001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.directvm;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.Map;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.ConcurrentMap;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.apache.camel.Endpoint;
027import org.apache.camel.impl.UriEndpointComponent;
028import org.apache.camel.spi.Metadata;
029
030/**
031 * The <a href="http://camel.apache.org/direct-vm.html">Direct VM Component</a> manages {@link DirectVmEndpoint} and holds the list of named direct-vm endpoints.
032 */
033public class DirectVmComponent extends UriEndpointComponent {
034
035    private static final AtomicInteger START_COUNTER = new AtomicInteger();
036
037    // must keep a map of consumers on the component to ensure endpoints can lookup old consumers
038    // later in case the DirectVmEndpoint was re-created due the old was evicted from the endpoints LRUCache
039    // on DefaultCamelContext
040    private static final ConcurrentMap<String, DirectVmConsumer> CONSUMERS = new ConcurrentHashMap<String, DirectVmConsumer>();
041    private boolean block;
042    @Metadata(defaultValue = "30000")
043    private long timeout = 30000L;
044
045    public DirectVmComponent() {
046        super(DirectVmEndpoint.class);
047    }
048
049    /**
050     * Gets all the consumer endpoints.
051     *
052     * @return consumer endpoints
053     */
054    public static Collection<Endpoint> getConsumerEndpoints() {
055        Collection<Endpoint> endpoints = new ArrayList<Endpoint>(CONSUMERS.size());
056        for (DirectVmConsumer consumer : CONSUMERS.values()) {
057            endpoints.add(consumer.getEndpoint());
058        }
059        return endpoints;
060    }
061
062    @Override
063    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
064        DirectVmEndpoint answer = new DirectVmEndpoint(uri, this);
065        answer.setBlock(block);
066        answer.setTimeout(timeout);
067        answer.configureProperties(parameters);
068        return answer;
069    }
070
071    public DirectVmConsumer getConsumer(DirectVmEndpoint endpoint) {
072        String key = getConsumerKey(endpoint.getEndpointUri());
073        return CONSUMERS.get(key);
074    }
075
076    public void addConsumer(DirectVmEndpoint endpoint, DirectVmConsumer consumer) {
077        String key = getConsumerKey(endpoint.getEndpointUri());
078        DirectVmConsumer existing = CONSUMERS.putIfAbsent(key, consumer);
079        if (existing != null) {
080            String contextId = existing.getEndpoint().getCamelContext().getName();
081            throw new IllegalStateException("A consumer " + existing + " already exists from CamelContext: " + contextId + ". Multiple consumers not supported");
082        }
083    }
084
085    public void removeConsumer(DirectVmEndpoint endpoint, DirectVmConsumer consumer) {
086        String key = getConsumerKey(endpoint.getEndpointUri());
087        CONSUMERS.remove(key);
088    }
089
090    private static String getConsumerKey(String uri) {
091        if (uri.contains("?")) {
092            // strip parameters
093            uri = uri.substring(0, uri.indexOf('?'));
094        }
095        return uri;
096    }
097
098    @Override
099    protected void doStart() throws Exception {
100        super.doStart();
101        START_COUNTER.incrementAndGet();
102    }
103
104    @Override
105    protected void doStop() throws Exception {
106        if (START_COUNTER.decrementAndGet() <= 0) {
107            // clear queues when no more direct-vm components in use
108            CONSUMERS.clear();
109        }
110        super.doStop();
111    }
112
113    public boolean isBlock() {
114        return block;
115    }
116
117    /**
118     * If sending a message to a direct endpoint which has no active consumer,
119     * then we can tell the producer to block and wait for the consumer to become active.
120     */
121    public void setBlock(boolean block) {
122        this.block = block;
123    }
124
125    public long getTimeout() {
126        return timeout;
127    }
128
129    /**
130     * The timeout value to use if block is enabled.
131     */
132    public void setTimeout(long timeout) {
133        this.timeout = timeout;
134    }
135}