001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.ExecutorService;
026import javax.xml.bind.annotation.XmlAccessType;
027import javax.xml.bind.annotation.XmlAccessorType;
028import javax.xml.bind.annotation.XmlAttribute;
029import javax.xml.bind.annotation.XmlElement;
030import javax.xml.bind.annotation.XmlElementRef;
031import javax.xml.bind.annotation.XmlRootElement;
032import javax.xml.bind.annotation.XmlTransient;
033
034import org.apache.camel.Predicate;
035import org.apache.camel.Processor;
036import org.apache.camel.processor.CamelInternalProcessor;
037import org.apache.camel.processor.OnCompletionProcessor;
038import org.apache.camel.spi.Metadata;
039import org.apache.camel.spi.RouteContext;
040
041/**
042 * Route to be executed when normal route processing completes
043 *
044 * @version 
045 */
046@Metadata(label = "configuration")
047@XmlRootElement(name = "onCompletion")
048@XmlAccessorType(XmlAccessType.FIELD)
049public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> {
050    @XmlAttribute @Metadata(defaultValue = "AfterConsumer")
051    private OnCompletionMode mode;
052    @XmlAttribute
053    private Boolean onCompleteOnly;
054    @XmlAttribute
055    private Boolean onFailureOnly;
056    @XmlElement(name = "onWhen")
057    private WhenDefinition onWhen;
058    @XmlAttribute
059    private Boolean parallelProcessing;
060    @XmlAttribute
061    private String executorServiceRef;
062    @XmlAttribute(name = "useOriginalMessage")
063    private Boolean useOriginalMessagePolicy;
064    @XmlElementRef
065    private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
066    @XmlTransient
067    private ExecutorService executorService;
068    @XmlTransient
069    private Boolean routeScoped;
070    // TODO: in Camel 3.0 the OnCompletionDefinition should not contain state and OnCompletion processors
071    @XmlTransient
072    private final Map<String, Processor> onCompletions = new HashMap<String, Processor>();
073
074    public OnCompletionDefinition() {
075    }
076
077    public boolean isRouteScoped() {
078        // is context scoped by default
079        return routeScoped != null ? routeScoped : false;
080    }
081
082    public Processor getOnCompletion(String routeId) {
083        return onCompletions.get(routeId);
084    }
085
086    public Collection<Processor> getOnCompletions() {
087        return onCompletions.values();
088    }
089
090    @Override
091    public String toString() {
092        return "onCompletion[" + getOutputs() + "]";
093    }
094
095    @Override
096    public String getLabel() {
097        return "onCompletion";
098    }
099
100    @Override
101    public boolean isAbstract() {
102        return true;
103    }
104
105    @Override
106    public boolean isTopLevelOnly() {
107        return true;
108    }
109
110    @Override
111    public Processor createProcessor(RouteContext routeContext) throws Exception {
112        // assign whether this was a route scoped onCompletion or not
113        // we need to know this later when setting the parent, as only route scoped should have parent
114        // Note: this logic can possible be removed when the Camel routing engine decides at runtime
115        // to apply onCompletion in a more dynamic fashion than current code base
116        // and therefore is in a better position to decide among context/route scoped OnCompletion at runtime
117        if (routeScoped == null) {
118            routeScoped = super.getParent() != null;
119        }
120
121        boolean isOnCompleteOnly = getOnCompleteOnly() != null && getOnCompleteOnly();
122        boolean isOnFailureOnly = getOnFailureOnly() != null && getOnFailureOnly();
123        boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing();
124        boolean original = getUseOriginalMessagePolicy() != null && getUseOriginalMessagePolicy();
125
126        if (isOnCompleteOnly && isOnFailureOnly) {
127            throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
128        }
129
130        String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
131
132        Processor childProcessor = this.createChildProcessor(routeContext, true);
133
134        // wrap the on completion route in a unit of work processor
135        CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor);
136        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId));
137        internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext));
138
139        onCompletions.put(routeId, internal);
140
141        Predicate when = null;
142        if (onWhen != null) {
143            when = onWhen.getExpression().createPredicate(routeContext);
144        }
145
146        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing);
147        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, isParallelProcessing);
148
149        // should be after consumer by default
150        boolean afterConsumer = mode == null || mode == OnCompletionMode.AfterConsumer;
151
152        OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), internal,
153                threadPool, shutdownThreadPool, isOnCompleteOnly, isOnFailureOnly, when, original, afterConsumer);
154        return answer;
155    }
156
157    /**
158     * Removes all existing {@link org.apache.camel.model.OnCompletionDefinition} from the definition.
159     * <p/>
160     * This is used to let route scoped <tt>onCompletion</tt> overrule any global <tt>onCompletion</tt>.
161     * Hence we remove all existing as they are global.
162     *
163     * @param definition the parent definition that is the route
164     */
165    public void removeAllOnCompletionDefinition(ProcessorDefinition<?> definition) {
166        for (Iterator<ProcessorDefinition<?>> it = definition.getOutputs().iterator(); it.hasNext();) {
167            ProcessorDefinition<?> out = it.next();
168            if (out instanceof OnCompletionDefinition) {
169                it.remove();
170            }
171        }
172    }
173
174    @Override
175    public ProcessorDefinition<?> end() {
176        // pop parent block, as we added our self as block to parent when synchronized was defined in the route
177        getParent().popBlock();
178        return super.end();
179    }
180
181    /**
182     * Sets the mode to be after route is done (default due backwards compatible).
183     * <p/>
184     * This executes the on completion work <i>after</i> the route consumer have written response
185     * back to the callee (if its InOut mode).
186     *
187     * @return the builder
188     */
189    public OnCompletionDefinition modeAfterConsumer() {
190        setMode(OnCompletionMode.AfterConsumer);
191        return this;
192    }
193
194    /**
195     * Sets the mode to be before consumer is done.
196     * <p/>
197     * This allows the on completion work to execute <i>before</i> the route consumer, writes any response
198     * back to the callee (if its InOut mode).
199     *
200     * @return the builder
201     */
202    public OnCompletionDefinition modeBeforeConsumer() {
203        setMode(OnCompletionMode.BeforeConsumer);
204        return this;
205    }
206
207    /**
208     * Will only synchronize when the {@link org.apache.camel.Exchange} completed successfully (no errors).
209     *
210     * @return the builder
211     */
212    public OnCompletionDefinition onCompleteOnly() {
213        boolean isOnFailureOnly = getOnFailureOnly() != null && getOnFailureOnly();
214        if (isOnFailureOnly) {
215            throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
216        }
217        // must define return type as OutputDefinition and not this type to avoid end user being able
218        // to invoke onFailureOnly/onCompleteOnly more than once
219        setOnCompleteOnly(Boolean.TRUE);
220        setOnFailureOnly(Boolean.FALSE);
221        return this;
222    }
223
224    /**
225     * Will only synchronize when the {@link org.apache.camel.Exchange} ended with failure (exception or FAULT message).
226     *
227     * @return the builder
228     */
229    public OnCompletionDefinition onFailureOnly() {
230        boolean isOnCompleteOnly = getOnCompleteOnly() != null && getOnCompleteOnly();
231        if (isOnCompleteOnly) {
232            throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
233        }
234        // must define return type as OutputDefinition and not this type to avoid end user being able
235        // to invoke onFailureOnly/onCompleteOnly more than once
236        setOnCompleteOnly(Boolean.FALSE);
237        setOnFailureOnly(Boolean.TRUE);
238        return this;
239    }
240
241    /**
242     * Sets an additional predicate that should be true before the onCompletion is triggered.
243     * <p/>
244     * To be used for fine grained controlling whether a completion callback should be invoked or not
245     *
246     * @param predicate predicate that determines true or false
247     * @return the builder
248     */
249    public OnCompletionDefinition onWhen(Predicate predicate) {
250        setOnWhen(new WhenDefinition(predicate));
251        return this;
252    }
253
254    /**
255     * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion.
256     * <p/>
257     * By default this feature is off.
258     *
259     * @return the builder
260     */
261    public OnCompletionDefinition useOriginalBody() {
262        setUseOriginalMessagePolicy(Boolean.TRUE);
263        return this;
264    }
265
266    /**
267     * To use a custom Thread Pool to be used for parallel processing.
268     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
269     */
270    public OnCompletionDefinition executorService(ExecutorService executorService) {
271        setExecutorService(executorService);
272        return this;
273    }
274
275    /**
276     * Refers to a custom Thread Pool to be used for parallel processing.
277     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
278     */
279    public OnCompletionDefinition executorServiceRef(String executorServiceRef) {
280        setExecutorServiceRef(executorServiceRef);
281        return this;
282    }
283
284    /**
285     * If enabled then the on completion process will run asynchronously by a separate thread from a thread pool.
286     * By default this is false, meaning the on completion process will run synchronously using the same caller thread as from the route.
287     *
288     * @return the builder
289     */
290    public OnCompletionDefinition parallelProcessing() {
291        setParallelProcessing(true);
292        return this;
293    }
294
295    public List<ProcessorDefinition<?>> getOutputs() {
296        return outputs;
297    }
298
299    public void setOutputs(List<ProcessorDefinition<?>> outputs) {
300        this.outputs = outputs;
301    }
302
303    public boolean isOutputSupported() {
304        return true;
305    }
306
307    public OnCompletionMode getMode() {
308        return mode;
309    }
310
311    /**
312     * Sets the on completion mode.
313     * <p/>
314     * The default value is AfterConsumer
315     */
316    public void setMode(OnCompletionMode mode) {
317        this.mode = mode;
318    }
319
320    public Boolean getOnCompleteOnly() {
321        return onCompleteOnly;
322    }
323
324    public void setOnCompleteOnly(Boolean onCompleteOnly) {
325        this.onCompleteOnly = onCompleteOnly;
326    }
327
328    public Boolean getOnFailureOnly() {
329        return onFailureOnly;
330    }
331
332    public void setOnFailureOnly(Boolean onFailureOnly) {
333        this.onFailureOnly = onFailureOnly;
334    }
335
336    public WhenDefinition getOnWhen() {
337        return onWhen;
338    }
339
340    public void setOnWhen(WhenDefinition onWhen) {
341        this.onWhen = onWhen;
342    }
343
344    public ExecutorService getExecutorService() {
345        return executorService;
346    }
347
348    public void setExecutorService(ExecutorService executorService) {
349        this.executorService = executorService;
350    }
351
352    public String getExecutorServiceRef() {
353        return executorServiceRef;
354    }
355
356    public void setExecutorServiceRef(String executorServiceRef) {
357        this.executorServiceRef = executorServiceRef;
358    }
359
360    public Boolean getUseOriginalMessagePolicy() {
361        return useOriginalMessagePolicy;
362    }
363
364    /**
365     * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion.
366     * <p/>
367     * By default this feature is off.
368     */
369    public void setUseOriginalMessagePolicy(Boolean useOriginalMessagePolicy) {
370        this.useOriginalMessagePolicy = useOriginalMessagePolicy;
371    }
372
373    public Boolean getParallelProcessing() {
374        return parallelProcessing;
375    }
376
377    public void setParallelProcessing(Boolean parallelProcessing) {
378        this.parallelProcessing = parallelProcessing;
379    }
380
381}