001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.management;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ThreadPoolExecutor;
028import javax.management.JMException;
029import javax.management.MalformedObjectNameException;
030import javax.management.ObjectName;
031
032import org.apache.camel.CamelContext;
033import org.apache.camel.CamelContextAware;
034import org.apache.camel.Channel;
035import org.apache.camel.Component;
036import org.apache.camel.Consumer;
037import org.apache.camel.Endpoint;
038import org.apache.camel.ErrorHandlerFactory;
039import org.apache.camel.ManagementStatisticsLevel;
040import org.apache.camel.Processor;
041import org.apache.camel.Producer;
042import org.apache.camel.Route;
043import org.apache.camel.Service;
044import org.apache.camel.StartupListener;
045import org.apache.camel.TimerListener;
046import org.apache.camel.VetoCamelContextStartException;
047import org.apache.camel.api.management.PerformanceCounter;
048import org.apache.camel.impl.ConsumerCache;
049import org.apache.camel.impl.DefaultCamelContext;
050import org.apache.camel.impl.DefaultEndpointRegistry;
051import org.apache.camel.impl.EventDrivenConsumerRoute;
052import org.apache.camel.impl.ProducerCache;
053import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
054import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager;
055import org.apache.camel.management.mbean.ManagedBacklogDebugger;
056import org.apache.camel.management.mbean.ManagedBacklogTracer;
057import org.apache.camel.management.mbean.ManagedCamelContext;
058import org.apache.camel.management.mbean.ManagedConsumerCache;
059import org.apache.camel.management.mbean.ManagedEndpoint;
060import org.apache.camel.management.mbean.ManagedEndpointRegistry;
061import org.apache.camel.management.mbean.ManagedInflightRepository;
062import org.apache.camel.management.mbean.ManagedProducerCache;
063import org.apache.camel.management.mbean.ManagedRestRegistry;
064import org.apache.camel.management.mbean.ManagedRoute;
065import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry;
066import org.apache.camel.management.mbean.ManagedService;
067import org.apache.camel.management.mbean.ManagedStreamCachingStrategy;
068import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy;
069import org.apache.camel.management.mbean.ManagedTracer;
070import org.apache.camel.management.mbean.ManagedTypeConverterRegistry;
071import org.apache.camel.model.AOPDefinition;
072import org.apache.camel.model.InterceptDefinition;
073import org.apache.camel.model.OnCompletionDefinition;
074import org.apache.camel.model.OnExceptionDefinition;
075import org.apache.camel.model.PolicyDefinition;
076import org.apache.camel.model.ProcessorDefinition;
077import org.apache.camel.model.ProcessorDefinitionHelper;
078import org.apache.camel.model.RouteDefinition;
079import org.apache.camel.processor.CamelInternalProcessor;
080import org.apache.camel.processor.interceptor.BacklogDebugger;
081import org.apache.camel.processor.interceptor.BacklogTracer;
082import org.apache.camel.processor.interceptor.Tracer;
083import org.apache.camel.spi.AsyncProcessorAwaitManager;
084import org.apache.camel.spi.EventNotifier;
085import org.apache.camel.spi.InflightRepository;
086import org.apache.camel.spi.LifecycleStrategy;
087import org.apache.camel.spi.ManagementAgent;
088import org.apache.camel.spi.ManagementAware;
089import org.apache.camel.spi.ManagementNameStrategy;
090import org.apache.camel.spi.ManagementObjectStrategy;
091import org.apache.camel.spi.ManagementStrategy;
092import org.apache.camel.spi.RestRegistry;
093import org.apache.camel.spi.RouteContext;
094import org.apache.camel.spi.RuntimeEndpointRegistry;
095import org.apache.camel.spi.StreamCachingStrategy;
096import org.apache.camel.spi.TypeConverterRegistry;
097import org.apache.camel.spi.UnitOfWork;
098import org.apache.camel.support.ServiceSupport;
099import org.apache.camel.support.TimerListenerManager;
100import org.apache.camel.util.KeyValueHolder;
101import org.apache.camel.util.ObjectHelper;
102import org.slf4j.Logger;
103import org.slf4j.LoggerFactory;
104
105/**
106 * Default JMX managed lifecycle strategy that registered objects using the configured
107 * {@link org.apache.camel.spi.ManagementStrategy}.
108 *
109 * @see org.apache.camel.spi.ManagementStrategy
110 * @version 
111 */
112@SuppressWarnings("deprecation")
113public class DefaultManagementLifecycleStrategy extends ServiceSupport implements LifecycleStrategy, CamelContextAware {
114
115    private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementLifecycleStrategy.class);
116    // the wrapped processors is for performance counters, which are in use for the created routes
117    // when a route is removed, we should remove the associated processors from this map
118    private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors =
119            new HashMap<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>>();
120    private final List<PreRegisterService> preServices = new ArrayList<PreRegisterService>();
121    private final TimerListenerManager timerListenerManager = new TimerListenerManager();
122    private final TimerListenerManagerStartupListener timerManagerStartupListener = new TimerListenerManagerStartupListener();
123    private volatile CamelContext camelContext;
124    private volatile ManagedCamelContext camelContextMBean;
125    private volatile boolean initialized;
126    private final Set<String> knowRouteIds = new HashSet<String>();
127    private final Map<Tracer, ManagedTracer> managedTracers = new HashMap<Tracer, ManagedTracer>();
128    private final Map<BacklogTracer, ManagedBacklogTracer> managedBacklogTracers = new HashMap<BacklogTracer, ManagedBacklogTracer>();
129    private final Map<BacklogDebugger, ManagedBacklogDebugger> managedBacklogDebuggers = new HashMap<BacklogDebugger, ManagedBacklogDebugger>();
130    private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<ThreadPoolExecutor, Object>();
131
132    public DefaultManagementLifecycleStrategy() {
133    }
134
135    public DefaultManagementLifecycleStrategy(CamelContext camelContext) {
136        this.camelContext = camelContext;
137    }
138
139    public CamelContext getCamelContext() {
140        return camelContext;
141    }
142
143    public void setCamelContext(CamelContext camelContext) {
144        this.camelContext = camelContext;
145    }
146
147    public void onContextStart(CamelContext context) throws VetoCamelContextStartException {
148        Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
149
150        String name = context.getName();
151        String managementName = context.getManagementNameStrategy().getName();
152
153        try {
154            boolean done = false;
155            while (!done) {
156                ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(managementName, name);
157                boolean exists = getManagementStrategy().isManaged(mc, on);
158                if (!exists) {
159                    done = true;
160                } else {
161                    // okay there exists already a CamelContext with this name, we can try to fix it by finding a free name
162                    boolean fixed = false;
163                    // if we use the default name strategy we can find a free name to use
164                    String newName = findFreeName(mc, context.getManagementNameStrategy(), name);
165                    if (newName != null) {
166                        // use this as the fixed name
167                        fixed = true;
168                        done = true;
169                        managementName = newName;
170                    }
171                    // we could not fix it so veto starting camel
172                    if (!fixed) {
173                        throw new VetoCamelContextStartException("CamelContext (" + context.getName() + ") with ObjectName[" + on + "] is already registered."
174                            + " Make sure to use unique names on CamelContext when using multiple CamelContexts in the same MBeanServer.", context);
175                    } else {
176                        LOG.warn("This CamelContext(" + context.getName() + ") will be registered using the name: " + managementName
177                            + " due to clash with an existing name already registered in MBeanServer.");
178                    }
179                }
180            }
181        } catch (VetoCamelContextStartException e) {
182            // rethrow veto
183            throw e;
184        } catch (Exception e) {
185            // must rethrow to allow CamelContext fallback to non JMX agent to allow
186            // Camel to continue to run
187            throw ObjectHelper.wrapRuntimeCamelException(e);
188        }
189
190        // set the name we are going to use
191        if (context instanceof DefaultCamelContext) {
192            ((DefaultCamelContext) context).setManagementName(managementName);
193        }
194
195        try {
196            manageObject(mc);
197        } catch (Exception e) {
198            // must rethrow to allow CamelContext fallback to non JMX agent to allow
199            // Camel to continue to run
200            throw ObjectHelper.wrapRuntimeCamelException(e);
201        }
202
203        // yes we made it and are initialized
204        initialized = true;
205
206        if (mc instanceof ManagedCamelContext) {
207            camelContextMBean = (ManagedCamelContext) mc;
208        }
209
210        // register any pre registered now that we are initialized
211        enlistPreRegisteredServices();
212    }
213
214    private String findFreeName(Object mc, ManagementNameStrategy strategy, String name) throws MalformedObjectNameException {
215        // we cannot find a free name for fixed named strategies
216        if (strategy.isFixedName()) {
217            return null;
218        }
219
220        // okay try to find a free name
221        boolean done = false;
222        String newName = null;
223        while (!done) {
224            // compute the next name
225            newName = strategy.getNextName();
226            ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(newName, name);
227            done = !getManagementStrategy().isManaged(mc, on);
228            if (LOG.isTraceEnabled()) {
229                LOG.trace("Using name: {} in ObjectName[{}] exists? {}", new Object[]{name, on, done});
230            }
231        }
232        return newName;
233    }
234
235    /**
236     * After {@link CamelContext} has been enlisted in JMX using {@link #onContextStart(org.apache.camel.CamelContext)}
237     * then we can enlist any pre registered services as well, as we had to wait for {@link CamelContext} to be
238     * enlisted first.
239     * <p/>
240     * A component/endpoint/service etc. can be pre registered when using dependency injection and annotations such as
241     * {@link org.apache.camel.Produce}, {@link org.apache.camel.EndpointInject}. Therefore we need to capture those
242     * registrations up front, and then afterwards enlist in JMX when {@link CamelContext} is being started.
243     */
244    private void enlistPreRegisteredServices() {
245        if (preServices.isEmpty()) {
246            return;
247        }
248
249        LOG.debug("Registering {} pre registered services", preServices.size());
250        for (PreRegisterService pre : preServices) {
251            if (pre.getComponent() != null) {
252                onComponentAdd(pre.getName(), pre.getComponent());
253            } else if (pre.getEndpoint() != null) {
254                onEndpointAdd(pre.getEndpoint());
255            } else if (pre.getService() != null) {
256                onServiceAdd(pre.getCamelContext(), pre.getService(), pre.getRoute());
257            }
258        }
259
260        // we are done so clear the list
261        preServices.clear();
262    }
263
264    public void onContextStop(CamelContext context) {
265        // the agent hasn't been started
266        if (!initialized) {
267            return;
268        }
269        try {
270            Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
271            // the context could have been removed already
272            if (getManagementStrategy().isManaged(mc, null)) {
273                unmanageObject(mc);
274            }
275        } catch (Exception e) {
276            LOG.warn("Could not unregister CamelContext MBean", e);
277        }
278
279        camelContextMBean = null;
280    }
281
282    public void onComponentAdd(String name, Component component) {
283        // always register components as there are only a few of those
284        if (!initialized) {
285            // pre register so we can register later when we have been initialized
286            PreRegisterService pre = new PreRegisterService();
287            pre.onComponentAdd(name, component);
288            preServices.add(pre);
289            return;
290        }
291        try {
292            Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
293            manageObject(mc);
294        } catch (Exception e) {
295            LOG.warn("Could not register Component MBean", e);
296        }
297    }
298
299    public void onComponentRemove(String name, Component component) {
300        // the agent hasn't been started
301        if (!initialized) {
302            return;
303        }
304        try {
305            Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
306            unmanageObject(mc);
307        } catch (Exception e) {
308            LOG.warn("Could not unregister Component MBean", e);
309        }
310    }
311
312    /**
313     * If the endpoint is an instance of ManagedResource then register it with the
314     * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and
315     * register that with the mbean server.
316     *
317     * @param endpoint the Endpoint attempted to be added
318     */
319    public void onEndpointAdd(Endpoint endpoint) {
320        if (!initialized) {
321            // pre register so we can register later when we have been initialized
322            PreRegisterService pre = new PreRegisterService();
323            pre.onEndpointAdd(endpoint);
324            preServices.add(pre);
325            return;
326        }
327
328        if (!shouldRegister(endpoint, null)) {
329            // avoid registering if not needed
330            return;
331        }
332
333        try {
334            Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
335            if (me == null) {
336                // endpoint should not be managed
337                return;
338            }
339            manageObject(me);
340        } catch (Exception e) {
341            LOG.warn("Could not register Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
342        }
343    }
344
345    public void onEndpointRemove(Endpoint endpoint) {
346        // the agent hasn't been started
347        if (!initialized) {
348            return;
349        }
350
351        try {
352            Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
353            unmanageObject(me);
354        } catch (Exception e) {
355            LOG.warn("Could not unregister Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
356        }
357    }
358
359    public void onServiceAdd(CamelContext context, Service service, Route route) {
360        if (!initialized) {
361            // pre register so we can register later when we have been initialized
362            PreRegisterService pre = new PreRegisterService();
363            pre.onServiceAdd(context, service, route);
364            preServices.add(pre);
365            return;
366        }
367
368        // services can by any kind of misc type but also processors
369        // so we have special logic when its a processor
370
371        if (!shouldRegister(service, route)) {
372            // avoid registering if not needed
373            return;
374        }
375
376        Object managedObject = getManagedObjectForService(context, service, route);
377        if (managedObject == null) {
378            // service should not be managed
379            return;
380        }
381
382        // skip already managed services, for example if a route has been restarted
383        if (getManagementStrategy().isManaged(managedObject, null)) {
384            LOG.trace("The service is already managed: {}", service);
385            return;
386        }
387
388        try {
389            manageObject(managedObject);
390        } catch (Exception e) {
391            LOG.warn("Could not register service: " + service + " as Service MBean.", e);
392        }
393    }
394
395    public void onServiceRemove(CamelContext context, Service service, Route route) {
396        // the agent hasn't been started
397        if (!initialized) {
398            return;
399        }
400
401        Object managedObject = getManagedObjectForService(context, service, route);
402        if (managedObject != null) {
403            try {
404                unmanageObject(managedObject);
405            } catch (Exception e) {
406                LOG.warn("Could not unregister service: " + service + " as Service MBean.", e);
407            }
408        }
409    }
410
411    @SuppressWarnings("unchecked")
412    private Object getManagedObjectForService(CamelContext context, Service service, Route route) {
413        // skip channel, UoW and dont double wrap instrumentation
414        if (service instanceof Channel || service instanceof UnitOfWork || service instanceof InstrumentationProcessor) {
415            return null;
416        }
417
418        Object answer = null;
419
420        if (service instanceof ManagementAware) {
421            return ((ManagementAware<Service>) service).getManagedObject(service);
422        } else if (service instanceof Tracer) {
423            // special for tracer
424            Tracer tracer = (Tracer) service;
425            ManagedTracer mt = managedTracers.get(tracer);
426            if (mt == null) {
427                mt = new ManagedTracer(context, tracer);
428                mt.init(getManagementStrategy());
429                managedTracers.put(tracer, mt);
430            }
431            return mt;
432        } else if (service instanceof BacklogTracer) {
433            // special for backlog tracer
434            BacklogTracer backlogTracer = (BacklogTracer) service;
435            ManagedBacklogTracer mt = managedBacklogTracers.get(backlogTracer);
436            if (mt == null) {
437                mt = new ManagedBacklogTracer(context, backlogTracer);
438                mt.init(getManagementStrategy());
439                managedBacklogTracers.put(backlogTracer, mt);
440            }
441            return mt;
442        } else if (service instanceof BacklogDebugger) {
443            // special for backlog debugger
444            BacklogDebugger backlogDebugger = (BacklogDebugger) service;
445            ManagedBacklogDebugger md = managedBacklogDebuggers.get(backlogDebugger);
446            if (md == null) {
447                md = new ManagedBacklogDebugger(context, backlogDebugger);
448                md.init(getManagementStrategy());
449                managedBacklogDebuggers.put(backlogDebugger, md);
450            }
451            return md;
452        } else if (service instanceof EventNotifier) {
453            answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service);
454        } else if (service instanceof Producer) {
455            answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service);
456        } else if (service instanceof Consumer) {
457            answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service);
458        } else if (service instanceof Processor) {
459            // special for processors as we need to do some extra work
460            return getManagedObjectForProcessor(context, (Processor) service, route);
461        } else if (service instanceof ThrottlingInflightRoutePolicy) {
462            answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service);
463        } else if (service instanceof ConsumerCache) {
464            answer = new ManagedConsumerCache(context, (ConsumerCache) service);
465        } else if (service instanceof ProducerCache) {
466            answer = new ManagedProducerCache(context, (ProducerCache) service);
467        } else if (service instanceof DefaultEndpointRegistry) {
468            answer = new ManagedEndpointRegistry(context, (DefaultEndpointRegistry) service);
469        } else if (service instanceof TypeConverterRegistry) {
470            answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service);
471        } else if (service instanceof RestRegistry) {
472            answer = new ManagedRestRegistry(context, (RestRegistry) service);
473        } else if (service instanceof InflightRepository) {
474            answer = new ManagedInflightRepository(context, (InflightRepository) service);
475        } else if (service instanceof AsyncProcessorAwaitManager) {
476            answer = new ManagedAsyncProcessorAwaitManager(context, (AsyncProcessorAwaitManager) service);
477        } else if (service instanceof RuntimeEndpointRegistry) {
478            answer = new ManagedRuntimeEndpointRegistry(context, (RuntimeEndpointRegistry) service);
479        } else if (service instanceof StreamCachingStrategy) {
480            answer = new ManagedStreamCachingStrategy(context, (StreamCachingStrategy) service);
481        } else if (service != null) {
482            // fallback as generic service
483            answer = getManagementObjectStrategy().getManagedObjectForService(context, service);
484        }
485
486        if (answer != null && answer instanceof ManagedService) {
487            ManagedService ms = (ManagedService) answer;
488            ms.setRoute(route);
489            ms.init(getManagementStrategy());
490        }
491
492        return answer;
493    }
494
495    private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) {
496        // a bit of magic here as the processors we want to manage have already been registered
497        // in the wrapped processors map when Camel have instrumented the route on route initialization
498        // so the idea is now to only manage the processors from the map
499        KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor);
500        if (holder == null) {
501            // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc.
502            return null;
503        }
504
505        // get the managed object as it can be a specialized type such as a Delayer/Throttler etc.
506        Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route);
507        // only manage if we have a name for it as otherwise we do not want to manage it anyway
508        if (managedObject != null) {
509            // is it a performance counter then we need to set our counter
510            if (managedObject instanceof PerformanceCounter) {
511                InstrumentationProcessor counter = holder.getValue();
512                if (counter != null) {
513                    // change counter to us
514                    counter.setCounter(managedObject);
515                }
516            }
517        }
518
519        return managedObject;
520    }
521
522    public void onRoutesAdd(Collection<Route> routes) {
523        for (Route route : routes) {
524
525            // if we are starting CamelContext or either of the two options has been
526            // enabled, then enlist the route as a known route
527            if (getCamelContext().getStatus().isStarting()
528                || getManagementStrategy().getManagementAgent().getRegisterAlways()
529                || getManagementStrategy().getManagementAgent().getRegisterNewRoutes()) {
530                // register as known route id
531                knowRouteIds.add(route.getId());
532            }
533
534            if (!shouldRegister(route, route)) {
535                // avoid registering if not needed, skip to next route
536                continue;
537            }
538
539            Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
540
541            // skip already managed routes, for example if the route has been restarted
542            if (getManagementStrategy().isManaged(mr, null)) {
543                LOG.trace("The route is already managed: {}", route);
544                continue;
545            }
546
547            // get the wrapped instrumentation processor from this route
548            // and set me as the counter
549            if (route instanceof EventDrivenConsumerRoute) {
550                EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route;
551                Processor processor = edcr.getProcessor();
552                if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) {
553                    CamelInternalProcessor internal = (CamelInternalProcessor) processor;
554                    ManagedRoute routeMBean = (ManagedRoute) mr;
555
556                    CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class);
557                    if (task != null) {
558                        // we need to wrap the counter with the camel context so we get stats updated on the context as well
559                        if (camelContextMBean != null) {
560                            CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean);
561                            task.setCounter(wrapper);
562                        } else {
563                            task.setCounter(routeMBean);
564                        }
565                    }
566                }
567            }
568
569            try {
570                manageObject(mr);
571            } catch (JMException e) {
572                LOG.warn("Could not register Route MBean", e);
573            } catch (Exception e) {
574                LOG.warn("Could not create Route MBean", e);
575            }
576        }
577    }
578
579    public void onRoutesRemove(Collection<Route> routes) {
580        // the agent hasn't been started
581        if (!initialized) {
582            return;
583        }
584
585        for (Route route : routes) {
586            Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
587
588            // skip unmanaged routes
589            if (!getManagementStrategy().isManaged(mr, null)) {
590                LOG.trace("The route is not managed: {}", route);
591                continue;
592            }
593
594            try {
595                unmanageObject(mr);
596            } catch (Exception e) {
597                LOG.warn("Could not unregister Route MBean", e);
598            }
599
600            // remove from known routes ids, as the route has been removed
601            knowRouteIds.remove(route.getId());
602        }
603
604        // after the routes has been removed, we should clear the wrapped processors as we no longer need them
605        // as they were just a provisional map used during creation of routes
606        removeWrappedProcessorsForRoutes(routes);
607    }
608
609    public void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) {
610        if (!shouldRegister(errorHandler, null)) {
611            // avoid registering if not needed
612            return;
613        }
614
615        Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
616
617        // skip already managed services, for example if a route has been restarted
618        if (getManagementStrategy().isManaged(me, null)) {
619            LOG.trace("The error handler builder is already managed: {}", errorHandlerBuilder);
620            return;
621        }
622
623        try {
624            manageObject(me);
625        } catch (Exception e) {
626            LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e);
627        }
628    }
629
630    public void onErrorHandlerRemove(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) {
631        if (!initialized) {
632            return;
633        }
634
635        Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
636        if (me != null) {
637            try {
638                unmanageObject(me);
639            } catch (Exception e) {
640                LOG.warn("Could not unregister error handler: " + me + " as ErrorHandler MBean.", e);
641            }
642        }
643    }
644
645    public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool, String id,
646                                String sourceId, String routeId, String threadPoolProfileId) {
647
648        if (!shouldRegister(threadPool, null)) {
649            // avoid registering if not needed
650            return;
651        }
652
653        Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
654
655        // skip already managed services, for example if a route has been restarted
656        if (getManagementStrategy().isManaged(mtp, null)) {
657            LOG.trace("The thread pool is already managed: {}", threadPool);
658            return;
659        }
660
661        try {
662            manageObject(mtp);
663            // store a reference so we can unmanage from JMX when the thread pool is removed
664            // we need to keep track here, as we cannot re-construct the thread pool ObjectName when removing the thread pool
665            managedThreadPools.put(threadPool, mtp);
666        } catch (Exception e) {
667            LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e);
668        }
669    }
670
671    public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool) {
672        if (!initialized) {
673            return;
674        }
675
676        // lookup the thread pool and remove it from JMX
677        Object mtp = managedThreadPools.remove(threadPool);
678        if (mtp != null) {
679            // skip unmanaged routes
680            if (!getManagementStrategy().isManaged(mtp, null)) {
681                LOG.trace("The thread pool is not managed: {}", threadPool);
682                return;
683            }
684
685            try {
686                unmanageObject(mtp);
687            } catch (Exception e) {
688                LOG.warn("Could not unregister ThreadPool MBean", e);
689            }
690        }
691    }
692
693    public void onRouteContextCreate(RouteContext routeContext) {
694        if (!initialized) {
695            return;
696        }
697
698        // Create a map (ProcessorType -> PerformanceCounter)
699        // to be passed to InstrumentationInterceptStrategy.
700        Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters =
701                new HashMap<ProcessorDefinition<?>, PerformanceCounter>();
702
703        // Each processor in a route will have its own performance counter.
704        // These performance counter will be embedded to InstrumentationProcessor
705        // and wrap the appropriate processor by InstrumentationInterceptStrategy.
706        RouteDefinition route = routeContext.getRoute();
707
708        // register performance counters for all processors and its children
709        for (ProcessorDefinition<?> processor : route.getOutputs()) {
710            registerPerformanceCounters(routeContext, processor, registeredCounters);
711        }
712
713        // set this managed intercept strategy that executes the JMX instrumentation for performance metrics
714        // so our registered counters can be used for fine grained performance instrumentation
715        routeContext.setManagedInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors));
716    }
717
718    /**
719     * Removes the wrapped processors for the given routes, as they are no longer in use.
720     * <p/>
721     * This is needed to avoid accumulating memory, if a lot of routes is being added and removed.
722     *
723     * @param routes the routes
724     */
725    private void removeWrappedProcessorsForRoutes(Collection<Route> routes) {
726        // loop the routes, and remove the route associated wrapped processors, as they are no longer in use
727        for (Route route : routes) {
728            String id = route.getId();
729
730            Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator();
731            while (it.hasNext()) {
732                KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next();
733                RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey());
734                if (def != null && id.equals(def.getId())) {
735                    it.remove();
736                }
737            }
738        }
739        
740    }
741
742    private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor,
743                                             Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) {
744
745        // traverse children if any exists
746        List<ProcessorDefinition<?>> children = processor.getOutputs();
747        for (ProcessorDefinition<?> child : children) {
748            registerPerformanceCounters(routeContext, child, registeredCounters);
749        }
750
751        // skip processors that should not be registered
752        if (!registerProcessor(processor)) {
753            return;
754        }
755
756        // okay this is a processor we would like to manage so create the
757        // a delegate performance counter that acts as the placeholder in the interceptor
758        // that then delegates to the real mbean which we register later in the onServiceAdd method
759        DelegatePerformanceCounter pc = new DelegatePerformanceCounter();
760        // set statistics enabled depending on the option
761        boolean enabled = camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.All;
762        pc.setStatisticsEnabled(enabled);
763
764        // and add it as a a registered counter that will be used lazy when Camel
765        // does the instrumentation of the route and adds the InstrumentationProcessor
766        // that does the actual performance metrics gatherings at runtime
767        registeredCounters.put(processor, pc);
768    }
769
770    /**
771     * Should the given processor be registered.
772     */
773    protected boolean registerProcessor(ProcessorDefinition<?> processor) {
774        // skip on exception
775        if (processor instanceof OnExceptionDefinition) {
776            return false;
777        }
778        // skip on completion
779        if (processor instanceof OnCompletionDefinition) {
780            return false;
781        }
782        // skip intercept
783        if (processor instanceof InterceptDefinition) {
784            return false;
785        }
786        // skip aop
787        if (processor instanceof AOPDefinition) {
788            return false;
789        }
790        // skip policy
791        if (processor instanceof PolicyDefinition) {
792            return false;
793        }
794
795        // only if custom id assigned
796        if (getManagementStrategy().isOnlyManageProcessorWithCustomId()) {
797            return processor.hasCustomIdAssigned();
798        }
799
800        // use customer filter
801        return getManagementStrategy().manageProcessor(processor);
802    }
803
804    private ManagementStrategy getManagementStrategy() {
805        ObjectHelper.notNull(camelContext, "CamelContext");
806        return camelContext.getManagementStrategy();
807    }
808
809    private ManagementObjectStrategy getManagementObjectStrategy() {
810        ObjectHelper.notNull(camelContext, "CamelContext");
811        return camelContext.getManagementStrategy().getManagementObjectStrategy();
812    }
813
814    /**
815     * Strategy for managing the object
816     *
817     * @param me the managed object
818     * @throws Exception is thrown if error registering the object for management
819     */
820    protected void manageObject(Object me) throws Exception {
821        getManagementStrategy().manageObject(me);
822        if (me instanceof TimerListener) {
823            TimerListener timer = (TimerListener) me;
824            timerListenerManager.addTimerListener(timer);
825        }
826    }
827
828    /**
829     * Un-manages the object.
830     *
831     * @param me the managed object
832     * @throws Exception is thrown if error unregistering the managed object
833     */
834    protected void unmanageObject(Object me) throws Exception {
835        if (me instanceof TimerListener) {
836            TimerListener timer = (TimerListener) me;
837            timerListenerManager.removeTimerListener(timer);
838        }
839        getManagementStrategy().unmanageObject(me);
840    }
841
842    /**
843     * Whether or not to register the mbean.
844     * <p/>
845     * The {@link ManagementAgent} has options which controls when to register.
846     * This allows us to only register mbeans accordingly. For example by default any
847     * dynamic endpoints is not registered. This avoids to register excessive mbeans, which
848     * most often is not desired.
849     *
850     * @param service the object to register
851     * @param route   an optional route the mbean is associated with, can be <tt>null</tt>
852     * @return <tt>true</tt> to register, <tt>false</tt> to skip registering
853     */
854    protected boolean shouldRegister(Object service, Route route) {
855        // the agent hasn't been started
856        if (!initialized) {
857            return false;
858        }
859
860        LOG.trace("Checking whether to register {} from route: {}", service, route);
861
862        ManagementAgent agent = getManagementStrategy().getManagementAgent();
863        if (agent == null) {
864            // do not register if no agent
865            return false;
866        }
867
868        // always register if we are starting CamelContext
869        if (getCamelContext().getStatus().isStarting()) {
870            return true;
871        }
872
873        // always register if we are setting up routes
874        if (getCamelContext().isSetupRoutes()) {
875            return true;
876        }
877
878        // register if always is enabled
879        if (agent.getRegisterAlways()) {
880            return true;
881        }
882
883        // is it a known route then always accept
884        if (route != null && knowRouteIds.contains(route.getId())) {
885            return true;
886        }
887
888        // only register if we are starting a new route, and current thread is in starting routes mode
889        if (agent.getRegisterNewRoutes()) {
890            // no specific route, then fallback to see if this thread is starting routes
891            // which is kept as state on the camel context
892            return getCamelContext().isStartingRoutes();
893        }
894
895        return false;
896    }
897
898    @Override
899    protected void doStart() throws Exception {
900        ObjectHelper.notNull(camelContext, "CamelContext");
901
902        // defer starting the timer manager until CamelContext has been fully started
903        camelContext.addStartupListener(timerManagerStartupListener);
904    }
905
906    private final class TimerListenerManagerStartupListener implements StartupListener {
907
908        @Override
909        public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
910            // we are disabled either if configured explicit, or if level is off
911            boolean disabled = !camelContext.getManagementStrategy().isLoadStatisticsEnabled()
912                    || camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.Off;
913
914            LOG.debug("Load performance statistics {}", disabled ? "disabled" : "enabled");
915            if (!disabled) {
916                // must use 1 sec interval as the load statistics is based on 1 sec calculations
917                timerListenerManager.setInterval(1000);
918                // we have to defer enlisting timer lister manager as a service until CamelContext has been started
919                getCamelContext().addService(timerListenerManager);
920            }
921        }
922    }
923
924    @Override
925    protected void doStop() throws Exception {
926        initialized = false;
927        knowRouteIds.clear();
928        preServices.clear();
929        wrappedProcessors.clear();
930        managedTracers.clear();
931        managedBacklogTracers.clear();
932        managedBacklogDebuggers.clear();
933        managedThreadPools.clear();
934    }
935
936    /**
937     * Class which holds any pre registration details.
938     *
939     * @see org.apache.camel.management.DefaultManagementLifecycleStrategy#enlistPreRegisteredServices()
940     */
941    private static final class PreRegisterService {
942
943        private String name;
944        private Component component;
945        private Endpoint endpoint;
946        private CamelContext camelContext;
947        private Service service;
948        private Route route;
949
950        public void onComponentAdd(String name, Component component) {
951            this.name = name;
952            this.component = component;
953        }
954
955        public void onEndpointAdd(Endpoint endpoint) {
956            this.endpoint = endpoint;
957        }
958
959        public void onServiceAdd(CamelContext camelContext, Service service, Route route) {
960            this.camelContext = camelContext;
961            this.service = service;
962            this.route = route;
963        }
964
965        public String getName() {
966            return name;
967        }
968
969        public Component getComponent() {
970            return component;
971        }
972
973        public Endpoint getEndpoint() {
974            return endpoint;
975        }
976
977        public CamelContext getCamelContext() {
978            return camelContext;
979        }
980
981        public Service getService() {
982            return service;
983        }
984
985        public Route getRoute() {
986            return route;
987        }
988    }
989
990}
991