001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.concurrent.ScheduledExecutorService;
022import java.util.concurrent.TimeUnit;
023
024import org.apache.camel.CamelContext;
025import org.apache.camel.Component;
026import org.apache.camel.LoggingLevel;
027import org.apache.camel.PollingConsumer;
028import org.apache.camel.ResolveEndpointFailedException;
029import org.apache.camel.spi.PollingConsumerPollStrategy;
030import org.apache.camel.spi.ScheduledPollConsumerScheduler;
031import org.apache.camel.spi.UriParam;
032import org.apache.camel.util.CamelContextHelper;
033import org.apache.camel.util.EndpointHelper;
034import org.apache.camel.util.IntrospectionSupport;
035
036/**
037 * A base class for {@link org.apache.camel.Endpoint} which creates a {@link ScheduledPollConsumer}
038 *
039 * @version 
040 */
041public abstract class ScheduledPollEndpoint extends DefaultEndpoint {
042
043    private static final String SPRING_SCHEDULER = "org.apache.camel.spring.pollingconsumer.SpringScheduledPollConsumerScheduler";
044    private static final String QUARTZ_2_SCHEDULER = "org.apache.camel.pollconsumer.quartz2.QuartzScheduledPollConsumerScheduler";
045
046    // if adding more options then align with org.apache.camel.impl.ScheduledPollConsumer
047    @UriParam(defaultValue = "true", label = "consumer")
048    private boolean startScheduler = true;
049    @UriParam(defaultValue = "1000", label = "consumer")
050    private long initialDelay = 1000;
051    @UriParam(defaultValue = "500", label = "consumer")
052    private long delay = 500;
053    @UriParam(defaultValue = "MILLISECONDS", label = "consumer")
054    private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
055    @UriParam(defaultValue = "true", label = "consumer")
056    private boolean useFixedDelay = true;
057    @UriParam(label = "consumer")
058    private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
059    @UriParam(defaultValue = "TRACE", label = "consumer")
060    private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;
061    @UriParam(label = "consumer")
062    private boolean sendEmptyMessageWhenIdle;
063    @UriParam(label = "consumer")
064    private boolean greedy;
065    @UriParam(enums = "spring,quartz2", label = "consumer")
066    private ScheduledPollConsumerScheduler scheduler;
067    private String schedulerName; // used when configuring scheduler using a string value
068    @UriParam(label = "consumer")
069    private Map<String, Object> schedulerProperties;
070    @UriParam(label = "consumer")
071    private ScheduledExecutorService scheduledExecutorService;
072    @UriParam(label = "consumer")
073    private int backoffMultiplier;
074    @UriParam(label = "consumer")
075    private int backoffIdleThreshold;
076    @UriParam(label = "consumer")
077    private int backoffErrorThreshold;
078
079    protected ScheduledPollEndpoint(String endpointUri, Component component) {
080        super(endpointUri, component);
081    }
082
083    @Deprecated
084    protected ScheduledPollEndpoint(String endpointUri, CamelContext context) {
085        super(endpointUri, context);
086    }
087
088    @Deprecated
089    protected ScheduledPollEndpoint(String endpointUri) {
090        super(endpointUri);
091    }
092
093    protected ScheduledPollEndpoint() {
094    }
095
096    public void configureProperties(Map<String, Object> options) {
097        super.configureProperties(options);
098        configureScheduledPollConsumerProperties(options, getConsumerProperties());
099    }
100
101    protected void configureScheduledPollConsumerProperties(Map<String, Object> options, Map<String, Object> consumerProperties) {
102        // special for scheduled poll consumers as we want to allow end users to configure its options
103        // from the URI parameters without the consumer. prefix
104        Map<String, Object> schedulerProperties = IntrospectionSupport.extractProperties(options, "scheduler.");
105        if (schedulerProperties != null && !schedulerProperties.isEmpty()) {
106            setSchedulerProperties(schedulerProperties);
107        }
108
109        if (scheduler == null && schedulerName != null) {
110            // special for scheduler if its "spring"
111            if ("spring".equals(schedulerName)) {
112                try {
113                    Class<? extends ScheduledPollConsumerScheduler> clazz = getCamelContext().getClassResolver().resolveMandatoryClass(SPRING_SCHEDULER, ScheduledPollConsumerScheduler.class);
114                    setScheduler(getCamelContext().getInjector().newInstance(clazz));
115                } catch (ClassNotFoundException e) {
116                    throw new IllegalArgumentException("Cannot load " + SPRING_SCHEDULER + " from classpath. Make sure camel-spring.jar is on the classpath.", e);
117                }
118            } else if ("quartz2".equals(schedulerName)) {
119                try {
120                    Class<? extends ScheduledPollConsumerScheduler> clazz = getCamelContext().getClassResolver().resolveMandatoryClass(QUARTZ_2_SCHEDULER, ScheduledPollConsumerScheduler.class);
121                    setScheduler(getCamelContext().getInjector().newInstance(clazz));
122                } catch (ClassNotFoundException e) {
123                    throw new IllegalArgumentException("Cannot load " + QUARTZ_2_SCHEDULER + " from classpath. Make sure camel-quarz2.jar is on the classpath.", e);
124                }
125            } else {
126                setScheduler(CamelContextHelper.mandatoryLookup(getCamelContext(), schedulerName, ScheduledPollConsumerScheduler.class));
127            }
128        }
129    }
130
131    @Override
132    protected void configurePollingConsumer(PollingConsumer consumer) throws Exception {
133        Map<String, Object> copy = new HashMap<String, Object>(getConsumerProperties());
134        Map<String, Object> throwaway = new HashMap<String, Object>();
135
136        // filter out unwanted options which is intended for the scheduled poll consumer
137        // as these options are not supported on the polling consumer
138        configureScheduledPollConsumerProperties(copy, throwaway);
139
140        // set reference properties first as they use # syntax that fools the regular properties setter
141        EndpointHelper.setReferenceProperties(getCamelContext(), consumer, copy);
142        EndpointHelper.setProperties(getCamelContext(), consumer, copy);
143
144        if (!isLenientProperties() && copy.size() > 0) {
145            throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + copy.size()
146                    + " parameters that couldn't be set on the endpoint polling consumer."
147                    + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint."
148                    + " Unknown consumer parameters=[" + copy + "]");
149        }
150    }
151
152    protected void initConsumerProperties() {
153        // must setup consumer properties before we are ready to start
154        Map<String, Object> options = getConsumerProperties();
155        if (!options.containsKey("startScheduler")) {
156            options.put("startScheduler", isStartScheduler());
157        }
158        if (!options.containsKey("initialDelay")) {
159            options.put("initialDelay", getInitialDelay());
160        }
161        if (!options.containsKey("delay")) {
162            options.put("delay", getDelay());
163        }
164        if (!options.containsKey("timeUnit")) {
165            options.put("timeUnit", getTimeUnit());
166        }
167        if (!options.containsKey("useFixedDelay")) {
168            options.put("useFixedDelay", isUseFixedDelay());
169        }
170        if (!options.containsKey("pollStrategy")) {
171            options.put("pollStrategy", getPollStrategy());
172        }
173        if (!options.containsKey("runLoggingLevel")) {
174            options.put("runLoggingLevel", getRunLoggingLevel());
175        }
176        if (!options.containsKey("sendEmptyMessageWhenIdle")) {
177            options.put("sendEmptyMessageWhenIdle", isSendEmptyMessageWhenIdle());
178        }
179        if (!options.containsKey("greedy")) {
180            options.put("greedy", isGreedy());
181        }
182        if (!options.containsKey("scheduler")) {
183            options.put("scheduler", getScheduler());
184        }
185        if (!options.containsKey("schedulerProperties")) {
186            options.put("schedulerProperties", getSchedulerProperties());
187        }
188        if (!options.containsKey("scheduledExecutorService")) {
189            options.put("scheduledExecutorService", getScheduledExecutorService());
190        }
191        if (!options.containsKey("backoffMultiplier")) {
192            options.put("backoffMultiplier", getBackoffMultiplier());
193        }
194        if (!options.containsKey("backoffIdleThreshold")) {
195            options.put("backoffIdleThreshold", getBackoffIdleThreshold());
196        }
197        if (!options.containsKey("backoffErrorThreshold")) {
198            options.put("backoffErrorThreshold", getBackoffErrorThreshold());
199        }
200    }
201
202    @Override
203    protected void doStart() throws Exception {
204        initConsumerProperties();
205        super.doStart();
206    }
207
208    @Override
209    protected void doStop() throws Exception {
210        super.doStop();
211        // noop
212    }
213
214    public boolean isStartScheduler() {
215        return startScheduler;
216    }
217
218    /**
219     * Whether the scheduler should be auto started.
220     */
221    public void setStartScheduler(boolean startScheduler) {
222        this.startScheduler = startScheduler;
223    }
224
225    public long getInitialDelay() {
226        return initialDelay;
227    }
228
229    /**
230     * Milliseconds before the first poll starts.
231     */
232    public void setInitialDelay(long initialDelay) {
233        this.initialDelay = initialDelay;
234    }
235
236    public long getDelay() {
237        return delay;
238    }
239
240    /**
241     * Milliseconds before the next poll.
242     */
243    public void setDelay(long delay) {
244        this.delay = delay;
245    }
246
247    public TimeUnit getTimeUnit() {
248        return timeUnit;
249    }
250
251    /**
252     * Time unit for initialDelay and delay options.
253     */
254    public void setTimeUnit(TimeUnit timeUnit) {
255        this.timeUnit = timeUnit;
256    }
257
258    public boolean isUseFixedDelay() {
259        return useFixedDelay;
260    }
261
262    /**
263     * Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details.
264     */
265    public void setUseFixedDelay(boolean useFixedDelay) {
266        this.useFixedDelay = useFixedDelay;
267    }
268
269    public PollingConsumerPollStrategy getPollStrategy() {
270        return pollStrategy;
271    }
272
273    /**
274     * A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation
275     * to control error handling usually occurred during the poll operation before an Exchange have been created
276     * and being routed in Camel. In other words the error occurred while the polling was gathering information,
277     * for instance access to a file network failed so Camel cannot access it to scan for files.
278     * The default implementation will log the caused exception at WARN level and ignore it.
279     */
280    public void setPollStrategy(PollingConsumerPollStrategy pollStrategy) {
281        this.pollStrategy = pollStrategy;
282        // we are allowed to change poll strategy
283    }
284
285    public LoggingLevel getRunLoggingLevel() {
286        return runLoggingLevel;
287    }
288
289    /**
290     * The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that.
291     */
292    public void setRunLoggingLevel(LoggingLevel runLoggingLevel) {
293        this.runLoggingLevel = runLoggingLevel;
294    }
295
296    public boolean isSendEmptyMessageWhenIdle() {
297        return sendEmptyMessageWhenIdle;
298    }
299
300    /**
301     * If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead.
302     */
303    public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) {
304        this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle;
305    }
306
307    public boolean isGreedy() {
308        return greedy;
309    }
310
311    /**
312     * If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages.
313     */
314    public void setGreedy(boolean greedy) {
315        this.greedy = greedy;
316    }
317
318    public ScheduledPollConsumerScheduler getScheduler() {
319        return scheduler;
320    }
321
322    /**
323     * Allow to plugin a custom org.apache.camel.spi.ScheduledPollConsumerScheduler to use as the scheduler for
324     * firing when the polling consumer runs. The default implementation uses the ScheduledExecutorService and
325     * there is a Quartz2, and Spring based which supports CRON expressions.
326     *
327     * Notice: If using a custom scheduler then the options for initialDelay, useFixedDelay, timeUnit,
328     * and scheduledExecutorService may not be in use. Use the text quartz2 to refer to use the Quartz2 scheduler;
329     * and use the text spring to use the Spring based; and use the text #myScheduler to refer to a custom scheduler
330     * by its id in the Registry. See Quartz2 page for an example.
331     */
332    public void setScheduler(ScheduledPollConsumerScheduler scheduler) {
333        this.scheduler = scheduler;
334    }
335
336    /**
337     * Allow to plugin a custom org.apache.camel.spi.ScheduledPollConsumerScheduler to use as the scheduler for
338     * firing when the polling consumer runs. This option is used for referring to one of the built-in schedulers
339     * either <tt>spring</tt>, or <tt>quartz2</tt>.
340     */
341    public void setScheduler(String schedulerName) {
342        this.schedulerName = schedulerName;
343    }
344
345    public Map<String, Object> getSchedulerProperties() {
346        return schedulerProperties;
347    }
348
349    /**
350     * To configure additional properties when using a custom scheduler or any of the Quartz2, Spring based scheduler.
351     */
352    public void setSchedulerProperties(Map<String, Object> schedulerProperties) {
353        this.schedulerProperties = schedulerProperties;
354    }
355
356    public ScheduledExecutorService getScheduledExecutorService() {
357        return scheduledExecutorService;
358    }
359
360    /**
361     * Allows for configuring a custom/shared thread pool to use for the consumer.
362     * By default each consumer has its own single threaded thread pool.
363     * This option allows you to share a thread pool among multiple consumers.
364     */
365    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
366        this.scheduledExecutorService = scheduledExecutorService;
367    }
368
369    public int getBackoffMultiplier() {
370        return backoffMultiplier;
371    }
372
373    /**
374     * To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row.
375     * The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again.
376     * When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured.
377     */
378    public void setBackoffMultiplier(int backoffMultiplier) {
379        this.backoffMultiplier = backoffMultiplier;
380    }
381
382    public int getBackoffIdleThreshold() {
383        return backoffIdleThreshold;
384    }
385
386    /**
387     * The number of subsequent idle polls that should happen before the backoffMultipler should kick-in.
388     */
389    public void setBackoffIdleThreshold(int backoffIdleThreshold) {
390        this.backoffIdleThreshold = backoffIdleThreshold;
391    }
392
393    public int getBackoffErrorThreshold() {
394        return backoffErrorThreshold;
395    }
396
397    /**
398     * The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in.
399     */
400    public void setBackoffErrorThreshold(int backoffErrorThreshold) {
401        this.backoffErrorThreshold = backoffErrorThreshold;
402    }
403
404}