001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.browse;
018
019import java.util.List;
020import java.util.concurrent.CopyOnWriteArrayList;
021
022import org.apache.camel.Component;
023import org.apache.camel.Consumer;
024import org.apache.camel.Exchange;
025import org.apache.camel.Processor;
026import org.apache.camel.Producer;
027import org.apache.camel.impl.DefaultEndpoint;
028import org.apache.camel.impl.DefaultProducer;
029import org.apache.camel.processor.loadbalancer.LoadBalancer;
030import org.apache.camel.processor.loadbalancer.LoadBalancerConsumer;
031import org.apache.camel.processor.loadbalancer.TopicLoadBalancer;
032import org.apache.camel.spi.BrowsableEndpoint;
033import org.apache.camel.spi.Metadata;
034import org.apache.camel.spi.UriEndpoint;
035import org.apache.camel.spi.UriPath;
036
037/**
038 * An endpoint which maintains a {@link List} of {@link Exchange} instances
039 * which can be useful for tooling, debugging and visualising routes.
040 *
041 * @version 
042 */
043@UriEndpoint(scheme = "browse", syntax = "browse:name", label = "core,monitoring")
044public class BrowseEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
045
046    @UriPath(description = "A name which can be any string to uniquely identify the endpoint") @Metadata(required = "true")
047    private String name;
048
049    private List<Exchange> exchanges;
050    private final LoadBalancer loadBalancer = new TopicLoadBalancer();
051
052    public BrowseEndpoint() {
053    }
054
055    public BrowseEndpoint(String uri, Component component) {
056        super(uri, component);
057    }
058
059    public boolean isSingleton() {
060        return true;
061    }
062
063    public List<Exchange> getExchanges() {
064        if (exchanges == null) {
065            exchanges = createExchangeList();
066        }
067        return exchanges;
068    }
069
070    public Producer createProducer() throws Exception {
071        return new DefaultProducer(this) {
072            public void process(Exchange exchange) throws Exception {
073                onExchange(exchange);
074            }
075        };
076    }
077
078    public Consumer createConsumer(Processor processor) throws Exception {
079        Consumer answer = new LoadBalancerConsumer(this, processor, loadBalancer);
080        configureConsumer(answer);
081        return answer;
082    }
083
084    public String getName() {
085        return name;
086    }
087
088    public void setName(String name) {
089        this.name = name;
090    }
091
092    protected List<Exchange> createExchangeList() {
093        return new CopyOnWriteArrayList<Exchange>();
094    }
095
096    /**
097     * Invoked on a message exchange being sent by a producer
098     *
099     * @param exchange the exchange
100     * @throws Exception is thrown if failed to process the exchange
101     */
102    protected void onExchange(Exchange exchange) throws Exception {
103        getExchanges().add(exchange);
104
105        // now fire any consumers
106        loadBalancer.process(exchange);
107    }
108
109    @Override
110    protected void doStart() throws Exception {
111        exchanges = createExchangeList();
112        super.doStart();
113    }
114
115    @Override
116    protected void doStop() throws Exception {
117        if (exchanges != null) {
118            exchanges.clear();
119            exchanges = null;
120        }
121        super.doStop();
122    }
123}