001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.Collections;
022import java.util.Comparator;
023import java.util.HashSet;
024import java.util.LinkedHashSet;
025import java.util.List;
026import java.util.Locale;
027import java.util.Set;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Future;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033
034import org.apache.camel.CamelContext;
035import org.apache.camel.CamelContextAware;
036import org.apache.camel.Consumer;
037import org.apache.camel.Route;
038import org.apache.camel.Service;
039import org.apache.camel.ShutdownRoute;
040import org.apache.camel.ShutdownRunningTask;
041import org.apache.camel.SuspendableService;
042import org.apache.camel.spi.InflightRepository;
043import org.apache.camel.spi.RouteStartupOrder;
044import org.apache.camel.spi.ShutdownAware;
045import org.apache.camel.spi.ShutdownPrepared;
046import org.apache.camel.spi.ShutdownStrategy;
047import org.apache.camel.support.ServiceSupport;
048import org.apache.camel.util.EventHelper;
049import org.apache.camel.util.ObjectHelper;
050import org.apache.camel.util.ServiceHelper;
051import org.apache.camel.util.StopWatch;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Default {@link org.apache.camel.spi.ShutdownStrategy} which uses graceful shutdown.
057 * <p/>
058 * Graceful shutdown ensures that any inflight and pending messages will be taken into account
059 * and it will wait until these exchanges has been completed.
060 * <p/>
061 * This strategy will perform graceful shutdown in two steps:
062 * <ul>
063 *     <li>Graceful - By suspending/stopping consumers, and let any in-flight exchanges complete</li>
064 *     <li>Forced - After a given period of time, a timeout occurred and if there are still pending
065 *     exchanges to complete, then a more aggressive forced strategy is performed.</li>
066 * </ul>
067 * The idea by the <tt>graceful</tt> shutdown strategy, is to stop taking in more new messages,
068 * and allow any existing inflight messages to complete. Then when there is no more inflight messages
069 * then the routes can be fully shutdown. This mean that if there is inflight messages then we will have
070 * to wait for these messages to complete. If they do not complete after a period of time, then a
071 * timeout triggers. And then a more aggressive strategy takes over.
072 * <p/>
073 * The idea by the <tt>forced</tt> shutdown strategy, is to stop continue processing messages.
074 * And force routes and its services to shutdown now. There is a risk when shutting down now,
075 * that some resources is not properly shutdown, which can cause side effects. The timeout value
076 * is by default 300 seconds, but can be customized.
077 * <p/>
078 * As this strategy will politely wait until all exchanges has been completed it can potential wait
079 * for a long time, and hence why a timeout value can be set. When the timeout triggers you can also
080 * specify whether the remainder consumers should be shutdown now or ignore.
081 * <p/>
082 * Will by default use a timeout of 300 seconds (5 minutes) by which it will shutdown now the remaining consumers.
083 * This ensures that when shutting down Camel it at some point eventually will shutdown.
084 * This behavior can of course be configured using the {@link #setTimeout(long)} and
085 * {@link #setShutdownNowOnTimeout(boolean)} methods.
086 * <p/>
087 * Routes will by default be shutdown in the reverse order of which they where started.
088 * You can customize this using the {@link #setShutdownRoutesInReverseOrder(boolean)} method.
089 * <p/>
090 * After route consumers have been shutdown, then any {@link ShutdownPrepared} services on the routes
091 * is being prepared for shutdown, by invoking {@link ShutdownPrepared#prepareShutdown(boolean)} which
092 * <tt>force=false</tt>.
093 * <p/>
094 * Then if a timeout occurred and the strategy has been configured with shutdown-now on timeout, then
095 * the strategy performs a more aggressive forced shutdown, by forcing all consumers to shutdown
096 * and then invokes {@link ShutdownPrepared#prepareShutdown(boolean)} with <tt>force=true</tt>
097 * on the services. This allows the services to know they should force shutdown now.
098 * <p/>
099 * When timeout occurred and a forced shutdown is happening, then there may be threads/tasks which are
100 * still inflight which may be rejected continued being routed. By default this can cause WARN and ERRORs
101 * to be logged. The option {@link #setSuppressLoggingOnTimeout(boolean)} can be used to suppress these
102 * logs, so they are logged at TRACE level instead.
103 * <p/>
104 * Also when a timeout occurred then information about the inflight exchanges is logged, if {@link #isLogInflightExchangesOnTimeout()}
105 * is enabled (is by default). This allows end users to known where these inflight exchanges currently are in the route(s),
106 * and how long time they have been inflight.
107 * <p/>
108 * This information can also be obtained from the {@link org.apache.camel.spi.InflightRepository}
109 * at all time during runtime.
110 *
111 * @version
112 */
113public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownStrategy, CamelContextAware {
114    private static final Logger LOG = LoggerFactory.getLogger(DefaultShutdownStrategy.class);
115
116    private CamelContext camelContext;
117    private ExecutorService executor;
118    private long timeout = 5 * 60;
119    private TimeUnit timeUnit = TimeUnit.SECONDS;
120    private boolean shutdownNowOnTimeout = true;
121    private boolean shutdownRoutesInReverseOrder = true;
122    private boolean suppressLoggingOnTimeout;
123    private boolean logInflightExchangesOnTimeout = true;
124
125    private volatile boolean forceShutdown;
126    private final AtomicBoolean timeoutOccurred = new AtomicBoolean();
127    private volatile Future<?> currentShutdownTaskFuture;
128
129    public DefaultShutdownStrategy() {
130    }
131
132    public DefaultShutdownStrategy(CamelContext camelContext) {
133        this.camelContext = camelContext;
134    }
135
136    public void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
137        shutdown(context, routes, getTimeout(), getTimeUnit());
138    }
139
140    @Override
141    public void shutdownForced(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
142        doShutdown(context, routes, getTimeout(), getTimeUnit(), false, false, true);
143    }
144
145    public void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
146        doShutdown(context, routes, getTimeout(), getTimeUnit(), true, false, false);
147    }
148
149    public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
150        doShutdown(context, routes, timeout, timeUnit, false, false, false);
151    }
152
153    public boolean shutdown(CamelContext context, RouteStartupOrder route, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception {
154        List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
155        routes.add(route);
156        return doShutdown(context, routes, timeout, timeUnit, false, abortAfterTimeout, false);
157    }
158
159    public void suspend(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
160        doShutdown(context, routes, timeout, timeUnit, true, false, false);
161    }
162
163    protected boolean doShutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit,
164                                 boolean suspendOnly, boolean abortAfterTimeout, boolean forceShutdown) throws Exception {
165
166        // just return if no routes to shutdown
167        if (routes.isEmpty()) {
168            return true;
169        }
170
171        StopWatch watch = new StopWatch();
172
173        // at first sort according to route startup order
174        List<RouteStartupOrder> routesOrdered = new ArrayList<RouteStartupOrder>(routes);
175        Collections.sort(routesOrdered, new Comparator<RouteStartupOrder>() {
176            public int compare(RouteStartupOrder o1, RouteStartupOrder o2) {
177                return o1.getStartupOrder() - o2.getStartupOrder();
178            }
179        });
180        if (shutdownRoutesInReverseOrder) {
181            Collections.reverse(routesOrdered);
182        }
183
184        LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")");
185
186        // use another thread to perform the shutdowns so we can support timeout
187        timeoutOccurred.set(false);
188        currentShutdownTaskFuture = getExecutorService().submit(new ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly, abortAfterTimeout, timeoutOccurred));
189        try {
190            currentShutdownTaskFuture.get(timeout, timeUnit);
191        } catch (ExecutionException e) {
192            // unwrap execution exception
193            throw ObjectHelper.wrapRuntimeCamelException(e.getCause());
194        } catch (Exception e) {
195            // either timeout or interrupted exception was thrown so this is okay
196            // as interrupted would mean cancel was called on the currentShutdownTaskFuture to signal a forced timeout
197
198            // we hit a timeout, so set the flag
199            timeoutOccurred.set(true);
200
201            // timeout then cancel the task
202            currentShutdownTaskFuture.cancel(true);
203
204            // signal we are forcing shutdown now, since timeout occurred
205            this.forceShutdown = forceShutdown;
206
207            // if set, stop processing and return false to indicate that the shutdown is aborting
208            if (!forceShutdown && abortAfterTimeout) {
209                LOG.warn("Timeout occurred during graceful shutdown. Aborting the shutdown now."
210                        + " Notice: some resources may still be running as graceful shutdown did not complete successfully.");
211
212                // we attempt to force shutdown so lets log the current inflight exchanges which are affected
213                logInflightExchanges(context, routes, isLogInflightExchangesOnTimeout());
214
215                return false;
216            } else {
217                if (forceShutdown || shutdownNowOnTimeout) {
218                    LOG.warn("Timeout occurred during graceful shutdown. Forcing the routes to be shutdown now."
219                            + " Notice: some resources may still be running as graceful shutdown did not complete successfully.");
220
221                    // we attempt to force shutdown so lets log the current inflight exchanges which are affected
222                    logInflightExchanges(context, routes, isLogInflightExchangesOnTimeout());
223
224                    // force the routes to shutdown now
225                    shutdownRoutesNow(routesOrdered);
226
227                    // now the route consumers has been shutdown, then prepare route services for shutdown now (forced)
228                    for (RouteStartupOrder order : routes) {
229                        for (Service service : order.getServices()) {
230                            prepareShutdown(service, true, true, isSuppressLoggingOnTimeout());
231                        }
232                    }
233                } else {
234                    LOG.warn("Timeout occurred during graceful shutdown. Will ignore shutting down the remainder routes."
235                            + " Notice: some resources may still be running as graceful shutdown did not complete successfully.");
236
237                    logInflightExchanges(context, routes, isLogInflightExchangesOnTimeout());
238                }
239            }
240        } finally {
241            currentShutdownTaskFuture = null;
242        }
243
244        // convert to seconds as its easier to read than a big milli seconds number
245        long seconds = TimeUnit.SECONDS.convert(watch.stop(), TimeUnit.MILLISECONDS);
246
247        LOG.info("Graceful shutdown of " + routesOrdered.size() + " routes completed in " + seconds + " seconds");
248        return true;
249    }
250
251    @Override
252    public boolean forceShutdown(Service service) {
253        return forceShutdown;
254    }
255
256    @Override
257    public boolean hasTimeoutOccurred() {
258        return timeoutOccurred.get();
259    }
260
261    public void setTimeout(long timeout) {
262        if (timeout <= 0) {
263            throw new IllegalArgumentException("Timeout must be a positive value");
264        }
265        this.timeout = timeout;
266    }
267
268    public long getTimeout() {
269        return timeout;
270    }
271
272    public void setTimeUnit(TimeUnit timeUnit) {
273        this.timeUnit = timeUnit;
274    }
275
276    public TimeUnit getTimeUnit() {
277        return timeUnit;
278    }
279
280    public void setShutdownNowOnTimeout(boolean shutdownNowOnTimeout) {
281        this.shutdownNowOnTimeout = shutdownNowOnTimeout;
282    }
283
284    public boolean isShutdownNowOnTimeout() {
285        return shutdownNowOnTimeout;
286    }
287
288    public boolean isShutdownRoutesInReverseOrder() {
289        return shutdownRoutesInReverseOrder;
290    }
291
292    public void setShutdownRoutesInReverseOrder(boolean shutdownRoutesInReverseOrder) {
293        this.shutdownRoutesInReverseOrder = shutdownRoutesInReverseOrder;
294    }
295
296    public boolean isSuppressLoggingOnTimeout() {
297        return suppressLoggingOnTimeout;
298    }
299
300    public void setSuppressLoggingOnTimeout(boolean suppressLoggingOnTimeout) {
301        this.suppressLoggingOnTimeout = suppressLoggingOnTimeout;
302    }
303
304    public boolean isLogInflightExchangesOnTimeout() {
305        return logInflightExchangesOnTimeout;
306    }
307
308    public void setLogInflightExchangesOnTimeout(boolean logInflightExchangesOnTimeout) {
309        this.logInflightExchangesOnTimeout = logInflightExchangesOnTimeout;
310    }
311
312    public CamelContext getCamelContext() {
313        return camelContext;
314    }
315
316    public void setCamelContext(CamelContext camelContext) {
317        this.camelContext = camelContext;
318    }
319
320    public Future<?> getCurrentShutdownTaskFuture() {
321        return currentShutdownTaskFuture;
322    }
323
324    /**
325     * Shutdown all the consumers immediately.
326     *
327     * @param routes the routes to shutdown
328     */
329    protected void shutdownRoutesNow(List<RouteStartupOrder> routes) {
330        for (RouteStartupOrder order : routes) {
331
332            // set the route to shutdown as fast as possible by stopping after
333            // it has completed its current task
334            ShutdownRunningTask current = order.getRoute().getRouteContext().getShutdownRunningTask();
335            if (current != ShutdownRunningTask.CompleteCurrentTaskOnly) {
336                LOG.debug("Changing shutdownRunningTask from {} to " +  ShutdownRunningTask.CompleteCurrentTaskOnly
337                    + " on route {} to shutdown faster", current, order.getRoute().getId());
338                order.getRoute().getRouteContext().setShutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly);
339            }
340
341            for (Consumer consumer : order.getInputs()) {
342                shutdownNow(consumer);
343            }
344        }
345    }
346
347    /**
348     * Shutdown all the consumers immediately.
349     *
350     * @param consumers the consumers to shutdown
351     */
352    protected void shutdownNow(List<Consumer> consumers) {
353        for (Consumer consumer : consumers) {
354            shutdownNow(consumer);
355        }
356    }
357
358    /**
359     * Shutdown the consumer immediately.
360     *
361     * @param consumer the consumer to shutdown
362     */
363    protected static void shutdownNow(Consumer consumer) {
364        LOG.trace("Shutting down: {}", consumer);
365
366        // allow us to do custom work before delegating to service helper
367        try {
368            ServiceHelper.stopService(consumer);
369        } catch (Throwable e) {
370            LOG.warn("Error occurred while shutting down route: " + consumer + ". This exception will be ignored.", e);
371            // fire event
372            EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e);
373        }
374
375        LOG.trace("Shutdown complete for: {}", consumer);
376    }
377
378    /**
379     * Suspends/stops the consumer immediately.
380     *
381     * @param consumer the consumer to suspend
382     */
383    protected static void suspendNow(Consumer consumer) {
384        LOG.trace("Suspending: {}", consumer);
385
386        // allow us to do custom work before delegating to service helper
387        try {
388            ServiceHelper.suspendService(consumer);
389        } catch (Throwable e) {
390            LOG.warn("Error occurred while suspending route: " + consumer + ". This exception will be ignored.", e);
391            // fire event
392            EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e);
393        }
394
395        LOG.trace("Suspend complete for: {}", consumer);
396    }
397
398    private ExecutorService getExecutorService() {
399        if (executor == null) {
400            // use a thread pool that allow to terminate idle threads so they do not hang around forever
401            executor = camelContext.getExecutorServiceManager().newThreadPool(this, "ShutdownTask", 0, 1);
402        }
403        return executor;
404    }
405
406    @Override
407    protected void doStart() throws Exception {
408        ObjectHelper.notNull(camelContext, "CamelContext");
409        // reset option
410        forceShutdown = false;
411        timeoutOccurred.set(false);
412    }
413
414    @Override
415    protected void doStop() throws Exception {
416        // noop
417    }
418
419    @Override
420    protected void doShutdown() throws Exception {
421        if (executor != null) {
422            // force shutting down as we are shutting down Camel
423            camelContext.getExecutorServiceManager().shutdownNow(executor);
424            // should clear executor so we can restart by creating a new thread pool
425            executor = null;
426        }
427    }
428
429    /**
430     * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean)} method
431     * on the service if it implement this interface.
432     *
433     * @param service the service
434     * @param forced  whether to force shutdown
435     * @param includeChildren whether to prepare the child of the service as well
436     */
437    private static void prepareShutdown(Service service, boolean forced, boolean includeChildren, boolean suppressLogging) {
438        Set<Service> list;
439        if (includeChildren) {
440            // include error handlers as we want to prepare them for shutdown as well
441            list = ServiceHelper.getChildServices(service, true);
442        } else {
443            list = new LinkedHashSet<Service>(1);
444            list.add(service);
445        }
446
447        for (Service child : list) {
448            if (child instanceof ShutdownPrepared) {
449                try {
450                    LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child);
451                    ((ShutdownPrepared) child).prepareShutdown(forced);
452                } catch (Exception e) {
453                    if (suppressLogging) {
454                        LOG.trace("Error during prepare shutdown on " + child + ". This exception will be ignored.", e);
455                    } else {
456                        LOG.warn("Error during prepare shutdown on " + child + ". This exception will be ignored.", e);
457                    }
458                }
459            }
460        }
461    }
462
463    /**
464     * Holder for deferred consumers
465     */
466    static class ShutdownDeferredConsumer {
467        private final Route route;
468        private final Consumer consumer;
469
470        ShutdownDeferredConsumer(Route route, Consumer consumer) {
471            this.route = route;
472            this.consumer = consumer;
473        }
474
475        Route getRoute() {
476            return route;
477        }
478
479        Consumer getConsumer() {
480            return consumer;
481        }
482    }
483
484    /**
485     * Shutdown task which shutdown all the routes in a graceful manner.
486     */
487    static class ShutdownTask implements Runnable {
488
489        private final CamelContext context;
490        private final List<RouteStartupOrder> routes;
491        private final boolean suspendOnly;
492        private final boolean abortAfterTimeout;
493        private final long timeout;
494        private final TimeUnit timeUnit;
495        private final AtomicBoolean timeoutOccurred;
496
497        public ShutdownTask(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit,
498                            boolean suspendOnly, boolean abortAfterTimeout, AtomicBoolean timeoutOccurred) {
499            this.context = context;
500            this.routes = routes;
501            this.suspendOnly = suspendOnly;
502            this.abortAfterTimeout = abortAfterTimeout;
503            this.timeout = timeout;
504            this.timeUnit = timeUnit;
505            this.timeoutOccurred = timeoutOccurred;
506        }
507
508        public void run() {
509            // the strategy in this run method is to
510            // 1) go over the routes and shutdown those routes which can be shutdown asap
511            //    some routes will be deferred to shutdown at the end, as they are needed
512            //    by other routes so they can complete their tasks
513            // 2) wait until all inflight and pending exchanges has been completed
514            // 3) shutdown the deferred routes
515
516            LOG.debug("There are {} routes to {}", routes.size(), suspendOnly ? "suspend" : "shutdown");
517
518            // list of deferred consumers to shutdown when all exchanges has been completed routed
519            // and thus there are no more inflight exchanges so they can be safely shutdown at that time
520            List<ShutdownDeferredConsumer> deferredConsumers = new ArrayList<ShutdownDeferredConsumer>();
521            for (RouteStartupOrder order : routes) {
522
523                ShutdownRoute shutdownRoute = order.getRoute().getRouteContext().getShutdownRoute();
524                ShutdownRunningTask shutdownRunningTask = order.getRoute().getRouteContext().getShutdownRunningTask();
525
526                if (LOG.isTraceEnabled()) {
527                    LOG.trace("{}{} with options [{},{}]",
528                            new Object[]{suspendOnly ? "Suspending route: " : "Shutting down route: ",
529                                order.getRoute().getId(), shutdownRoute, shutdownRunningTask});
530                }
531
532                for (Consumer consumer : order.getInputs()) {
533
534                    boolean suspend = false;
535
536                    // assume we should shutdown if we are not deferred
537                    boolean shutdown = shutdownRoute != ShutdownRoute.Defer;
538
539                    if (shutdown) {
540                        // if we are to shutdown then check whether we can suspend instead as its a more
541                        // gentle way to graceful shutdown
542
543                        // some consumers do not support shutting down so let them decide
544                        // if a consumer is suspendable then prefer to use that and then shutdown later
545                        if (consumer instanceof ShutdownAware) {
546                            shutdown = !((ShutdownAware) consumer).deferShutdown(shutdownRunningTask);
547                        }
548                        if (shutdown && consumer instanceof SuspendableService) {
549                            // we prefer to suspend over shutdown
550                            suspend = true;
551                        }
552                    }
553
554                    // log at info level when a route has been shutdown (otherwise log at debug level to not be too noisy)
555                    if (suspend) {
556                        // only suspend it and then later shutdown it
557                        suspendNow(consumer);
558                        // add it to the deferred list so the route will be shutdown later
559                        deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
560                        LOG.debug("Route: {} suspended and shutdown deferred, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint());
561                    } else if (shutdown) {
562                        shutdownNow(consumer);
563                        LOG.info("Route: {} shutdown complete, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint());
564                    } else {
565                        // we will stop it later, but for now it must run to be able to help all inflight messages
566                        // be safely completed
567                        deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
568                        LOG.debug("Route: " + order.getRoute().getId() + (suspendOnly ? " shutdown deferred." : " suspension deferred."));
569                    }
570                }
571            }
572
573            // notify the services we intend to shutdown
574            for (RouteStartupOrder order : routes) {
575                for (Service service : order.getServices()) {
576                    // skip the consumer as we handle that specially
577                    if (service instanceof Consumer) {
578                        continue;
579                    }
580                    prepareShutdown(service, false, true, false);
581                }
582            }
583
584            // wait till there are no more pending and inflight messages
585            boolean done = false;
586            long loopDelaySeconds = 1;
587            long loopCount = 0;
588            while (!done && !timeoutOccurred.get()) {
589                int size = 0;
590                for (RouteStartupOrder order : routes) {
591                    int inflight = context.getInflightRepository().size(order.getRoute().getId());
592                    for (Consumer consumer : order.getInputs()) {
593                        // include any additional pending exchanges on some consumers which may have internal
594                        // memory queues such as seda
595                        if (consumer instanceof ShutdownAware) {
596                            inflight += ((ShutdownAware) consumer).getPendingExchangesSize();
597                        }
598                    }
599                    if (inflight > 0) {
600                        size += inflight;
601                        LOG.trace("{} inflight and pending exchanges for route: {}", inflight, order.getRoute().getId());
602                    }
603                }
604                if (size > 0) {
605                    try {
606                        LOG.info("Waiting as there are still " + size + " inflight and pending exchanges to complete, timeout in "
607                             + (TimeUnit.SECONDS.convert(timeout, timeUnit) - (loopCount++ * loopDelaySeconds)) + " seconds.");
608
609                        // log verbose if DEBUG logging is enabled
610                        logInflightExchanges(context, routes, false);
611
612                        Thread.sleep(loopDelaySeconds * 1000);
613                    } catch (InterruptedException e) {
614                        if (abortAfterTimeout) {
615                            LOG.warn("Interrupted while waiting during graceful shutdown, will abort.");
616                            return;
617                        } else {
618                            LOG.warn("Interrupted while waiting during graceful shutdown, will force shutdown now.");
619                            break;
620                        }
621                    }
622                } else {
623                    done = true;
624                }
625            }
626
627            // prepare for shutdown
628            for (ShutdownDeferredConsumer deferred : deferredConsumers) {
629                Consumer consumer = deferred.getConsumer();
630                if (consumer instanceof ShutdownAware) {
631                    LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId());
632                    boolean forced = context.getShutdownStrategy().forceShutdown(consumer);
633                    boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout();
634                    prepareShutdown(consumer, forced, false, suppress);
635                    LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId());
636                }
637            }
638
639            // now all messages has been completed then stop the deferred consumers
640            for (ShutdownDeferredConsumer deferred : deferredConsumers) {
641                Consumer consumer = deferred.getConsumer();
642                if (suspendOnly) {
643                    suspendNow(consumer);
644                    LOG.info("Route: {} suspend complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint());
645                } else {
646                    shutdownNow(consumer);
647                    LOG.info("Route: {} shutdown complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint());
648                }
649            }
650
651            // now the route consumers has been shutdown, then prepare route services for shutdown
652            for (RouteStartupOrder order : routes) {
653                for (Service service : order.getServices()) {
654                    boolean forced = context.getShutdownStrategy().forceShutdown(service);
655                    boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout();
656                    prepareShutdown(service, forced, true, suppress);
657                }
658            }
659        }
660
661    }
662
663    /**
664     * Logs information about the inflight exchanges
665     *
666     * @param infoLevel <tt>true</tt> to log at INFO level, <tt>false</tt> to log at DEBUG level
667     */
668    protected static void logInflightExchanges(CamelContext camelContext, List<RouteStartupOrder> routes, boolean infoLevel) {
669        // check if we need to log
670        if (!infoLevel && !LOG.isDebugEnabled()) {
671            return;
672        }
673
674        Collection<InflightRepository.InflightExchange> inflights = camelContext.getInflightRepository().browse();
675        int size = inflights.size();
676        if (size == 0) {
677            return;
678        }
679
680        // filter so inflight must start from any of the routes
681        Set<String> routeIds = new HashSet<String>();
682        for (RouteStartupOrder route : routes) {
683            routeIds.add(route.getRoute().getId());
684        }
685        Collection<InflightRepository.InflightExchange> filtered = new ArrayList<InflightRepository.InflightExchange>();
686        for (InflightRepository.InflightExchange inflight : inflights) {
687            String routeId = inflight.getExchange().getFromRouteId();
688            if (routeIds.contains(routeId)) {
689                filtered.add(inflight);
690            }
691        }
692
693        size = filtered.size();
694        if (size == 0) {
695            return;
696        }
697
698        StringBuilder sb = new StringBuilder("There are " + size + " inflight exchanges:");
699        for (InflightRepository.InflightExchange inflight : filtered) {
700            sb.append("\n\tInflightExchange: [exchangeId=").append(inflight.getExchange().getExchangeId())
701                    .append(", fromRouteId=").append(inflight.getExchange().getFromRouteId())
702                    .append(", routeId=").append(inflight.getRouteId())
703                    .append(", nodeId=").append(inflight.getNodeId())
704                    .append(", elapsed=").append(inflight.getElapsed())
705                    .append(", duration=").append(inflight.getDuration())
706                    .append("]");
707        }
708
709        if (infoLevel) {
710            LOG.info(sb.toString());
711        } else {
712            LOG.debug(sb.toString());
713        }
714    }
715
716}