001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.HashMap;
020import java.util.Map;
021
022import org.apache.camel.CamelContext;
023import org.apache.camel.CamelContextAware;
024import org.apache.camel.Component;
025import org.apache.camel.Consumer;
026import org.apache.camel.Endpoint;
027import org.apache.camel.EndpointConfiguration;
028import org.apache.camel.Exchange;
029import org.apache.camel.ExchangePattern;
030import org.apache.camel.PollingConsumer;
031import org.apache.camel.ResolveEndpointFailedException;
032import org.apache.camel.spi.HasId;
033import org.apache.camel.spi.UriParam;
034import org.apache.camel.support.ServiceSupport;
035import org.apache.camel.util.EndpointHelper;
036import org.apache.camel.util.IntrospectionSupport;
037import org.apache.camel.util.ObjectHelper;
038import org.apache.camel.util.URISupport;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * A default endpoint useful for implementation inheritance.
044 * <p/>
045 * Components which leverages <a
046 * href="http://camel.apache.org/asynchronous-routing-engine.html">asynchronous
047 * processing model</a> should check the {@link #isSynchronous()} to determine
048 * if asynchronous processing is allowed. The <tt>synchronous</tt> option on the
049 * endpoint allows Camel end users to dictate whether they want the asynchronous
050 * model or not. The option is default <tt>false</tt> which means asynchronous
051 * processing is allowed.
052 * 
053 * @version
054 */
055public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint, HasId, CamelContextAware {
056
057    private static final Logger LOG = LoggerFactory.getLogger(DefaultEndpoint.class);
058    private String endpointUri;
059    private EndpointConfiguration endpointConfiguration;
060    private CamelContext camelContext;
061    private Component component;
062    @UriParam(defaultValue = "InOnly", description = "Sets the default exchange pattern when creating an exchange")
063    private ExchangePattern exchangePattern = ExchangePattern.InOnly;
064    // option to allow end user to dictate whether async processing should be
065    // used or not (if possible)
066    @UriParam(defaultValue = "false",
067            description = "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).")
068    private boolean synchronous;
069    private final String id = EndpointHelper.createEndpointId();
070    private Map<String, Object> consumerProperties;
071    private int pollingConsumerQueueSize = 1000;
072    private boolean pollingConsumerBlockWhenFull = true;
073
074    /**
075     * Constructs a fully-initialized DefaultEndpoint instance. This is the
076     * preferred method of constructing an object from Java code (as opposed to
077     * Spring beans, etc.).
078     * 
079     * @param endpointUri the full URI used to create this endpoint
080     * @param component the component that created this endpoint
081     */
082    protected DefaultEndpoint(String endpointUri, Component component) {
083        this.camelContext = component == null ? null : component.getCamelContext();
084        this.component = component;
085        this.setEndpointUri(endpointUri);
086    }
087
088    /**
089     * Constructs a DefaultEndpoint instance which has <b>not</b> been created
090     * using a {@link Component}.
091     * <p/>
092     * <b>Note:</b> It is preferred to create endpoints using the associated
093     * component.
094     * 
095     * @param endpointUri the full URI used to create this endpoint
096     * @param camelContext the Camel Context in which this endpoint is operating
097     */
098    @Deprecated
099    protected DefaultEndpoint(String endpointUri, CamelContext camelContext) {
100        this(endpointUri);
101        this.camelContext = camelContext;
102    }
103
104    /**
105     * Constructs a partially-initialized DefaultEndpoint instance.
106     * <p/>
107     * <b>Note:</b> It is preferred to create endpoints using the associated
108     * component.
109     * 
110     * @param endpointUri the full URI used to create this endpoint
111     */
112    @Deprecated
113    protected DefaultEndpoint(String endpointUri) {
114        this.setEndpointUri(endpointUri);
115    }
116
117    /**
118     * Constructs a partially-initialized DefaultEndpoint instance. Useful when
119     * creating endpoints manually (e.g., as beans in Spring).
120     * <p/>
121     * Please note that the endpoint URI must be set through properties (or
122     * overriding {@link #createEndpointUri()} if one uses this constructor.
123     * <p/>
124     * <b>Note:</b> It is preferred to create endpoints using the associated
125     * component.
126     */
127    protected DefaultEndpoint() {
128    }
129
130    public int hashCode() {
131        return getEndpointUri().hashCode() * 37 + 1;
132    }
133
134    @Override
135    public boolean equals(Object object) {
136        if (object instanceof DefaultEndpoint) {
137            DefaultEndpoint that = (DefaultEndpoint)object;
138            return ObjectHelper.equal(this.getEndpointUri(), that.getEndpointUri());
139        }
140        return false;
141    }
142
143    @Override
144    public String toString() {
145        String value = null;
146        try {
147            value = getEndpointUri();
148        } catch (RuntimeException e) {
149            // ignore any exception and use null for building the string value
150        }
151        return String.format("Endpoint[%s]", URISupport.sanitizeUri(value));
152    }
153
154    /**
155     * Returns a unique String ID which can be used for aliasing without having
156     * to use the whole URI which is not unique
157     */
158    public String getId() {
159        return id;
160    }
161
162    public String getEndpointUri() {
163        if (endpointUri == null) {
164            endpointUri = createEndpointUri();
165            if (endpointUri == null) {
166                throw new IllegalArgumentException("endpointUri is not specified and " + getClass().getName()
167                    + " does not implement createEndpointUri() to create a default value");
168            }
169        }
170        return endpointUri;
171    }
172
173    public EndpointConfiguration getEndpointConfiguration() {
174        if (endpointConfiguration == null) {
175            endpointConfiguration = createEndpointConfiguration(getEndpointUri());
176        }
177        return endpointConfiguration;
178    }
179
180    /**
181     * Sets a custom {@link EndpointConfiguration}
182     *
183     * @param endpointConfiguration a custom endpoint configuration to be used.
184     */
185    public void setEndpointConfiguration(EndpointConfiguration endpointConfiguration) {
186        this.endpointConfiguration = endpointConfiguration;
187    }
188
189    public String getEndpointKey() {
190        if (isLenientProperties()) {
191            // only use the endpoint uri without parameters as the properties is
192            // lenient
193            String uri = getEndpointUri();
194            if (uri.indexOf('?') != -1) {
195                return ObjectHelper.before(uri, "?");
196            } else {
197                return uri;
198            }
199        } else {
200            // use the full endpoint uri
201            return getEndpointUri();
202        }
203    }
204
205    public CamelContext getCamelContext() {
206        return camelContext;
207    }
208
209    /**
210     * Returns the component that created this endpoint.
211     * 
212     * @return the component that created this endpoint, or <tt>null</tt> if
213     *         none set
214     */
215    public Component getComponent() {
216        return component;
217    }
218
219    public void setCamelContext(CamelContext camelContext) {
220        this.camelContext = camelContext;
221    }
222
223    public PollingConsumer createPollingConsumer() throws Exception {
224        // should not call configurePollingConsumer when its EventDrivenPollingConsumer
225        LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} and blockWhenFull: {}", getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull());
226        EventDrivenPollingConsumer consumer = new EventDrivenPollingConsumer(this, getPollingConsumerQueueSize());
227        consumer.setBlockWhenFull(isPollingConsumerBlockWhenFull());
228        return consumer;
229    }
230
231    public Exchange createExchange(Exchange exchange) {
232        return exchange.copy();
233    }
234
235    public Exchange createExchange() {
236        return createExchange(getExchangePattern());
237    }
238
239    public Exchange createExchange(ExchangePattern pattern) {
240        return new DefaultExchange(this, pattern);
241    }
242
243    /**
244     * Returns the default exchange pattern to use when creating an exchange.
245     */
246    public ExchangePattern getExchangePattern() {
247        return exchangePattern;
248    }
249
250    /**
251     * Sets the default exchange pattern when creating an exchange.
252     */
253    public void setExchangePattern(ExchangePattern exchangePattern) {
254        this.exchangePattern = exchangePattern;
255    }
256
257    /**
258     * Returns whether synchronous processing should be strictly used.
259     */
260    public boolean isSynchronous() {
261        return synchronous;
262    }
263
264    /**
265     * Sets whether synchronous processing should be strictly used, or Camel is
266     * allowed to use asynchronous processing (if supported).
267     * 
268     * @param synchronous <tt>true</tt> to enforce synchronous processing
269     */
270    public void setSynchronous(boolean synchronous) {
271        this.synchronous = synchronous;
272    }
273
274    /**
275     * Gets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
276     * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
277     * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
278     * <p/>
279     * The default value is <tt>1000</tt>
280     */
281    public int getPollingConsumerQueueSize() {
282        return pollingConsumerQueueSize;
283    }
284
285    /**
286     * Sets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
287     * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
288     * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
289     * <p/>
290     * The default value is <tt>1000</tt>
291     */
292    public void setPollingConsumerQueueSize(int pollingConsumerQueueSize) {
293        this.pollingConsumerQueueSize = pollingConsumerQueueSize;
294    }
295
296    /**
297     * Whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
298     * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
299     * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
300     * <p/>
301     * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown
302     * when trying to add to the queue, and its full.
303     * <p/>
304     * The default value is <tt>true</tt> which will block the producer queue until the queue has space.
305     */
306    public boolean isPollingConsumerBlockWhenFull() {
307        return pollingConsumerBlockWhenFull;
308    }
309
310    /**
311     * Set whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
312     * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
313     * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
314     * <p/>
315     * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown
316     * when trying to add to the queue, and its full.
317     * <p/>
318     * The default value is <tt>true</tt> which will block the producer queue until the queue has space.
319     */
320    public void setPollingConsumerBlockWhenFull(boolean pollingConsumerBlockWhenFull) {
321        this.pollingConsumerBlockWhenFull = pollingConsumerBlockWhenFull;
322    }
323
324    public void configureProperties(Map<String, Object> options) {
325        Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options, "consumer.");
326        if (consumerProperties != null && !consumerProperties.isEmpty()) {
327            setConsumerProperties(consumerProperties);
328        }
329    }
330
331    /**
332     * Sets the bean properties on the given bean.
333     * <p/>
334     * This is the same logical implementation as {@link DefaultComponent#setProperties(Object, java.util.Map)}
335     *
336     * @param bean  the bean
337     * @param parameters  properties to set
338     */
339    protected void setProperties(Object bean, Map<String, Object> parameters) throws Exception {
340        // set reference properties first as they use # syntax that fools the regular properties setter
341        EndpointHelper.setReferenceProperties(getCamelContext(), bean, parameters);
342        EndpointHelper.setProperties(getCamelContext(), bean, parameters);
343    }
344
345    /**
346     * A factory method to lazily create the endpointUri if none is specified
347     */
348    protected String createEndpointUri() {
349        return null;
350    }
351
352    /**
353     * A factory method to lazily create the endpoint configuration if none is specified
354     */
355    protected EndpointConfiguration createEndpointConfiguration(String uri) {
356        // using this factory method to be backwards compatible with the old code
357        if (getComponent() != null) {
358            // prefer to use component endpoint configuration
359            try {
360                return getComponent().createConfiguration(uri);
361            } catch (Exception e) {
362                throw ObjectHelper.wrapRuntimeCamelException(e);
363            }
364        } else if (getCamelContext() != null) {
365            // fallback and use a mapped endpoint configuration
366            return new MappedEndpointConfiguration(getCamelContext(), uri);
367        }
368        // not configuration possible
369        return null;
370    }
371
372    /**
373     * Sets the endpointUri if it has not been specified yet via some kind of
374     * dependency injection mechanism. This allows dependency injection
375     * frameworks such as Spring or Guice to set the default endpoint URI in
376     * cases where it has not been explicitly configured using the name/context
377     * in which an Endpoint is created.
378     */
379    public void setEndpointUriIfNotSpecified(String value) {
380        if (endpointUri == null) {
381            setEndpointUri(value);
382        }
383    }
384
385    /**
386     * Sets the URI that created this endpoint.
387     */
388    protected void setEndpointUri(String endpointUri) {
389        this.endpointUri = endpointUri;
390    }
391
392    public boolean isLenientProperties() {
393        // default should be false for most components
394        return false;
395    }
396
397    public Map<String, Object> getConsumerProperties() {
398        if (consumerProperties == null) {
399            // must create empty if none exists
400            consumerProperties = new HashMap<String, Object>();
401        }
402        return consumerProperties;
403    }
404
405    public void setConsumerProperties(Map<String, Object> consumerProperties) {
406        // append consumer properties
407        if (consumerProperties != null && !consumerProperties.isEmpty()) {
408            if (this.consumerProperties == null) {
409                this.consumerProperties = new HashMap<String, Object>(consumerProperties);
410            } else {
411                this.consumerProperties.putAll(consumerProperties);
412            }
413        }
414    }
415
416    protected void configureConsumer(Consumer consumer) throws Exception {
417        if (consumerProperties != null) {
418            // use a defensive copy of the consumer properties as the methods below will remove the used properties
419            // and in case we restart routes, we need access to the original consumer properties again
420            Map<String, Object> copy = new HashMap<String, Object>(consumerProperties);
421
422            // set reference properties first as they use # syntax that fools the regular properties setter
423            EndpointHelper.setReferenceProperties(getCamelContext(), consumer, copy);
424            EndpointHelper.setProperties(getCamelContext(), consumer, copy);
425
426            // special consumer.bridgeErrorHandler option
427            Object bridge = copy.remove("bridgeErrorHandler");
428            if (bridge != null && "true".equals(bridge)) {
429                if (consumer instanceof DefaultConsumer) {
430                    DefaultConsumer defaultConsumer = (DefaultConsumer) consumer;
431                    defaultConsumer.setExceptionHandler(new BridgeExceptionHandlerToErrorHandler(defaultConsumer));
432                } else {
433                    throw new IllegalArgumentException("Option consumer.bridgeErrorHandler is only supported by endpoints,"
434                            + " having their consumer extend DefaultConsumer. The consumer is a " + consumer.getClass().getName() + " class.");
435                }
436            }
437
438            if (!this.isLenientProperties() && copy.size() > 0) {
439                throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + copy.size()
440                    + " parameters that couldn't be set on the endpoint consumer."
441                    + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint."
442                    + " Unknown consumer parameters=[" + copy + "]");
443            }
444        }
445    }
446
447    protected void configurePollingConsumer(PollingConsumer consumer) throws Exception {
448        configureConsumer(consumer);
449    }
450
451    @Override
452    protected void doStart() throws Exception {
453        // noop
454    }
455
456    @Override
457    protected void doStop() throws Exception {
458        // noop
459    }
460}