001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.ArrayList;
020 import java.util.Collection;
021 import java.util.List;
022 import java.util.concurrent.Callable;
023 import java.util.concurrent.CompletionService;
024 import java.util.concurrent.ExecutionException;
025 import java.util.concurrent.ExecutorCompletionService;
026 import java.util.concurrent.ExecutorService;
027 import java.util.concurrent.Future;
028 import java.util.concurrent.TimeUnit;
029 import java.util.concurrent.atomic.AtomicBoolean;
030 import java.util.concurrent.atomic.AtomicInteger;
031
032 import org.apache.camel.CamelExchangeException;
033 import org.apache.camel.Exchange;
034 import org.apache.camel.Navigate;
035 import org.apache.camel.Processor;
036 import org.apache.camel.Producer;
037 import org.apache.camel.builder.ErrorHandlerBuilder;
038 import org.apache.camel.impl.ServiceSupport;
039 import org.apache.camel.processor.aggregate.AggregationStrategy;
040 import org.apache.camel.spi.RouteContext;
041 import org.apache.camel.spi.TracedRouteNodes;
042 import org.apache.camel.util.ExchangeHelper;
043 import org.apache.camel.util.ObjectHelper;
044 import org.apache.camel.util.ServiceHelper;
045 import org.apache.camel.util.concurrent.AtomicExchange;
046 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
047 import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
048 import org.apache.commons.logging.Log;
049 import org.apache.commons.logging.LogFactory;
050
051 import static org.apache.camel.util.ObjectHelper.notNull;
052
053 /**
054 * Implements the Multicast pattern to send a message exchange to a number of
055 * endpoints, each endpoint receiving a copy of the message exchange.
056 *
057 * @see Pipeline
058 * @version $Revision: 17823 $
059 */
060 public class MulticastProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
061
062 private static final int DEFAULT_THREADPOOL_SIZE = 10;
063 private static final transient Log LOG = LogFactory.getLog(MulticastProcessor.class);
064
065 /**
066 * Class that represent each step in the multicast route to do
067 */
068 static class ProcessorExchangePair {
069 private final Processor processor;
070 private final Processor prepared;
071 private final Exchange exchange;
072
073 /**
074 * Private constructor as you must use the static creator
075 * {@link org.apache.camel.processor.MulticastProcessor#createProcessorExchangePair(org.apache.camel.Processor,
076 * org.apache.camel.Exchange)} which prepares the processor before its ready to be used.
077 *
078 * @param processor the original processor
079 * @param prepared the prepared processor
080 * @param exchange the exchange
081 */
082 private ProcessorExchangePair(Processor processor, Processor prepared, Exchange exchange) {
083 this.processor = processor;
084 this.prepared = prepared;
085 this.exchange = exchange;
086 }
087
088 public Processor getProcessor() {
089 return processor;
090 }
091
092 public Processor getPrepared() {
093 return prepared;
094 }
095
096 public Exchange getExchange() {
097 return exchange;
098 }
099 }
100
101 private Collection<Processor> processors;
102 private final AggregationStrategy aggregationStrategy;
103 private final boolean isParallelProcessing;
104 private final boolean streaming;
105 private final boolean stopOnException;
106 private ExecutorService executorService;
107
108 public MulticastProcessor(Collection<Processor> processors) {
109 this(processors, null);
110 }
111
112 public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
113 this(processors, aggregationStrategy, false, null, false, false);
114 }
115
116 public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy,
117 boolean parallelProcessing, ExecutorService executorService, boolean streaming, boolean stopOnException) {
118 notNull(processors, "processors");
119 this.processors = processors;
120 this.aggregationStrategy = aggregationStrategy;
121 this.isParallelProcessing = parallelProcessing;
122 this.executorService = executorService;
123 this.streaming = streaming;
124 this.stopOnException = stopOnException;
125
126 if (isParallelProcessing()) {
127 if (this.executorService == null) {
128 // setup default executor as parallel processing requires an executor
129 this.executorService = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "Multicast", true);
130 }
131 }
132 }
133
134 @Override
135 public String toString() {
136 return "Multicast[" + getProcessors() + "]";
137 }
138
139 public String getTraceLabel() {
140 return "multicast";
141 }
142
143 public void process(Exchange exchange) throws Exception {
144 final AtomicExchange result = new AtomicExchange();
145 final Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
146
147 // multicast uses fine grained error handling on the output processors
148 // so use try .. catch to cater for this
149 try {
150 if (isParallelProcessing()) {
151 doProcessParallel(result, pairs, isStreaming());
152 } else {
153 doProcessSequential(result, pairs);
154 }
155
156 if (result.get() != null) {
157 ExchangeHelper.copyResults(exchange, result.get());
158 }
159 } catch (Exception e) {
160 // multicast uses error handling on its output processors and they have tried to redeliver
161 // so we shall signal back to the other error handlers that we are exhausted and they should not
162 // also try to redeliver as we will then do that twice
163 exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
164 exchange.setException(e);
165 }
166 }
167
168 protected void doProcessParallel(final AtomicExchange result, Iterable<ProcessorExchangePair> pairs, boolean streaming) throws InterruptedException, ExecutionException {
169 final CompletionService<Exchange> completion;
170 final AtomicBoolean running = new AtomicBoolean(true);
171
172 if (streaming) {
173 // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence)
174 completion = new ExecutorCompletionService<Exchange>(executorService);
175 } else {
176 // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence)
177 completion = new SubmitOrderedCompletionService<Exchange>(executorService);
178 }
179
180 final AtomicInteger total = new AtomicInteger(0);
181
182 for (ProcessorExchangePair pair : pairs) {
183 final Processor processor = pair.getProcessor();
184 final Processor prepared = pair.getPrepared();
185 final Exchange subExchange = pair.getExchange();
186 updateNewExchange(subExchange, total.intValue(), pairs);
187
188 completion.submit(new Callable<Exchange>() {
189 public Exchange call() throws Exception {
190 if (!running.get()) {
191 // do not start processing the task if we are not running
192 return subExchange;
193 }
194
195 doProcess(processor, prepared, subExchange);
196
197 // should we stop in case of an exception occurred during processing?
198 if (stopOnException && subExchange.getException() != null) {
199 // signal to stop running
200 running.set(false);
201 throw new CamelExchangeException("Parallel processing failed for number " + total.intValue(), subExchange, subExchange.getException());
202 }
203
204 if (LOG.isTraceEnabled()) {
205 LOG.trace("Parallel processing complete for exchange: " + subExchange);
206 }
207 return subExchange;
208 }
209 });
210
211 total.incrementAndGet();
212 }
213
214 for (int i = 0; i < total.intValue(); i++) {
215 Future<Exchange> future = completion.take();
216 Exchange subExchange = future.get();
217 if (aggregationStrategy != null) {
218 doAggregate(result, subExchange);
219 }
220 }
221
222 if (LOG.isDebugEnabled()) {
223 LOG.debug("Done parallel processing " + total + " exchanges");
224 }
225 }
226
227 protected void doProcessSequential(AtomicExchange result, Iterable<ProcessorExchangePair> pairs) throws Exception {
228 int total = 0;
229
230 for (ProcessorExchangePair pair : pairs) {
231 Processor processor = pair.getProcessor();
232 Processor prepared = pair.getPrepared();
233 Exchange subExchange = pair.getExchange();
234 updateNewExchange(subExchange, total, pairs);
235
236 doProcess(processor, prepared, subExchange);
237
238 // should we stop in case of an exception occurred during processing?
239 if (stopOnException && subExchange.getException() != null) {
240 throw new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException());
241 }
242
243 if (LOG.isTraceEnabled()) {
244 LOG.trace("Sequential processing complete for number " + total + " exchange: " + subExchange);
245 }
246
247 if (aggregationStrategy != null) {
248 doAggregate(result, subExchange);
249 }
250 total++;
251 }
252
253 if (LOG.isDebugEnabled()) {
254 LOG.debug("Done sequential processing " + total + " exchanges");
255 }
256 }
257
258 private void doProcess(Processor processor, Processor prepared, Exchange exchange) {
259 TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
260
261 try {
262 // prepare tracing starting from a new block
263 if (traced != null) {
264 traced.pushBlock();
265 }
266
267 // let the prepared process it
268 prepared.process(exchange);
269 } catch (Exception e) {
270 exchange.setException(e);
271 } finally {
272 // pop the block so by next round we have the same staring point and thus the tracing looks accurate
273 if (traced != null) {
274 traced.popBlock();
275 }
276 }
277 }
278
279 /**
280 * Aggregate the {@link Exchange} with the current result
281 *
282 * @param result the current result
283 * @param exchange the exchange to be added to the result
284 */
285 protected synchronized void doAggregate(AtomicExchange result, Exchange exchange) {
286 if (aggregationStrategy != null) {
287 // prepare the exchanges for aggregation
288 Exchange oldExchange = result.get();
289 ExchangeHelper.prepareAggregation(oldExchange, exchange);
290 result.set(aggregationStrategy.aggregate(oldExchange, exchange));
291 }
292 }
293
294 protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs) {
295 exchange.setProperty(Exchange.MULTICAST_INDEX, index);
296 }
297
298 protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
299 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
300
301 for (Processor processor : processors) {
302 Exchange copy = exchange.copy();
303 result.add(createProcessorExchangePair(processor, copy));
304 }
305
306 return result;
307 }
308
309 /**
310 * Creates the {@link ProcessorExchangePair} which holds the processor and exchange to be send out.
311 * <p/>
312 * You <b>must</b> use this method to create the instances of {@link ProcessorExchangePair} as they
313 * need to be specially prepared before use.
314 *
315 * @param processor the processor
316 * @param exchange the exchange
317 * @return prepared for use
318 */
319 protected static ProcessorExchangePair createProcessorExchangePair(Processor processor, Exchange exchange) {
320 Processor prepared = processor;
321
322 // set property which endpoint we send to
323 setToEndpoint(exchange, prepared);
324
325 // rework error handling to support fine grained error handling
326 if (exchange.getUnitOfWork() != null && exchange.getUnitOfWork().getRouteContext() != null) {
327 // wrap the producer in error handler so we have fine grained error handling on
328 // the output side instead of the input side
329 // this is needed to support redelivery on that output alone and not doing redelivery
330 // for the entire multicast block again which will start from scratch again
331 RouteContext routeContext = exchange.getUnitOfWork().getRouteContext();
332 ErrorHandlerBuilder builder = routeContext.getRoute().getErrorHandlerBuilder();
333 // create error handler (create error handler directly to keep it light weight,
334 // instead of using ProcessorDefinition.wrapInErrorHandler)
335 try {
336 prepared = builder.createErrorHandler(routeContext, prepared);
337 } catch (Exception e) {
338 throw ObjectHelper.wrapRuntimeCamelException(e);
339 }
340 }
341
342 return new ProcessorExchangePair(processor, prepared, exchange);
343 }
344
345 protected void doStop() throws Exception {
346 if (executorService != null) {
347 executorService.shutdown();
348 executorService.awaitTermination(0, TimeUnit.SECONDS);
349 executorService = null;
350 }
351 ServiceHelper.stopServices(processors);
352 }
353
354 protected void doStart() throws Exception {
355 ServiceHelper.startServices(processors);
356 }
357
358 private static void setToEndpoint(Exchange exchange, Processor processor) {
359 if (processor instanceof Producer) {
360 Producer producer = (Producer) processor;
361 exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
362 }
363 }
364
365 /**
366 * Is the multicast processor working in streaming mode?
367 *
368 * In streaming mode:
369 * <ul>
370 * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li>
371 * <li>for parallel processing, we start aggregating responses as they get send back to the processor;
372 * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li>
373 * </ul>
374 */
375 public boolean isStreaming() {
376 return streaming;
377 }
378
379 /**
380 * Should the multicast processor stop processing further exchanges in case of an exception occurred?
381 */
382 public boolean isStopOnException() {
383 return stopOnException;
384 }
385
386 /**
387 * Returns the producers to multicast to
388 */
389 public Collection<Processor> getProcessors() {
390 return processors;
391 }
392
393 public AggregationStrategy getAggregationStrategy() {
394 return aggregationStrategy;
395 }
396
397 public boolean isParallelProcessing() {
398 return isParallelProcessing;
399 }
400
401 public ExecutorService getExecutorService() {
402 return executorService;
403 }
404
405 public void setExecutorService(ExecutorService executorService) {
406 this.executorService = executorService;
407 }
408
409 public List<Processor> next() {
410 if (!hasNext()) {
411 return null;
412 }
413 return new ArrayList<Processor>(processors);
414 }
415
416 public boolean hasNext() {
417 return processors != null && !processors.isEmpty();
418 }
419 }