001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.concurrent.Callable;
020 import java.util.concurrent.RejectedExecutionException;
021 import java.util.concurrent.ScheduledExecutorService;
022 import java.util.concurrent.TimeUnit;
023
024 import org.apache.camel.AsyncCallback;
025 import org.apache.camel.AsyncProcessor;
026 import org.apache.camel.CamelContext;
027 import org.apache.camel.Exchange;
028 import org.apache.camel.LoggingLevel;
029 import org.apache.camel.Message;
030 import org.apache.camel.Predicate;
031 import org.apache.camel.Processor;
032 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
033 import org.apache.camel.model.OnExceptionDefinition;
034 import org.apache.camel.util.AsyncProcessorHelper;
035 import org.apache.camel.util.EventHelper;
036 import org.apache.camel.util.ExchangeHelper;
037 import org.apache.camel.util.MessageHelper;
038 import org.apache.camel.util.ObjectHelper;
039 import org.apache.camel.util.ServiceHelper;
040
041 /**
042 * Base redeliverable error handler that also supports a final dead letter queue in case
043 * all redelivery attempts fail.
044 * <p/>
045 * This implementation should contain all the error handling logic and the sub classes
046 * should only configure it according to what they support.
047 *
048 * @version $Revision: 21784 $
049 */
050 public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor {
051
052 private static ScheduledExecutorService executorService;
053 protected final CamelContext camelContext;
054 protected final Processor deadLetter;
055 protected final String deadLetterUri;
056 protected final Processor output;
057 protected final AsyncProcessor outputAsync;
058 protected final Processor redeliveryProcessor;
059 protected final RedeliveryPolicy redeliveryPolicy;
060 protected final Predicate handledPolicy;
061 protected final Predicate retryWhilePolicy;
062 protected final Logger logger;
063 protected final boolean useOriginalMessagePolicy;
064
065 /**
066 * Contains the current redelivery data
067 */
068 protected class RedeliveryData {
069 boolean sync = true;
070 int redeliveryCounter;
071 long redeliveryDelay;
072 Predicate retryWhilePredicate = retryWhilePolicy;
073 boolean redeliverFromSync;
074
075 // default behavior which can be overloaded on a per exception basis
076 RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
077 Processor deadLetterProcessor = deadLetter;
078 Processor failureProcessor;
079 Processor onRedeliveryProcessor = redeliveryProcessor;
080 Predicate handledPredicate = handledPolicy;
081 Predicate continuedPredicate;
082 boolean useOriginalInMessage = useOriginalMessagePolicy;
083 boolean asyncDelayedRedelivery = redeliveryPolicy.isAsyncDelayedRedelivery();
084 }
085
086 /**
087 * Tasks which performs asynchronous redelivery attempts, and being triggered by a
088 * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
089 * has to be delayed before a redelivery attempt is performed.
090 */
091 private class AsyncRedeliveryTask implements Callable<Boolean> {
092
093 private final Exchange exchange;
094 private final AsyncCallback callback;
095 private final RedeliveryData data;
096
097 public AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
098 this.exchange = exchange;
099 this.callback = callback;
100 this.data = data;
101 }
102
103 public Boolean call() throws Exception {
104 // prepare for redelivery
105 prepareExchangeForRedelivery(exchange);
106
107 // letting onRedeliver be executed at first
108 deliverToOnRedeliveryProcessor(exchange, data);
109
110 if (log.isTraceEnabled()) {
111 log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " -> " + outputAsync + " for Exchange: " + exchange);
112 }
113
114 // emmit event we are doing redelivery
115 EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
116
117 // process the exchange (also redelivery)
118 boolean sync;
119 if (data.redeliverFromSync) {
120 // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from
121 // this error handler, which means we have to invoke the callback with false, to have the callback
122 // be notified when we are done
123 sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() {
124 public void done(boolean doneSync) {
125 if (log.isTraceEnabled()) {
126 log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " done sync: " + doneSync);
127 }
128
129 // mark we are in sync mode now
130 data.sync = false;
131
132 // only process if the exchange hasn't failed
133 // and it has not been handled by the error processor
134 if (isDone(exchange)) {
135 callback.done(false);
136 return;
137 }
138
139 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
140 processAsyncErrorHandler(exchange, callback, data);
141 }
142 });
143 } else {
144 // this redelivery task was scheduled from asynchronous, which means we should only
145 // handle when the asynchronous task was done
146 sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() {
147 public void done(boolean doneSync) {
148 if (log.isTraceEnabled()) {
149 log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " done sync: " + doneSync);
150 }
151
152 // this callback should only handle the async case
153 if (doneSync) {
154 return;
155 }
156
157 // mark we are in async mode now
158 data.sync = false;
159
160 // only process if the exchange hasn't failed
161 // and it has not been handled by the error processor
162 if (isDone(exchange)) {
163 callback.done(doneSync);
164 return;
165 }
166 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
167 processAsyncErrorHandler(exchange, callback, data);
168 }
169 });
170 }
171
172 return sync;
173 }
174 }
175
176 public RedeliveryErrorHandler(CamelContext camelContext, Processor output, Logger logger, Processor redeliveryProcessor,
177 RedeliveryPolicy redeliveryPolicy, Predicate handledPolicy, Processor deadLetter,
178 String deadLetterUri, boolean useOriginalMessagePolicy, Predicate retryWhile) {
179 ObjectHelper.notNull(camelContext, "CamelContext", this);
180 ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this);
181
182 this.camelContext = camelContext;
183 this.redeliveryProcessor = redeliveryProcessor;
184 this.deadLetter = deadLetter;
185 this.output = output;
186 this.outputAsync = AsyncProcessorTypeConverter.convert(output);
187 this.redeliveryPolicy = redeliveryPolicy;
188 this.logger = logger;
189 this.deadLetterUri = deadLetterUri;
190 this.handledPolicy = handledPolicy;
191 this.useOriginalMessagePolicy = useOriginalMessagePolicy;
192 this.retryWhilePolicy = retryWhile;
193 }
194
195 public boolean supportTransacted() {
196 return false;
197 }
198
199 public void process(Exchange exchange) throws Exception {
200 if (output == null) {
201 // no output then just return
202 return;
203 }
204 AsyncProcessorHelper.process(this, exchange);
205 }
206
207 public boolean process(Exchange exchange, final AsyncCallback callback) {
208 return processErrorHandler(exchange, callback, new RedeliveryData());
209 }
210
211 /**
212 * Process the exchange using redelivery error handling.
213 */
214 protected boolean processErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
215
216 // use looping to have redelivery attempts
217 while (true) {
218
219 // can we still run
220 if (!isRunAllowed()) {
221 if (exchange.getException() == null) {
222 exchange.setException(new RejectedExecutionException());
223 }
224 // we cannot process so invoke callback
225 callback.done(data.sync);
226 return data.sync;
227 }
228
229 // did previous processing cause an exception?
230 boolean handle = shouldHandleException(exchange);
231 if (handle) {
232 handleException(exchange, data);
233 }
234
235 // compute if we should redeliver or not
236 boolean shouldRedeliver = shouldRedeliver(exchange, data);
237 if (!shouldRedeliver) {
238 // no we should not redeliver to the same output so either try an onException (if any given)
239 // or the dead letter queue
240 Processor target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
241 // deliver to the failure processor (either an on exception or dead letter queue
242 boolean sync = deliverToFailureProcessor(target, exchange, data, callback);
243 // we are breaking out
244 return sync;
245 }
246
247 if (data.redeliveryCounter > 0) {
248 // calculate delay
249 data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
250
251 if (data.redeliveryDelay > 0) {
252 // okay there is a delay so create a scheduled task to have it executed in the future
253
254 if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) {
255 // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
256 // have it being executed in the future, or immediately
257 // we are continuing asynchronously
258
259 // mark we are routing async from now and that this redelivery task came from a synchronous routing
260 data.sync = false;
261 data.redeliverFromSync = true;
262 AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
263
264 // schedule the redelivery task
265 if (log.isTraceEnabled()) {
266 log.trace("Scheduling redelivery task to run in " + data.redeliveryDelay + " millis for exchangeId: " + exchange.getExchangeId());
267 }
268 executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
269
270 return false;
271 } else {
272 // async delayed redelivery was disabled or we are transacted so we must be synchronous
273 // as the transaction manager requires to execute in the same thread context
274 try {
275 data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
276 } catch (InterruptedException e) {
277 // we was interrupted so break out
278 exchange.setException(e);
279 // mark the exchange to stop continue routing when interrupted
280 // as we do not want to continue routing (for example a task has been cancelled)
281 exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
282 callback.done(data.sync);
283 return data.sync;
284 }
285 }
286 }
287
288 // prepare for redelivery
289 prepareExchangeForRedelivery(exchange);
290
291 // letting onRedeliver be executed
292 deliverToOnRedeliveryProcessor(exchange, data);
293
294 // emmit event we are doing redelivery
295 EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
296 }
297
298 // process the exchange (also redelivery)
299 boolean sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() {
300 public void done(boolean sync) {
301 // this callback should only handle the async case
302 if (sync) {
303 return;
304 }
305
306 // mark we are in async mode now
307 data.sync = false;
308
309 // if we are done then notify callback and exit
310 if (isDone(exchange)) {
311 callback.done(sync);
312 return;
313 }
314
315 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
316 // method which takes care of this in a asynchronous manner
317 processAsyncErrorHandler(exchange, callback, data);
318 }
319 });
320
321 if (!sync) {
322 // the remainder of the Exchange is being processed asynchronously so we should return
323 return false;
324 }
325 // we continue to route synchronously
326
327 // if we are done then notify callback and exit
328 boolean done = isDone(exchange);
329 if (done) {
330 callback.done(true);
331 return true;
332 }
333
334 // error occurred so loop back around.....
335 }
336 }
337
338 /**
339 * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback.
340 * <p/>
341 * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use
342 * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b>
343 * in terms of logic.
344 */
345 protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
346 // can we still run
347 if (!isRunAllowed()) {
348 if (exchange.getException() == null) {
349 exchange.setException(new RejectedExecutionException());
350 }
351 callback.done(data.sync);
352 return;
353 }
354
355 // did previous processing cause an exception?
356 boolean handle = shouldHandleException(exchange);
357 if (handle) {
358 handleException(exchange, data);
359 }
360
361 // compute if we should redeliver or not
362 boolean shouldRedeliver = shouldRedeliver(exchange, data);
363 if (!shouldRedeliver) {
364 // no we should not redeliver to the same output so either try an onException (if any given)
365 // or the dead letter queue
366 Processor target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
367 // deliver to the failure processor (either an on exception or dead letter queue
368 deliverToFailureProcessor(target, exchange, data, callback);
369 // we are breaking out
370 return;
371 }
372
373 if (data.redeliveryCounter > 0) {
374 // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
375 // have it being executed in the future, or immediately
376 // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously
377 // to ensure the callback will continue routing from where we left
378 AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
379
380 // calculate the redelivery delay
381 data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
382 if (data.redeliveryDelay > 0) {
383 // schedule the redelivery task
384 if (log.isTraceEnabled()) {
385 log.trace("Scheduling redelivery task to run in " + data.redeliveryDelay + " millis for exchangeId: " + exchange.getExchangeId());
386 }
387 executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
388 } else {
389 // execute the task immediately
390 executorService.submit(task);
391 }
392 }
393 }
394
395 /**
396 * Strategy whether the exchange has an exception that we should try to handle.
397 * <p/>
398 * Standard implementations should just look for an exception.
399 */
400 protected boolean shouldHandleException(Exchange exchange) {
401 return exchange.getException() != null;
402 }
403
404 /**
405 * Strategy to determine if the exchange is done so we can continue
406 */
407 protected boolean isDone(Exchange exchange) {
408 boolean answer = isCancelledOrInterrupted(exchange);
409
410 // only done if the exchange hasn't failed
411 // and it has not been handled by the failure processor
412 // or we are exhausted
413 if (!answer) {
414 answer = exchange.getException() == null
415 || ExchangeHelper.isFailureHandled(exchange)
416 || ExchangeHelper.isRedeliveryExhausted(exchange);
417 }
418
419 if (log.isTraceEnabled()) {
420 log.trace("Is exchangeId: " + exchange.getExchangeId() + " done? " + answer);
421 }
422 return answer;
423 }
424
425 /**
426 * Strategy to determine if the exchange was cancelled or interrupted
427 */
428 protected boolean isCancelledOrInterrupted(Exchange exchange) {
429 boolean answer = false;
430
431 if (ExchangeHelper.isInterrupted(exchange)) {
432 // mark the exchange to stop continue routing when interrupted
433 // as we do not want to continue routing (for example a task has been cancelled)
434 exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
435 answer = true;
436 }
437
438 if (log.isTraceEnabled()) {
439 log.trace("Is exchangeId: " + exchange.getExchangeId() + " interrupted? " + answer);
440 }
441 return answer;
442 }
443
444 /**
445 * Returns the output processor
446 */
447 public Processor getOutput() {
448 return output;
449 }
450
451 /**
452 * Returns the dead letter that message exchanges will be sent to if the
453 * redelivery attempts fail
454 */
455 public Processor getDeadLetter() {
456 return deadLetter;
457 }
458
459 public String getDeadLetterUri() {
460 return deadLetterUri;
461 }
462
463 public boolean isUseOriginalMessagePolicy() {
464 return useOriginalMessagePolicy;
465 }
466
467 public RedeliveryPolicy getRedeliveryPolicy() {
468 return redeliveryPolicy;
469 }
470
471 public Logger getLogger() {
472 return logger;
473 }
474
475 protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data) {
476 Exception caught = exchange.getException();
477
478 // continue is a kind of redelivery so reuse the logic to prepare
479 prepareExchangeForRedelivery(exchange);
480 // its continued then remove traces of redelivery attempted and caught exception
481 exchange.getIn().removeHeader(Exchange.REDELIVERED);
482 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
483 // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception
484
485 // create log message
486 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId();
487 msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
488 msg = msg + ". Handled and continue routing.";
489
490 // log that we failed but want to continue
491 logFailedDelivery(false, false, true, exchange, msg, data, null);
492 }
493
494 protected void prepareExchangeForRedelivery(Exchange exchange) {
495 // okay we will give it another go so clear the exception so we can try again
496 exchange.setException(null);
497
498 // clear rollback flags
499 exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
500
501 // reset cached streams so they can be read again
502 MessageHelper.resetStreamCache(exchange.getIn());
503 }
504
505 protected void handleException(Exchange exchange, RedeliveryData data) {
506 Exception e = exchange.getException();
507
508 // store the original caused exception in a property, so we can restore it later
509 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
510
511 // find the error handler to use (if any)
512 OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
513 if (exceptionPolicy != null) {
514 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
515 data.handledPredicate = exceptionPolicy.getHandledPolicy();
516 data.continuedPredicate = exceptionPolicy.getContinuedPolicy();
517 data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy();
518 data.useOriginalInMessage = exceptionPolicy.isUseOriginalMessage();
519 data.asyncDelayedRedelivery = exceptionPolicy.isAsyncDelayedRedelivery();
520
521 // route specific failure handler?
522 Processor processor = exceptionPolicy.getErrorHandler();
523 if (processor != null) {
524 data.failureProcessor = processor;
525 }
526 // route specific on redelivery?
527 processor = exceptionPolicy.getOnRedelivery();
528 if (processor != null) {
529 data.onRedeliveryProcessor = processor;
530 }
531 }
532
533 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
534 + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
535 logFailedDelivery(true, false, false, exchange, msg, data, e);
536
537 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
538 }
539
540 /**
541 * Gives an optional configure redelivery processor a chance to process before the Exchange
542 * will be redelivered. This can be used to alter the Exchange.
543 */
544 protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
545 if (data.onRedeliveryProcessor == null) {
546 return;
547 }
548
549 if (log.isTraceEnabled()) {
550 log.trace("Redelivery processor " + data.onRedeliveryProcessor + " is processing Exchange: " + exchange
551 + " before its redelivered");
552 }
553
554 // run this synchronously as its just a Processor
555 try {
556 data.onRedeliveryProcessor.process(exchange);
557 } catch (Throwable e) {
558 exchange.setException(e);
559 }
560 log.trace("Redelivery processor done");
561 }
562
563 /**
564 * All redelivery attempts failed so move the exchange to the dead letter queue
565 */
566 protected boolean deliverToFailureProcessor(final Processor processor, final Exchange exchange,
567 final RedeliveryData data, final AsyncCallback callback) {
568 boolean sync = true;
569
570 Exception caught = exchange.getException();
571
572 // we did not success with the redelivery so now we let the failure processor handle it
573 // clear exception as we let the failure processor handle it
574 exchange.setException(null);
575
576 boolean handled = false;
577 // regard both handled or continued as being handled
578 if (shouldHandled(exchange, data) || shouldContinue(exchange, data)) {
579 // its handled then remove traces of redelivery attempted
580 exchange.getIn().removeHeader(Exchange.REDELIVERED);
581 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
582 handled = true;
583 } else {
584 // must decrement the redelivery counter as we didn't process the redelivery but is
585 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
586 decrementRedeliveryCounter(exchange);
587 }
588
589 // is the a failure processor to process the Exchange
590 if (processor != null) {
591
592 // reset cached streams so they can be read again
593 MessageHelper.resetStreamCache(exchange.getIn());
594
595 // prepare original IN body if it should be moved instead of current body
596 if (data.useOriginalInMessage) {
597 if (log.isTraceEnabled()) {
598 log.trace("Using the original IN message instead of current");
599 }
600
601 Message original = exchange.getUnitOfWork().getOriginalInMessage();
602 exchange.setIn(original);
603 }
604
605 if (log.isTraceEnabled()) {
606 log.trace("Failure processor " + processor + " is processing Exchange: " + exchange);
607 }
608
609 // store the last to endpoint as the failure endpoint
610 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
611
612 // the failure processor could also be asynchronous
613 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(processor);
614 sync = AsyncProcessorHelper.process(afp, exchange, new AsyncCallback() {
615 public void done(boolean sync) {
616 if (log.isTraceEnabled()) {
617 log.trace("Failure processor done: " + processor + " processing Exchange: " + exchange);
618 }
619 try {
620 prepareExchangeAfterFailure(exchange, data);
621 // fire event as we had a failure processor to handle it, which there is a event for
622 boolean deadLetterChannel = processor == data.deadLetterProcessor && data.deadLetterProcessor != null;
623 EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel);
624 } finally {
625 // if the fault was handled asynchronously, this should be reflected in the callback as well
626 data.sync &= sync;
627 callback.done(data.sync);
628 }
629 }
630 });
631 } else {
632 try {
633 // no processor but we need to prepare after failure as well
634 prepareExchangeAfterFailure(exchange, data);
635 } finally {
636 // callback we are done
637 callback.done(data.sync);
638 }
639 }
640
641 // create log message
642 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId();
643 msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
644 if (processor != null) {
645 msg = msg + ". Processed by failure processor: " + processor;
646 }
647
648 // log that we failed delivery as we are exhausted
649 logFailedDelivery(false, handled, false, exchange, msg, data, null);
650
651 return sync;
652 }
653
654 protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data) {
655 // we could not process the exchange so we let the failure processor handled it
656 ExchangeHelper.setFailureHandled(exchange);
657
658 // honor if already set a handling
659 boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
660 if (alreadySet) {
661 boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
662 if (log.isTraceEnabled()) {
663 log.trace("This exchange has already been marked for handling: " + handled);
664 }
665 if (handled) {
666 exchange.setException(null);
667 } else {
668 // exception not handled, put exception back in the exchange
669 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
670 // and put failure endpoint back as well
671 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
672 }
673 return;
674 }
675
676 if (shouldHandled(exchange, data)) {
677 if (log.isTraceEnabled()) {
678 log.trace("This exchange is handled so its marked as not failed: " + exchange);
679 }
680 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
681 } else if (shouldContinue(exchange, data)) {
682 if (log.isTraceEnabled()) {
683 log.trace("This exchange is continued: " + exchange);
684 }
685 // okay we want to continue then prepare the exchange for that as well
686 prepareExchangeForContinue(exchange, data);
687 } else {
688 if (log.isTraceEnabled()) {
689 log.trace("This exchange is not handled or continued so its marked as failed: " + exchange);
690 }
691 // exception not handled, put exception back in the exchange
692 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
693 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
694 // and put failure endpoint back as well
695 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
696 }
697 }
698
699 private void logFailedDelivery(boolean shouldRedeliver, boolean handled, boolean continued, Exchange exchange, String message, RedeliveryData data, Throwable e) {
700 if (logger == null) {
701 return;
702 }
703
704 if (handled && !data.currentRedeliveryPolicy.isLogHandled()) {
705 // do not log handled
706 return;
707 }
708
709 if (continued && !data.currentRedeliveryPolicy.isLogContinued()) {
710 // do not log handled
711 return;
712 }
713
714 if (shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) {
715 // do not log retry attempts
716 return;
717 }
718
719 if (!shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) {
720 // do not log exhausted
721 return;
722 }
723
724 LoggingLevel newLogLevel;
725 boolean logStrackTrace;
726 if (shouldRedeliver) {
727 newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
728 logStrackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace();
729 } else {
730 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
731 logStrackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
732 }
733 if (e == null) {
734 e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
735 }
736
737 if (exchange.isRollbackOnly()) {
738 String msg = "Rollback exchange";
739 if (exchange.getException() != null) {
740 msg = msg + " due: " + exchange.getException().getMessage();
741 }
742 if (newLogLevel == LoggingLevel.ERROR || newLogLevel == LoggingLevel.FATAL) {
743 // log intended rollback on maximum WARN level (no ERROR or FATAL)
744 logger.log(msg, LoggingLevel.WARN);
745 } else {
746 // otherwise use the desired logging level
747 logger.log(msg, newLogLevel);
748 }
749 } else if (e != null && logStrackTrace) {
750 logger.log(message, e, newLogLevel);
751 } else {
752 logger.log(message, newLogLevel);
753 }
754 }
755
756 /**
757 * Determines whether or not to we should try to redeliver
758 *
759 * @param exchange the current exchange
760 * @param data the redelivery data
761 * @return <tt>true</tt> to redeliver, or <tt>false</tt> to exhaust.
762 */
763 private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) {
764 // if marked as rollback only then do not redeliver
765 boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class);
766 if (rollbackOnly) {
767 if (log.isTraceEnabled()) {
768 log.trace("This exchange is marked as rollback only, should not be redelivered: " + exchange);
769 }
770 return false;
771 }
772 return data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate);
773 }
774
775 /**
776 * Determines whether or not to continue if we are exhausted.
777 *
778 * @param exchange the current exchange
779 * @param data the redelivery data
780 * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust.
781 */
782 private boolean shouldContinue(Exchange exchange, RedeliveryData data) {
783 if (data.continuedPredicate != null) {
784 return data.continuedPredicate.matches(exchange);
785 }
786 // do not continue by default
787 return false;
788 }
789
790 /**
791 * Determines whether or not to handle if we are exhausted.
792 *
793 * @param exchange the current exchange
794 * @param data the redelivery data
795 * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust.
796 */
797 private boolean shouldHandled(Exchange exchange, RedeliveryData data) {
798 if (data.handledPredicate != null) {
799 return data.handledPredicate.matches(exchange);
800 }
801 // do not handle by default
802 return false;
803 }
804
805 /**
806 * Increments the redelivery counter and adds the redelivered flag if the
807 * message has been redelivered
808 */
809 private int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
810 Message in = exchange.getIn();
811 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
812 int next = 1;
813 if (counter != null) {
814 next = counter + 1;
815 }
816 in.setHeader(Exchange.REDELIVERY_COUNTER, next);
817 in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
818 return next;
819 }
820
821 /**
822 * Prepares the redelivery counter and boolean flag for the failure handle processor
823 */
824 private void decrementRedeliveryCounter(Exchange exchange) {
825 Message in = exchange.getIn();
826 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
827 if (counter != null) {
828 int prev = counter - 1;
829 in.setHeader(Exchange.REDELIVERY_COUNTER, prev);
830 // set boolean flag according to counter
831 in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
832 } else {
833 // not redelivered
834 in.setHeader(Exchange.REDELIVERY_COUNTER, 0);
835 in.setHeader(Exchange.REDELIVERED, Boolean.FALSE);
836 }
837 }
838
839 @Override
840 protected void doStart() throws Exception {
841 ServiceHelper.startServices(output, outputAsync, deadLetter);
842 // use a shared scheduler
843 if (executorService == null || executorService.isShutdown()) {
844 // camel context will shutdown the executor when it shutdown so no need to shut it down when stopping
845 executorService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "ErrorHandlerRedeliveryTask");
846 }
847 }
848
849 @Override
850 protected void doStop() throws Exception {
851 ServiceHelper.stopServices(deadLetter, output, outputAsync);
852 }
853
854 }