001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.concurrent.ExecutorService;
020import java.util.concurrent.ScheduledExecutorService;
021import javax.xml.bind.annotation.XmlAccessType;
022import javax.xml.bind.annotation.XmlAccessorType;
023import javax.xml.bind.annotation.XmlAttribute;
024import javax.xml.bind.annotation.XmlRootElement;
025import javax.xml.bind.annotation.XmlTransient;
026
027import org.apache.camel.Expression;
028import org.apache.camel.Processor;
029import org.apache.camel.builder.ExpressionBuilder;
030import org.apache.camel.model.language.ExpressionDefinition;
031import org.apache.camel.processor.Throttler;
032import org.apache.camel.spi.Metadata;
033import org.apache.camel.spi.RouteContext;
034import org.apache.camel.util.ObjectHelper;
035
036/**
037 * Controls the rate at which messages are passed to the next node in the route
038 *
039 * @version 
040 */
041@Metadata(label = "eip,routing")
042@XmlRootElement(name = "throttle")
043@XmlAccessorType(XmlAccessType.FIELD)
044public class ThrottleDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<ThrottleDefinition> {
045    // TODO: Camel 3.0 Should not support outputs
046
047    @XmlTransient
048    private ExecutorService executorService;
049    @XmlAttribute
050    private String executorServiceRef;
051    @XmlAttribute @Metadata(defaultValue = "1000")
052    private Long timePeriodMillis;
053    @XmlAttribute
054    private Boolean asyncDelayed;
055    @XmlAttribute @Metadata(defaultValue = "true")
056    private Boolean callerRunsWhenRejected;
057    @XmlAttribute
058    private Boolean rejectExecution;
059    
060    public ThrottleDefinition() {
061    }
062
063    public ThrottleDefinition(Expression maximumRequestsPerPeriod) {
064        super(maximumRequestsPerPeriod);
065    }
066
067    @Override
068    public String toString() {
069        return "Throttle[" + description() + " -> " + getOutputs() + "]";
070    }
071    
072    protected String description() {
073        return getExpression() + " request per " + getTimePeriodMillis() + " millis";
074    }
075
076    @Override
077    public String getLabel() {
078        return "throttle[" + description() + "]";
079    }
080
081    @Override
082    public Processor createProcessor(RouteContext routeContext) throws Exception {
083        Processor childProcessor = this.createChildProcessor(routeContext, true);
084
085        boolean async = getAsyncDelayed() != null && getAsyncDelayed();
086        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, async);
087        ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, async);
088        
089        // should be default 1000 millis
090        long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L;
091
092        // max requests per period is mandatory
093        Expression maxRequestsExpression = createMaxRequestsPerPeriodExpression(routeContext);
094        if (maxRequestsExpression == null) {
095            throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this);
096        }
097
098        boolean reject = getRejectExecution() != null && getRejectExecution();
099        Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject);
100
101        answer.setAsyncDelayed(async);
102        if (getCallerRunsWhenRejected() == null) {
103            // should be true by default
104            answer.setCallerRunsWhenRejected(true);
105        } else {
106            answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
107        }
108        return answer;
109    }
110
111    private Expression createMaxRequestsPerPeriodExpression(RouteContext routeContext) {
112        if (getExpression() != null) {
113            if (ObjectHelper.isNotEmpty(getExpression().getExpression()) || getExpression().getExpressionValue() != null) {
114                return getExpression().createExpression(routeContext);
115            } 
116        } 
117        return null;
118    }
119    
120    // Fluent API
121    // -------------------------------------------------------------------------
122    /**
123     * Sets the time period during which the maximum request count is valid for
124     *
125     * @param timePeriodMillis  period in millis
126     * @return the builder
127     */
128    public ThrottleDefinition timePeriodMillis(long timePeriodMillis) {
129        setTimePeriodMillis(timePeriodMillis);
130        return this;
131    }
132    
133    /**
134     * Sets the time period during which the maximum request count per period
135     *
136     * @param maximumRequestsPerPeriod  the maximum request count number per time period
137     * @return the builder
138     */
139    public ThrottleDefinition maximumRequestsPerPeriod(Long maximumRequestsPerPeriod) {
140        setExpression(ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.constantExpression(maximumRequestsPerPeriod)));
141        return this;
142    }
143
144    /**
145     * Whether or not the caller should run the task when it was rejected by the thread pool.
146     * <p/>
147     * Is by default <tt>true</tt>
148     *
149     * @param callerRunsWhenRejected whether or not the caller should run
150     * @return the builder
151     */
152    public ThrottleDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) {
153        setCallerRunsWhenRejected(callerRunsWhenRejected);
154        return this;
155    }
156
157    /**
158     * Enables asynchronous delay which means the thread will <b>no</b> block while delaying.
159     *
160     * @return the builder
161     */
162    public ThrottleDefinition asyncDelayed() {
163        setAsyncDelayed(true);
164        return this;
165    }
166    
167    /**
168     * Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit
169     * <p/>
170     * Is by default <tt>false</tt>
171     *
172     * @param rejectExecution throw the RejectExecutionException if the exchange exceeds the request limit 
173     * @return the builder
174     */
175    public ThrottleDefinition rejectExecution(boolean rejectExecution) {
176        setRejectExecution(rejectExecution);
177        return this;
178    }
179
180    /**
181     * Sets the ExecutorService which could be used by throttle definition
182     *
183     * @param executorService  
184     * @return the builder
185     */
186    public ThrottleDefinition executorService(ExecutorService executorService) {
187        setExecutorService(executorService);
188        return this;
189    }
190
191    /**
192     * Sets the ExecutorService which could be used by throttle definition
193     *
194     * @param executorServiceRef the reference id of the Executor Service  
195     * @return the builder
196     */
197    public ThrottleDefinition executorServiceRef(String executorServiceRef) {
198        setExecutorServiceRef(executorServiceRef);
199        return this;
200    }
201
202    // Properties
203    // -------------------------------------------------------------------------
204
205    /**
206     * Expression to configure the maximum number of messages to throttle per request
207     */
208    @Override
209    public void setExpression(ExpressionDefinition expression) {
210        // override to include javadoc what the expression is used for
211        super.setExpression(expression);
212    }
213
214    public Long getTimePeriodMillis() {
215        return timePeriodMillis;
216    }
217
218    public void setTimePeriodMillis(Long timePeriodMillis) {
219        this.timePeriodMillis = timePeriodMillis;
220    }
221
222    public Boolean getAsyncDelayed() {
223        return asyncDelayed;
224    }
225
226    public void setAsyncDelayed(Boolean asyncDelayed) {
227        this.asyncDelayed = asyncDelayed;
228    }
229
230    public Boolean getCallerRunsWhenRejected() {
231        return callerRunsWhenRejected;
232    }
233
234    public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) {
235        this.callerRunsWhenRejected = callerRunsWhenRejected;
236    }
237
238    public ExecutorService getExecutorService() {
239        return executorService;
240    }
241
242    public void setExecutorService(ExecutorService executorService) {
243        this.executorService = executorService;
244    }
245
246    public String getExecutorServiceRef() {
247        return executorServiceRef;
248    }
249
250    public void setExecutorServiceRef(String executorServiceRef) {
251        this.executorServiceRef = executorServiceRef;
252    }
253    
254    public Boolean getRejectExecution() {
255        return rejectExecution;
256    }
257
258    public void setRejectExecution(Boolean rejectExecution) {
259        this.rejectExecution = rejectExecution;
260    }
261}