001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.dataset;
018
019import java.util.concurrent.atomic.AtomicInteger;
020
021import org.apache.camel.Component;
022import org.apache.camel.Consumer;
023import org.apache.camel.Exchange;
024import org.apache.camel.Message;
025import org.apache.camel.Processor;
026import org.apache.camel.Service;
027import org.apache.camel.component.mock.MockEndpoint;
028import org.apache.camel.processor.ThroughputLogger;
029import org.apache.camel.spi.Metadata;
030import org.apache.camel.spi.UriEndpoint;
031import org.apache.camel.spi.UriParam;
032import org.apache.camel.spi.UriPath;
033import org.apache.camel.util.CamelLogger;
034import org.apache.camel.util.ExchangeHelper;
035import org.apache.camel.util.ObjectHelper;
036import org.apache.camel.util.URISupport;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * Endpoint for DataSet.
042 *
043 * @version 
044 */
045@UriEndpoint(scheme = "dataset", syntax = "dataset:name", consumerClass = DataSetConsumer.class, label = "core,testing")
046public class DataSetEndpoint extends MockEndpoint implements Service {
047    private final transient Logger log;
048    private final AtomicInteger receivedCounter = new AtomicInteger();
049    @UriPath(name = "name", description = "Name of DataSet to lookup in the registry") @Metadata(required = "true")
050    private volatile DataSet dataSet;
051    @UriParam(defaultValue = "0")
052    private int minRate;
053    @UriParam(defaultValue = "3")
054    private long produceDelay = 3;
055    @UriParam(defaultValue = "0")
056    private long consumeDelay;
057    @UriParam(defaultValue = "0")
058    private long preloadSize;
059    @UriParam(defaultValue = "1000")
060    private long initialDelay = 1000;
061
062    @Deprecated
063    public DataSetEndpoint() {
064        this.log = LoggerFactory.getLogger(DataSetEndpoint.class);
065        // optimize as we dont need to copy the exchange
066        setCopyOnExchange(false);
067    }
068
069    public DataSetEndpoint(String endpointUri, Component component, DataSet dataSet) {
070        super(endpointUri, component);
071        this.dataSet = dataSet;
072        this.log = LoggerFactory.getLogger(endpointUri);
073        // optimize as we dont need to copy the exchange
074        setCopyOnExchange(false);
075    }
076
077    public static void assertEquals(String description, Object expected, Object actual, Exchange exchange) {
078        if (!ObjectHelper.equal(expected, actual)) {
079            throw new AssertionError(description + " does not match. Expected: " + expected + " but was: " + actual + " on " + exchange + " with headers: " + exchange.getIn().getHeaders());
080        }
081    }
082
083    @Override
084    public Consumer createConsumer(Processor processor) throws Exception {
085        Consumer answer = new DataSetConsumer(this, processor);
086        configureConsumer(answer);
087        return answer;
088    }
089
090    @Override
091    public void reset() {
092        super.reset();
093        receivedCounter.set(0);
094    }
095
096    @Override
097    public int getReceivedCounter() {
098        return receivedCounter.get();
099    }
100
101    /**
102     * Creates a message exchange for the given index in the {@link DataSet}
103     */
104    public Exchange createExchange(long messageIndex) throws Exception {
105        Exchange exchange = createExchange();
106        getDataSet().populateMessage(exchange, messageIndex);
107
108        Message in = exchange.getIn();
109        in.setHeader(Exchange.DATASET_INDEX, messageIndex);
110
111        return exchange;
112    }
113
114    @Override
115    protected void waitForCompleteLatch(long timeout) throws InterruptedException {
116        super.waitForCompleteLatch(timeout);
117
118        if (minRate > 0) {
119            int count = getReceivedCounter();
120            do {
121                // wait as long as we get a decent message rate
122                super.waitForCompleteLatch(1000L);
123                count = getReceivedCounter() - count;
124            } while (count >= minRate);
125        }
126    }
127
128    // Properties
129    //-------------------------------------------------------------------------
130
131    public DataSet getDataSet() {
132        return dataSet;
133    }
134
135    public void setDataSet(DataSet dataSet) {
136        this.dataSet = dataSet;
137    }
138
139    public int getMinRate() {
140        return minRate;
141    }
142
143    /**
144     * Wait until the DataSet contains at least this number of messages
145     */
146    public void setMinRate(int minRate) {
147        this.minRate = minRate;
148    }
149
150    public long getPreloadSize() {
151        return preloadSize;
152    }
153
154    /**
155     * Sets how many messages should be preloaded (sent) before the route completes its initialization
156     */
157    public void setPreloadSize(long preloadSize) {
158        this.preloadSize = preloadSize;
159    }
160
161    public long getConsumeDelay() {
162        return consumeDelay;
163    }
164
165    /**
166     * Allows a delay to be specified which causes consumers to pause - to simulate slow consumers
167     */
168    public void setConsumeDelay(long consumeDelay) {
169        this.consumeDelay = consumeDelay;
170    }
171
172    public long getProduceDelay() {
173        return produceDelay;
174    }
175
176    /**
177     * Allows a delay to be specified which causes producers to pause - to simulate slow producers
178     */
179    public void setProduceDelay(long produceDelay) {
180        this.produceDelay = produceDelay;
181    }
182
183    public long getInitialDelay() {
184        return initialDelay;
185    }
186
187    /**
188     * Time period in millis to wait before starting sending messages.
189     */
190    public void setInitialDelay(long initialDelay) {
191        this.initialDelay = initialDelay;
192    }
193
194    // Implementation methods
195    //-------------------------------------------------------------------------
196
197    @Override
198    protected void performAssertions(Exchange actual, Exchange copy) throws Exception {
199        int receivedCount = receivedCounter.incrementAndGet();
200        long index = receivedCount - 1;
201        Exchange expected = createExchange(index);
202
203        // now let's assert that they are the same
204        if (log.isDebugEnabled()) {
205            log.debug("Received message: {} (DataSet index={}) = {}",
206                    new Object[]{index, copy.getIn().getHeader(Exchange.DATASET_INDEX, Integer.class), copy});
207        }
208
209        assertMessageExpected(index, expected, copy);
210
211        if (consumeDelay > 0) {
212            Thread.sleep(consumeDelay);
213        }
214    }
215
216    protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception {
217        long actualCounter = ExchangeHelper.getMandatoryHeader(actual, Exchange.DATASET_INDEX, Long.class);
218        assertEquals("Header: " + Exchange.DATASET_INDEX, index, actualCounter, actual);
219
220        getDataSet().assertMessageExpected(this, expected, actual, index);
221    }
222
223    protected ThroughputLogger createReporter() {
224        // must sanitize uri to avoid logging sensitive information
225        String uri = URISupport.sanitizeUri(getEndpointUri());
226        CamelLogger logger = new CamelLogger(uri);
227        ThroughputLogger answer = new ThroughputLogger(logger, (int) this.getDataSet().getReportCount());
228        answer.setAction("Received");
229        return answer;
230    }
231
232    @Override
233    protected void doStart() throws Exception {
234        super.doStart();
235
236        long size = getDataSet().getSize();
237        expectedMessageCount((int) size);
238        if (reporter == null) {
239            reporter = createReporter();
240        }
241        log.info(this + " expecting " + size + " messages");
242    }
243
244}