001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.direct;
018
019import java.util.HashMap;
020import java.util.Map;
021
022import org.apache.camel.Component;
023import org.apache.camel.Consumer;
024import org.apache.camel.Processor;
025import org.apache.camel.Producer;
026import org.apache.camel.impl.DefaultEndpoint;
027import org.apache.camel.spi.Metadata;
028import org.apache.camel.spi.UriEndpoint;
029import org.apache.camel.spi.UriParam;
030import org.apache.camel.spi.UriPath;
031import org.apache.camel.util.ObjectHelper;
032
033/**
034 * Represents a direct endpoint that synchronously invokes the consumer of the
035 * endpoint when a producer sends a message to it.
036 *
037 * @version 
038 */
039@UriEndpoint(scheme = "direct", syntax = "direct:name", consumerClass = DirectConsumer.class, label = "core,endpoint")
040public class DirectEndpoint extends DefaultEndpoint {
041
042    private volatile Map<String, DirectConsumer> consumers;
043
044    @UriPath(description = "Name of direct endpoint") @Metadata(required = "true")
045    private String name;
046
047    @UriParam(label = "producer")
048    private boolean block;
049    @UriParam(label = "producer", defaultValue = "30000")
050    private long timeout = 30000L;
051
052    public DirectEndpoint() {
053        this.consumers = new HashMap<String, DirectConsumer>();
054    }
055
056    public DirectEndpoint(String endpointUri, Component component) {
057        this(endpointUri, component, new HashMap<String, DirectConsumer>());
058    }
059
060    public DirectEndpoint(String uri, Component component, Map<String, DirectConsumer> consumers) {
061        super(uri, component);
062        this.consumers = consumers;
063    }
064
065    public Producer createProducer() throws Exception {
066        if (block) {
067            return new DirectBlockingProducer(this);
068        } else {
069            return new DirectProducer(this);
070        }
071    }
072
073    public Consumer createConsumer(Processor processor) throws Exception {
074        Consumer answer = new DirectConsumer(this, processor);
075        configureConsumer(answer);
076        return answer;
077    }
078
079    public boolean isSingleton() {
080        return true;
081    }
082
083    public void addConsumer(DirectConsumer consumer) {
084        String key = consumer.getEndpoint().getKey();
085        consumers.put(key, consumer);
086    }
087
088    public void removeConsumer(DirectConsumer consumer) {
089        String key = consumer.getEndpoint().getKey();
090        consumers.remove(key);
091    }
092
093    public boolean hasConsumer(DirectConsumer consumer) {
094        String key = consumer.getEndpoint().getKey();
095        return consumers.containsKey(key);
096    }
097
098    public DirectConsumer getConsumer() {
099        String key = getKey();
100        return consumers.get(key);
101    }
102
103    public boolean isBlock() {
104        return block;
105    }
106
107    /**
108     * If sending a message to a direct endpoint which has no active consumer,
109     * then we can tell the producer to block and wait for the consumer to become active.
110     */
111    public void setBlock(boolean block) {
112        this.block = block;
113    }
114
115    public long getTimeout() {
116        return timeout;
117    }
118
119    /**
120     * The timeout value to use if block is enabled.
121     *
122     * @param timeout the timeout value
123     */
124    public void setTimeout(long timeout) {
125        this.timeout = timeout;
126    }
127
128    protected String getKey() {
129        String uri = getEndpointUri();
130        if (uri.indexOf('?') != -1) {
131            return ObjectHelper.before(uri, "?");
132        } else {
133            return uri;
134        }
135    }
136}