001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor.idempotent;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.atomic.AtomicLong;
022
023import org.apache.camel.AsyncCallback;
024import org.apache.camel.AsyncProcessor;
025import org.apache.camel.Exchange;
026import org.apache.camel.Expression;
027import org.apache.camel.Navigate;
028import org.apache.camel.Processor;
029import org.apache.camel.spi.ExchangeIdempotentRepository;
030import org.apache.camel.spi.IdempotentRepository;
031import org.apache.camel.support.ServiceSupport;
032import org.apache.camel.util.AsyncProcessorConverterHelper;
033import org.apache.camel.util.AsyncProcessorHelper;
034import org.apache.camel.util.ServiceHelper;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * An implementation of the <a
040 * href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer</a> pattern.
041 * <p/>
042 * This implementation supports idempotent repositories implemented as
043 * <ul>
044 *     <li>IdempotentRepository</li>
045 *     <li>ExchangeIdempotentRepository</li>
046 * </ul>
047 *
048 * @see org.apache.camel.spi.IdempotentRepository
049 * @see org.apache.camel.spi.ExchangeIdempotentRepository
050 */
051public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor, Navigate<Processor> {
052    private static final Logger LOG = LoggerFactory.getLogger(IdempotentConsumer.class);
053    private final Expression messageIdExpression;
054    private final AsyncProcessor processor;
055    private final IdempotentRepository<String> idempotentRepository;
056    private final boolean eager;
057    private final boolean skipDuplicate;
058    private final boolean removeOnFailure;
059    private final AtomicLong duplicateMessageCount = new AtomicLong();
060
061    public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository,
062                              boolean eager, boolean skipDuplicate, boolean removeOnFailure, Processor processor) {
063        this.messageIdExpression = messageIdExpression;
064        this.idempotentRepository = idempotentRepository;
065        this.eager = eager;
066        this.skipDuplicate = skipDuplicate;
067        this.removeOnFailure = removeOnFailure;
068        this.processor = AsyncProcessorConverterHelper.convert(processor);
069    }
070
071    @Override
072    public String toString() {
073        return "IdempotentConsumer[" + messageIdExpression + " -> " + processor + "]";
074    }
075
076    public void process(Exchange exchange) throws Exception {
077        AsyncProcessorHelper.process(this, exchange);
078    }
079
080    public boolean process(Exchange exchange, AsyncCallback callback) {
081        final String messageId = messageIdExpression.evaluate(exchange, String.class);
082        if (messageId == null) {
083            exchange.setException(new NoMessageIdException(exchange, messageIdExpression));
084            callback.done(true);
085            return true;
086        }
087
088        boolean newKey;
089        if (eager) {
090            // add the key to the repository
091            if (idempotentRepository instanceof ExchangeIdempotentRepository) {
092                newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).add(exchange, messageId);
093            } else {
094                newKey = idempotentRepository.add(messageId);
095            }
096        } else {
097            // check if we already have the key
098            if (idempotentRepository instanceof ExchangeIdempotentRepository) {
099                newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).contains(exchange, messageId);
100            } else {
101                newKey = !idempotentRepository.contains(messageId);
102            }
103        }
104
105
106        if (!newKey) {
107            // mark the exchange as duplicate
108            exchange.setProperty(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE);
109
110            // we already have this key so its a duplicate message
111            onDuplicate(exchange, messageId);
112
113            if (skipDuplicate) {
114                // if we should skip duplicate then we are done
115                LOG.debug("Ignoring duplicate message with id: {} for exchange: {}", messageId, exchange);
116                callback.done(true);
117                return true;
118            }
119        }
120
121        // register our on completion callback
122        exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure));
123
124        // process the exchange
125        return processor.process(exchange, callback);
126    }
127
128    public List<Processor> next() {
129        if (!hasNext()) {
130            return null;
131        }
132        List<Processor> answer = new ArrayList<Processor>(1);
133        answer.add(processor);
134        return answer;
135    }
136
137    public boolean hasNext() {
138        return processor != null;
139    }
140
141    // Properties
142    // -------------------------------------------------------------------------
143    public Expression getMessageIdExpression() {
144        return messageIdExpression;
145    }
146
147    public IdempotentRepository<String> getIdempotentRepository() {
148        return idempotentRepository;
149    }
150
151    public Processor getProcessor() {
152        return processor;
153    }
154
155    public long getDuplicateMessageCount() {
156        return duplicateMessageCount.get();
157    }
158
159    // Implementation methods
160    // -------------------------------------------------------------------------
161
162    protected void doStart() throws Exception {
163        ServiceHelper.startServices(processor);
164    }
165
166    protected void doStop() throws Exception {
167        ServiceHelper.stopServices(processor);
168    }
169
170    /**
171     * Resets the duplicate message counter to <code>0L</code>.
172     */
173    public void resetDuplicateMessageCount() {
174        duplicateMessageCount.set(0L);
175    }
176
177    private void onDuplicate(Exchange exchange, String messageId) {
178        duplicateMessageCount.incrementAndGet();
179
180        onDuplicateMessage(exchange, messageId);
181    }
182
183    /**
184     * A strategy method to allow derived classes to overload the behaviour of
185     * processing a duplicate message
186     *
187     * @param exchange  the exchange
188     * @param messageId the message ID of this exchange
189     */
190    protected void onDuplicateMessage(Exchange exchange, String messageId) {
191        // noop
192    }
193
194}