001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.List;
022    import java.util.concurrent.Callable;
023    import java.util.concurrent.CompletionService;
024    import java.util.concurrent.ExecutionException;
025    import java.util.concurrent.ExecutorCompletionService;
026    import java.util.concurrent.ExecutorService;
027    import java.util.concurrent.Future;
028    import java.util.concurrent.TimeUnit;
029    import java.util.concurrent.atomic.AtomicBoolean;
030    import java.util.concurrent.atomic.AtomicInteger;
031    
032    import org.apache.camel.CamelExchangeException;
033    import org.apache.camel.Exchange;
034    import org.apache.camel.Navigate;
035    import org.apache.camel.Processor;
036    import org.apache.camel.Producer;
037    import org.apache.camel.builder.ErrorHandlerBuilder;
038    import org.apache.camel.impl.ServiceSupport;
039    import org.apache.camel.processor.aggregate.AggregationStrategy;
040    import org.apache.camel.spi.RouteContext;
041    import org.apache.camel.spi.TracedRouteNodes;
042    import org.apache.camel.util.ExchangeHelper;
043    import org.apache.camel.util.ObjectHelper;
044    import org.apache.camel.util.ServiceHelper;
045    import org.apache.camel.util.concurrent.AtomicExchange;
046    import org.apache.camel.util.concurrent.ExecutorServiceHelper;
047    import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
048    import org.apache.commons.logging.Log;
049    import org.apache.commons.logging.LogFactory;
050    
051    import static org.apache.camel.util.ObjectHelper.notNull;
052    
053    /**
054     * Implements the Multicast pattern to send a message exchange to a number of
055     * endpoints, each endpoint receiving a copy of the message exchange.
056     *
057     * @see Pipeline
058     * @version $Revision: 17823 $
059     */
060    public class MulticastProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
061    
062        private static final int DEFAULT_THREADPOOL_SIZE = 10;
063        private static final transient Log LOG = LogFactory.getLog(MulticastProcessor.class);
064    
065        /**
066         * Class that represent each step in the multicast route to do
067         */
068        static class ProcessorExchangePair {
069            private final Processor processor;
070            private final Processor prepared;
071            private final Exchange exchange;
072    
073            /**
074             * Private constructor as you must use the static creator
075             * {@link org.apache.camel.processor.MulticastProcessor#createProcessorExchangePair(org.apache.camel.Processor,
076             *        org.apache.camel.Exchange)} which prepares the processor before its ready to be used.
077             *
078             * @param processor  the original processor
079             * @param prepared   the prepared processor
080             * @param exchange   the exchange
081             */
082            private ProcessorExchangePair(Processor processor, Processor prepared, Exchange exchange) {
083                this.processor = processor;
084                this.prepared = prepared;
085                this.exchange = exchange;
086            }
087    
088            public Processor getProcessor() {
089                return processor;
090            }
091    
092            public Processor getPrepared() {
093                return prepared;
094            }
095    
096            public Exchange getExchange() {
097                return exchange;
098            }
099        }
100    
101        private Collection<Processor> processors;
102        private final AggregationStrategy aggregationStrategy;
103        private final boolean isParallelProcessing;
104        private final boolean streaming;
105        private final boolean stopOnException;
106        private ExecutorService executorService;
107    
108        public MulticastProcessor(Collection<Processor> processors) {
109            this(processors, null);
110        }
111    
112        public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
113            this(processors, aggregationStrategy, false, null, false, false);
114        }
115        
116        public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy,
117                                  boolean parallelProcessing, ExecutorService executorService, boolean streaming, boolean stopOnException) {
118            notNull(processors, "processors");
119            this.processors = processors;
120            this.aggregationStrategy = aggregationStrategy;
121            this.isParallelProcessing = parallelProcessing;
122            this.executorService = executorService;
123            this.streaming = streaming;
124            this.stopOnException = stopOnException;
125    
126            if (isParallelProcessing()) {
127                if (this.executorService == null) {
128                    // setup default executor as parallel processing requires an executor
129                    this.executorService = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "Multicast", true);
130                }
131            }
132        }
133    
134        @Override
135        public String toString() {
136            return "Multicast[" + getProcessors() + "]";
137        }
138    
139        public String getTraceLabel() {
140            return "multicast";
141        }
142    
143        public void process(Exchange exchange) throws Exception {
144            final AtomicExchange result = new AtomicExchange();
145            final Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
146    
147            // multicast uses fine grained error handling on the output processors
148            // so use try .. catch to cater for this
149            try {
150                if (isParallelProcessing()) {
151                    doProcessParallel(result, pairs, isStreaming());
152                } else {
153                    doProcessSequential(result, pairs);
154                }
155    
156                if (result.get() != null) {
157                    ExchangeHelper.copyResults(exchange, result.get());
158                }
159            } catch (Exception e) {
160                // multicast uses error handling on its output processors and they have tried to redeliver
161                // so we shall signal back to the other error handlers that we are exhausted and they should not
162                // also try to redeliver as we will then do that twice
163                exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
164                exchange.setException(e);
165            }
166        }
167    
168        protected void doProcessParallel(final AtomicExchange result, Iterable<ProcessorExchangePair> pairs, boolean streaming) throws InterruptedException, ExecutionException {
169            final CompletionService<Exchange> completion;
170            final AtomicBoolean running = new AtomicBoolean(true);
171    
172            if (streaming) {
173                // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence)
174                completion = new ExecutorCompletionService<Exchange>(executorService);
175            } else {
176                // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence)
177                completion = new SubmitOrderedCompletionService<Exchange>(executorService);
178            }
179    
180            final AtomicInteger total =  new AtomicInteger(0);
181    
182            for (ProcessorExchangePair pair : pairs) {
183                final Processor processor = pair.getProcessor();
184                final Processor prepared = pair.getPrepared();
185                final Exchange subExchange = pair.getExchange();
186                updateNewExchange(subExchange, total.intValue(), pairs);
187    
188                completion.submit(new Callable<Exchange>() {
189                    public Exchange call() throws Exception {
190                        if (!running.get()) {
191                            // do not start processing the task if we are not running
192                            return subExchange;
193                        }
194    
195                        doProcess(processor, prepared, subExchange);
196    
197                        // should we stop in case of an exception occurred during processing?
198                        if (stopOnException && subExchange.getException() != null) {
199                            // signal to stop running
200                            running.set(false);
201                            throw new CamelExchangeException("Parallel processing failed for number " + total.intValue(), subExchange, subExchange.getException());
202                        }
203    
204                        if (LOG.isTraceEnabled()) {
205                            LOG.trace("Parallel processing complete for exchange: " + subExchange);
206                        }
207                        return subExchange;
208                    }
209                });
210    
211                total.incrementAndGet();
212            }
213    
214            for (int i = 0; i < total.intValue(); i++) {
215                Future<Exchange> future = completion.take();
216                Exchange subExchange = future.get();
217                if (aggregationStrategy != null) {
218                    doAggregate(result, subExchange);
219                }
220            }
221    
222            if (LOG.isDebugEnabled()) {
223                LOG.debug("Done parallel processing " + total + " exchanges");
224            }
225        }
226    
227        protected void doProcessSequential(AtomicExchange result, Iterable<ProcessorExchangePair> pairs) throws Exception {
228            int total = 0;
229    
230            for (ProcessorExchangePair pair : pairs) {
231                Processor processor = pair.getProcessor();
232                Processor prepared = pair.getPrepared();
233                Exchange subExchange = pair.getExchange();
234                updateNewExchange(subExchange, total, pairs);
235    
236                doProcess(processor, prepared, subExchange);
237    
238                // should we stop in case of an exception occurred during processing?
239                if (stopOnException && subExchange.getException() != null) {
240                    throw new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException());
241                }
242    
243                if (LOG.isTraceEnabled()) {
244                    LOG.trace("Sequential processing complete for number " + total + " exchange: " + subExchange);
245                }
246    
247                if (aggregationStrategy != null) {
248                    doAggregate(result, subExchange);
249                }
250                total++;
251            }
252    
253            if (LOG.isDebugEnabled()) {
254                LOG.debug("Done sequential processing " + total + " exchanges");
255            }
256        }
257    
258        private void doProcess(Processor processor, Processor prepared, Exchange exchange) {
259            TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
260    
261            try {
262                // prepare tracing starting from a new block
263                if (traced != null) {
264                    traced.pushBlock();
265                }
266    
267                // let the prepared process it
268                prepared.process(exchange);
269            } catch (Exception e) {
270                exchange.setException(e);
271            } finally {
272                // pop the block so by next round we have the same staring point and thus the tracing looks accurate
273                if (traced != null) {
274                    traced.popBlock();
275                }
276            }
277        }
278    
279        /**
280         * Aggregate the {@link Exchange} with the current result
281         *
282         * @param result the current result
283         * @param exchange the exchange to be added to the result
284         */
285        protected synchronized void doAggregate(AtomicExchange result, Exchange exchange) {
286            if (aggregationStrategy != null) {
287                // prepare the exchanges for aggregation
288                Exchange oldExchange = result.get();
289                ExchangeHelper.prepareAggregation(oldExchange, exchange);
290                result.set(aggregationStrategy.aggregate(oldExchange, exchange));
291            }
292        }
293    
294        protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs) {
295            exchange.setProperty(Exchange.MULTICAST_INDEX, index);
296        }
297    
298        protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
299            List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
300    
301            for (Processor processor : processors) {
302                Exchange copy = exchange.copy();
303                result.add(createProcessorExchangePair(processor, copy));
304            }
305    
306            return result;
307        }
308    
309        /**
310         * Creates the {@link ProcessorExchangePair} which holds the processor and exchange to be send out.
311         * <p/>
312         * You <b>must</b> use this method to create the instances of {@link ProcessorExchangePair} as they
313         * need to be specially prepared before use.
314         *
315         * @param processor  the processor
316         * @param exchange   the exchange
317         * @return prepared for use
318         */
319        protected static ProcessorExchangePair createProcessorExchangePair(Processor processor, Exchange exchange) {
320            Processor prepared = processor;
321    
322            // set property which endpoint we send to
323            setToEndpoint(exchange, prepared);
324    
325            // rework error handling to support fine grained error handling
326            if (exchange.getUnitOfWork() != null && exchange.getUnitOfWork().getRouteContext() != null) {
327                // wrap the producer in error handler so we have fine grained error handling on
328                // the output side instead of the input side
329                // this is needed to support redelivery on that output alone and not doing redelivery
330                // for the entire multicast block again which will start from scratch again
331                RouteContext routeContext = exchange.getUnitOfWork().getRouteContext();
332                ErrorHandlerBuilder builder = routeContext.getRoute().getErrorHandlerBuilder();
333                // create error handler (create error handler directly to keep it light weight,
334                // instead of using ProcessorDefinition.wrapInErrorHandler)
335                try {
336                    prepared = builder.createErrorHandler(routeContext, prepared);
337                } catch (Exception e) {
338                    throw ObjectHelper.wrapRuntimeCamelException(e);
339                }
340            }
341    
342            return new ProcessorExchangePair(processor, prepared, exchange);
343        }
344    
345        protected void doStop() throws Exception {
346            if (executorService != null) {
347                executorService.shutdown();
348                executorService.awaitTermination(0, TimeUnit.SECONDS);
349                executorService = null;
350            }
351            ServiceHelper.stopServices(processors);
352        }
353    
354        protected void doStart() throws Exception {
355            ServiceHelper.startServices(processors);
356        }
357        
358        private static void setToEndpoint(Exchange exchange, Processor processor) {
359            if (processor instanceof Producer) {
360                Producer producer = (Producer) processor;
361                exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
362            }
363        }
364    
365        /**
366         * Is the multicast processor working in streaming mode?
367         * 
368         * In streaming mode:
369         * <ul>
370         * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li>
371         * <li>for parallel processing, we start aggregating responses as they get send back to the processor;
372         * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li>
373         * </ul>
374         */
375        public boolean isStreaming() {
376            return streaming;
377        }
378    
379        /**
380         * Should the multicast processor stop processing further exchanges in case of an exception occurred?
381         */
382        public boolean isStopOnException() {
383            return stopOnException;
384        }
385    
386        /**
387         * Returns the producers to multicast to
388         */
389        public Collection<Processor> getProcessors() {
390            return processors;
391        }
392    
393        public AggregationStrategy getAggregationStrategy() {
394            return aggregationStrategy;
395        }
396    
397        public boolean isParallelProcessing() {
398            return isParallelProcessing;
399        }
400    
401        public ExecutorService getExecutorService() {
402            return executorService;
403        }
404    
405        public void setExecutorService(ExecutorService executorService) {
406            this.executorService = executorService;
407        }
408    
409        public List<Processor> next() {
410            if (!hasNext()) {
411                return null;
412            }
413            return new ArrayList<Processor>(processors);
414        }
415    
416        public boolean hasNext() {
417            return processors != null && !processors.isEmpty();
418        }
419    }