001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor.loadbalancer;
018
019import java.util.List;
020import java.util.concurrent.RejectedExecutionException;
021import java.util.concurrent.atomic.AtomicInteger;
022
023import org.apache.camel.AsyncCallback;
024import org.apache.camel.AsyncProcessor;
025import org.apache.camel.CamelContext;
026import org.apache.camel.CamelContextAware;
027import org.apache.camel.Exchange;
028import org.apache.camel.Processor;
029import org.apache.camel.Traceable;
030import org.apache.camel.util.AsyncProcessorConverterHelper;
031
032public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements Traceable, CamelContextAware {
033    private static final int STATE_CLOSED = 0;
034    private static final int STATE_HALF_OPEN = 1;
035    private static final int STATE_OPEN = 2;
036
037    private final List<Class<?>> exceptions;
038    private CamelContext camelContext;
039    private int threshold;
040    private long halfOpenAfter;
041    private long lastFailure;
042    private AtomicInteger failures = new AtomicInteger();
043    private AtomicInteger state = new AtomicInteger(STATE_CLOSED);
044
045    public CircuitBreakerLoadBalancer(List<Class<?>> exceptions) {
046        this.exceptions = exceptions;
047    }
048
049    public CircuitBreakerLoadBalancer() {
050        this.exceptions = null;
051    }
052
053    public void setHalfOpenAfter(long halfOpenAfter) {
054        this.halfOpenAfter = halfOpenAfter;
055    }
056
057    public void setThreshold(int threshold) {
058        this.threshold = threshold;
059    }
060
061    @Override
062    public CamelContext getCamelContext() {
063        return camelContext;
064    }
065
066    @Override
067    public void setCamelContext(CamelContext camelContext) {
068        this.camelContext = camelContext;
069    }
070
071    public List<Class<?>> getExceptions() {
072        return exceptions;
073    }
074
075    protected boolean hasFailed(Exchange exchange) {
076        boolean answer = false;
077
078        if (exchange.getException() != null) {
079            if (exceptions == null || exceptions.isEmpty()) {
080                answer = true;
081            } else {
082                for (Class<?> exception : exceptions) {
083                    if (exchange.getException(exception) != null) {
084                        answer = true;
085                        break;
086                    }
087                }
088            }
089        }
090        return answer;
091    }
092
093    @Override
094    public boolean isRunAllowed() {
095        boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
096        if (forceShutdown) {
097            log.trace("Run not allowed as ShutdownStrategy is forcing shutting down");
098        }
099        return !forceShutdown && super.isRunAllowed();
100    }
101
102    public boolean process(final Exchange exchange, final AsyncCallback callback) {
103
104        // can we still run
105        if (!isRunAllowed()) {
106            log.trace("Run not allowed, will reject executing exchange: {}", exchange);
107            if (exchange.getException() == null) {
108                exchange.setException(new RejectedExecutionException("Run is not allowed"));
109            }
110            callback.done(true);
111            return true;
112        }
113
114        return calculateState(exchange, callback);
115    }
116
117    private boolean calculateState(final Exchange exchange, final AsyncCallback callback) {
118        boolean output = false;
119        if (state.get() == STATE_HALF_OPEN) {
120            if (failures.get() == 0) {
121                output = closeCircuit(exchange, callback);
122            } else {
123                output = openCircuit(exchange, callback);
124            }
125        } else if (state.get() == STATE_OPEN) {
126            if (failures.get() >= threshold && System.currentTimeMillis() - lastFailure < halfOpenAfter) {
127                output = openCircuit(exchange, callback);
128            } else {
129                output = halfOpenCircuit(exchange, callback);
130            }
131        } else if (state.get() == STATE_CLOSED) {
132            if (failures.get() >= threshold && System.currentTimeMillis() - lastFailure < halfOpenAfter) {
133                output = openCircuit(exchange, callback);
134            } else if (failures.get() >= threshold && System.currentTimeMillis() - lastFailure >= halfOpenAfter) {
135                output = halfOpenCircuit(exchange, callback);
136            } else {
137                output = closeCircuit(exchange, callback);
138            }
139        } else {
140            throw new IllegalStateException("Unrecognised circuitBreaker state " + state.get());
141        }
142        return output;
143    }
144
145    private boolean openCircuit(final Exchange exchange, final AsyncCallback callback) {
146        boolean output = rejectExchange(exchange, callback);
147        state.set(STATE_OPEN);
148        logState();
149        return output;
150    }
151
152    private boolean halfOpenCircuit(final Exchange exchange, final AsyncCallback callback) {
153        boolean output = executeProcessor(exchange, callback);
154        state.set(STATE_HALF_OPEN);
155        logState();
156        return output;
157    }
158
159    private boolean closeCircuit(final Exchange exchange, final AsyncCallback callback) {
160        boolean output = executeProcessor(exchange, callback);
161        state.set(STATE_CLOSED);
162        logState();
163        return output;
164    }
165
166    private void logState() {
167        log.debug("State {}, failures {}, closed since {}", new Object[]{state.get(), failures.get(), System.currentTimeMillis() - lastFailure});
168    }
169
170    private boolean executeProcessor(final Exchange exchange, final AsyncCallback callback) {
171        Processor processor = getProcessors().get(0);
172        if (processor == null) {
173            throw new IllegalStateException("No processors could be chosen to process CircuitBreaker");
174        }
175
176        AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
177        // Added a callback for processing the exchange in the callback
178        boolean sync = albp.process(exchange, new CircuitBreakerCallback(exchange, callback));
179
180        // We need to check the exception here as albp is use sync call
181        if (sync) {
182            boolean failed = hasFailed(exchange);
183            if (!failed) {
184                failures.set(0);
185            } else {
186                failures.incrementAndGet();
187                lastFailure = System.currentTimeMillis();
188            }
189        } else {
190            // CircuitBreakerCallback can take care of failure check of the
191            // exchange
192            log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
193            return false;
194        }
195
196        log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
197        callback.done(true);
198        return true;
199    }
200
201    private boolean rejectExchange(final Exchange exchange, final AsyncCallback callback) {
202        exchange.setException(new RejectedExecutionException("CircuitBreaker Open: failures: " + failures + ", lastFailure: " + lastFailure));
203        /*
204         * If the circuit opens, we have to prevent the execution of any
205         * processor. The failures count can be set to 0.
206         */
207        failures.set(0);
208        callback.done(true);
209        return true;
210    }
211
212    public String toString() {
213        return "CircuitBreakerLoadBalancer[" + getProcessors() + "]";
214    }
215
216    public String getTraceLabel() {
217        return "circuitbreaker";
218    }
219
220    class CircuitBreakerCallback implements AsyncCallback {
221        private final AsyncCallback callback;
222        private final Exchange exchange;
223
224        CircuitBreakerCallback(Exchange exchange, AsyncCallback callback) {
225            this.callback = callback;
226            this.exchange = exchange;
227        }
228
229        @Override
230        public void done(boolean doneSync) {
231            if (!doneSync) {
232                boolean failed = hasFailed(exchange);
233                if (!failed) {
234                    failures.set(0);
235                } else {
236                    failures.incrementAndGet();
237                    lastFailure = System.currentTimeMillis();
238                }
239            }
240            callback.done(doneSync);
241        }
242
243    }
244}