001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.List;
021import javax.xml.bind.annotation.XmlAccessType;
022import javax.xml.bind.annotation.XmlAccessorType;
023import javax.xml.bind.annotation.XmlElement;
024import javax.xml.bind.annotation.XmlElementRef;
025import javax.xml.bind.annotation.XmlElements;
026import javax.xml.bind.annotation.XmlRootElement;
027import javax.xml.bind.annotation.XmlTransient;
028
029import org.apache.camel.Expression;
030import org.apache.camel.Processor;
031import org.apache.camel.model.config.BatchResequencerConfig;
032import org.apache.camel.model.config.ResequencerConfig;
033import org.apache.camel.model.config.StreamResequencerConfig;
034import org.apache.camel.model.language.ExpressionDefinition;
035import org.apache.camel.processor.CamelInternalProcessor;
036import org.apache.camel.processor.Resequencer;
037import org.apache.camel.processor.StreamResequencer;
038import org.apache.camel.processor.resequencer.ExpressionResultComparator;
039import org.apache.camel.spi.Metadata;
040import org.apache.camel.spi.Required;
041import org.apache.camel.spi.RouteContext;
042import org.apache.camel.util.CamelContextHelper;
043import org.apache.camel.util.ObjectHelper;
044
045/**
046 * Resequences (re-order) messages based on an expression
047 *
048 * @version 
049 */
050@Metadata(label = "eip,routing")
051@XmlRootElement(name = "resequence")
052@XmlAccessorType(XmlAccessType.FIELD)
053public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefinition> {
054    @Metadata(required = "false")
055    @XmlElements({
056    @XmlElement(name = "batch-config", type = BatchResequencerConfig.class),
057    @XmlElement(name = "stream-config", type = StreamResequencerConfig.class)}
058    )
059    private ResequencerConfig resequencerConfig;
060    @XmlTransient
061    private BatchResequencerConfig batchConfig;
062    @XmlTransient
063    private StreamResequencerConfig streamConfig;
064    @XmlElementRef
065    @Required
066    private ExpressionDefinition expression;
067    @XmlElementRef
068    private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
069
070    public ResequenceDefinition() {
071    }
072
073    public List<ProcessorDefinition<?>> getOutputs() {
074        return outputs;
075    }
076
077    public void setOutputs(List<ProcessorDefinition<?>> outputs) {
078        this.outputs = outputs;
079    }
080
081    @Override
082    public boolean isOutputSupported() {
083        return true;
084    }
085
086    // Fluent API
087    // -------------------------------------------------------------------------
088    /**
089     * Configures the stream-based resequencing algorithm using the default
090     * configuration.
091     *
092     * @return the builder
093     */
094    public ResequenceDefinition stream() {
095        return stream(StreamResequencerConfig.getDefault());
096    }
097
098    /**
099     * Configures the batch-based resequencing algorithm using the default
100     * configuration.
101     *
102     * @return the builder
103     */
104    public ResequenceDefinition batch() {
105        return batch(BatchResequencerConfig.getDefault());
106    }
107
108    /**
109     * Configures the stream-based resequencing algorithm using the given
110     * {@link StreamResequencerConfig}.
111     *
112     * @param config  the config
113     * @return the builder
114     */
115    public ResequenceDefinition stream(StreamResequencerConfig config) {
116        this.streamConfig = config;
117        this.batchConfig = null;
118        return this;
119    }
120
121    /**
122     * Configures the batch-based resequencing algorithm using the given
123     * {@link BatchResequencerConfig}.
124     *
125     * @param config  the config
126     * @return the builder
127     */
128    public ResequenceDefinition batch(BatchResequencerConfig config) {
129        this.batchConfig = config;
130        this.streamConfig = null;
131        return this;
132    }
133
134    /**
135     * Sets the timeout
136     * @param timeout  timeout in millis
137     * @return the builder
138     */
139    public ResequenceDefinition timeout(long timeout) {
140        if (streamConfig != null) {
141            streamConfig.setTimeout(timeout);
142        } else {
143            // initialize batch mode as its default mode
144            if (batchConfig == null) {
145                batch();
146            }
147            batchConfig.setBatchTimeout(timeout);
148        }
149        return this;
150    }
151
152    /**
153     * Sets the rejectOld flag to throw an error when a message older than the last delivered message is processed
154     * @return the builder
155     */
156    public ResequenceDefinition rejectOld() {
157        if (streamConfig == null) {
158            throw new IllegalStateException("rejectOld() only supported for stream resequencer");
159        }
160        streamConfig.setRejectOld(true);
161        return this;
162    }
163
164    /**
165     * Sets the in batch size for number of exchanges received
166     * @param batchSize  the batch size
167     * @return the builder
168     */
169    public ResequenceDefinition size(int batchSize) {
170        if (streamConfig != null) {
171            throw new IllegalStateException("size() only supported for batch resequencer");
172        }
173        // initialize batch mode as its default mode
174        if (batchConfig == null) {
175            batch();
176        }
177        batchConfig.setBatchSize(batchSize);
178        return this;
179    }
180
181    /**
182     * Sets the capacity for the stream resequencer
183     *
184     * @param capacity  the capacity
185     * @return the builder
186     */
187    public ResequenceDefinition capacity(int capacity) {
188        if (streamConfig == null) {
189            throw new IllegalStateException("capacity() only supported for stream resequencer");
190        }
191        streamConfig.setCapacity(capacity);
192        return this;
193
194    }
195
196    /**
197     * Enables duplicates for the batch resequencer mode
198     * @return the builder
199     */
200    public ResequenceDefinition allowDuplicates() {
201        if (streamConfig != null) {
202            throw new IllegalStateException("allowDuplicates() only supported for batch resequencer");
203        }
204        // initialize batch mode as its default mode
205        if (batchConfig == null) {
206            batch();
207        }
208        batchConfig.setAllowDuplicates(true);
209        return this;
210    }
211
212    /**
213     * Enables reverse mode for the batch resequencer mode.
214     * <p/>
215     * This means the expression for determine the sequence order will be reversed.
216     * Can be used for Z..A or 9..0 ordering.
217     *
218     * @return the builder
219     */
220    public ResequenceDefinition reverse() {
221        if (streamConfig != null) {
222            throw new IllegalStateException("reverse() only supported for batch resequencer");
223        }
224        // initialize batch mode as its default mode
225        if (batchConfig == null) {
226            batch();
227        }
228        batchConfig.setReverse(true);
229        return this;
230    }
231
232    /**
233     * If an incoming {@link org.apache.camel.Exchange} is invalid, then it will be ignored.
234     *
235     * @return builder
236     */
237    public ResequenceDefinition ignoreInvalidExchanges() {
238        if (streamConfig != null) {
239            streamConfig.setIgnoreInvalidExchanges(true);
240        } else {
241            // initialize batch mode as its default mode
242            if (batchConfig == null) {
243                batch();
244            }
245            batchConfig.setIgnoreInvalidExchanges(true);
246        }
247        return this;
248    }
249
250    /**
251     * Sets the comparator to use for stream resequencer
252     *
253     * @param comparator  the comparator
254     * @return the builder
255     */
256    public ResequenceDefinition comparator(ExpressionResultComparator comparator) {
257        if (streamConfig == null) {
258            throw new IllegalStateException("comparator() only supported for stream resequencer");
259        }
260        streamConfig.setComparator(comparator);
261        return this;
262    }
263
264    @Override
265    public String toString() {
266        return "Resequencer[" + getExpression() + " -> " + getOutputs() + "]";
267    }
268    
269    @Override
270    public String getLabel() {
271        return "resequencer[" + (getExpression() != null ? getExpression().getLabel() : "") + "]";
272    }
273
274    public ResequencerConfig getResequencerConfig() {
275        return resequencerConfig;
276    }
277
278    /**
279     * To configure the resequencer in using either batch or stream configuration. Will by default use batch configuration.
280     */
281    public void setResequencerConfig(ResequencerConfig resequencerConfig) {
282        this.resequencerConfig = resequencerConfig;
283    }
284
285    public BatchResequencerConfig getBatchConfig() {
286        if (batchConfig == null && resequencerConfig != null && resequencerConfig instanceof BatchResequencerConfig) {
287            return (BatchResequencerConfig) resequencerConfig;
288        }
289        return batchConfig;
290    }
291
292    public StreamResequencerConfig getStreamConfig() {
293        if (streamConfig == null && resequencerConfig != null && resequencerConfig instanceof StreamResequencerConfig) {
294            return (StreamResequencerConfig) resequencerConfig;
295        }
296        return streamConfig;
297    }
298
299    public void setBatchConfig(BatchResequencerConfig batchConfig) {
300        this.batchConfig = batchConfig;
301    }
302
303    public void setStreamConfig(StreamResequencerConfig streamConfig) {
304        this.streamConfig = streamConfig;
305    }
306
307    public ExpressionDefinition getExpression() {
308        return expression;
309    }
310
311    /**
312     * Expression to use for re-ordering the messages, such as a header with a sequence number
313     */
314    public void setExpression(ExpressionDefinition expression) {
315        this.expression = expression;
316    }
317
318    @Override
319    public Processor createProcessor(RouteContext routeContext) throws Exception {
320        // if configured from XML then streamConfig has been set with the configuration
321        if (resequencerConfig != null) {
322            if (resequencerConfig instanceof StreamResequencerConfig) {
323                streamConfig = (StreamResequencerConfig) resequencerConfig;
324            } else {
325                batchConfig = (BatchResequencerConfig) resequencerConfig;
326            }
327        }
328
329        if (streamConfig != null) {
330            return createStreamResequencer(routeContext, streamConfig);
331        } else {
332            if (batchConfig == null) {
333                // default as batch mode
334                batch();
335            }
336            return createBatchResequencer(routeContext, batchConfig);
337        }
338    }
339
340    /**
341     * Creates a batch {@link Resequencer} instance applying the given <code>config</code>.
342     * 
343     * @param routeContext route context.
344     * @param config batch resequencer configuration.
345     * @return the configured batch resequencer.
346     * @throws Exception can be thrown
347     */
348    @SuppressWarnings("deprecation")
349    protected Resequencer createBatchResequencer(RouteContext routeContext,
350                                                 BatchResequencerConfig config) throws Exception {
351        Processor processor = this.createChildProcessor(routeContext, true);
352        Expression expression = getExpression().createExpression(routeContext);
353
354        // and wrap in unit of work
355        String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
356        CamelInternalProcessor internal = new CamelInternalProcessor(processor);
357        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId));
358        internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext));
359
360        ObjectHelper.notNull(config, "config", this);
361        ObjectHelper.notNull(expression, "expression", this);
362
363        boolean isReverse = config.getReverse() != null && config.getReverse();
364        boolean isAllowDuplicates = config.getAllowDuplicates() != null && config.getAllowDuplicates();
365
366        Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), internal, expression, isAllowDuplicates, isReverse);
367        resequencer.setBatchSize(config.getBatchSize());
368        resequencer.setBatchTimeout(config.getBatchTimeout());
369        if (config.getIgnoreInvalidExchanges() != null) {
370            resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
371        }
372        return resequencer;
373    }
374
375    /**
376     * Creates a {@link StreamResequencer} instance applying the given <code>config</code>.
377     * 
378     * @param routeContext route context.
379     * @param config stream resequencer configuration.
380     * @return the configured stream resequencer.
381     * @throws Exception can be thrwon
382     */
383    protected StreamResequencer createStreamResequencer(RouteContext routeContext,
384                                                        StreamResequencerConfig config) throws Exception {
385        Processor processor = this.createChildProcessor(routeContext, true);
386        Expression expression = getExpression().createExpression(routeContext);
387
388        // and wrap in unit of work
389        String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
390        CamelInternalProcessor internal = new CamelInternalProcessor(processor);
391        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId));
392        internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext));
393
394        ObjectHelper.notNull(config, "config", this);
395        ObjectHelper.notNull(expression, "expression", this);
396
397        ExpressionResultComparator comparator;
398        if (config.getComparatorRef() != null) {
399            comparator = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), config.getComparatorRef(), ExpressionResultComparator.class);
400        } else {
401            comparator = config.getComparator();
402        }
403        comparator.setExpression(expression);
404
405        StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), internal, comparator);
406        resequencer.setTimeout(config.getTimeout());
407        resequencer.setCapacity(config.getCapacity());
408        resequencer.setRejectOld(config.getRejectOld());
409        if (config.getIgnoreInvalidExchanges() != null) {
410            resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
411        }
412        return resequencer;
413    }
414
415}