001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.seda;
018
019import java.util.ArrayList;
020import java.util.HashSet;
021import java.util.List;
022import java.util.Set;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.CopyOnWriteArraySet;
025import java.util.concurrent.ExecutorService;
026
027import org.apache.camel.Component;
028import org.apache.camel.Consumer;
029import org.apache.camel.Exchange;
030import org.apache.camel.Message;
031import org.apache.camel.MultipleConsumersSupport;
032import org.apache.camel.PollingConsumer;
033import org.apache.camel.Processor;
034import org.apache.camel.Producer;
035import org.apache.camel.WaitForTaskToComplete;
036import org.apache.camel.api.management.ManagedAttribute;
037import org.apache.camel.api.management.ManagedOperation;
038import org.apache.camel.api.management.ManagedResource;
039import org.apache.camel.impl.DefaultEndpoint;
040import org.apache.camel.processor.MulticastProcessor;
041import org.apache.camel.spi.BrowsableEndpoint;
042import org.apache.camel.spi.Metadata;
043import org.apache.camel.spi.UriEndpoint;
044import org.apache.camel.spi.UriParam;
045import org.apache.camel.spi.UriPath;
046import org.apache.camel.util.EndpointHelper;
047import org.apache.camel.util.MessageHelper;
048import org.apache.camel.util.ServiceHelper;
049import org.apache.camel.util.URISupport;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * An implementation of the <a
055 * href="http://camel.apache.org/queue.html">Queue components</a> for
056 * asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
057 */
058@ManagedResource(description = "Managed SedaEndpoint")
059@UriEndpoint(scheme = "seda", syntax = "seda:name", consumerClass = SedaConsumer.class, label = "core,endpoint")
060public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport {
061    private static final Logger LOG = LoggerFactory.getLogger(SedaEndpoint.class);
062    private volatile BlockingQueue<Exchange> queue;
063    private final Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
064    private final Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>();
065    private volatile MulticastProcessor consumerMulticastProcessor;
066    private volatile boolean multicastStarted;
067    private volatile ExecutorService multicastExecutor;
068
069    @UriPath(description = "Name of queue") @Metadata(required = "true")
070    private String name;
071    @UriParam(defaultValue = "" + Integer.MAX_VALUE)
072    private int size = Integer.MAX_VALUE;
073
074    @UriParam(label = "consumer", defaultValue = "1")
075    private int concurrentConsumers = 1;
076    @UriParam(label = "consumer")
077    private boolean multipleConsumers;
078    @UriParam(label = "consumer")
079    private boolean purgeWhenStopping;
080    @UriParam(label = "consumer", defaultValue = "1000")
081    private int pollTimeout = 1000;
082
083    @UriParam(label = "producer", defaultValue = "IfReplyExpected")
084    private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
085    @UriParam(label = "producer", defaultValue = "30000")
086    private long timeout = 30000;
087    @UriParam(label = "producer")
088    private boolean blockWhenFull;
089    @UriParam(label = "producer")
090    private boolean failIfNoConsumers;
091
092    private BlockingQueueFactory<Exchange> queueFactory;
093
094    public SedaEndpoint() {
095        queueFactory = new LinkedBlockingQueueFactory<Exchange>();
096    }
097
098    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) {
099        this(endpointUri, component, queue, 1);
100    }
101
102    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
103        this(endpointUri, component, concurrentConsumers);
104        this.queue = queue;
105        if (queue != null) {
106            this.size = queue.remainingCapacity();
107        }
108        queueFactory = new LinkedBlockingQueueFactory<Exchange>();
109        getComponent().registerQueue(this, queue);
110    }
111
112    public SedaEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) {
113        this(endpointUri, component, concurrentConsumers);
114        this.queueFactory = queueFactory;
115    }
116
117    private SedaEndpoint(String endpointUri, Component component, int concurrentConsumers) {
118        super(endpointUri, component);
119        this.concurrentConsumers = concurrentConsumers;
120    }
121
122    @Override
123    public SedaComponent getComponent() {
124        return (SedaComponent) super.getComponent();
125    }
126
127    public Producer createProducer() throws Exception {
128        return new SedaProducer(this, getWaitForTaskToComplete(), getTimeout(), isBlockWhenFull());
129    }
130
131    public Consumer createConsumer(Processor processor) throws Exception {
132        if (getComponent() != null) {
133            // all consumers must match having the same multipleConsumers options
134            String key = getComponent().getQueueKey(getEndpointUri());
135            QueueReference ref = getComponent().getQueueReference(key);
136            if (ref != null && ref.getMultipleConsumers() != isMultipleConsumers()) {
137                // there is already a multiple consumers, so make sure they matches
138                throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue multiple consumers "
139                        + ref.getMultipleConsumers() + " does not match given multiple consumers " + multipleConsumers);
140            }
141        }
142
143        Consumer answer = createNewConsumer(processor);
144        configureConsumer(answer);
145        return answer;
146    }
147
148    protected SedaConsumer createNewConsumer(Processor processor) {
149        return new SedaConsumer(this, processor);
150    }
151
152    @Override
153    public PollingConsumer createPollingConsumer() throws Exception {
154        SedaPollingConsumer answer = new SedaPollingConsumer(this);
155        configureConsumer(answer);
156        return answer;
157    }
158
159    public synchronized BlockingQueue<Exchange> getQueue() {
160        if (queue == null) {
161            // prefer to lookup queue from component, so if this endpoint is re-created or re-started
162            // then the existing queue from the component can be used, so new producers and consumers
163            // can use the already existing queue referenced from the component
164            if (getComponent() != null) {
165                // use null to indicate default size (= use what the existing queue has been configured with)
166                Integer size = getSize() == Integer.MAX_VALUE ? null : getSize();
167                QueueReference ref = getComponent().getOrCreateQueue(this, size, isMultipleConsumers(), queueFactory);
168                queue = ref.getQueue();
169                String key = getComponent().getQueueKey(getEndpointUri());
170                LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() !=  null ? ref.getSize() : Integer.MAX_VALUE});
171                // and set the size we are using
172                if (ref.getSize() != null) {
173                    setSize(ref.getSize());
174                }
175            } else {
176                // fallback and create queue (as this endpoint has no component)
177                queue = createQueue();
178                LOG.info("Endpoint {} is using queue: {} with size: {}", new Object[]{this, getEndpointUri(), getSize()});
179            }
180        }
181        return queue;
182    }
183
184    protected BlockingQueue<Exchange> createQueue() {
185        if (size > 0) {
186            return queueFactory.create(size);
187        } else {
188            return queueFactory.create();
189        }
190    }
191
192    /**
193     * Get's the {@link QueueReference} for the this endpoint.
194     * @return the reference, or <tt>null</tt> if no queue reference exists.
195     */
196    public synchronized QueueReference getQueueReference() {
197        String key = getComponent().getQueueKey(getEndpointUri());
198        QueueReference ref = getComponent().getQueueReference(key);
199        return ref;
200    }
201
202    protected synchronized MulticastProcessor getConsumerMulticastProcessor() throws Exception {
203        if (!multicastStarted && consumerMulticastProcessor != null) {
204            // only start it on-demand to avoid starting it during stopping
205            ServiceHelper.startService(consumerMulticastProcessor);
206            multicastStarted = true;
207        }
208        return consumerMulticastProcessor;
209    }
210
211    protected synchronized void updateMulticastProcessor() throws Exception {
212        // only needed if we support multiple consumers
213        if (!isMultipleConsumersSupported()) {
214            return;
215        }
216
217        // stop old before we create a new
218        if (consumerMulticastProcessor != null) {
219            ServiceHelper.stopService(consumerMulticastProcessor);
220            consumerMulticastProcessor = null;
221        }
222
223        int size = getConsumers().size();
224        if (size >= 1) {
225            if (multicastExecutor == null) {
226                // create multicast executor as we need it when we have more than 1 processor
227                multicastExecutor = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, URISupport.sanitizeUri(getEndpointUri()) + "(multicast)");
228            }
229            // create list of consumers to multicast to
230            List<Processor> processors = new ArrayList<Processor>(size);
231            for (SedaConsumer consumer : getConsumers()) {
232                processors.add(consumer.getProcessor());
233            }
234            // create multicast processor
235            multicastStarted = false;
236            consumerMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null,
237                                                                true, multicastExecutor, false, false, false, 
238                                                                0, null, false, false);
239        }
240    }
241
242    /**
243     * Define the queue instance which will be used by seda endpoint.
244     * <p/>
245     * This option is only for rare use-cases where you want to use a custom queue instance.
246     */
247    public void setQueue(BlockingQueue<Exchange> queue) {
248        this.queue = queue;
249        this.size = queue.remainingCapacity();
250    }
251
252    @ManagedAttribute(description = "Queue max capacity")
253    public int getSize() {
254        return size;
255    }
256
257    /**
258     * The maximum capacity of the SEDA queue (i.e., the number of messages it can hold).
259     */
260    public void setSize(int size) {
261        this.size = size;
262    }
263
264    @ManagedAttribute(description = "Current queue size")
265    public int getCurrentQueueSize() {
266        return queue.size();
267    }
268
269    /**
270     * Whether a thread that sends messages to a full SEDA queue will block until the queue's capacity is no longer exhausted.
271     * By default, an exception will be thrown stating that the queue is full.
272     * By enabling this option, the calling thread will instead block and wait until the message can be accepted.
273     */
274    public void setBlockWhenFull(boolean blockWhenFull) {
275        this.blockWhenFull = blockWhenFull;
276    }
277
278    @ManagedAttribute(description = "Whether the caller will block sending to a full queue")
279    public boolean isBlockWhenFull() {
280        return blockWhenFull;
281    }
282
283    /**
284     * Number of concurrent threads processing exchanges.
285     */
286    public void setConcurrentConsumers(int concurrentConsumers) {
287        this.concurrentConsumers = concurrentConsumers;
288    }
289
290    @ManagedAttribute(description = "Number of concurrent consumers")
291    public int getConcurrentConsumers() {
292        return concurrentConsumers;
293    }
294
295    public WaitForTaskToComplete getWaitForTaskToComplete() {
296        return waitForTaskToComplete;
297    }
298
299    /**
300     * Option to specify whether the caller should wait for the async task to complete or not before continuing.
301     * The following three options are supported: Always, Never or IfReplyExpected.
302     * The first two values are self-explanatory.
303     * The last value, IfReplyExpected, will only wait if the message is Request Reply based.
304     * The default option is IfReplyExpected.
305     */
306    public void setWaitForTaskToComplete(WaitForTaskToComplete waitForTaskToComplete) {
307        this.waitForTaskToComplete = waitForTaskToComplete;
308    }
309
310    @ManagedAttribute
311    public long getTimeout() {
312        return timeout;
313    }
314
315    /**
316     * Timeout (in milliseconds) before a SEDA producer will stop waiting for an asynchronous task to complete.
317     * You can disable timeout by using 0 or a negative value.
318     */
319    public void setTimeout(long timeout) {
320        this.timeout = timeout;
321    }
322
323    @ManagedAttribute
324    public boolean isFailIfNoConsumers() {
325        return failIfNoConsumers;
326    }
327
328    /**
329     * Whether the producer should fail by throwing an exception, when sending to a SEDA queue with no active consumers.
330     */
331    public void setFailIfNoConsumers(boolean failIfNoConsumers) {
332        this.failIfNoConsumers = failIfNoConsumers;
333    }
334
335    @ManagedAttribute
336    public boolean isMultipleConsumers() {
337        return multipleConsumers;
338    }
339
340    /**
341     * Specifies whether multiple consumers are allowed. If enabled, you can use SEDA for Publish-Subscribe messaging.
342     * That is, you can send a message to the SEDA queue and have each consumer receive a copy of the message.
343     * When enabled, this option should be specified on every consumer endpoint.
344     */
345    public void setMultipleConsumers(boolean multipleConsumers) {
346        this.multipleConsumers = multipleConsumers;
347    }
348
349    @ManagedAttribute
350    public int getPollTimeout() {
351        return pollTimeout;
352    }
353
354    /**
355     * The timeout used when polling. When a timeout occurs, the consumer can check whether it is allowed to continue running.
356     * Setting a lower value allows the consumer to react more quickly upon shutdown.
357     */
358    public void setPollTimeout(int pollTimeout) {
359        this.pollTimeout = pollTimeout;
360    }
361
362    @ManagedAttribute
363    public boolean isPurgeWhenStopping() {
364        return purgeWhenStopping;
365    }
366
367    /**
368     * Whether to purge the task queue when stopping the consumer/route.
369     * This allows to stop faster, as any pending messages on the queue is discarded.
370     */
371    public void setPurgeWhenStopping(boolean purgeWhenStopping) {
372        this.purgeWhenStopping = purgeWhenStopping;
373    }
374
375    @ManagedAttribute(description = "Singleton")
376    public boolean isSingleton() {
377        return true;
378    }
379
380    /**
381     * Returns the current pending exchanges
382     */
383    public List<Exchange> getExchanges() {
384        return new ArrayList<Exchange>(getQueue());
385    }
386
387    @ManagedAttribute
388    public boolean isMultipleConsumersSupported() {
389        return isMultipleConsumers();
390    }
391
392    /**
393     * Purges the queue
394     */
395    @ManagedOperation(description = "Purges the seda queue")
396    public void purgeQueue() {
397        LOG.debug("Purging queue with {} exchanges", queue.size());
398        queue.clear();
399    }
400
401    /**
402     * Returns the current active consumers on this endpoint
403     */
404    public Set<SedaConsumer> getConsumers() {
405        return new HashSet<SedaConsumer>(consumers);
406    }
407
408    /**
409     * Returns the current active producers on this endpoint
410     */
411    public Set<SedaProducer> getProducers() {
412        return new HashSet<SedaProducer>(producers);
413    }
414
415    @ManagedOperation(description = "Current number of Exchanges in Queue")
416    public long queueSize() {
417        return getExchanges().size();
418    }
419
420    @ManagedOperation(description = "Get Exchange from queue by index")
421    public String browseExchange(Integer index) {
422        List<Exchange> exchanges = getExchanges();
423        if (index >= exchanges.size()) {
424            return null;
425        }
426        Exchange exchange = exchanges.get(index);
427        if (exchange == null) {
428            return null;
429        }
430        // must use java type with JMX such as java.lang.String
431        return exchange.toString();
432    }
433
434    @ManagedOperation(description = "Get message body from queue by index")
435    public String browseMessageBody(Integer index) {
436        List<Exchange> exchanges = getExchanges();
437        if (index >= exchanges.size()) {
438            return null;
439        }
440        Exchange exchange = exchanges.get(index);
441        if (exchange == null) {
442            return null;
443        }
444
445        // must use java type with JMX such as java.lang.String
446        String body;
447        if (exchange.hasOut()) {
448            body = exchange.getOut().getBody(String.class);
449        } else {
450            body = exchange.getIn().getBody(String.class);
451        }
452
453        return body;
454    }
455
456    @ManagedOperation(description = "Get message as XML from queue by index")
457    public String browseMessageAsXml(Integer index, Boolean includeBody) {
458        List<Exchange> exchanges = getExchanges();
459        if (index >= exchanges.size()) {
460            return null;
461        }
462        Exchange exchange = exchanges.get(index);
463        if (exchange == null) {
464            return null;
465        }
466
467        Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
468        String xml = MessageHelper.dumpAsXml(msg, includeBody);
469
470        return xml;
471    }
472
473    @ManagedOperation(description = "Gets all the messages as XML from the queue")
474    public String browseAllMessagesAsXml(Boolean includeBody) {
475        return browseRangeMessagesAsXml(0, Integer.MAX_VALUE, includeBody);
476    }
477
478    @ManagedOperation(description = "Gets the range of messages as XML from the queue")
479    public String browseRangeMessagesAsXml(Integer fromIndex, Integer toIndex, Boolean includeBody) {
480        return EndpointHelper.browseRangeMessagesAsXml(this, fromIndex, toIndex, includeBody);
481    }
482
483    @ManagedAttribute(description = "Camel context ID")
484    public String getCamelId() {
485        return getCamelContext().getName();
486    }
487
488    @ManagedAttribute(description = "Camel ManagementName")
489    public String getCamelManagementName() {
490        return getCamelContext().getManagementName();
491    }
492
493    @ManagedAttribute(description = "Endpoint URI", mask = true)
494    public String getEndpointUri() {
495        return super.getEndpointUri();
496    }
497
498    @ManagedAttribute(description = "Endpoint service state")
499    public String getState() {
500        return getStatus().name();
501    }
502
503    void onStarted(SedaProducer producer) {
504        producers.add(producer);
505    }
506
507    void onStopped(SedaProducer producer) {
508        producers.remove(producer);
509    }
510
511    void onStarted(SedaConsumer consumer) throws Exception {
512        consumers.add(consumer);
513        if (isMultipleConsumers()) {
514            updateMulticastProcessor();
515        }
516    }
517
518    void onStopped(SedaConsumer consumer) throws Exception {
519        consumers.remove(consumer);
520        if (isMultipleConsumers()) {
521            updateMulticastProcessor();
522        }
523    }
524
525    public boolean hasConsumers() {
526        return this.consumers.size() > 0;
527    }
528
529    @Override
530    protected void doStart() throws Exception {
531        super.doStart();
532
533        // force creating queue when starting
534        if (queue == null) {
535            queue = getQueue();
536        }
537
538        // special for unit testing where we can set a system property to make seda poll faster
539        // and therefore also react faster upon shutdown, which makes overall testing faster of the Camel project
540        String override = System.getProperty("CamelSedaPollTimeout", "" + getPollTimeout());
541        setPollTimeout(Integer.valueOf(override));
542    }
543
544    @Override
545    public void stop() throws Exception {
546        if (getConsumers().isEmpty()) {
547            super.stop();
548        } else {
549            LOG.debug("There is still active consumers.");
550        }
551    }
552
553    @Override
554    public void shutdown() throws Exception {
555        if (shutdown.get()) {
556            LOG.trace("Service already shut down");
557            return;
558        }
559
560        // notify component we are shutting down this endpoint
561        if (getComponent() != null) {
562            getComponent().onShutdownEndpoint(this);
563        }
564
565        if (getConsumers().isEmpty()) {
566            super.shutdown();
567        } else {
568            LOG.debug("There is still active consumers.");
569        }
570    }
571
572    @Override
573    protected void doShutdown() throws Exception {
574        // shutdown thread pool if it was in use
575        if (multicastExecutor != null) {
576            getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor);
577            multicastExecutor = null;
578        }
579
580        // clear queue, as we are shutdown, so if re-created then the queue must be updated
581        queue = null;
582    }
583
584}