001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.Callable;
020    import java.util.concurrent.RejectedExecutionException;
021    import java.util.concurrent.ScheduledExecutorService;
022    import java.util.concurrent.TimeUnit;
023    
024    import org.apache.camel.AsyncCallback;
025    import org.apache.camel.AsyncProcessor;
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.LoggingLevel;
029    import org.apache.camel.Message;
030    import org.apache.camel.Predicate;
031    import org.apache.camel.Processor;
032    import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
033    import org.apache.camel.model.OnExceptionDefinition;
034    import org.apache.camel.util.AsyncProcessorHelper;
035    import org.apache.camel.util.EventHelper;
036    import org.apache.camel.util.ExchangeHelper;
037    import org.apache.camel.util.MessageHelper;
038    import org.apache.camel.util.ObjectHelper;
039    import org.apache.camel.util.ServiceHelper;
040    
041    /**
042     * Base redeliverable error handler that also supports a final dead letter queue in case
043     * all redelivery attempts fail.
044     * <p/>
045     * This implementation should contain all the error handling logic and the sub classes
046     * should only configure it according to what they support.
047     *
048     * @version $Revision: 21784 $
049     */
050    public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor {
051    
052        private static ScheduledExecutorService executorService;
053        protected final CamelContext camelContext;
054        protected final Processor deadLetter;
055        protected final String deadLetterUri;
056        protected final Processor output;
057        protected final AsyncProcessor outputAsync;
058        protected final Processor redeliveryProcessor;
059        protected final RedeliveryPolicy redeliveryPolicy;
060        protected final Predicate handledPolicy;
061        protected final Predicate retryWhilePolicy;
062        protected final Logger logger;
063        protected final boolean useOriginalMessagePolicy;
064    
065        /**
066         * Contains the current redelivery data
067         */
068        protected class RedeliveryData {
069            boolean sync = true;
070            int redeliveryCounter;
071            long redeliveryDelay;
072            Predicate retryWhilePredicate = retryWhilePolicy;
073            boolean redeliverFromSync;
074    
075            // default behavior which can be overloaded on a per exception basis
076            RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
077            Processor deadLetterProcessor = deadLetter;
078            Processor failureProcessor;
079            Processor onRedeliveryProcessor = redeliveryProcessor;
080            Predicate handledPredicate = handledPolicy;
081            Predicate continuedPredicate;
082            boolean useOriginalInMessage = useOriginalMessagePolicy;
083            boolean asyncDelayedRedelivery = redeliveryPolicy.isAsyncDelayedRedelivery();
084        }
085    
086        /**
087         * Tasks which performs asynchronous redelivery attempts, and being triggered by a
088         * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
089         * has to be delayed before a redelivery attempt is performed. 
090         */
091        private class AsyncRedeliveryTask implements Callable<Boolean> {
092    
093            private final Exchange exchange;
094            private final AsyncCallback callback;
095            private final RedeliveryData data;
096    
097            public AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
098                this.exchange = exchange;
099                this.callback = callback;
100                this.data = data;
101            }
102    
103            public Boolean call() throws Exception {
104                // prepare for redelivery
105                prepareExchangeForRedelivery(exchange);
106    
107                // letting onRedeliver be executed at first
108                deliverToOnRedeliveryProcessor(exchange, data);
109    
110                if (log.isTraceEnabled()) {
111                    log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " -> " + outputAsync + " for Exchange: " + exchange);
112                }
113    
114                // emmit event we are doing redelivery
115                EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
116    
117                // process the exchange (also redelivery)
118                boolean sync;
119                if (data.redeliverFromSync) {
120                    // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from
121                    // this error handler, which means we have to invoke the callback with false, to have the callback
122                    // be notified when we are done
123                    sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() {
124                        public void done(boolean doneSync) {
125                            if (log.isTraceEnabled()) {
126                                log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " done sync: " + doneSync);
127                            }
128    
129                            // mark we are in sync mode now
130                            data.sync = false;
131    
132                            // only process if the exchange hasn't failed
133                            // and it has not been handled by the error processor
134                            if (isDone(exchange)) {
135                                callback.done(false);
136                                return;
137                            }
138    
139                            // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
140                            processAsyncErrorHandler(exchange, callback, data);
141                        }
142                    });
143                } else {
144                    // this redelivery task was scheduled from asynchronous, which means we should only
145                    // handle when the asynchronous task was done
146                    sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() {
147                        public void done(boolean doneSync) {
148                            if (log.isTraceEnabled()) {
149                                log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " done sync: " + doneSync);
150                            }
151    
152                            // this callback should only handle the async case
153                            if (doneSync) {
154                                return;
155                            }
156    
157                            // mark we are in async mode now
158                            data.sync = false;
159    
160                            // only process if the exchange hasn't failed
161                            // and it has not been handled by the error processor
162                            if (isDone(exchange)) {
163                                callback.done(doneSync);
164                                return;
165                            }
166                            // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
167                            processAsyncErrorHandler(exchange, callback, data);
168                        }
169                    });
170                }
171    
172                return sync;
173            }
174        }
175    
176        public RedeliveryErrorHandler(CamelContext camelContext, Processor output, Logger logger, Processor redeliveryProcessor,
177                                      RedeliveryPolicy redeliveryPolicy, Predicate handledPolicy, Processor deadLetter,
178                                      String deadLetterUri, boolean useOriginalMessagePolicy, Predicate retryWhile) {
179            ObjectHelper.notNull(camelContext, "CamelContext", this);
180            ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this);
181    
182            this.camelContext = camelContext;
183            this.redeliveryProcessor = redeliveryProcessor;
184            this.deadLetter = deadLetter;
185            this.output = output;
186            this.outputAsync = AsyncProcessorTypeConverter.convert(output);
187            this.redeliveryPolicy = redeliveryPolicy;
188            this.logger = logger;
189            this.deadLetterUri = deadLetterUri;
190            this.handledPolicy = handledPolicy;
191            this.useOriginalMessagePolicy = useOriginalMessagePolicy;
192            this.retryWhilePolicy = retryWhile;
193        }
194    
195        public boolean supportTransacted() {
196            return false;
197        }
198    
199        public void process(Exchange exchange) throws Exception {
200            if (output == null) {
201                // no output then just return
202                return;
203            }
204            AsyncProcessorHelper.process(this, exchange);
205        }
206    
207        public boolean process(Exchange exchange, final AsyncCallback callback) {
208            return processErrorHandler(exchange, callback, new RedeliveryData());
209        }
210    
211        /**
212         * Process the exchange using redelivery error handling.
213         */
214        protected boolean processErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
215    
216            // use looping to have redelivery attempts
217            while (true) {
218    
219                // can we still run
220                if (!isRunAllowed()) {
221                    if (exchange.getException() == null) {
222                        exchange.setException(new RejectedExecutionException());
223                    }
224                    // we cannot process so invoke callback
225                    callback.done(data.sync);
226                    return data.sync;
227                }
228    
229                // did previous processing cause an exception?
230                boolean handle = shouldHandleException(exchange);
231                if (handle) {
232                    handleException(exchange, data);
233                }
234    
235                // compute if we should redeliver or not
236                boolean shouldRedeliver = shouldRedeliver(exchange, data);
237                if (!shouldRedeliver) {
238                    // no we should not redeliver to the same output so either try an onException (if any given)
239                    // or the dead letter queue
240                    Processor target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
241                    // deliver to the failure processor (either an on exception or dead letter queue
242                    boolean sync = deliverToFailureProcessor(target, exchange, data, callback);
243                    // we are breaking out
244                    return sync;
245                }
246    
247                if (data.redeliveryCounter > 0) {
248                    // calculate delay
249                    data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
250    
251                    if (data.redeliveryDelay > 0) {
252                        // okay there is a delay so create a scheduled task to have it executed in the future
253    
254                        if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) {
255                            // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
256                            // have it being executed in the future, or immediately
257                            // we are continuing asynchronously
258    
259                            // mark we are routing async from now and that this redelivery task came from a synchronous routing
260                            data.sync = false;
261                            data.redeliverFromSync = true;
262                            AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
263    
264                            // schedule the redelivery task
265                            if (log.isTraceEnabled()) {
266                                log.trace("Scheduling redelivery task to run in " + data.redeliveryDelay + " millis for exchangeId: " + exchange.getExchangeId());
267                            }
268                            executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
269    
270                            return false;
271                        } else {
272                            // async delayed redelivery was disabled or we are transacted so we must be synchronous
273                            // as the transaction manager requires to execute in the same thread context
274                            try {
275                                data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
276                            } catch (InterruptedException e) {
277                                // we was interrupted so break out
278                                exchange.setException(e);
279                                // mark the exchange to stop continue routing when interrupted
280                                // as we do not want to continue routing (for example a task has been cancelled)
281                                exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
282                                callback.done(data.sync);
283                                return data.sync;
284                            }
285                        }
286                    }
287    
288                    // prepare for redelivery
289                    prepareExchangeForRedelivery(exchange);
290    
291                    // letting onRedeliver be executed
292                    deliverToOnRedeliveryProcessor(exchange, data);
293    
294                    // emmit event we are doing redelivery
295                    EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
296                }
297    
298                // process the exchange (also redelivery)
299                boolean sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() {
300                    public void done(boolean sync) {
301                        // this callback should only handle the async case
302                        if (sync) {
303                            return;
304                        }
305    
306                        // mark we are in async mode now
307                        data.sync = false;
308    
309                        // if we are done then notify callback and exit
310                        if (isDone(exchange)) {
311                            callback.done(sync);
312                            return;
313                        }
314    
315                        // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
316                        // method which takes care of this in a asynchronous manner
317                        processAsyncErrorHandler(exchange, callback, data);
318                    }
319                });
320    
321                if (!sync) {
322                    // the remainder of the Exchange is being processed asynchronously so we should return
323                    return false;
324                }
325                // we continue to route synchronously
326    
327                // if we are done then notify callback and exit
328                boolean done = isDone(exchange);
329                if (done) {
330                    callback.done(true);
331                    return true;
332                }
333    
334                // error occurred so loop back around.....
335            }
336        }
337    
338        /**
339         * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback.
340         * <p/>
341         * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use
342         * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b>
343         * in terms of logic.
344         */
345        protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
346            // can we still run
347            if (!isRunAllowed()) {
348                if (exchange.getException() == null) {
349                    exchange.setException(new RejectedExecutionException());
350                }
351                callback.done(data.sync);
352                return;
353            }
354    
355            // did previous processing cause an exception?
356            boolean handle = shouldHandleException(exchange);
357            if (handle) {
358                handleException(exchange, data);
359            }
360    
361            // compute if we should redeliver or not
362            boolean shouldRedeliver = shouldRedeliver(exchange, data);
363            if (!shouldRedeliver) {
364                // no we should not redeliver to the same output so either try an onException (if any given)
365                // or the dead letter queue
366                Processor target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
367                // deliver to the failure processor (either an on exception or dead letter queue
368                deliverToFailureProcessor(target, exchange, data, callback);
369                // we are breaking out
370                return;
371            }
372    
373            if (data.redeliveryCounter > 0) {
374                // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
375                // have it being executed in the future, or immediately
376                // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously
377                // to ensure the callback will continue routing from where we left
378                AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
379    
380                // calculate the redelivery delay
381                data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
382                if (data.redeliveryDelay > 0) {
383                    // schedule the redelivery task
384                    if (log.isTraceEnabled()) {
385                        log.trace("Scheduling redelivery task to run in " + data.redeliveryDelay + " millis for exchangeId: " + exchange.getExchangeId());
386                    }
387                    executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
388                } else {
389                    // execute the task immediately
390                    executorService.submit(task);
391                }
392            }
393        }
394    
395        /**
396         * Strategy whether the exchange has an exception that we should try to handle.
397         * <p/>
398         * Standard implementations should just look for an exception.
399         */
400        protected boolean shouldHandleException(Exchange exchange) {
401            return exchange.getException() != null;
402        }
403    
404        /**
405         * Strategy to determine if the exchange is done so we can continue
406         */
407        protected boolean isDone(Exchange exchange) {
408            boolean answer = isCancelledOrInterrupted(exchange);
409    
410            // only done if the exchange hasn't failed
411            // and it has not been handled by the failure processor
412            // or we are exhausted
413            if (!answer) {
414                answer = exchange.getException() == null
415                    || ExchangeHelper.isFailureHandled(exchange)
416                    || ExchangeHelper.isRedeliveryExhausted(exchange);
417            }
418    
419            if (log.isTraceEnabled()) {
420                log.trace("Is exchangeId: " + exchange.getExchangeId() + " done? " + answer);
421            }
422            return answer;
423        }
424    
425        /**
426         * Strategy to determine if the exchange was cancelled or interrupted
427         */
428        protected boolean isCancelledOrInterrupted(Exchange exchange) {
429            boolean answer = false;
430    
431            if (ExchangeHelper.isInterrupted(exchange)) {
432                // mark the exchange to stop continue routing when interrupted
433                // as we do not want to continue routing (for example a task has been cancelled)
434                exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
435                answer = true;
436            }
437    
438            if (log.isTraceEnabled()) {
439                log.trace("Is exchangeId: " + exchange.getExchangeId() + " interrupted? " + answer);
440            }
441            return answer;
442        }
443    
444        /**
445         * Returns the output processor
446         */
447        public Processor getOutput() {
448            return output;
449        }
450    
451        /**
452         * Returns the dead letter that message exchanges will be sent to if the
453         * redelivery attempts fail
454         */
455        public Processor getDeadLetter() {
456            return deadLetter;
457        }
458    
459        public String getDeadLetterUri() {
460            return deadLetterUri;
461        }
462    
463        public boolean isUseOriginalMessagePolicy() {
464            return useOriginalMessagePolicy;
465        }
466    
467        public RedeliveryPolicy getRedeliveryPolicy() {
468            return redeliveryPolicy;
469        }
470    
471        public Logger getLogger() {
472            return logger;
473        }
474    
475        protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data) {
476            Exception caught = exchange.getException();
477    
478            // continue is a kind of redelivery so reuse the logic to prepare
479            prepareExchangeForRedelivery(exchange);
480            // its continued then remove traces of redelivery attempted and caught exception
481            exchange.getIn().removeHeader(Exchange.REDELIVERED);
482            exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
483            // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception
484    
485            // create log message
486            String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId();
487            msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
488            msg = msg + ". Handled and continue routing.";
489    
490            // log that we failed but want to continue
491            logFailedDelivery(false, false, true, exchange, msg, data, null);
492        }
493    
494        protected void prepareExchangeForRedelivery(Exchange exchange) {
495            // okay we will give it another go so clear the exception so we can try again
496            exchange.setException(null);
497    
498            // clear rollback flags
499            exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
500    
501            // reset cached streams so they can be read again
502            MessageHelper.resetStreamCache(exchange.getIn());
503        }
504    
505        protected void handleException(Exchange exchange, RedeliveryData data) {
506            Exception e = exchange.getException();
507    
508            // store the original caused exception in a property, so we can restore it later
509            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
510    
511            // find the error handler to use (if any)
512            OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
513            if (exceptionPolicy != null) {
514                data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
515                data.handledPredicate = exceptionPolicy.getHandledPolicy();
516                data.continuedPredicate = exceptionPolicy.getContinuedPolicy();
517                data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy();
518                data.useOriginalInMessage = exceptionPolicy.isUseOriginalMessage();
519                data.asyncDelayedRedelivery = exceptionPolicy.isAsyncDelayedRedelivery();
520    
521                // route specific failure handler?
522                Processor processor = exceptionPolicy.getErrorHandler();
523                if (processor != null) {
524                    data.failureProcessor = processor;
525                }
526                // route specific on redelivery?
527                processor = exceptionPolicy.getOnRedelivery();
528                if (processor != null) {
529                    data.onRedeliveryProcessor = processor;
530                }
531            }
532    
533            String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
534                    + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
535            logFailedDelivery(true, false, false, exchange, msg, data, e);
536    
537            data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
538        }
539    
540        /**
541         * Gives an optional configure redelivery processor a chance to process before the Exchange
542         * will be redelivered. This can be used to alter the Exchange.
543         */
544        protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
545            if (data.onRedeliveryProcessor == null) {
546                return;
547            }
548    
549            if (log.isTraceEnabled()) {
550                log.trace("Redelivery processor " + data.onRedeliveryProcessor + " is processing Exchange: " + exchange
551                        + " before its redelivered");
552            }
553    
554            // run this synchronously as its just a Processor
555            try {
556                data.onRedeliveryProcessor.process(exchange);
557            } catch (Throwable e) {
558                exchange.setException(e);
559            }
560            log.trace("Redelivery processor done");
561        }
562    
563        /**
564         * All redelivery attempts failed so move the exchange to the dead letter queue
565         */
566        protected boolean deliverToFailureProcessor(final Processor processor, final Exchange exchange,
567                                                    final RedeliveryData data, final AsyncCallback callback) {
568            boolean sync = true;
569    
570            Exception caught = exchange.getException();
571    
572            // we did not success with the redelivery so now we let the failure processor handle it
573            // clear exception as we let the failure processor handle it
574            exchange.setException(null);
575    
576            boolean handled = false;
577            // regard both handled or continued as being handled
578            if (shouldHandled(exchange, data) || shouldContinue(exchange, data)) {
579                // its handled then remove traces of redelivery attempted
580                exchange.getIn().removeHeader(Exchange.REDELIVERED);
581                exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
582                handled = true;
583            } else {
584                // must decrement the redelivery counter as we didn't process the redelivery but is
585                // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
586                decrementRedeliveryCounter(exchange);
587            }
588    
589            // is the a failure processor to process the Exchange
590            if (processor != null) {
591    
592                // reset cached streams so they can be read again
593                MessageHelper.resetStreamCache(exchange.getIn());
594    
595                // prepare original IN body if it should be moved instead of current body
596                if (data.useOriginalInMessage) {
597                    if (log.isTraceEnabled()) {
598                        log.trace("Using the original IN message instead of current");
599                    }
600    
601                    Message original = exchange.getUnitOfWork().getOriginalInMessage();
602                    exchange.setIn(original);
603                }
604    
605                if (log.isTraceEnabled()) {
606                    log.trace("Failure processor " + processor + " is processing Exchange: " + exchange);
607                }
608    
609                // store the last to endpoint as the failure endpoint
610                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
611    
612                // the failure processor could also be asynchronous
613                AsyncProcessor afp = AsyncProcessorTypeConverter.convert(processor);
614                sync = AsyncProcessorHelper.process(afp, exchange, new AsyncCallback() {
615                    public void done(boolean sync) {
616                        if (log.isTraceEnabled()) {
617                            log.trace("Failure processor done: " + processor + " processing Exchange: " + exchange);
618                        }
619                        try {
620                            prepareExchangeAfterFailure(exchange, data);
621                            // fire event as we had a failure processor to handle it, which there is a event for
622                            boolean deadLetterChannel = processor == data.deadLetterProcessor && data.deadLetterProcessor != null;
623                            EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel);
624                        } finally {
625                            // if the fault was handled asynchronously, this should be reflected in the callback as well
626                            data.sync &= sync;
627                            callback.done(data.sync);
628                        }
629                    }
630                });
631            } else {
632                try {
633                    // no processor but we need to prepare after failure as well
634                    prepareExchangeAfterFailure(exchange, data);
635                } finally {
636                    // callback we are done
637                    callback.done(data.sync);
638                }
639            }
640    
641            // create log message
642            String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId();
643            msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
644            if (processor != null) {
645                msg = msg + ". Processed by failure processor: " + processor;
646            }
647    
648            // log that we failed delivery as we are exhausted
649            logFailedDelivery(false, handled, false, exchange, msg, data, null);
650    
651            return sync;
652        }
653    
654        protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data) {
655            // we could not process the exchange so we let the failure processor handled it
656            ExchangeHelper.setFailureHandled(exchange);
657    
658            // honor if already set a handling
659            boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
660            if (alreadySet) {
661                boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
662                if (log.isTraceEnabled()) {
663                    log.trace("This exchange has already been marked for handling: " + handled);
664                }
665                if (handled) {
666                    exchange.setException(null);
667                } else {
668                    // exception not handled, put exception back in the exchange
669                    exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
670                    // and put failure endpoint back as well
671                    exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
672                }
673                return;
674            }
675    
676            if (shouldHandled(exchange, data)) {
677                if (log.isTraceEnabled()) {
678                    log.trace("This exchange is handled so its marked as not failed: " + exchange);
679                }
680                exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
681            } else if (shouldContinue(exchange, data)) {
682                if (log.isTraceEnabled()) {
683                    log.trace("This exchange is continued: " + exchange);
684                }
685                // okay we want to continue then prepare the exchange for that as well
686                prepareExchangeForContinue(exchange, data);
687            } else {
688                if (log.isTraceEnabled()) {
689                    log.trace("This exchange is not handled or continued so its marked as failed: " + exchange);
690                }
691                // exception not handled, put exception back in the exchange
692                exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
693                exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
694                // and put failure endpoint back as well
695                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
696            }
697        }
698    
699        private void logFailedDelivery(boolean shouldRedeliver, boolean handled, boolean continued, Exchange exchange, String message, RedeliveryData data, Throwable e) {
700            if (logger == null) {
701                return;
702            }
703    
704            if (handled && !data.currentRedeliveryPolicy.isLogHandled()) {
705                // do not log handled
706                return;
707            }
708    
709            if (continued && !data.currentRedeliveryPolicy.isLogContinued()) {
710                // do not log handled
711                return;
712            }
713    
714            if (shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) {
715                // do not log retry attempts
716                return;
717            }
718    
719            if (!shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) {
720                // do not log exhausted
721                return;
722            }
723    
724            LoggingLevel newLogLevel;
725            boolean logStrackTrace;
726            if (shouldRedeliver) {
727                newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
728                logStrackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace();
729            } else {
730                newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
731                logStrackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
732            }
733            if (e == null) {
734                e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
735            }
736    
737            if (exchange.isRollbackOnly()) {
738                String msg = "Rollback exchange";
739                if (exchange.getException() != null) {
740                    msg = msg + " due: " + exchange.getException().getMessage();
741                }
742                if (newLogLevel == LoggingLevel.ERROR || newLogLevel == LoggingLevel.FATAL) {
743                    // log intended rollback on maximum WARN level (no ERROR or FATAL)
744                    logger.log(msg, LoggingLevel.WARN);
745                } else {
746                    // otherwise use the desired logging level
747                    logger.log(msg, newLogLevel);
748                }
749            } else if (e != null && logStrackTrace) {
750                logger.log(message, e, newLogLevel);
751            } else {
752                logger.log(message, newLogLevel);
753            }
754        }
755    
756        /**
757         * Determines whether or not to we should try to redeliver
758         *
759         * @param exchange the current exchange
760         * @param data     the redelivery data
761         * @return <tt>true</tt> to redeliver, or <tt>false</tt> to exhaust.
762         */
763        private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) {
764            // if marked as rollback only then do not redeliver
765            boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class);
766            if (rollbackOnly) {
767                if (log.isTraceEnabled()) {
768                    log.trace("This exchange is marked as rollback only, should not be redelivered: " + exchange);
769                }
770                return false;
771            }
772            return data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate);
773        }
774    
775        /**
776         * Determines whether or not to continue if we are exhausted.
777         *
778         * @param exchange the current exchange
779         * @param data     the redelivery data
780         * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust.
781         */
782        private boolean shouldContinue(Exchange exchange, RedeliveryData data) {
783            if (data.continuedPredicate != null) {
784                return data.continuedPredicate.matches(exchange);
785            }
786            // do not continue by default
787            return false;
788        }
789    
790        /**
791         * Determines whether or not to handle if we are exhausted.
792         *
793         * @param exchange the current exchange
794         * @param data     the redelivery data
795         * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust.
796         */
797        private boolean shouldHandled(Exchange exchange, RedeliveryData data) {
798            if (data.handledPredicate != null) {
799                return data.handledPredicate.matches(exchange);
800            }
801            // do not handle by default
802            return false;
803        }
804    
805        /**
806         * Increments the redelivery counter and adds the redelivered flag if the
807         * message has been redelivered
808         */
809        private int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
810            Message in = exchange.getIn();
811            Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
812            int next = 1;
813            if (counter != null) {
814                next = counter + 1;
815            }
816            in.setHeader(Exchange.REDELIVERY_COUNTER, next);
817            in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
818            return next;
819        }
820    
821        /**
822         * Prepares the redelivery counter and boolean flag for the failure handle processor
823         */
824        private void decrementRedeliveryCounter(Exchange exchange) {
825            Message in = exchange.getIn();
826            Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
827            if (counter != null) {
828                int prev = counter - 1;
829                in.setHeader(Exchange.REDELIVERY_COUNTER, prev);
830                // set boolean flag according to counter
831                in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
832            } else {
833                // not redelivered
834                in.setHeader(Exchange.REDELIVERY_COUNTER, 0);
835                in.setHeader(Exchange.REDELIVERED, Boolean.FALSE);
836            }
837        }
838    
839        @Override
840        protected void doStart() throws Exception {
841            ServiceHelper.startServices(output, outputAsync, deadLetter);
842            // use a shared scheduler
843            if (executorService == null || executorService.isShutdown()) {
844                // camel context will shutdown the executor when it shutdown so no need to shut it down when stopping
845                executorService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "ErrorHandlerRedeliveryTask");
846            }
847        }
848    
849        @Override
850        protected void doStop() throws Exception {
851            ServiceHelper.stopServices(deadLetter, output, outputAsync);
852        }
853    
854    }