001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.mock;
018
019import java.io.File;
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Date;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.CopyOnWriteArrayList;
030import java.util.concurrent.CopyOnWriteArraySet;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.TimeUnit;
033
034import org.apache.camel.AsyncCallback;
035import org.apache.camel.CamelContext;
036import org.apache.camel.Component;
037import org.apache.camel.Consumer;
038import org.apache.camel.Endpoint;
039import org.apache.camel.Exchange;
040import org.apache.camel.ExchangePattern;
041import org.apache.camel.Expression;
042import org.apache.camel.Handler;
043import org.apache.camel.Message;
044import org.apache.camel.Predicate;
045import org.apache.camel.Processor;
046import org.apache.camel.Producer;
047import org.apache.camel.builder.ProcessorBuilder;
048import org.apache.camel.impl.DefaultAsyncProducer;
049import org.apache.camel.impl.DefaultEndpoint;
050import org.apache.camel.impl.InterceptSendToEndpoint;
051import org.apache.camel.spi.BrowsableEndpoint;
052import org.apache.camel.spi.Metadata;
053import org.apache.camel.spi.UriEndpoint;
054import org.apache.camel.spi.UriParam;
055import org.apache.camel.spi.UriPath;
056import org.apache.camel.util.CamelContextHelper;
057import org.apache.camel.util.CaseInsensitiveMap;
058import org.apache.camel.util.ExchangeHelper;
059import org.apache.camel.util.ExpressionComparator;
060import org.apache.camel.util.FileUtil;
061import org.apache.camel.util.ObjectHelper;
062import org.apache.camel.util.StopWatch;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * A Mock endpoint which provides a literate, fluent API for testing routes
068 * using a <a href="http://jmock.org/">JMock style</a> API.
069 * <p/>
070 * The mock endpoint have two set of methods
071 * <ul>
072 *   <li>expectedXXX or expectsXXX - To set pre conditions, before the test is executed</li>
073 *   <li>assertXXX - To assert assertions, after the test has been executed</li>
074 * </ul>
075 * Its <b>important</b> to know the difference between the two set. The former is used to
076 * set expectations before the test is being started (eg before the mock receives messages).
077 * The latter is used after the test has been executed, to verify the expectations; or
078 * other assertions which you can perform after the test has been completed.
079 * <p/>
080 * <b>Beware:</b> If you want to expect a mock does not receive any messages, by calling
081 * {@link #setExpectedMessageCount(int)} with <tt>0</tt>, then take extra care,
082 * as <tt>0</tt> matches when the tests starts, so you need to set a assert period time
083 * to let the test run for a while to make sure there are still no messages arrived; for
084 * that use {@link #setAssertPeriod(long)}.
085 * An alternative is to use <a href="http://camel.apache.org/notifybuilder.html">NotifyBuilder</a>, and use the notifier
086 * to know when Camel is done routing some messages, before you call the {@link #assertIsSatisfied()} method on the mocks.
087 * This allows you to not use a fixed assert period, to speedup testing times.
088 *
089 * @version 
090 */
091@UriEndpoint(scheme = "mock", syntax = "mock:name", producerOnly = true, label = "core,testing")
092public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
093    private static final Logger LOG = LoggerFactory.getLogger(MockEndpoint.class);
094    // must be volatile so changes is visible between the thread which performs the assertions
095    // and the threads which process the exchanges when routing messages in Camel
096    protected volatile Processor reporter;
097    
098    private volatile Processor defaultProcessor;
099    private volatile Map<Integer, Processor> processors;
100    private volatile List<Exchange> receivedExchanges;
101    private volatile List<Throwable> failures;
102    private volatile List<Runnable> tests;
103    private volatile CountDownLatch latch;
104    private volatile int expectedMinimumCount;
105    private volatile List<?> expectedBodyValues;
106    private volatile List<Object> actualBodyValues;
107    private volatile Map<String, Object> expectedHeaderValues;
108    private volatile Map<String, Object> actualHeaderValues;
109    private volatile Map<String, Object> expectedPropertyValues;
110    private volatile Map<String, Object> actualPropertyValues;
111
112    private volatile int counter;
113
114    @UriPath(description = "Name of mock endpoint") @Metadata(required = "true")
115    private String name;
116    @UriParam(defaultValue = "-1")
117    private int expectedCount;
118    @UriParam(defaultValue = "0")
119    private long sleepForEmptyTest;
120    @UriParam(defaultValue = "0")
121    private long resultWaitTime;
122    @UriParam(defaultValue = "0")
123    private long resultMinimumWaitTime;
124    @UriParam(defaultValue = "0")
125    private long assertPeriod;
126    @UriParam(defaultValue = "-1")
127    private int retainFirst;
128    @UriParam(defaultValue = "-1")
129    private int retainLast;
130    @UriParam(defaultValue = "true")
131    private boolean copyOnExchange = true;
132
133    public MockEndpoint(String endpointUri, Component component) {
134        super(endpointUri, component);
135        init();
136    }
137
138    @Deprecated
139    public MockEndpoint(String endpointUri) {
140        super(endpointUri);
141        init();
142    }
143
144    public MockEndpoint() {
145        this(null);
146    }
147
148    /**
149     * A helper method to resolve the mock endpoint of the given URI on the given context
150     *
151     * @param context the camel context to try resolve the mock endpoint from
152     * @param uri the uri of the endpoint to resolve
153     * @return the endpoint
154     */
155    public static MockEndpoint resolve(CamelContext context, String uri) {
156        return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class);
157    }
158
159    public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
160        long start = System.currentTimeMillis();
161        long left = unit.toMillis(timeout);
162        long end = start + left;
163        for (MockEndpoint endpoint : endpoints) {
164            if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
165                throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
166            }
167            left = end - System.currentTimeMillis();
168            if (left <= 0) {
169                left = 0;
170            }
171        }
172    }
173
174    public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
175        assertWait(timeout, unit, endpoints);
176        for (MockEndpoint endpoint : endpoints) {
177            endpoint.assertIsSatisfied();
178        }
179    }
180
181    public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
182        for (MockEndpoint endpoint : endpoints) {
183            endpoint.assertIsSatisfied();
184        }
185    }
186
187
188    /**
189     * Asserts that all the expectations on any {@link MockEndpoint} instances registered
190     * in the given context are valid
191     *
192     * @param context the camel context used to find all the available endpoints to be asserted
193     */
194    public static void assertIsSatisfied(CamelContext context) throws InterruptedException {
195        ObjectHelper.notNull(context, "camelContext");
196        Collection<Endpoint> endpoints = context.getEndpoints();
197        for (Endpoint endpoint : endpoints) {
198            // if the endpoint was intercepted we should get the delegate
199            if (endpoint instanceof InterceptSendToEndpoint) {
200                endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
201            }
202            if (endpoint instanceof MockEndpoint) {
203                MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
204                mockEndpoint.assertIsSatisfied();
205            }
206        }
207    }
208
209    /**
210     * Asserts that all the expectations on any {@link MockEndpoint} instances registered
211     * in the given context are valid
212     *
213     * @param context the camel context used to find all the available endpoints to be asserted
214     * @param timeout timeout
215     * @param unit    time unit
216     */
217    public static void assertIsSatisfied(CamelContext context, long timeout, TimeUnit unit) throws InterruptedException {
218        ObjectHelper.notNull(context, "camelContext");
219        ObjectHelper.notNull(unit, "unit");
220        Collection<Endpoint> endpoints = context.getEndpoints();
221        long millis = unit.toMillis(timeout);
222        for (Endpoint endpoint : endpoints) {
223            // if the endpoint was intercepted we should get the delegate
224            if (endpoint instanceof InterceptSendToEndpoint) {
225                endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
226            }
227            if (endpoint instanceof MockEndpoint) {
228                MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
229                mockEndpoint.setResultWaitTime(millis);
230                mockEndpoint.assertIsSatisfied();
231            }
232        }
233    }
234
235    /**
236     * Sets the assert period on all the expectations on any {@link MockEndpoint} instances registered
237     * in the given context.
238     *
239     * @param context the camel context used to find all the available endpoints
240     * @param period the period in millis
241     */
242    public static void setAssertPeriod(CamelContext context, long period) {
243        ObjectHelper.notNull(context, "camelContext");
244        Collection<Endpoint> endpoints = context.getEndpoints();
245        for (Endpoint endpoint : endpoints) {
246            // if the endpoint was intercepted we should get the delegate
247            if (endpoint instanceof InterceptSendToEndpoint) {
248                endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
249            }
250            if (endpoint instanceof MockEndpoint) {
251                MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
252                mockEndpoint.setAssertPeriod(period);
253            }
254        }
255    }
256
257    /**
258     * Reset all mock endpoints
259     *
260     * @param context the camel context used to find all the available endpoints to reset
261     */
262    public static void resetMocks(CamelContext context) {
263        ObjectHelper.notNull(context, "camelContext");
264        Collection<Endpoint> endpoints = context.getEndpoints();
265        for (Endpoint endpoint : endpoints) {
266            // if the endpoint was intercepted we should get the delegate
267            if (endpoint instanceof InterceptSendToEndpoint) {
268                endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
269            }
270            if (endpoint instanceof MockEndpoint) {
271                MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
272                mockEndpoint.reset();
273            }
274        }
275    }
276
277    public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
278        for (MockEndpoint endpoint : endpoints) {
279            endpoint.setExpectedMessageCount(count);
280        }
281    }
282
283    public List<Exchange> getExchanges() {
284        return getReceivedExchanges();
285    }
286
287    public Consumer createConsumer(Processor processor) throws Exception {
288        throw new UnsupportedOperationException("You cannot consume from this endpoint");
289    }
290
291    public Producer createProducer() throws Exception {
292        return new DefaultAsyncProducer(this) {
293            public boolean process(Exchange exchange, AsyncCallback callback) {
294                onExchange(exchange);
295                callback.done(true);
296                return true;
297            }
298        };
299    }
300
301    public void reset() {
302        init();
303    }
304
305
306    // Testing API
307    // -------------------------------------------------------------------------
308
309    /**
310     * Handles the incoming exchange.
311     * <p/>
312     * This method turns this mock endpoint into a bean which you can use
313     * in the Camel routes, which allows you to inject MockEndpoint as beans
314     * in your routes and use the features of the mock to control the bean.
315     *
316     * @param exchange  the exchange
317     * @throws Exception can be thrown
318     */
319    @Handler
320    public void handle(Exchange exchange) throws Exception {
321        onExchange(exchange);
322    }
323
324    /**
325     * Set the processor that will be invoked when the index
326     * message is received.
327     */
328    public void whenExchangeReceived(int index, Processor processor) {
329        this.processors.put(index, processor);
330    }
331
332    /**
333     * Set the processor that will be invoked when the some message
334     * is received.
335     *
336     * This processor could be overwritten by
337     * {@link #whenExchangeReceived(int, Processor)} method.
338     */
339    public void whenAnyExchangeReceived(Processor processor) {
340        this.defaultProcessor = processor;
341    }
342    
343    /**
344     * Set the expression which value will be set to the message body
345     * @param expression which is use to set the message body 
346     */
347    public void returnReplyBody(Expression expression) {
348        this.defaultProcessor = ProcessorBuilder.setBody(expression);
349    }
350    
351    /**
352     * Set the expression which value will be set to the message header
353     * @param headerName that will be set value
354     * @param expression which is use to set the message header 
355     */
356    public void returnReplyHeader(String headerName, Expression expression) {
357        this.defaultProcessor = ProcessorBuilder.setHeader(headerName, expression);
358    }
359    
360
361    /**
362     * Validates that all the available expectations on this endpoint are
363     * satisfied; or throw an exception
364     */
365    public void assertIsSatisfied() throws InterruptedException {
366        assertIsSatisfied(sleepForEmptyTest);
367    }
368
369    /**
370     * Validates that all the available expectations on this endpoint are
371     * satisfied; or throw an exception
372     *
373     * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
374     *                should wait for the test to be true
375     */
376    public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
377        LOG.info("Asserting: " + this + " is satisfied");
378        doAssertIsSatisfied(timeoutForEmptyEndpoints);
379        if (assertPeriod > 0) {
380            // if an assert period was set then re-assert again to ensure the assertion is still valid
381            Thread.sleep(assertPeriod);
382            LOG.info("Re-asserting: " + this + " is satisfied after " + assertPeriod + " millis");
383            // do not use timeout when we re-assert
384            doAssertIsSatisfied(0);
385        }
386    }
387
388    protected void doAssertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
389        if (expectedCount == 0) {
390            if (timeoutForEmptyEndpoints > 0) {
391                LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
392                Thread.sleep(timeoutForEmptyEndpoints);
393            }
394            assertEquals("Received message count", expectedCount, getReceivedCounter());
395        } else if (expectedCount > 0) {
396            if (expectedCount != getReceivedCounter()) {
397                waitForCompleteLatch();
398            }
399            assertEquals("Received message count", expectedCount, getReceivedCounter());
400        } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
401            waitForCompleteLatch();
402        }
403
404        if (expectedMinimumCount >= 0) {
405            int receivedCounter = getReceivedCounter();
406            assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter);
407        }
408
409        for (Runnable test : tests) {
410            test.run();
411        }
412
413        for (Throwable failure : failures) {
414            if (failure != null) {
415                LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
416                fail("Failed due to caught exception: " + failure);
417            }
418        }
419    }
420
421    /**
422     * Validates that the assertions fail on this endpoint
423     */
424    public void assertIsNotSatisfied() throws InterruptedException {
425        boolean failed = false;
426        try {
427            assertIsSatisfied();
428            // did not throw expected error... fail!
429            failed = true;
430        } catch (AssertionError e) {
431            LOG.info("Caught expected failure: " + e);
432        }
433        if (failed) {
434            // fail() throws the AssertionError to indicate the test failed. 
435            fail("Expected assertion failure but test succeeded!");
436        }
437    }
438
439    /**
440     * Validates that the assertions fail on this endpoint
441
442     * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
443     *        should wait for the test to be true
444     */
445    public void assertIsNotSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
446        boolean failed = false;
447        try {
448            assertIsSatisfied(timeoutForEmptyEndpoints);
449            // did not throw expected error... fail!
450            failed = true;
451        } catch (AssertionError e) {
452            LOG.info("Caught expected failure: " + e);
453        }
454        if (failed) { 
455            // fail() throws the AssertionError to indicate the test failed. 
456            fail("Expected assertion failure but test succeeded!");
457        }
458    }    
459    
460    /**
461     * Specifies the expected number of message exchanges that should be
462     * received by this endpoint
463     *
464     * If you want to assert that <b>exactly</b> n messages arrives to this mock
465     * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details.
466     *
467     * @param expectedCount the number of message exchanges that should be
468     *                expected by this endpoint
469     * @see #setAssertPeriod(long)
470     */
471    public void expectedMessageCount(int expectedCount) {
472        setExpectedMessageCount(expectedCount);
473    }
474
475    /**
476     * Sets a grace period after which the mock endpoint will re-assert
477     * to ensure the preliminary assertion is still valid.
478     * <p/>
479     * This is used for example to assert that <b>exactly</b> a number of messages 
480     * arrives. For example if {@link #expectedMessageCount(int)} was set to 5, then
481     * the assertion is satisfied when 5 or more message arrives. To ensure that
482     * exactly 5 messages arrives, then you would need to wait a little period
483     * to ensure no further message arrives. This is what you can use this
484     * {@link #setAssertPeriod(long)} method for.
485     * <p/>
486     * By default this period is disabled.
487     *
488     * @param period grace period in millis
489     */
490    public void setAssertPeriod(long period) {
491        this.assertPeriod = period;
492    }
493
494    /**
495     * Specifies the minimum number of expected message exchanges that should be
496     * received by this endpoint
497     *
498     * @param expectedCount the number of message exchanges that should be
499     *                expected by this endpoint
500     */
501    public void expectedMinimumMessageCount(int expectedCount) {
502        setMinimumExpectedMessageCount(expectedCount);
503    }
504
505    /**
506     * Sets an expectation that the given header name & value are received by this endpoint
507     * <p/>
508     * You can set multiple expectations for different header names.
509     * If you set a value of <tt>null</tt> that means we accept either the header is absent, or its value is <tt>null</tt>
510     */
511    public void expectedHeaderReceived(final String name, final Object value) {
512        if (expectedHeaderValues == null) {
513            expectedHeaderValues = new CaseInsensitiveMap();
514            // we just wants to expects to be called once
515            expects(new Runnable() {
516                public void run() {
517                    for (int i = 0; i < getReceivedExchanges().size(); i++) {
518                        Exchange exchange = getReceivedExchange(i);
519                        for (Map.Entry<String, Object> entry : expectedHeaderValues.entrySet()) {
520                            String key = entry.getKey();
521                            Object expectedValue = entry.getValue();
522
523                            // we accept that an expectedValue of null also means that the header may be absent
524                            if (expectedValue != null) {
525                                assertTrue("Exchange " + i + " has no headers", exchange.getIn().hasHeaders());
526                                boolean hasKey = exchange.getIn().getHeaders().containsKey(key);
527                                assertTrue("No header with name " + key + " found for message: " + i, hasKey);
528                            }
529
530                            Object actualValue = exchange.getIn().getHeader(key);
531                            actualValue = extractActualValue(exchange, actualValue, expectedValue);
532
533                            assertEquals("Header with name " + key + " for message: " + i, expectedValue, actualValue);
534                        }
535                    }
536                }
537            });
538        }
539        expectedHeaderValues.put(name, value);
540    }
541
542    /**
543     * Adds an expectation that the given header values are received by this
544     * endpoint in any order
545     */
546    public void expectedHeaderValuesReceivedInAnyOrder(final String name, final List<?> values) {
547        expectedMessageCount(values.size());
548
549        expects(new Runnable() {
550            public void run() {
551                // these are the expected values to find
552                final Set<Object> actualHeaderValues = new CopyOnWriteArraySet<Object>(values);
553
554                for (int i = 0; i < getReceivedExchanges().size(); i++) {
555                    Exchange exchange = getReceivedExchange(i);
556
557                    Object actualValue = exchange.getIn().getHeader(name);
558                    for (Object expectedValue : actualHeaderValues) {
559                        actualValue = extractActualValue(exchange, actualValue, expectedValue);
560                        // remove any found values
561                        actualHeaderValues.remove(actualValue);
562                    }
563                }
564
565                // should be empty, as we should find all the values
566                assertTrue("Expected " + values.size() + " headers with key[" + name + "], received " + (values.size() - actualHeaderValues.size())
567                        + " headers. Expected header values: " + actualHeaderValues, actualHeaderValues.isEmpty());
568            }
569        });
570    }
571
572    /**
573     * Adds an expectation that the given header values are received by this
574     * endpoint in any order
575     */
576    public void expectedHeaderValuesReceivedInAnyOrder(String name, Object... values) {
577        List<Object> valueList = new ArrayList<Object>();
578        valueList.addAll(Arrays.asList(values));
579        expectedHeaderValuesReceivedInAnyOrder(name, valueList);
580    }
581
582    /**
583     * Sets an expectation that the given property name & value are received by this endpoint
584     * <p/>
585     * You can set multiple expectations for different property names.
586     * If you set a value of <tt>null</tt> that means we accept either the property is absent, or its value is <tt>null</tt>
587     */
588    public void expectedPropertyReceived(final String name, final Object value) {
589        if (expectedPropertyValues == null) {
590            expectedPropertyValues = new ConcurrentHashMap<String, Object>();
591        }
592        if (value != null) {
593            // ConcurrentHashMap cannot store null values
594            expectedPropertyValues.put(name, value);
595        }
596
597        expects(new Runnable() {
598            public void run() {
599                for (int i = 0; i < getReceivedExchanges().size(); i++) {
600                    Exchange exchange = getReceivedExchange(i);
601                    for (Map.Entry<String, Object> entry : expectedPropertyValues.entrySet()) {
602                        String key = entry.getKey();
603                        Object expectedValue = entry.getValue();
604
605                        // we accept that an expectedValue of null also means that the header may be absent
606                        if (expectedValue != null) {
607                            assertTrue("Exchange " + i + " has no properties", !exchange.getProperties().isEmpty());
608                            boolean hasKey = exchange.getProperties().containsKey(key);
609                            assertTrue("No property with name " + key + " found for message: " + i, hasKey);
610                        }
611
612                        Object actualValue = exchange.getProperty(key);
613                        actualValue = extractActualValue(exchange, actualValue, expectedValue);
614
615                        assertEquals("Property with name " + key + " for message: " + i, expectedValue, actualValue);
616                    }
617                }
618            }
619        });
620    }
621
622    /**
623     * Adds an expectation that the given body values are received by this
624     * endpoint in the specified order
625     */
626    public void expectedBodiesReceived(final List<?> bodies) {
627        expectedMessageCount(bodies.size());
628        this.expectedBodyValues = bodies;
629        this.actualBodyValues = new ArrayList<Object>();
630
631        expects(new Runnable() {
632            public void run() {
633                for (int i = 0; i < expectedBodyValues.size(); i++) {
634                    Exchange exchange = getReceivedExchange(i);
635                    assertTrue("No exchange received for counter: " + i, exchange != null);
636
637                    Object expectedBody = expectedBodyValues.get(i);
638                    Object actualBody = null;
639                    if (i < actualBodyValues.size()) {
640                        actualBody = actualBodyValues.get(i);
641                    }
642                    actualBody = extractActualValue(exchange, actualBody, expectedBody);
643
644                    assertEquals("Body of message: " + i, expectedBody, actualBody);
645                }
646            }
647        });
648    }
649
650    private Object extractActualValue(Exchange exchange, Object actualValue, Object expectedValue) {
651        if (actualValue == null) {
652            return null;
653        }
654
655        if (actualValue instanceof Expression) {
656            actualValue = ((Expression)actualValue).evaluate(exchange, expectedValue != null ? expectedValue.getClass() : Object.class);
657        } else if (actualValue instanceof Predicate) {
658            actualValue = ((Predicate)actualValue).matches(exchange);
659        } else if (expectedValue != null) {
660            String from = actualValue.getClass().getName();
661            String to = expectedValue.getClass().getName();
662            actualValue = getCamelContext().getTypeConverter().convertTo(expectedValue.getClass(), exchange, actualValue);
663            assertTrue("There is no type conversion possible from " + from + " to " + to, actualValue != null);
664        }
665        return actualValue;
666    }
667
668    /**
669     * Sets an expectation that the given predicates matches the received messages by this endpoint
670     */
671    public void expectedMessagesMatches(Predicate... predicates) {
672        for (int i = 0; i < predicates.length; i++) {
673            final int messageIndex = i;
674            final Predicate predicate = predicates[i];
675            final AssertionClause clause = new AssertionClause(this) {
676                public void run() {
677                    addPredicate(predicate);
678                    applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
679                }
680            };
681            expects(clause);
682        }
683    }
684
685    /**
686     * Sets an expectation that the given body values are received by this endpoint
687     */
688    public void expectedBodiesReceived(Object... bodies) {
689        List<Object> bodyList = new ArrayList<Object>();
690        bodyList.addAll(Arrays.asList(bodies));
691        expectedBodiesReceived(bodyList);
692    }
693
694    /**
695     * Adds an expectation that the given body value are received by this endpoint
696     */
697    public AssertionClause expectedBodyReceived() {
698        expectedMessageCount(1);
699        final AssertionClause clause = new AssertionClause(this) {
700            public void run() {
701                Exchange exchange = getReceivedExchange(0);
702                assertTrue("No exchange received for counter: " + 0, exchange != null);
703
704                Object actualBody = exchange.getIn().getBody();
705                Expression exp = createExpression(getCamelContext());
706                Object expectedBody = exp.evaluate(exchange, Object.class);
707
708                assertEquals("Body of message: " + 0, expectedBody, actualBody);
709            }
710        };
711        expects(clause);
712        return clause;
713    }
714
715    /**
716     * Adds an expectation that the given body values are received by this
717     * endpoint in any order
718     */
719    public void expectedBodiesReceivedInAnyOrder(final List<?> bodies) {
720        expectedMessageCount(bodies.size());
721        this.expectedBodyValues = bodies;
722        this.actualBodyValues = new ArrayList<Object>();
723
724        expects(new Runnable() {
725            public void run() {
726                List<Object> actualBodyValuesSet = new ArrayList<Object>(actualBodyValues);
727                for (int i = 0; i < expectedBodyValues.size(); i++) {
728                    Exchange exchange = getReceivedExchange(i);
729                    assertTrue("No exchange received for counter: " + i, exchange != null);
730
731                    Object expectedBody = expectedBodyValues.get(i);
732                    assertTrue("Message with body " + expectedBody + " was expected but not found in " + actualBodyValuesSet, actualBodyValuesSet.remove(expectedBody));
733                }
734            }
735        });
736    }
737
738    /**
739     * Adds an expectation that the given body values are received by this
740     * endpoint in any order
741     */
742    public void expectedBodiesReceivedInAnyOrder(Object... bodies) {
743        List<Object> bodyList = new ArrayList<Object>();
744        bodyList.addAll(Arrays.asList(bodies));
745        expectedBodiesReceivedInAnyOrder(bodyList);
746    }
747
748    /**
749     * Adds an expectation that a file exists with the given name
750     *
751     * @param name name of file, will cater for / and \ on different OS platforms
752     */
753    public void expectedFileExists(final String name) {
754        expectedFileExists(name, null);
755    }
756
757    /**
758     * Adds an expectation that a file exists with the given name
759     * <p/>
760     * Will wait at most 5 seconds while checking for the existence of the file.
761     *
762     * @param name name of file, will cater for / and \ on different OS platforms
763     * @param content content of file to compare, can be <tt>null</tt> to not compare content
764     */
765    public void expectedFileExists(final String name, final String content) {
766        final File file = new File(FileUtil.normalizePath(name));
767
768        expects(new Runnable() {
769            public void run() {
770                // wait at most 5 seconds for the file to exists
771                final long timeout = System.currentTimeMillis() + 5000;
772
773                boolean stop = false;
774                while (!stop && !file.exists()) {
775                    try {
776                        Thread.sleep(50);
777                    } catch (InterruptedException e) {
778                        // ignore
779                    }
780                    stop = System.currentTimeMillis() > timeout;
781                }
782
783                assertTrue("The file should exists: " + name, file.exists());
784
785                if (content != null) {
786                    String body = getCamelContext().getTypeConverter().convertTo(String.class, file);
787                    assertEquals("Content of file: " + name, content, body);
788                }
789            }
790        });
791    }
792
793    /**
794     * Adds an expectation that messages received should have the given exchange pattern
795     */
796    public void expectedExchangePattern(final ExchangePattern exchangePattern) {
797        expectedMessagesMatches(new Predicate() {
798            public boolean matches(Exchange exchange) {
799                return exchange.getPattern().equals(exchangePattern);
800            }
801        });
802    }
803
804    /**
805     * Adds an expectation that messages received should have ascending values
806     * of the given expression such as a user generated counter value
807     */
808    public void expectsAscending(final Expression expression) {
809        expects(new Runnable() {
810            public void run() {
811                assertMessagesAscending(expression);
812            }
813        });
814    }
815
816    /**
817     * Adds an expectation that messages received should have ascending values
818     * of the given expression such as a user generated counter value
819     */
820    public AssertionClause expectsAscending() {
821        final AssertionClause clause = new AssertionClause(this) {
822            public void run() {
823                assertMessagesAscending(createExpression(getCamelContext()));
824            }
825        };
826        expects(clause);
827        return clause;
828    }
829
830    /**
831     * Adds an expectation that messages received should have descending values
832     * of the given expression such as a user generated counter value
833     */
834    public void expectsDescending(final Expression expression) {
835        expects(new Runnable() {
836            public void run() {
837                assertMessagesDescending(expression);
838            }
839        });
840    }
841
842    /**
843     * Adds an expectation that messages received should have descending values
844     * of the given expression such as a user generated counter value
845     */
846    public AssertionClause expectsDescending() {
847        final AssertionClause clause = new AssertionClause(this) {
848            public void run() {
849                assertMessagesDescending(createExpression(getCamelContext()));
850            }
851        };
852        expects(clause);
853        return clause;
854    }
855
856    /**
857     * Adds an expectation that no duplicate messages should be received using
858     * the expression to determine the message ID
859     *
860     * @param expression the expression used to create a unique message ID for
861     *                message comparison (which could just be the message
862     *                payload if the payload can be tested for uniqueness using
863     *                {@link Object#equals(Object)} and
864     *                {@link Object#hashCode()}
865     */
866    public void expectsNoDuplicates(final Expression expression) {
867        expects(new Runnable() {
868            public void run() {
869                assertNoDuplicates(expression);
870            }
871        });
872    }
873
874    /**
875     * Adds an expectation that no duplicate messages should be received using
876     * the expression to determine the message ID
877     */
878    public AssertionClause expectsNoDuplicates() {
879        final AssertionClause clause = new AssertionClause(this) {
880            public void run() {
881                assertNoDuplicates(createExpression(getCamelContext()));
882            }
883        };
884        expects(clause);
885        return clause;
886    }
887
888    /**
889     * Asserts that the messages have ascending values of the given expression
890     */
891    public void assertMessagesAscending(Expression expression) {
892        assertMessagesSorted(expression, true);
893    }
894
895    /**
896     * Asserts that the messages have descending values of the given expression
897     */
898    public void assertMessagesDescending(Expression expression) {
899        assertMessagesSorted(expression, false);
900    }
901
902    protected void assertMessagesSorted(Expression expression, boolean ascending) {
903        String type = ascending ? "ascending" : "descending";
904        ExpressionComparator comparator = new ExpressionComparator(expression);
905        List<Exchange> list = getReceivedExchanges();
906        for (int i = 1; i < list.size(); i++) {
907            int j = i - 1;
908            Exchange e1 = list.get(j);
909            Exchange e2 = list.get(i);
910            int result = comparator.compare(e1, e2);
911            if (result == 0) {
912                fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: "
913                    + expression.evaluate(e1, Object.class) + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
914            } else {
915                if (!ascending) {
916                    result = result * -1;
917                }
918                if (result > 0) {
919                    fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1, Object.class)
920                        + " and message " + i + " has value: " + expression.evaluate(e2, Object.class) + " for expression: "
921                        + expression + ". Exchanges: " + e1 + " and " + e2);
922                }
923            }
924        }
925    }
926
927    public void assertNoDuplicates(Expression expression) {
928        Map<Object, Exchange> map = new HashMap<Object, Exchange>();
929        List<Exchange> list = getReceivedExchanges();
930        for (int i = 0; i < list.size(); i++) {
931            Exchange e2 = list.get(i);
932            Object key = expression.evaluate(e2, Object.class);
933            Exchange e1 = map.get(key);
934            if (e1 != null) {
935                fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
936            } else {
937                map.put(key, e2);
938            }
939        }
940    }
941
942    /**
943     * Adds the expectation which will be invoked when enough messages are received
944     */
945    public void expects(Runnable runnable) {
946        tests.add(runnable);
947    }
948
949    /**
950     * Adds an assertion to the given message index
951     *
952     * @param messageIndex the number of the message
953     * @return the assertion clause
954     */
955    public AssertionClause message(final int messageIndex) {
956        final AssertionClause clause = new AssertionClause(this) {
957            public void run() {
958                applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
959            }
960        };
961        expects(clause);
962        return clause;
963    }
964
965    /**
966     * Adds an assertion to all the received messages
967     *
968     * @return the assertion clause
969     */
970    public AssertionClause allMessages() {
971        final AssertionClause clause = new AssertionClause(this) {
972            public void run() {
973                List<Exchange> list = getReceivedExchanges();
974                int index = 0;
975                for (Exchange exchange : list) {
976                    applyAssertionOn(MockEndpoint.this, index++, exchange);
977                }
978            }
979        };
980        expects(clause);
981        return clause;
982    }
983
984    /**
985     * Asserts that the given index of message is received (starting at zero)
986     */
987    public Exchange assertExchangeReceived(int index) {
988        int count = getReceivedCounter();
989        assertTrue("Not enough messages received. Was: " + count, count > index);
990        return getReceivedExchange(index);
991    }
992
993    // Properties
994    // -------------------------------------------------------------------------
995
996    public String getName() {
997        return name;
998    }
999
1000    public void setName(String name) {
1001        this.name = name;
1002    }
1003
1004    public List<Throwable> getFailures() {
1005        return failures;
1006    }
1007
1008    public int getReceivedCounter() {
1009        return counter;
1010    }
1011
1012    public List<Exchange> getReceivedExchanges() {
1013        return receivedExchanges;
1014    }
1015
1016    public int getExpectedCount() {
1017        return expectedCount;
1018    }
1019
1020    public long getSleepForEmptyTest() {
1021        return sleepForEmptyTest;
1022    }
1023
1024    /**
1025     * Allows a sleep to be specified to wait to check that this endpoint really
1026     * is empty when {@link #expectedMessageCount(int)} is called with zero
1027     *
1028     * @param sleepForEmptyTest the milliseconds to sleep for to determine that
1029     *                this endpoint really is empty
1030     */
1031    public void setSleepForEmptyTest(long sleepForEmptyTest) {
1032        this.sleepForEmptyTest = sleepForEmptyTest;
1033    }
1034
1035    public long getResultWaitTime() {
1036        return resultWaitTime;
1037    }
1038
1039    /**
1040     * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will
1041     * wait on a latch until it is satisfied
1042     */
1043    public void setResultWaitTime(long resultWaitTime) {
1044        this.resultWaitTime = resultWaitTime;
1045    }
1046
1047    /**
1048     * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will
1049     * wait on a latch until it is satisfied
1050     */
1051    public void setResultMinimumWaitTime(long resultMinimumWaitTime) {
1052        this.resultMinimumWaitTime = resultMinimumWaitTime;
1053    }
1054
1055    /**
1056     * @deprecated use {@link #setResultMinimumWaitTime(long)}
1057     */
1058    @Deprecated
1059    public void setMinimumResultWaitTime(long resultMinimumWaitTime) {
1060        setResultMinimumWaitTime(resultMinimumWaitTime);
1061    }
1062
1063    /**
1064     * Specifies the expected number of message exchanges that should be
1065     * received by this endpoint.
1066     * <p/>
1067     * <b>Beware:</b> If you want to expect that <tt>0</tt> messages, then take extra care,
1068     * as <tt>0</tt> matches when the tests starts, so you need to set a assert period time
1069     * to let the test run for a while to make sure there are still no messages arrived; for
1070     * that use {@link #setAssertPeriod(long)}.
1071     * An alternative is to use <a href="http://camel.apache.org/notifybuilder.html">NotifyBuilder</a>, and use the notifier
1072     * to know when Camel is done routing some messages, before you call the {@link #assertIsSatisfied()} method on the mocks.
1073     * This allows you to not use a fixed assert period, to speedup testing times.
1074     * <p/>
1075     * If you want to assert that <b>exactly</b> n'th message arrives to this mock
1076     * endpoint, then see also the {@link #setAssertPeriod(long)} method for further details.
1077     *
1078     * @param expectedCount the number of message exchanges that should be
1079     *                expected by this endpoint
1080     * @see #setAssertPeriod(long)                      
1081     */
1082    public void setExpectedCount(int expectedCount) {
1083        setExpectedMessageCount(expectedCount);
1084    }
1085
1086    /**
1087     * @see #setExpectedCount(int)
1088     */
1089    public void setExpectedMessageCount(int expectedCount) {
1090        this.expectedCount = expectedCount;
1091        if (expectedCount <= 0) {
1092            latch = null;
1093        } else {
1094            latch = new CountDownLatch(expectedCount);
1095        }
1096    }
1097
1098    /**
1099     * Specifies the minimum number of expected message exchanges that should be
1100     * received by this endpoint
1101     *
1102     * @param expectedCount the number of message exchanges that should be
1103     *                expected by this endpoint
1104     */
1105    public void setMinimumExpectedMessageCount(int expectedCount) {
1106        this.expectedMinimumCount = expectedCount;
1107        if (expectedCount <= 0) {
1108            latch = null;
1109        } else {
1110            latch = new CountDownLatch(expectedMinimumCount);
1111        }
1112    }
1113
1114    public Processor getReporter() {
1115        return reporter;
1116    }
1117
1118    /**
1119     * Allows a processor to added to the endpoint to report on progress of the test
1120     */
1121    public void setReporter(Processor reporter) {
1122        this.reporter = reporter;
1123    }
1124
1125    /**
1126     * Specifies to only retain the first n'th number of received {@link Exchange}s.
1127     * <p/>
1128     * This is used when testing with big data, to reduce memory consumption by not storing
1129     * copies of every {@link Exchange} this mock endpoint receives.
1130     * <p/>
1131     * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()}
1132     * will still return the actual number of received {@link Exchange}s. For example
1133     * if we have received 5000 {@link Exchange}s, and have configured to only retain the first
1134     * 10 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt>
1135     * but there is only the first 10 {@link Exchange}s in the {@link #getExchanges()} and
1136     * {@link #getReceivedExchanges()} methods.
1137     * <p/>
1138     * When using this method, then some of the other expectation methods is not supported,
1139     * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first
1140     * number of bodies received.
1141     * <p/>
1142     * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods,
1143     * to limit both the first and last received.
1144     * 
1145     * @param retainFirst  to limit and only keep the first n'th received {@link Exchange}s, use
1146     *                     <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all.
1147     * @see #setRetainLast(int)
1148     */
1149    public void setRetainFirst(int retainFirst) {
1150        this.retainFirst = retainFirst;
1151    }
1152
1153    /**
1154     * Specifies to only retain the last n'th number of received {@link Exchange}s.
1155     * <p/>
1156     * This is used when testing with big data, to reduce memory consumption by not storing
1157     * copies of every {@link Exchange} this mock endpoint receives.
1158     * <p/>
1159     * <b>Important:</b> When using this limitation, then the {@link #getReceivedCounter()}
1160     * will still return the actual number of received {@link Exchange}s. For example
1161     * if we have received 5000 {@link Exchange}s, and have configured to only retain the last
1162     * 20 {@link Exchange}s, then the {@link #getReceivedCounter()} will still return <tt>5000</tt>
1163     * but there is only the last 20 {@link Exchange}s in the {@link #getExchanges()} and
1164     * {@link #getReceivedExchanges()} methods.
1165     * <p/>
1166     * When using this method, then some of the other expectation methods is not supported,
1167     * for example the {@link #expectedBodiesReceived(Object...)} sets a expectation on the first
1168     * number of bodies received.
1169     * <p/>
1170     * You can configure both {@link #setRetainFirst(int)} and {@link #setRetainLast(int)} methods,
1171     * to limit both the first and last received.
1172     *
1173     * @param retainLast  to limit and only keep the last n'th received {@link Exchange}s, use
1174     *                     <tt>0</tt> to not retain any messages, or <tt>-1</tt> to retain all.
1175     * @see #setRetainFirst(int)
1176     */
1177    public void setRetainLast(int retainLast) {
1178        this.retainLast = retainLast;
1179    }
1180
1181    public boolean isCopyOnExchange() {
1182        return copyOnExchange;
1183    }
1184
1185    /**
1186     * Sets whether to make a deep copy of the incoming {@link Exchange} when received at this mock endpoint.
1187     * <p/>
1188     * Is by default <tt>true</tt>.
1189     */
1190    public void setCopyOnExchange(boolean copyOnExchange) {
1191        this.copyOnExchange = copyOnExchange;
1192    }
1193
1194    // Implementation methods
1195    // -------------------------------------------------------------------------
1196    private void init() {
1197        expectedCount = -1;
1198        counter = 0;
1199        defaultProcessor = null;
1200        processors = new HashMap<Integer, Processor>();
1201        receivedExchanges = new CopyOnWriteArrayList<Exchange>();
1202        failures = new CopyOnWriteArrayList<Throwable>();
1203        tests = new CopyOnWriteArrayList<Runnable>();
1204        latch = null;
1205        sleepForEmptyTest = 0;
1206        resultWaitTime = 0;
1207        resultMinimumWaitTime = 0L;
1208        assertPeriod = 0L;
1209        expectedMinimumCount = -1;
1210        expectedBodyValues = null;
1211        actualBodyValues = new ArrayList<Object>();
1212        expectedHeaderValues = null;
1213        actualHeaderValues = null;
1214        expectedPropertyValues = null;
1215        actualPropertyValues = null;
1216        retainFirst = -1;
1217        retainLast = -1;
1218    }
1219
1220    protected synchronized void onExchange(Exchange exchange) {
1221        try {
1222            if (reporter != null) {
1223                reporter.process(exchange);
1224            }
1225            Exchange copy = exchange;
1226            if (copyOnExchange) {
1227                // copy the exchange so the mock stores the copy and not the actual exchange
1228                copy = ExchangeHelper.createCopy(exchange, true);
1229            }
1230            performAssertions(exchange, copy);
1231        } catch (Throwable e) {
1232            // must catch java.lang.Throwable as AssertionError extends java.lang.Error
1233            failures.add(e);
1234        } finally {
1235            // make sure latch is counted down to avoid test hanging forever
1236            if (latch != null) {
1237                latch.countDown();
1238            }
1239        }
1240    }
1241
1242    /**
1243     * Performs the assertions on the incoming exchange.
1244     *
1245     * @param exchange   the actual exchange
1246     * @param copy       a copy of the exchange (only store this)
1247     * @throws Exception can be thrown if something went wrong
1248     */
1249    protected void performAssertions(Exchange exchange, Exchange copy) throws Exception {
1250        Message in = copy.getIn();
1251        Object actualBody = in.getBody();
1252
1253        if (expectedHeaderValues != null) {
1254            if (actualHeaderValues == null) {
1255                actualHeaderValues = new CaseInsensitiveMap();
1256            }
1257            if (in.hasHeaders()) {
1258                actualHeaderValues.putAll(in.getHeaders());
1259            }
1260        }
1261
1262        if (expectedPropertyValues != null) {
1263            if (actualPropertyValues == null) {
1264                actualPropertyValues = new ConcurrentHashMap<String, Object>();
1265            }
1266            actualPropertyValues.putAll(copy.getProperties());
1267        }
1268
1269        if (expectedBodyValues != null) {
1270            int index = actualBodyValues.size();
1271            if (expectedBodyValues.size() > index) {
1272                Object expectedBody = expectedBodyValues.get(index);
1273                if (expectedBody != null) {
1274                    // prefer to convert body early, for example when using files
1275                    // we need to read the content at this time
1276                    Object body = in.getBody(expectedBody.getClass());
1277                    if (body != null) {
1278                        actualBody = body;
1279                    }
1280                }
1281                actualBodyValues.add(actualBody);
1282            }
1283        }
1284
1285        // let counter be 0 index-based in the logs
1286        if (LOG.isDebugEnabled()) {
1287            String msg = getEndpointUri() + " >>>> " + counter + " : " + copy + " with body: " + actualBody;
1288            if (copy.getIn().hasHeaders()) {
1289                msg += " and headers:" + copy.getIn().getHeaders();
1290            }
1291            LOG.debug(msg);
1292        }
1293
1294        // record timestamp when exchange was received
1295        copy.setProperty(Exchange.RECEIVED_TIMESTAMP, new Date());
1296
1297        // add a copy of the received exchange
1298        addReceivedExchange(copy);
1299        // and then increment counter after adding received exchange
1300        ++counter;
1301
1302        Processor processor = processors.get(getReceivedCounter()) != null
1303                ? processors.get(getReceivedCounter()) : defaultProcessor;
1304
1305        if (processor != null) {
1306            try {
1307                // must process the incoming exchange and NOT the copy as the idea
1308                // is the end user can manipulate the exchange
1309                processor.process(exchange);
1310            } catch (Exception e) {
1311                // set exceptions on exchange so we can throw exceptions to simulate errors
1312                exchange.setException(e);
1313            }
1314        }
1315    }
1316
1317    /**
1318     * Adds the received exchange.
1319     * 
1320     * @param copy  a copy of the received exchange
1321     */
1322    protected void addReceivedExchange(Exchange copy) {
1323        if (retainFirst == 0 && retainLast == 0) {
1324            // do not retain any messages at all
1325        } else if (retainFirst < 0 && retainLast < 0) {
1326            // no limitation so keep them all
1327            receivedExchanges.add(copy);
1328        } else {
1329            // okay there is some sort of limitations, so figure out what to retain
1330            if (retainFirst > 0 && counter < retainFirst) {
1331                // store a copy as its within the retain first limitation
1332                receivedExchanges.add(copy);
1333            } else if (retainLast > 0) {
1334                // remove the oldest from the last retained boundary,
1335                int index = receivedExchanges.size() - retainLast;
1336                if (index >= 0) {
1337                    // but must be outside the first range as well
1338                    // otherwise we should not remove the oldest
1339                    if (retainFirst <= 0 || retainFirst <= index) {
1340                        receivedExchanges.remove(index);
1341                    }
1342                }
1343                // store a copy of the last n'th received
1344                receivedExchanges.add(copy);
1345            }
1346        }
1347    }
1348
1349    protected void waitForCompleteLatch() throws InterruptedException {
1350        if (latch == null) {
1351            fail("Should have a latch!");
1352        }
1353
1354        StopWatch watch = new StopWatch();
1355        waitForCompleteLatch(resultWaitTime);
1356        long delta = watch.stop();
1357        LOG.debug("Took {} millis to complete latch", delta);
1358
1359        if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) {
1360            fail("Expected minimum " + resultMinimumWaitTime
1361                + " millis waiting on the result, but was faster with " + delta + " millis.");
1362        }
1363    }
1364
1365    protected void waitForCompleteLatch(long timeout) throws InterruptedException {
1366        // Wait for a default 10 seconds if resultWaitTime is not set
1367        long waitTime = timeout == 0 ? 10000L : timeout;
1368
1369        // now let's wait for the results
1370        LOG.debug("Waiting on the latch for: " + timeout + " millis");
1371        latch.await(waitTime, TimeUnit.MILLISECONDS);
1372    }
1373
1374    protected void assertEquals(String message, Object expectedValue, Object actualValue) {
1375        if (!ObjectHelper.equal(expectedValue, actualValue)) {
1376            fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
1377        }
1378    }
1379
1380    protected void assertTrue(String message, boolean predicate) {
1381        if (!predicate) {
1382            fail(message);
1383        }
1384    }
1385
1386    protected void fail(Object message) {
1387        if (LOG.isDebugEnabled()) {
1388            List<Exchange> list = getReceivedExchanges();
1389            int index = 0;
1390            for (Exchange exchange : list) {
1391                LOG.debug("{} failed and received[{}]: {}", new Object[]{getEndpointUri(), ++index, exchange});
1392            }
1393        }
1394        throw new AssertionError(getEndpointUri() + " " + message);
1395    }
1396
1397    public int getExpectedMinimumCount() {
1398        return expectedMinimumCount;
1399    }
1400
1401    public void await() throws InterruptedException {
1402        if (latch != null) {
1403            latch.await();
1404        }
1405    }
1406
1407    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
1408        if (latch != null) {
1409            return latch.await(timeout, unit);
1410        }
1411        return true;
1412    }
1413
1414    public boolean isSingleton() {
1415        return true;
1416    }
1417
1418    public boolean isLenientProperties() {
1419        return true;
1420    }
1421
1422    private Exchange getReceivedExchange(int index) {
1423        if (index <= receivedExchanges.size() - 1) {
1424            return receivedExchanges.get(index);
1425        } else {
1426            return null;
1427        }
1428    }
1429
1430}