001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.Collections;
022import java.util.Comparator;
023import java.util.Date;
024import java.util.List;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.atomic.AtomicInteger;
028
029import org.apache.camel.Endpoint;
030import org.apache.camel.Exchange;
031import org.apache.camel.MessageHistory;
032import org.apache.camel.spi.InflightRepository;
033import org.apache.camel.support.ServiceSupport;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Default {@link org.apache.camel.spi.InflightRepository}.
039 *
040 * @version 
041 */
042public class DefaultInflightRepository extends ServiceSupport implements InflightRepository {
043
044    private static final Logger LOG = LoggerFactory.getLogger(DefaultInflightRepository.class);
045    private final ConcurrentMap<String, Exchange> inflight = new ConcurrentHashMap<String, Exchange>();
046    private final ConcurrentMap<String, AtomicInteger> routeCount = new ConcurrentHashMap<String, AtomicInteger>();
047
048    public void add(Exchange exchange) {
049        inflight.put(exchange.getExchangeId(), exchange);
050    }
051
052    public void remove(Exchange exchange) {
053        inflight.remove(exchange.getExchangeId());
054    }
055
056    public void add(Exchange exchange, String routeId) {
057        AtomicInteger existing = routeCount.putIfAbsent(routeId, new AtomicInteger(1));
058        if (existing != null) {
059            existing.incrementAndGet();
060        }
061    }
062
063    public void remove(Exchange exchange, String routeId) {
064        AtomicInteger existing = routeCount.get(routeId);
065        if (existing != null) {
066            existing.decrementAndGet();
067        }
068    }
069
070    public int size() {
071        return inflight.size();
072    }
073
074    @Deprecated
075    public int size(Endpoint endpoint) {
076        return 0;
077    }
078
079    @Override
080    public void removeRoute(String routeId) {
081        routeCount.remove(routeId);
082    }
083
084    @Override
085    public int size(String routeId) {
086        AtomicInteger existing = routeCount.get(routeId);
087        return existing != null ? existing.get() : 0;
088    }
089
090    @Override
091    public Collection<InflightExchange> browse() {
092        return browse(-1, false);
093    }
094
095    @Override
096    public Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration) {
097        List<InflightExchange> answer = new ArrayList<InflightExchange>();
098
099        List<Exchange> values = new ArrayList<Exchange>(inflight.values());
100        if (sortByLongestDuration) {
101            Collections.sort(values, new Comparator<Exchange>() {
102                @Override
103                public int compare(Exchange e1, Exchange e2) {
104                    long d1 = getExchangeDuration(e1);
105                    long d2 = getExchangeDuration(e2);
106                    return Long.compare(d1, d2);
107                }
108            });
109        } else {
110            // else sort by exchange id
111            Collections.sort(values, new Comparator<Exchange>() {
112                @Override
113                public int compare(Exchange e1, Exchange e2) {
114                    return e1.getExchangeId().compareTo(e2.getExchangeId());
115                }
116            });
117        }
118
119        for (Exchange exchange : values) {
120            answer.add(new InflightExchangeEntry(exchange));
121            if (limit > 0 && answer.size() >= limit) {
122                break;
123            }
124        }
125        return Collections.unmodifiableCollection(answer);
126    }
127
128    @Override
129    protected void doStart() throws Exception {
130    }
131
132    @Override
133    protected void doStop() throws Exception {
134        int count = size();
135        if (count > 0) {
136            LOG.warn("Shutting down while there are still " + count + " inflight exchanges.");
137        } else {
138            LOG.debug("Shutting down with no inflight exchanges.");
139        }
140        routeCount.clear();
141    }
142
143    private static long getExchangeDuration(Exchange exchange) {
144        long duration = 0;
145        Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class);
146        if (created != null) {
147            duration = System.currentTimeMillis() - created.getTime();
148        }
149        return duration;
150    }
151
152    private static final class InflightExchangeEntry implements InflightExchange {
153
154        private final Exchange exchange;
155
156        private InflightExchangeEntry(Exchange exchange) {
157            this.exchange = exchange;
158        }
159
160        @Override
161        public Exchange getExchange() {
162            return exchange;
163        }
164
165        @Override
166        public long getDuration() {
167            return DefaultInflightRepository.getExchangeDuration(exchange);
168        }
169
170        @Override
171        @SuppressWarnings("unchecked")
172        public long getElapsed() {
173            List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
174            if (list == null || list.isEmpty()) {
175                return 0;
176            }
177
178            // get latest entry
179            MessageHistory history = list.get(list.size() - 1);
180            if (history != null) {
181                return history.getElapsed();
182            } else {
183                return 0;
184            }
185        }
186
187        @Override
188        @SuppressWarnings("unchecked")
189        public String getNodeId() {
190            List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
191            if (list == null || list.isEmpty()) {
192                return null;
193            }
194
195            // get latest entry
196            MessageHistory history = list.get(list.size() - 1);
197            if (history != null) {
198                return history.getNode().getId();
199            } else {
200                return null;
201            }
202        }
203
204        @Override
205        @SuppressWarnings("unchecked")
206        public String getRouteId() {
207            List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
208            if (list == null || list.isEmpty()) {
209                return null;
210            }
211
212            // get latest entry
213            MessageHistory history = list.get(list.size() - 1);
214            if (history != null) {
215                return history.getRouteId();
216            } else {
217                return null;
218            }
219        }
220
221        @Override
222        public String toString() {
223            return "InflightExchangeEntry[exchangeId=" + exchange.getExchangeId() + "]";
224        }
225    }
226
227}