001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.io.Closeable;
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.Collections;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.Scanner;
026    import java.util.concurrent.ExecutorService;
027    
028    import org.apache.camel.AsyncCallback;
029    import org.apache.camel.AsyncProcessor;
030    import org.apache.camel.CamelContext;
031    import org.apache.camel.Exchange;
032    import org.apache.camel.Expression;
033    import org.apache.camel.Message;
034    import org.apache.camel.Processor;
035    import org.apache.camel.processor.aggregate.AggregationStrategy;
036    import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
037    import org.apache.camel.util.CollectionHelper;
038    import org.apache.camel.util.IOHelper;
039    import org.apache.camel.util.ObjectHelper;
040    import org.apache.commons.logging.Log;
041    import org.apache.commons.logging.LogFactory;
042    
043    import static org.apache.camel.util.ObjectHelper.notNull;
044    
045    /**
046     * Implements a dynamic <a
047     * href="http://camel.apache.org/splitter.html">Splitter</a> pattern
048     * where an expression is evaluated to iterate through each of the parts of a
049     * message and then each part is then send to some endpoint.
050     *
051     * @version $Revision: 20167 $
052     */
053    public class Splitter extends MulticastProcessor implements AsyncProcessor, Traceable {
054        private static final transient Log LOG = LogFactory.getLog(Splitter.class);
055    
056        private final Expression expression;
057    
058        public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
059            this(camelContext, expression, destination, aggregationStrategy, false, null, false, false);
060        }
061    
062        public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
063                        boolean parallelProcessing, ExecutorService executorService, boolean streaming, boolean stopOnException) {
064            super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, streaming, stopOnException);
065    
066            this.expression = expression;
067            notNull(expression, "expression");
068            notNull(destination, "destination");
069        }
070    
071        @Override
072        public String toString() {
073            return "Splitter[on: " + expression + " to: " + getProcessors().iterator().next() + " aggregate: " + getAggregationStrategy() + "]";
074        }
075    
076        @Override
077        public String getTraceLabel() {
078            return "split[" + expression + "]";
079        }
080    
081        @Override
082        public boolean process(Exchange exchange, final AsyncCallback callback) {
083            final AggregationStrategy strategy = getAggregationStrategy();
084    
085            // if no custom aggregation strategy is being used then fallback to keep the original
086            // and propagate exceptions which is done by a per exchange specific aggregation strategy
087            // to ensure it supports async routing
088            if (strategy == null) {
089                UseOriginalAggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true);
090                exchange.setProperty(Exchange.AGGREGATION_STRATEGY, original);
091            }
092    
093            return super.process(exchange, callback);
094        }
095    
096        @Override
097        protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) {
098            Object value = expression.evaluate(exchange, Object.class);
099    
100            if (isStreaming()) {
101                return createProcessorExchangePairsIterable(exchange, value);
102            } else {
103                return createProcessorExchangePairsList(exchange, value);
104            }
105        }
106    
107        @SuppressWarnings("unchecked")
108        private Iterable<ProcessorExchangePair> createProcessorExchangePairsIterable(final Exchange exchange, final Object value) {
109            final Iterator iterator = ObjectHelper.createIterator(value);
110            return new Iterable() {
111    
112                public Iterator iterator() {
113                    return new Iterator() {
114                        private int index;
115                        private boolean closed;
116    
117                        public boolean hasNext() {
118                            if (closed) {
119                                return false;
120                            }
121    
122                            boolean answer = iterator.hasNext();
123                            if (!answer) {
124                                // we are now closed
125                                closed = true;
126                                // nothing more so we need to close the expression value in case it needs to be
127                                if (value instanceof Closeable) {
128                                    IOHelper.close((Closeable) value, value.getClass().getName(), LOG);
129                                } else if (value instanceof Scanner) {
130                                    // special for Scanner as it does not implement Closeable
131                                    ((Scanner) value).close();
132                                }
133                            }
134                            return answer;
135                        }
136    
137                        public Object next() {
138                            Object part = iterator.next();
139                            Exchange newExchange = exchange.copy();
140                            if (part instanceof Message) {
141                                newExchange.setIn((Message)part);
142                            } else {
143                                Message in = newExchange.getIn();
144                                in.setBody(part);
145                            }
146                            return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange);
147                        }
148    
149                        public void remove() {
150                            throw new UnsupportedOperationException("Remove is not supported by this iterator");
151                        }
152                    };
153                }
154    
155            };
156        }
157    
158        private Iterable<ProcessorExchangePair> createProcessorExchangePairsList(Exchange exchange, Object value) {
159            List<ProcessorExchangePair> result;
160            Integer collectionSize = CollectionHelper.size(value);
161            if (collectionSize != null) {
162                result = new ArrayList<ProcessorExchangePair>(collectionSize);
163            } else {
164                result = new ArrayList<ProcessorExchangePair>();
165            }
166    
167            int index = 0;
168            Iterator<Object> iter = ObjectHelper.createIterator(value);
169            while (iter.hasNext()) {
170                Object part = iter.next();
171                Exchange newExchange = exchange.copy();
172                if (part instanceof Message) {
173                    newExchange.setIn((Message)part);
174                } else {
175                    Message in = newExchange.getIn();
176                    in.setBody(part);
177                }
178                result.add(createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange));
179            }
180            return result;
181        }
182    
183        @Override
184        protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
185                                         Iterator<ProcessorExchangePair> it) {
186            exchange.setProperty(Exchange.SPLIT_INDEX, index);
187            if (allPairs instanceof Collection) {
188                exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>)allPairs).size());
189            }
190            if (it.hasNext()) {
191                exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
192            } else {
193                exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE);
194            }
195        }
196    
197        public Expression getExpression() {
198            return expression;
199        }
200    }