Implementing Routes

Version 2.0

July 2011
Legal Notices

Updated: 19 Aug 2011

Table of Contents

I. General Routing Concepts
1. Route Building Blocks
Router DSL
Message exchanges
Languages for expressions and predicates
2. Pipeline Processing
3. Multiple Inputs
Multiple independent inputs
Segmented routes
4. Exception Handling
onException Clause
doTry, doCatch, and doFinally
5. Bean Integration
Accessing beans in a route
Initializing parameters using method signatures
Initializing parameters using annotations
6. Aspect Oriented Programming
7. Transforming Message Content
Simple transformations
Marshalling and unmarshalling
8. Property Placeholders
9. Threading Model
10. Controlling Start-Up and Shutdown of Routes
Changing route start behavior
Changing route shutdown
11. Scheduled Route Policy
Overview of Scheduled Route Policies
Simple Scheduled Route Policy
Cron Scheduled Route Policy
II. Enterprise Integration Patterns
12. Introducing Enterprise Integration Patterns
13. Messaging Systems
Message Channel
Message Endpoint
Pipes and Filters
Message Router
Message Translator
14. Messaging Channels
Point-to-Point Channel
Publish-Subscribe Channel
Dead Letter Channel
Guaranteed Delivery
Message Bus
15. Message Construction
Correlation Identifier
Return Address
16. Message Routing
Content-Based Router
Message Filter
Recipient List
Routing Slip
Load Balancer
Composed Message Processor
Dynamic Router
17. Message Transformation
Content Enricher
Content Filter
Claim Check
18. Messaging Endpoints
Messaging Mapper
Event Driven Consumer
Polling Consumer
Competing Consumers
Message Dispatcher
Selective Consumer
Durable Subscriber
Idempotent Consumer
Transactional Client
Messaging Gateway
Service Activator
19. System Management
Wire Tap
III. Appendices
A. Scheduled Route Policy Properties
Simple Scheduled Route Policy
Cron Scheduled Route Policy

List of Figures

1.1. Basic route diagram
2.1. Processor Modifying an In Message
2.2. Processor Creating an Out Message
2.3. Sample Pipeline for InOnly Exchanges
2.4. Sample Pipeline for InOut Exchanges
3.1. Processing Multiple Inputs with Segmented Routes
13.1. Message Pattern
13.2. Message Channel Pattern
13.3. Message Endpoint Pattern
13.4. Pipes and Filters Pattern
13.5. Pipeline for InOut Exchanges
13.6. Pipeline for InOnly Exchanges
13.7. Message Router Pattern
13.8. Message Translator Pattern
14.1. Point to Point Channel Pattern
14.2. Publish Subscribe Channel Pattern
14.3. Dead Letter Channel Pattern
14.4. Guaranteed Delivery Pattern
14.5. Message Bus Pattern
15.1. Correlation Identifier Pattern
16.1. Content-Based Router Pattern
16.2. Message Filter Pattern
16.3. Recipient List Pattern
16.4. Splitter Pattern
16.5. Aggregator Pattern
16.6. Aggregator Implementation
16.7. Recoverable Aggregation Repository
16.8. Resequencer Pattern
16.9. Routing Slip Pattern
16.10. Multicast Pattern
16.11. Composed Message Processor Pattern
16.12. Scatter-Gather Pattern
16.13. Dynamic Router Pattern
17.1. Content Enricher Pattern
17.2. Content Filter Pattern
17.3. Normalizer Pattern
17.4. Claim Check Pattern
18.1. Event Driven Consumer Pattern
18.2. Polling Consumer Pattern
18.3. Competing Consumers Pattern
18.4. Message Dispatcher Pattern
18.5. Selective Consumer Pattern
18.6. Durable Subscriber Pattern
18.7. Transactional Client Pattern
18.8. Messaging Gateway Pattern
18.9. Service Activator Pattern
19.1. Wire Tap Pattern

List of Tables

1.1. Apache Camel processors
1.2. Elements for Expression and Predicate Languages
5.1. Basic bean annotations
5.2. Expression language annotations
7.1. Simple transformation patterns
9.1. Processor Threading Options
9.2. Default Thread Pool Profile Settings
9.3. Thread Pool Builder Options
12.1. Messaging Systems
12.2. Messaging Channels
12.3. Message Construction
12.4. Message Routing
12.5. Message Transformation
12.6. Messaging Endpoints
12.7. System Management
14.1. Redelivery Policy Settings
14.2. Dead Letter Redelivery Headers
16.1. Aggregated Exchange Properties
16.2. Redelivered Exchange Properties
16.3. Aggregator Options
16.4. Batch Resequencer Options
16.5. Failover load balancing options
16.6. Weighted options

List of Examples

1.1. Specifying the DSL schema location
1.2. A simple route
1.3. Simple filter
1.4. Adding HTTP component dependency
1.5. JMS consumer endpoint
1.6. JMS producer endpoint
1.7. 45 minute timer
1.8. Timers
1.9. Simple choice processor
1.10. Filter processor
1.11. Simple throttler
1.12. Implementing a custom processor
1.13. Invoking a custom processor
2.1. Processor that modifies the In Message
2.2. Processor the creates an out message
2.3. InOnly pipeline
2.4. InOut pipline
2.5. Alternate pipeline syntax
3.1. Multiple independent inputs
3.2. Separate routes
3.3. Segmented route using a direct endpoint
3.4. Segmented route using a SEDA endpoint
3.5. Segmented route using a VM endpoint
4.1. onException clause syntax
4.2. Simple exception trap
4.3. Series of onException clauses
4.4. Trapping multiple exceptions with a single clause
4.5. Using the original message in an onException clause
4.6. Setting the maximum number of redelivery tries
4.7. Specifying the redelivery policy as a bean
4.8. Controlling redelivery with retryWhile
4.9. Using onWhen to control exception trapping
4.10. Suppressing the rethrowing of an exception
4.11. Continuing route processing after an exception
4.12. Sending a respones from onException
4.13. Route scoped onException clause
4.14. Catching exceptions with a try, catch, finally
4.15. Rethrowing an exception in a doCatch
4.16. Conditional exception catching
5.1. Using a Java bean
5.2. Bean method for processing a message body
5.3. Bean method for processing an exchange
5.4. Using basic annotations to inject data into a bean
5.5. Using XPath to inject data into a bean
5.6. Using the @Bean annotation to inject data into a bean
6.1. Route using AOP
7.1. Simple transformation of messages
7.2. Unmarshalling from Java
7.3. Unmarshalling JAXB data
7.4. Unmarshalling from XMLBeans
8.1. Sample Property File
10.1. Disabling route auto start up
10.2. Startup order
10.3. Shutting down a file consumer
11.1. Simple scheduled route policy
11.2. Simple scheduled route
11.3. Cron scheduled route policy
11.4. Simple scheduled route
16.1. Messaging Client Sample
16.2. Looping based on a constant
16.3. Looping based on an expression
18.1. Filtering Duplicate Messages with an In-memory Cache

The DSL to defines routes in a camelContext element. Figure 1.1 shows an overview of the basic syntax for defining a route.

A route always starts with a route element. A camelContext can contain multiple route definitions.

The first child of the route element is a from element, which specifies the source of messages (the consumer endpoint) for the route. The endpoint is configured using the required uri attribute.


You can also define a route by starting the rule with a special processor type (such as intercept, exception, or errorHandler).

The body of the route is made up of an arbitrarily long chain of processors. The filter in the diagram is an example of a processor. Processors usually include predicates that describe the action performed by the processor.

A route typically ends with a to element, which specifies the target for the messages (the producer endpoint) that pass through the route. The endpoint is configured using the required uri attribute.


It is not always necessary to end a route with a to element. There are alternative ways of specifying the message target in a route.

Example 1.2 shows the XML for a simple route that includes a filter. The route receives messages from the endpoint seda:x1 and filters them using an XPath expression. If the message meets the filters criteria it is sent to the endpoint seda:x1. If it does not, it is dropped.

The main parts of an Exchange object are:

Each URI scheme maps to a Apache Camel component. A Apache Camel component is an endpoint factory. To use a particular type of endpoint, you must deploy the corresponding Apache Camel component along with the route. For example, to use JMS endpoints, you would deploy the JMS component.

Apache Camel provides a large variety of different components that enable you to integrate your application with various transport protocols and third-party products. For example, some of the more commonly used components are: File, JMS, CXF (Web services), HTTP, Jetty, Direct, and Mock. For the full list of supported components, see Component Reference.

Most of the Apache Camel components are packaged separately to the Camel core. If you use Maven to build your application, you can easily add a component (and its third-party dependencies) to your application simply by adding a dependency on the relevant component artifact. Example 1.4 shows the dependency needed to include the HTTP component to a project.

The following components are built-in to the the camel-core artifact, so they are always available:

  • Bean

  • Browse

  • Dataset

  • Direct

  • File

  • Log

  • Mock

  • Properties

  • Ref

  • SEDA

  • Timer

  • VM

To enable the router to do something more interesting than simply connecting a consumer endpoint to a producer endpoint, you can add processors to your route. A processor is a node you can insert into a routing rule to perform arbitrary processing of messages that flow through the rule. Fuse Mediation Router provides a wide variety of different processors, as shown in Table 1.1.

Table 1.1. Apache Camel processors

XML DSLDescription
aggregateAggregator EIP: Creates an aggregator, which combines multiple incoming exchanges into a single exchange.
aopUse Aspect Oriented Programming (AOP) to do work before and after a specified sub-route. See Aspect Oriented Programming.
beanProcess the current exchange by invoking a method on a Java object (or bean). See Bean Integration.
choiceContent Based Router EIP: Selects a particular sub-route based on the exchange content, using when and otherwise clauses.
convertBodyToConverts the In message body to the specified type.
delayDelayer EIP: Delays the propagation of the exchange to the latter part of the route.
doTryCreates a try/catch block for handling exceptions, using doCatch, doFinally, and end clauses.
enrichContent Enricher EIP: Combines the current exchange with data requested from a specified producer endpoint URI.
filterMessage Filter EIP: Uses a predicate expression to filter incoming exchanges.
idempotentConsumerIdempotent Consumer EIP: Implements a strategy to suppress duplicate messages.
@inheritErrorHandlerBoolean option that can be used to disable the inherited error handler on a particular route node.
inOnlyEither sets the current exchange's MEP to InOnly (if no arguments) or sends the exchange as an InOnly to the specified endpoint(s).
inOutEither sets the current exchange's MEP to InOut (if no arguments) or sends the exchange as an InOut to the specified endpoint(s).
loadBalanceLoad Balancer EIP: Implements load balancing over a collection of endpoints.
logLogs a message to the console.
loopLoop EIP: Repeatedly resends each exchange to the latter part of the route.
@markRollbackOnly(Transactions) Marks the current transaction for rollback only (no exception is raised). This option is set as a boolean attribute on the rollback element.
@markRollbackOnlyLast(Transactions) If one or more transactions have previously been associated with this thread and then suspended, this command marks the latest transaction for rollback only (no exception is raised). This option is set as a boolean attribute on the rollback element.
marshalTransforms into a low-level or binary format using the specified data format, in preparation for sending over a particular transport protocol. See Marshalling and unmarshalling.
multicastMulticast EIP: Multicasts the current exchange to multiple destinations, where each destination gets its own copy of the exchange.
onCompletionDefines a sub-route that gets executed after the main route has completed. For conditional execution, use the onWhen sub-clause. Can also be defined on its own line (not in a route).
onExceptionDefines a sub-route that gets executed whenever the specified exception occurs. Usually not defined as part of a route.
pipelinePipes and Filters EIP: Sends the exchange to a series of endpoints, where the output of one endpoint becomes the input of the next endpoint. See also Pipeline Processing.
policyApply a policy to the current route (currently only used for transactional policies).
pollEnrichContent Enricher EIP: Combines the current exchange with data polled from a specified consumer endpoint URI.
processExecute a custom processor on the current exchange. See Custom processor.
recipientListRecipient List EIP: Sends the exchange to a list of recipients that is calculated at runtime (for example, based on the contents of a header).
removeHeaderRemoves the specified header from the exchange's In message.
removeHeadersRemoves the headers matching the specified pattern from the exchange's In message. The pattern can have the form, prefix*—in which case it matches every name starting with prefix—otherwise, it is interpreted as a regular expression.
removePropertyRemoves the specified exchange property from the exchange.

Resequencer EIP: Re-orders incoming exchanges on the basis of a specified comparotor operation. Supports a batch mode and a stream mode.

rollback(Transactions) Marks the current transaction for rollback only (also raising an exception, by default).
routingSlipRouting Slip EIP: Routes the exchange through a pipeline that is constructed dynamically, based on the list of endpoint URIs extracted from a slip header.
sampleCreates a sampling throttler, allowing you to extract a sample of exchanges from the traffic on a route.
setBodySets the message body of the exchange's In message.
setExchangePatternSets the current exchange's MEP to the specified value. See Message exchange patterns.
setHeaderSets the specified header in the exchange's In message.
setOutHeaderSets the specified header in the exchange's Out message.
setPropertySets the specified exchange property.
sortSorts the contents of the In message body (where a custom comparator can optionally be specified).
splitSplitter EIP: Splits the current exchange into a sequence of exchanges, where each split exchange contains a fragment of the original message body.
stopStops routing the current exchange and marks it as completed.
threadsCreates a thread pool for concurrent processing of the latter part of the route.
throttleThrottler EIP: Limit the flow rate to the specified level (exchanges per second).
throwExceptionThrow the specified Java exception.
toSend the exchange to one or more endpoints. See Pipeline Processing.
transactedCreate a Spring transaction scope that encloses the latter part of the route.
transformMessage Translator EIP: Copy the In message headers to the Out message headers and set the Out message body to the specified value.
unmarshalTransforms the In message body from a low-level or binary format to a high-level format, using the specified data format. See Marshalling and unmarshalling.
validateTakes a predicate expression to test whether the current message is valid. If the predicate returns false, throws a PredicateValidationException exception.
wireTapWire Tap EIP: Sends a copy of the current exchange to the specified wire tap URI, using the ExchangePattern.InOnly MEP.

Table 1.2 lists the elements that you can insert whenever the context demands an expression or a predicate. The content of the element must be a script written in the relevant language. At runtime, the return value of the script is read by the parent element.

Table 1.2. Elements for Expression and Predicate Languages

simpleSimpleA simple expression language, native to Fuse Mediation Router.
xpathXPathThe XPath language, which is used to select element, attribute, and text nodes from XML documents (see The XPath expression is applied to the current message.
xqueryXQueryThe XQuery language, which is an extension of XPath (see The XQuery expression is applied to the current message.
sqlJoSQLThe JoSQL language, which is a language for extracting and manipulating data from collections of Java objects, using a SQL-like syntax (see
ognlOGNLThe OGNL (Object Graph Navigation Language) language (see
elELThe Unified Expression Language (EL), originally developed as part of the JSP standard (see
groovyGroovyThe Groovy scripting language (see
javaScriptJavaScriptThe JavaScript scripting language (see, also known as ECMAScript (see
phpPHPThe PHP scripting language (see
pythonPythonThe Python scripting language (see
rubyRubyThe Ruby scripting language (see
mvelMVELThe MVEL expression language (see
<... language="beanshell">BeanShellThe BeanShell scripting language (see
beanBeanNot really a language. The bean element is actually a mechanism for integrating with Java beans. You use the bean element to obtain an expression or predicate by invoking a method on a Java bean.

Figure 2.3 shows an example of a processor pipeline for InOnly exchanges. Processor A acts by modifying the In message, while processors B and C create an Out message. The route builder links the processors together as shown.

In particular, processors B and C are linked together in the form of a pipeline. Processor B's Out message is moved to the In message before feeding the exchange into processor C, and processor C's Out message is moved to the In message before feeding the exchange into the producer endpoint. Thus the processors' outputs and inputs are joined into a continuous pipeline, as shown in Figure 2.3.

Fuse Mediation Router employs the pipeline pattern by default, so you do not need to use any special syntax to create a pipeline in your routes. For example, Example 2.3 pulls messages from a userdataQueue queue, pipes the message through a Velocity template (to produce a customer address in text format), and then sends the resulting text address to the queue, envelopeAddressQueue.

Where the Velocity endpoint, velocity:file:AdressTemplate.vm, specifies the location of a Velocity template file, file:AdressTemplate.vm, in the file system. The setExchangePattern command changes the exchange pattern to InOut before sending the exchange to the Velocity endpoint and then changes it back to InOnly afterwards. For more details of the Velocity endpoint, see Velocity in Component Reference.

Figure 2.4 shows an example of a processor pipeline for InOut exchanges, which you typically use to support remote procedure call (RPC) semantics. Processors A, B, and C are linked together in the form of a pipeline, with the output of each processor being fed into the input of the next. The final Out message produced by the producer endpoint is sent all the way back to the consumer endpoint, where it provides the reply to the original request.


In order to support the InOut exchange pattern, it is essential that the last node in the route creates an Out message. Otherwise, any client that connects to the consumer endpoint would hang and wait indefinitely for a reply message.

Example 2.4 shows a route that processes payment requests that are received via HTTP.

The incoming payment request is processed by passing it through a pipeline of Web services, cxf:bean:addAccountDetails, cxf:bean:getCreditRating, and cxf:bean:processTransaction. The final Web service, processTransaction, generates a response (Out message) that is sent back through the JETTY endpoint.

When the pipeline consists of just a sequence of endpoints, it is also possible to use the alternative syntax shown in Example 2.5.

The pipeline for InOptionalOut exchanges is essentially the same as the pipeline in Figure 2.4. The difference between InOut and InOptionalOut is that an exchange with the InOptionalOut exchange pattern is allowed to have a null Out message as a reply. That is, in the case of an InOptionalOut exchange, a null Out message is copied to the In message of the next node in the pipeline. By contrast, in the case of an InOut exchange, a null Out message is discarded and the original In message from the current node would be copied to the In message of the next node instead.

A standard route takes its input from just a single endpoint. But what if you need to define multiple inputs for your route? Apache Camel provides several alternatives for specifying multiple inputs to a route:

The approach you take depends on whether you want the exchanges to be processed independently of each other before being sent to a common destination. If you want the exchanges from different inputs to be combined in some way you should use the content enrichment pattern.

The direct component provides the simplest mechanism for linking together routes. The event model for the direct component is synchronous, so that subsequent segments of the route run in the same thread as the first segment. The general format of a direct URL is direct:EndpointID, where the endpoint ID, EndpointID, is simply a unique alphanumeric string that identifies the endpoint instance.

For example, if you want to take the input from two message queues, activemq:Nyse and activemq:Nasdaq, and merge them into a single message queue, activemq:USTxn, you can do this by defining a set of routes similar to Example 3.3.

Where the first two routes take the input from the message queues, Nyse and Nasdaq, and send them to the endpoint, direct:mergeTxns. The last queue combines the inputs from the previous two queues and sends the combined message stream to the activemq:USTxn queue.

The implementation of the direct endpoint behaves as follows: whenever an exchange arrives at a producer endpoint (for example, <to uri="direct:mergeTxns" />), the direct endpoint passes the exchange directly to all of the consumers endpoints that have the same endpoint ID (for example, <from uri="direct:mergeTxns" />). Direct endpoints can only be used to communicate between routes that belong to the same camelContext element.

The SEDA component provides an alternative mechanism for linking together routes. You can use it in a similar way to the direct component, but it has a different underlying event and threading model, as follows:

One of the main advantages of using a SEDA endpoint is that the routes can be more responsive, owing to the built-in consumer thread pool. The stock transactions example can be re-written to use SEDA endpoints instead of direct endpoints, as shown in Example 3.4.

The main difference between this example and the direct example is that when using SEDA, the second route segment (from seda:mergeTxns to activemq:USTxn) is processed by a pool of five threads.


There is more to SEDA than simply pasting together route segments. The staged event-driven architecture (SEDA) encompasses a design philosophy for building more manageable multi-threaded applications. The purpose of the SEDA component in Fuse Mediation Router is simply to enable you to apply this design philosophy to your applications. For more details about SEDA, see

For more details about exception handling, see Dead Letter Channel.

You can define multiple onException clauses to trap exceptions in a camelContext scope. This enables you to take different actions in response to different exceptions. For example, the series of onException clauses shown in Example 4.3 define different deadletter destinations for ValidationException, ValidationException, and Exception.

You can also group multiple exceptions together to be trapped by the same onException clause. You can group multiple exceptions as shown in Example 4.4.

When trapping multiple exceptions, the order of the onException clauses is significant. Fuse Mediation Router initially attempts to match the thrown exception against the first clause. If the first clause fails to match, the next onException clause is tried, and so on until a match is found. Each matching attempt is governed by the following algorithm:

  1. If the thrown exception is a chained exception (that is, where an exception has been caught and rethrown as a different exception), the most nested exception type serves initially as the basis for matching. This exception is tested as follows:

    1. If the exception-to-test has exactly the type specified in the onException clause (tested using instanceof), a match is triggered.

    2. If the exception-to-test is a sub-type of the type specified in the onException clause, a match is triggered.

  2. If the most nested exception fails to yield a match, the next exception in the chain (the wrapping exception) is tested instead. The testing continues up the chain until either a match is triggered or the chain is exhausted.

Instead of interrupting the processing of a message and giving up as soon as an exception is raised, Fuse Mediation Router gives you the option of attempting to redeliver the message at the point where the exception occurred. In networked systems, where timeouts can occur and temporary faults arise, it is often possible for failed messages to be processed successfully, if they are redelivered shortly after the original exception was raised.

The Fuse Mediation Router redelivery supports various strategies for redelivering messages after an exception occurs. Some of the most important options for configuring redelivery are as follows:


Specifies the maximum number of times redelivery can be attempted (default is 0). A negative value means redelivery is always attempted (equivalent to an infinite value).


Specifies, in milliseconds, the delay between redelivery attempts.

These properties, as long as the full range of redelivery properties are set using the Property Editor. The editor pane for the On Exception clause includes a section labeled Redelivery Policy that has fields for configuring the redelivery policy. See On Exception in Enterprise Integration Pattern Reference.

For example, you can specify a maximum of six redeliveries, after which the exchange is sent to the validationFailed deadletter queue, as shown in Example 4.6.

The latter part of the route—after the redelivery options are set—is not processed until after the last redelivery attempt has failed.

Optionally, you can define a Spring bean using the class org.apache.camel.processor.RedeliveryPolicy and reference it using the onRedeliveryRef attribute as shown in Example 4.7.


Defining the redelivery policy using a bean is useful if you want to re-use the same redelivery policy in multiple onException clauses.

By default, when an exception is raised in the middle of a route, processing of the current exchange is interrupted and the thrown exception is propagated back to the consumer endpoint at the start of the route. When an onException clause is triggered, the behavior is essentially the same, except that the onException clause performs some processing before the thrown exception is propagated back.

But this default behavior is not the only way to handle an exception. The onException provides various options to modify the exception handling behavior, as follows:

  • Suppressing exception rethrow—you have the option of suppressing the rethrown exception after the onException clause has completed. In other words, in this case the exception does not propagate back to the consumer endpoint at the start of the route.

  • Continuing processing—you have the option of resuming normal processing of the exchange from the point where the exception originally occurred. Implicitly, this approach also suppresses the rethrown exception.

  • Sending a response—in the special case where the consumer endpoint at the start of the route expects a reply (that is, having an InOut MEP), you might prefer to construct a custom fault reply message, rather than propagating the exception back to the consumer endpoint.

Bean integration provides a general purpose mechanism for processing messages using arbitrary Java objects. By inserting a bean reference into a route, you can call an arbitrary method on a Java object, which can then access and modify the incoming exchange. The mechanism that maps an exchange's contents to the parameters and return values of a bean method is known as bean binding.

the bean binding can use either or both of the following approaches to initialize a method's parameters:

  • Conventional method signatures — If the method signature conforms to certain conventions, the bean binding can use Java reflection to determine what parameters to pass.

  • Annotations and dependency injection — For a more flexible binding mechanism, employ Java annotations to specify what to inject into the method's arguments. This dependency injection mechanism relies on Spring component scanning. Normally, if you are deploying your Fuse Mediation Router application into a Spring container, the dependency injection mechanism will work automatically.

The basic bean bindings described in Initializing parameters using method signatures might not always be convenient to use. For example, if you have a legacy Java class that performs some data manipulation, you might want to extract data from an inbound exchange and map it to the arguments of an existing method signature. For this kind of bean binding, Fuse Mediation Router provides the following kinds of Java annotation:

The expression language annotations provide a powerful mechanism for injecting message data into a bean method's arguments. Using these annotations, you can invoke an arbitrary script, written in the scripting language of your choice, to extract data from an inbound exchange and inject the data into a method argument.

Table 5.2 shows the annotations from the org.apache.camel.language package (and sub-packages, for the non-core annotations) that you can use to inject message data into the arguments of a bean method.

For example, Example 5.5 shows you how to use the @XPath annotation to extract a username and a password from the body of an incoming XML message.


Some of the language annotations are available in the core component (@Bean, @Constant, @Simple, and @XPath). For non-core components, however, you will have to make sure that you load the relevant component. For example, to use the OGNL script, you must load the camel-ognl component.

Fuse Mediation Router supports a variety of approaches to transforming message content. In addition to a native API for modifying message content, Fuse Mediation Router supports integration with several different third-party libraries and transformation standards. The following kinds of transformations are discussed in this section:

Fuse Mediation Router supports marshalling and unmarshalling of a number of data formats including:

Wherever an endpoint URI string appears in a route, the first step in parsing the endpoint URI is to apply the property placeholder parser. The placeholder parser automatically substitutes any property names appearing between double braces, {{Key}}. For example, given the property settings shown in Example 8.1, you could define a route as follows:


By default, the placeholder parser looks up the properties bean ID in the registry to find the property component. If you prefer, you can explicitly specify the scheme in the endpoint URIs. For example, by prefixing properties: to each of the endpoint URIs, you can define the following equivalent route:


When specifying the scheme explicitly, you also have the option of specifying options to the properties component. For example, to override the property file location, you could set the location option as follows:


If you define a camelContext element inside an OSGi blueprint file, the Fuse Mediation Router property placeholder mechanism automatically integrates with the blueprint property placeholder mechanism. That is, placeholders obeying the Fuse Mediation Router syntax (for example, {{cool.end}}) that appear within the scope of camelContext are implicitly resolved by looking up the blueprint property placeholder mechanism.

For example, consider the following route defined in an OSGi blueprint file, where the last endpoint in the route is defined by the property placeholder, {{result}}:

<blueprint xmlns=""

    <!-- OSGI blueprint property placeholder -->
    <cm:property-placeholder id="myblueprint.placeholder" persistent-id="camel.blueprint">
        <!-- list some properties for this test -->
            <cm:property name="result" value="mock:result"/>

    <camelContext xmlns="">
        <!-- in the route we can use {{ }} placeholders which will look up in blueprint,
             as Camel will auto detect the OSGi blueprint property placeholder and use it -->
            <from uri="direct:start"/>
            <to uri="mock:foo"/>
            <to uri="{{result}}"/>


The blueprint property placeholder mechanism is initialized by creating a cm:property-placeholder bean. In the preceding example, the cm:property-placeholder bean is associated with the camel.blueprint persistent ID, where a persistent ID is the standard way of referencing a group of related properties from the OSGi Configuration Adminn service. In other words, the cm:property-placeholder bean provides access to all of the properties defined under the camel.blueprint persistent ID. It is also possible to specify default values for some of the properties (using the nested cm:property elements).

In the context of blueprint, the Fuse Mediation Router placeholder mechanism searches for an instance of cm:property-placeholder in the bean registry. If it finds such an instance, it automatically integrates the Fuse Mediation Router placeholder mechanism, so that placeholders like, {{result}}, are resolved by looking up the key in the blueprint property placeholder mechanism (in this example, through the myblueprint.placeholder bean).


The default blueprint placeholder syntax (accessing the blueprint properties directly) is ${Key}. Hence, outside the scope of a camelContext element, the placeholder syntax you must use is ${Key}. Whereas, inside the scope of a camelContext element, the placeholder syntax you must use is {{Key}}.

If you want to have more control over where the Fuse Mediation Router property placeholder mechanism finds its properties, you can define a propertyPlaceholder element and specify the resolver locations explicitly.

For example, consider the following blueprint configuration, which differs from the previous example in that it creates an explicit propertyPlaceholder instance:

<blueprint xmlns=""

    <!-- OSGI blueprint property placeholder -->
    <cm:property-placeholder id="myblueprint.placeholder" persistent-id="camel.blueprint">
        <!-- list some properties for this test -->
            <cm:property name="result" value="mock:result"/>

    <camelContext xmlns="">

        <!-- using Camel properties component and refer to the blueprint property placeholder by its id -->
        <propertyPlaceholder id="properties" location="blueprint:myblueprint.placeholder"/>

        <!-- in the route we can use {{ }} placeholders which will lookup in blueprint -->
            <from uri="direct:start"/>
            <to uri="mock:foo"/>
            <to uri="{{result}}"/>



In the preceding example, the propertyPlaceholder element specifies explicitly which cm:property-placeholder bean to use by setting the location to blueprint:myblueprint.placeholder. That is, the blueprint: resolver explicitly references the ID, myblueprint.placeholder, of the cm:property-placeholder bean.

This style of configuration is useful, if there is more than one cm:property-placeholder bean defined in the blueprint file and you need to specify which one to use. It also makes it possible to source properties from multiple locations, by specifying a comma-separated list of locations. For example, if you wanted to look up properties both from the cm:property-placeholder bean and from the properties file,, on the classpath, you could define the propertyPlaceholder element as follows:

<propertyPlaceholder id="properties"

The Fuse Mediation Router threading model is based on the powerful Java concurrency API, java.util.concurrent, that first became available in Sun's JDK 1.5. The key interface in this API is the ExecutorService interface, which represents a thread pool. Using the concurrency API, you can create many different kinds of thread pool, covering a wide range of scenarios.

A custom thread pool can be any thread pool of java.util.concurrent.ExecutorService type. The following approaches to creating a thread pool instance are recommended in Fuse Mediation Router:

  • Use the org.apache.camel.builder.ThreadPoolBuilder utility to build the thread pool class.

  • Use the org.apache.camel.spi.ExecutorServiceStrategy instance from the current CamelContext to create the thread pool class.

Ultimately, there is not much difference between the two approaches, because the ThreadPoolBuilder is actually defined using the ExecutorServiceStrategy instance. Normally, the ThreadPoolBuilder is preferred, because it offers a simpler approach. But there is at least one kind of thread (the ScheduledExecutorService) that can only be created by accessing the ExecutorServiceStrategy instance directory.

Table 9.3 shows the options supported by the ThreadPoolBuilder class, which you can set when defining a new custom thread pool.

Table 9.3. Thread Pool Builder Options

Builder OptionDescription
maxQueueSize()Sets the maximum number of pending tasks that this thread pool can store in its incoming task queue. A value of -1 specifies an unbounded queue. Default value is taken from default thread pool profile.
poolSize()Sets the minimum number of threads in the pool (this is also the initial pool size). Default value is taken from default thread pool profile.
maxPoolSize()Sets the maximum number of threads that can be in the pool. Default value is taken from default thread pool profile.
keepAliveTime()If any threads are idle for longer than this period of time (specified in seconds), they are terminated. This allows the thread pool to shrink when the load is light. Default value is taken from default thread pool profile.

Specifies what course of action to take, if the incoming task queue is full. You can specify four possible values:


(Default value) Gets the caller thread to run the latest incoming task. As a side effect, this option prevents the caller thread from receiving any more tasks until it has finished processing the latest incoming task.


Aborts the latest incoming task by throwing an exception.


Quietly discards the latest incoming task.


Discards the oldest unhandled task and then attempts to enqueue the latest incoming task in the task queue.

build()Finishes building the custom thread pool and registers the new thread pool under the ID specified as the argument to build().

In Java DSL, you can define a custom thread pool using the ThreadPoolBuilder, as follows:

// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");


Instead of passing the object reference, customPool, directly to the executorService() option, you can look up the thread pool in the registry, by passing its bean ID to the executorServiceRef() option, as follows:

// Java

In XML DSL, you access the ThreadPoolBuilder using the threadPool element. You can then reference the custom thread pool using the executorServiceRef attribute to look up the thread pool by ID in the Spring registry, as follows:

<camelContext id="camel" xmlns="">
    <threadPool id="customPool"
                maxQueueSize="100" />

        <from uri="direct:start"/>
        <multicast executorServiceRef="customPool">
            <to uri="mock:first"/>
            <to uri="mock:second"/>
            <to uri="mock:third"/>

If you have many custom thread pool instances to create, you might find it more convenient to define a custom thread pool profile, which acts as a factory for thread pools. Whenever you reference a thread pool profile from a threading-aware processor, the processor automatically uses the profile to create a new thread pool instance. You can define a custom thread pool profile either in Java DSL or in XML DSL.

For example, in Java DSL you can create a custom thread pool profile with the bean ID, customProfile, and reference it from within a route, as follows:

// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
// Reference the custom thread pool profile in a route

In XML DSL, use the threadPoolProfile element to create a custom pool profile (where you let the defaultProfile option default to false, because this is not a default thread pool profile). You can create a custom thread pool profile with the bean ID, customProfile, and reference it from within a route, as follows:

<camelContext id="camel" xmlns="">
                maxQueueSize="100" />

        <from uri="direct:start"/>
        <multicast executorServiceRef="customProfile">
            <to uri="mock:first"/>
            <to uri="mock:second"/>
            <to uri="mock:third"/>

By default, routes are automatically started when your Fuse Mediation Router application starts up and routes are automatically shut down when your Fuse Mediation Router application shuts down. For non-critical deployments, the details of the shutdown sequence are usually not important. But in a production environment, it is often crucial that existing tasks should run to completion during shutdown, in order to avoid data loss. You typically also want to control the order in which routes shut down, so that dependencies are not violated.

For this reason, Fuse Mediation Router provides a set of features to support graceful shutdown of applications. Graceful shutdown gives you full control over the stopping and starting of routes, enabling you to control the shutdown order of routes and enabling current tasks to run to completion.

Routes are shut down in the reverse of the start-up order. When a start-up order is defined using the startupOrder attribute, the first route to shut down is the route with the highest integer value assigned by the start-up order and the last route to shut down is the route with the lowest integer value assigned by the start-up order.

For example, in Example 10.2, the first route segment to be shut down is the route with the ID, first, and the second route segment to be shut down is the route with the ID, second. This example illustrates a general rule, which you should observe when shutting down routes: the routes that expose externally-accessible consumer endpoints should be shut down first, because this helps to throttle the flow of messages through the rest of the route graph.

When you configure a simple scheduled route policy to stop a route, the route stopping algorithm is automatically integrated with the graceful shutdown procedure (see Controlling Start-Up and Shutdown of Routes). This means that the task waits until the current exchange has finished processing before shutting down the route. You can set a timeout, however, that forces the route to stop after the specified time, irrespective of whether or not the route has finished processing the exchange.

The cron expression syntax has its origins in the UNIX cron utility, which schedules jobs to run in the background on a UNIX system. A cron expression is effectively a syntax for wildcarding dates and times that enables you to specify either a single event or multiple events that recur periodically.

A cron expression consists of 6 or 7 fields in the following order:

Seconds Minutes Hours DayOfMonth Month DayOfWeek [Year]

The Year field is optional and usually omitted, unless you want to define an event that occurs once and once only. Each field consists of a mixture of literals and special characters. For example, the following cron expression specifies an event that fires once every day at midnight:

0 0 24 * * ?

The * character is a wildcard that matches every value of a field. Hence, the preceding expression matches every day of every month. The ? character is a dummy placeholder that means ignore this field. It always appears either in the DayOfMonth field or in the DayOfWeek field, because it is not logically consistent to specify both of these fields at the same time. For example, if you want to schedule an event that fires once a day, but only from Monday to Friday, use the following cron expression:

0 0 24 ? * MON-FRI

Where the hyphen character specifies a range, MON-FRI. You can also use the forward slash character, /, to specify increments. For example, to specify that an event fires every 5 minutes, use the following cron expression:

0 0/5 * * * ?

For a full explanation of the cron expression syntax, see the Wikipedia article on CRON expressions.

Fuse Mediation Router supports most of the patterns from the book, Enterprise Integration Patterns by Gregor Hohpe and Bobby Woolf. The patterns described by these authors provide an excellent toolbox for developing enterprise integration projects. In addition to providing a common language for discussing integration architectures, many of the patterns can be implemented directly using Fuse Mediation Router's programming interfaces and XML configuration.

The message routing patterns, shown in Table 12.4, describe various ways of linking message channels together, including various algorithms that can be applied to the message stream (without modifying the body of the message).

Table 12.4. Message Routing

IconNameUse Case
Content based router iconContent Based RouterHow do we handle a situation where the implementation of a single logical function (e.g., inventory check) is spread across multiple physical systems?
Message filter iconMessage FilterHow does a component avoid receiving uninteresting messages?
Recipient List iconRecipient ListHow do we route a message to a list of dynamically specified recipients?
Splitter iconSplitterHow can we process a message if it contains multiple elements, each of which might have to be processed in a different way?
Aggregator iconAggregatorHow do we combine the results of individual, but related messages so that they can be processed as a whole?
Resequencer iconResequencerHow can we get a stream of related, but out-of-sequence, messages back into the correct order?
Composed Message Processor How can you maintain the overall message flow when processing a message consisting of multiple elements, each of which may require different processing?
  Scatter-Gather How do you maintain the overall message flow when a message needs to be sent to multiple recipients, each of which may send a reply?
Routing slip iconRouting SlipHow do we route a message consecutively through a series of processing steps when the sequence of steps is not known at design-time, and might vary for each message?
 ThrottlerHow can I throttle messages to ensure that a specific endpoint does not get overloaded, or that we don't exceed an agreed SLA with some external service?
 DelayerHow can I delay the sending of a message?
Load balancer iconLoad BalancerHow can I balance load across a number of endpoints?
Multicast iconMulticastHow can I route a message to a number of endpoints at the same time?
  Loop How can I repeat processing a message in a loop?
 SamplingHow can I sample one message out of many in a given period to avoid downstream route does not get overloaded?
Dynamic router iconDynamic RouterHow can you avoid the dependency of the router on all possible destinations while maintaining its efficiency?

By default, Fuse Mediation Router applies the following structure to all message types:

It is important to remember that this division into headers, body, and attachments is an abstract model of the message. Fuse Mediation Router supports many different components, that generate a wide variety of message formats. Ultimately, it is the underlying component implementation that decides what gets placed into the headers and body of a message.

The Microsoft Message Queuing (MSMQ) technology is a queuing system that runs on Windows Server machines (see Microsoft Message Queuing). In MSMQ, you can access queues using an endpoint URI with the following format:


Where the MSMQueueName is a queue reference, defined according to the rules of MSMQ. You can reference a queue using any of the approaches described in Referencing a Queue.

See MSMQ in Component Reference for more details.

A message endpoint is the interface between an application and a messaging system. As shown in Figure 13.3, you can have a sender endpoint, sometimes called a proxy or a service consumer, which is responsible for sending In messages, and a receiver endpoint, sometimes called an endpoint or a service, which is responsible for receiving In messages.

In Fuse Mediation Router, an endpoint is represented by an endpoint URI, which typically encapsulates the following kinds of data:

An endpoint URI in Fuse Mediation Router has the following general form:


Where ComponentPrefix is a URI prefix that identifies a particular Fuse Mediation Router component (see Component Reference for details of all the supported components). The remaining part of the URI, ComponentSpecificURI, has a syntax defined by the particular component. For example, to connect to the JMS queue, Foo.Bar, you can define an endpoint URI like the following:


To define a route that connects the consumer endpoint, file://local/router/messages/foo, directly to the producer endpoint, jms:Foo.Bar, you can use the following Java DSL fragment:


Alternatively, you can define the same route in XML, as follows:

<camelContext id="CamelContextID" xmlns="">
    <from uri="file://local/router/messages/foo"/>
    <to uri="jms:Foo.Bar"/>

The pipes and filters pattern, shown in Figure 13.4, describes a way of constructing a route by creating a chain of filters, where the output of one filter is fed into the input of the next filter in the pipeline (analogous to the UNIX pipe command). The advantage of the pipeline approach is that it enables you to compose services (some of which can be external to the Fuse Mediation Router application) to create more complex forms of message processing.

A message router, shown in Figure 13.7, is a type of filter that consumes messages from a single consumer endpoint and redirects them to the appropriate target endpoint, based on a particular decision criterion. A message router is concerned only with redirecting messages; it does not modify the message content.

A message router can easily be implemented in Fuse Mediation Router using the choice() processor, where each of the alternative target endpoints can be selected using a when() subclause (for details of the choice processor, see Processors).

The message translator pattern, shown in Figure 13.8 describes a component that modifies the contents of a message, translating it to a different format. You can use Fuse Mediation Router's bean integration feature to perform the message translation.

You can transform a message using bean integration, which enables you to call a method on any registered bean. For example, to call the method, myMethodName(), on the bean with ID, myTransformerBean:

  .beanRef("myTransformerBean", "myMethodName")

Where the myTransformerBean bean is defined in either a Spring XML file or in JNDI. If, you omit the method name parameter from beanRef(), the bean integration will try to deduce the method name to invoke by examining the message exchange.

You can also add your own explicit Processor instance to perform the transformation, as follows:

from("direct:start").process(new Processor() {
    public void process(Exchange exchange) {
        Message in = exchange.getIn();
        in.setBody(in.getBody(String.class) + " World!");

Or, you can use the DSL to explicitly configure the transformation, as follows:

from("direct:start").setBody(body().append(" World!")).to("mock:result");

You can also use templating to consume a message from one destination, transform it with something like Velocity in Component Reference or XQuery and then send it on to another destination. For example, using the InOnly exchange pattern (one-way messaging) :


If you want to use InOut (request-reply) semantics to process requests on the My.Queue queue on ActiveMQ in Component Reference with a template generated response, then you could use a route like the following to send responses back to the JMSReplyTo destination:


A point-to-point channel, shown in Figure 14.1 is a message channel that guarantees that only one receiver consumes any given message. This is in contrast with a publish-subscribe channel, which allows multiple receivers to consume the same message. In particular, with a point-to-point channel, it is possible for multiple receivers to subscribe to the same channel. If more than one receiver competes to consume a message, it is up to the message channel to ensure that only one receiver actually consumes the message.

A publish-subscribe channel, shown in Figure 14.2, is a message channel that enables multiple subscribers to consume any given message. This is in contrast with a point-to-point channel. Publish-subscribe channels are frequently used as a means of broadcasting events or notifications to multiple subscribers.

The dead letter channel pattern, shown in Figure 14.3, describes the actions to take when the messaging system fails to deliver a message to the intended recipient. This includes such features as retrying delivery and, if delivery ultimately fails, sending the message to a dead letter channel, which archives the undelivered messages.

Normally, you do not send a message straight to the dead letter channel, if a delivery attempt fails. Instead, you re-attempt delivery up to some maximum limit, and after all redelivery attempts fail you would send the message to the dead letter channel. To customize message redelivery, you can configure the dead letter channel to have a redelivery policy. For example, to specify a maximum of two redelivery attempts, and to apply an exponential backoff algorithm to the time delay between delivery attempts, you can configure the dead letter channel as follows:


Where you set the redelivery options on the dead letter channel by invoking the relevant methods in a chain (each method in the chain returns a reference to the current RedeliveryPolicy object). Table 14.1 summarizes the methods that you can use to set redelivery policies.

Table 14.1. Redelivery Policy Settings

Method SignatureDefaultDescription
backOffMultiplier(double multiplier)2

If exponential backoff is enabled, let m be the backoff multiplier and let d be the initial delay. The sequence of redelivery attempts are then timed as follows:

d, m*d, m*m*d, m*m*m*d, ...
collisionAvoidancePercent(double collisionAvoidancePercent)15If collision avoidance is enabled, let p be the collision avoidance percent. The collision avoidance policy then tweaks the next delay by a random amount, up to plus/minus p% of its current value.
delayPattern(String delayPattern)NoneFuse Mediation Router 2.0:
disableRedelivery()trueFuse Mediation Router 2.0: Disables the redelivery feature. To enable redelivery, set maximumRedeliveries() to a positive integer value.
handled(boolean handled)trueFuse Mediation Router 2.0: If true, the current exception is cleared when the message is moved to the dead letter channel; if false, the exception is propagated back to the client.
initialRedeliveryDelay(long initialRedeliveryDelay)1000Specifies the delay (in milliseconds) before attempting the first redelivery.
logStackTrace(boolean logStackTrace)falseFuse Mediation Router 2.0: If true, the JVM stack trace is included in the error logs.
maximumRedeliveries(int maximumRedeliveries)6Maximum number of delivery attempts.
maximumRedeliveries(int maximumRedeliveries)0Fuse Mediation Router 2.0: Maximum number of delivery attempts.
maximumRedeliveryDelay(long maxDelay)60000Fuse Mediation Router 2.0: When using an exponential backoff strategy (see useExponentialBackOff()), it is theoretically possible for the redelivery delay to increase without limit. This property imposes an upper limit on the redelivery delay (in milliseconds)
onRedelivery(Processor processor)NoneFuse Mediation Router 2.0: Configures a processor that gets called before every redelivery attempt.
redeliveryDelay(long int)0Fuse Mediation Router 2.0: Specifies the delay (in milliseconds) between redelivery attempts.
retriesExhaustedLogLevel(LoggingLevel logLevel)LoggingLevel.ERRORFuse Mediation Router 2.0: Specifies the logging level at which to log delivery failure (specified as an org.apache.camel.LoggingLevel constant).
retryAttemptedLogLevel(LoggingLevel logLevel)LoggingLevel.DEBUGFuse Mediation Router 2.0: Specifies the logging level at which to redelivery attempts (specified as an org.apache.camel.LoggingLevel constant).
useCollisionAvoidance()falseEnables collision avoidence, which adds some randomization to the backoff timings to reduce contention probability.
useOriginalMessage()falseFuse Mediation Router 2.0: If this feature is enabled, the message sent to the dead letter channel is a copy of the original message exchange, as it existed at the beginning of the route (in the from() node).
useExponentialBackOff()falseEnables exponential backoff.

When Fuse Mediation Router routes messages, it updates an Exchange property that contains the last endpoint the Exchange was sent to. Hence, you can obtain the URI for the current exchange's most recent destination using the following code:

// Java
String lastEndpointUri = exchange.getProperty(Exchange.TO_ENDPOINT, String.class);

Where Exchange.TO_ENDPOINT is a string constant equal to CamelToEndpoint. This property is updated whenever Camel sends a message to any endpoint.

If an error occurs during routing and the exchange is moved into the dead letter queue, Fuse Mediation Router will additionally set a property named CamelFailureEndpoint, which identifies the last destination the exchange was sent to before the error occcured. Hence, you can access the failure endpoint from within a dead letter queue using the following code:

// Java
String failedEndpointUri = exchange.getProperty(Exchange.FAILURE_ENDPOINT, String.class);

Where Exchange.FAILURE_ENDPOINT is a string constant equal to CamelFailureEndpoint.


These properties remain set in the current exchange, even if the failure occurs after the given destination endpoint has finished processing. For example, consider the following route:


Now suppose that a failure happens in the foo bean. In this case the Exchange.TO_ENDPOINT property and the Exchange.FAILURE_ENDPOINT property still contain the value, http://someserver/somepath.

When a dead letter channel is performing redeliveries, it is possible to configure a Processor that is executed just before every redelivery attempt. This can be used for situations where you need to alter the message before it is redelivered.

For example, the following dead letter channel is configured to call the MyRedeliverProcessor before redelivering exchanges:

// we configure our Dead Letter Channel to invoke
// MyRedeliveryProcessor before a redelivery is
// attempted. This allows us to alter the message before
        .onRedelivery(new MyRedeliverProcessor())
        // setting delay to zero is just to make unit teting faster

Where the MyRedeliveryProcessor process is implemented as follows:

// This is our processor that is executed before every redelivery attempt
// here we can do what we want in the java code, such as altering the message
public class MyRedeliverProcessor implements Processor {

    public void process(Exchange exchange) throws Exception {
        // the message is being redelivered so we can alter it

        // we just append the redelivery counter to the body
        // you can of course do all kind of stuff instead
        String body = exchange.getIn().getBody(String.class);
        int count = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);

        exchange.getIn().setBody(body + count);

        // the maximum redelivery was set to 5
        int max = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
        assertEquals(5, max);

Instead of using the errorHandler() interceptor in your route builder, you can define a series of onException() clauses that define different redelivery policies and different dead letter channels for various exception types. For example, to define distinct behavior for each of the NullPointerException, IOException, and Exception types, you can define the following rules in your route builder using Java DSL:

    .setHeader("messageInfo", "Oh dear! An NPE.")

    .setHeader("messageInfo", "Oh dear! Some kind of I/O exception.")

    .setHeader("messageInfo", "Oh dear! An exception.")


Where the redelivery options are specified by chaining the redelivery policy methods (as listed in Table 14.1), and you specify the dead letter channel's endpoint using the to() DSL command. You can also call other Java DSL commands in the onException() clauses. For example, the preceding example calls setHeader() to record some error details in a message header named, messageInfo.

In this example, the NullPointerException and the IOException exception types are configured specially. All other exception types are handled by the generic Exception exception interceptor. By default, Fuse Mediation Router applies the exception interceptor that most closely matches the thrown exception. If it fails to find an exact match, it tries to match the closest base type, and so on. Finally, if no other interceptor matches, the interceptor for the Exception type matches all remaining exceptions.

In ActiveMQ, message persistence is enabled by default. From version 5 onwards, ActiveMQ uses the AMQ message store as the default persistence mechanism. There are several different approaches you can use to enabe message persistence in ActiveMQ.

The simplest option (different from Figure 14.4) is to enable persistence in a central broker and then connect to that broker using a reliable protocol. After a message is been sent to the central broker, delivery to consumers is guaranteed. For example, in the Fuse Mediation Router configuration file, META-INF/spring/camel-context.xml, you can configure the ActiveMQ component to connect to the central broker using the OpenWire/TCP protocol as follows:

<beans ... >
  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://somehost:61616"/>

If you prefer to implement an architecture where messages are stored locally before being sent to a remote endpoint (similar to Figure 14.4), you do this by instantiating an embedded broker in your Fuse Mediation Router application. A simple way to achieve this is to use the ActiveMQ Peer-to-Peer protocol, which implicitly creates an embedded broker to communicate with other peer endpoints. For example, in the camel-context.xml configuration file, you can configure the ActiveMQ component to connect to all of the peers in group, GroupA, as follows:

<beans ... >
  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="peer://GroupA/broker1"/>

Where broker1 is the broker name of the embedded broker (other peers in the group should use different broker names). One limiting feature of the Peer-to-Peer protocol is that it relies on IP multicast to locate the other peers in its group. This makes it unsuitable for use in wide area networks (and in some local area networks that do not have IP multicast enabled).

A more flexible way to create an embedded broker in the ActiveMQ component is to exploit ActiveMQ's VM protocol, which connects to an embedded broker instance. If a broker of the required name does not already exist, the VM protocol automatically creates one. You can use this mechanism to create an embedded broker with custom configuration. For example:

<beans ... >
  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="vm://broker1?brokerConfig=xbean:activemq.xml"/>

Where activemq.xml is an ActiveMQ file which configures the embedded broker instance. Within the ActiveMQ configuration file, you can choose to enable one of the following persistence mechanisms:

See ActiveMQ in Component Reference for more details.

The correlation identifier pattern, shown in Figure 15.1, describes how to match reply messages with request messages, given that an asynchronous messaging system is used to implement a request-reply protocol. The essence of this idea is that request messages should be generated with a unique token, the request ID, that identifies the request message and reply messages should include a token, the correlation ID, that contains the matching request ID.

Fuse Mediation Router supports the Correlation Identifier from the EIP patterns by getting or setting a header on a Message.

When working with the ActiveMQ in Component Reference or JMS in Component Reference components, the correlation identifier header is called JMSCorrelationID. You can add your own correlation identifier to any message exchange to help correlate messages together in a single conversation (or business process). A correlation identifier is usually stored in a Fuse Mediation Router message header.

A message filter is a processor that eliminates undesired messages based on specific criteria. In Fuse Mediation Router, the message filter pattern, shown in Figure 16.2, is implemented by the filter() Java DSL command. The filter() command takes a single predicate argument, which controls the filter. When the predicate is true, the incoming message is allowed to proceed, and when the predicate is false, the incoming message is blocked.

The following example shows how to configure the route with an XPath predicate in XML (see Languages for expressions and predicates):

<camelContext id="simpleFilterRoute" xmlns="">
    <from uri="seda:a"/>
      <xpath>$foo = 'bar'</xpath>
      <to uri="seda:b"/>

Stop is a special type of filter that filters out all messages. Stop is convenient to use in a Content Based Router when you need to stop further processing in one of the predicates.

In the following example, we do not want messages with the word Bye in the message body to propagate any further in the route. We prevent this in the when() predicate using .stop().


Knowing if Exchange was filtered or not

Available as of Camel 2.5

The Message Filter EIP will add a property on the Exchange which states if it was filtered or not.

The property has the key Exchannge.FILTER_MATCHED which has the String value of CamelFilterMatched. Its value is a boolean indicating true or false. If the value is true then the Exchange was routed in the filter block.

A recipient list, shown in Figure 16.3, is a type of router that sends each incoming message to multiple different destinations. In addition, a recipient list typically requires that the list of recipients be calculated at run time.

The simplest kind of recipient list is where the list of destinations is fixed and known in advance, and the exchange pattern is InOnly. In this case, you can hardwire the list of destinations into the to() Java DSL command.


The examples given here, for the recipient list with fixed destinations, work only with the InOnly exchange pattern (similar to a pipeline). If you want to create a recipient list for exchange patterns with Out messages, use the multicast pattern instead.

The Recipient List supports parallelProcessing, which is similar to the corresponding feature in Splitter. Use the parallel processing feature to send the exchange to multiple recipients concurrently—for example:


In XML, the parallel processing feature is implemented as an attribute on the recipientList tag—for example:

  <from uri="direct:a"/>
  <recipientList parallelProcessing="true">

Available as of Camel 2.3

The Recipient List supports the ignoreInvalidEndpoints option, which enables the recipient list to skip invalid endpoints (Routing Slip also supports this option). For example:


And in XML, you can enable this option by setting the ignoreInvalidEndpoints attribute on the recipientList tag, as follows

  <from uri="direct:a"/>
  <recipientList ignoreInvalidEndpoints="true">

Consider the case where myHeader contains the two endpoints, direct:foo,xxx:bar. The first endpoint is valid and works. The second is invalid and, therefore, ignored. Fuse Mediation Router logs at INFO level whenever an invalid endpoint is encountered.

You can use a custom AggregationStrategy with the Recipient List, which is useful for aggregating replies from the recipients in the list. By default, Fuse Mediation Router uses the UseLatestAggregationStrategy aggregation strategy, which keeps just the last received reply. For a more sophisticated aggregation strategy, you can define your own implementation of the AggregationStrategy interface—see Aggregator EIP for details. For example, to apply the custom aggregation strategy, MyOwnAggregationStrategy, to the reply messages, you can define a Java DSL route as follows:

    .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy())

In XML, you can specify the custom aggregation strategy as an attribute on the recipientList tag, as follows:

  <from uri="direct:a"/>
  <recipientList strategyRef="myStrategy">
  <to uri="direct:b"/>
<bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>

You can use a Bean in Component User's Guide to provide the recipients, for example:

from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");      

Where the MessageRouter bean is defined as follows:

public class MessageRouter {

    public String routeTo() {
        String queueName = "activemq:queue:test2";
        return queueName;

Available as of Camel 2.5

If you use parallelProcessing, you can configure a total timeout value in milliseconds. Camel will then process the messages in parallel until the timeout is hit. This allows you to continue processing if one message is slow.

For example, in the unit test below, you can see we multicast the message to 3 destinations. We have a timeout of 2 seconds, which means only the last two messages can be completed within the timeframe. This means we will only aggregate the last two, which yields a result aggregation which outputs "BC".

    .multicast(new AggregationStrategy() {
            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                if (oldExchange == null) {
                    return newExchange;

                String body = oldExchange.getIn().getBody(String.class);
                oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                return oldExchange;
        .parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c")
    // use end to indicate end of multicast route



[Tip]Timeout in other EIPs

This timeout feature is also supported by Splitter and both multicast and recipientList.

By default if a timeout occurs the AggregationStrategy is not invoked. However you can implement a specialized version

// Java
public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {

     * A timeout occurred
     * @param oldExchange  the oldest exchange (is &lt;tt>null&lt;/tt> on first aggregation as we only have the new exchange)
     * @param index        the index
     * @param total        the total
     * @param timeout      the timeout value in millis
    void timeout(Exchange oldExchange, int index, int total, long timeout);

This allows you to deal with the timeout in the AggregationStrategy if you really need to.

[Important]Timeout is total

The timeout is total, which means that after X time, Camel will aggregate the messages which has completed within the timeframe. The remainders will be cancelled. Camel will also only invoke the timeout method in the TimeoutAwareAggregationStrategy once, for the first index which caused the timeout.

If you want to execute the resulting pieces of the message in parallel, you can enable the parallel processing option, which instantiates a thread pool to process the message pieces. For example:

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); 

You can customize the underlying ThreadPoolExecutor used in the parallel splitter. For example, you can specify a custom executor in the Java DSL as follows:

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); 
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());

You can specify a custom executor in the XML DSL as follows:

<camelContext xmlns="">
    <from uri="direct:parallel-custom-pool"/>
    <split executorServiceRef="threadPoolExecutor">
      <to uri="mock:result"/>

<bean id="threadPoolExecutor" class="java.util.concurrent.ThreadPoolExecutor">
  <constructor-arg index="0" value="8"/>
  <constructor-arg index="1" value="16"/>
  <constructor-arg index="2" value="0"/>
  <constructor-arg index="3" value="MILLISECONDS"/>
  <constructor-arg index="4"><bean class="java.util.concurrent.LinkedBlockingQueue"/></constructor-arg>

As the splitter can use any expression to do the splitting, we can use a bean to perform splitting, by invoking the method() expression. The bean should return an iterable value such as: java.util.Collection, java.util.Iterator, or an array.

The following route defines a method() expression that calls a method on the mySplitterBean bean instance:

        // here we use a POJO bean mySplitterBean to do the split of the payload
        .method("mySplitterBean", "splitBody")
        // here we use a POJO bean mySplitterBean to do the split of the message 
        // with a certain header value
        .method("mySplitterBean", "splitMessage")

Where mySplitterBean is an instance of the MySplitterBean class, which is defined as follows:

public class MySplitterBean {

     * The split body method returns something that is iteratable such as a java.util.List.
     * @param body the payload of the incoming message
     * @return a list containing each part split
    public List<String> splitBody(String body) {
        // since this is based on an unit test you can of couse
        // use different logic for splitting as Fuse Mediation Router have out
        // of the box support for splitting a String based on comma
        // but this is for show and tell, since this is java code
        // you have the full power how you like to split your messages
        List<String> answer = new ArrayList<String>();
        String[] parts = body.split(",");
        for (String part : parts) {
        return answer;
     * The split message method returns something that is iteratable such as a java.util.List.
     * @param header the header of the incoming message with the name user
     * @param body the payload of the incoming message
     * @return a list containing each part split
    public List<Message> splitMessage(@Header(value = "user") String header, @Body String body) {
        // we can leverage the Parameter Binding Annotations  
        // to access the message header and body at same time, 
        // then create the message that we want, splitter will
        // take care rest of them.
        // *NOTE* this feature requires Fuse Mediation Router version >= 1.6.1
        List<Message> answer = new ArrayList<Message>();
        String[] parts = header.split(",");
        for (String part : parts) {
            DefaultMessage message = new DefaultMessage();
            message.setHeader("user", part);
        return answer;

The following properties are set on each split exchange:

header type description
org.apache.camel.splitCounter int Fuse Mediation Router 1.6.2: A split counter that increases for each Exchange being split. The counter starts from 0.
org.apache.camel.splitSize int Fuse Mediation Router 1.6.2: The total number of Exchanges that was split. This header is not applied for stream based splitting.
CamelSplitIndex int Fuse Mediation Router 2.0: A split counter that increases for each Exchange being split. The counter starts from 0.
CamelSplitSize int Fuse Mediation Router 2.0: The total number of Exchanges that was split. This header is not applied for stream based splitting.
CamelSplitComplete boolean Fuse Mediation Router 2.4: Whether or not this Exchange is the last.

Figure 16.6 shows an overview of how the aggregator works, assuming it is fed with a stream of exchanges that have correlation keys such as A, B, C, or D.

The incoming stream of exchanges shown in Figure 16.6 is processed as follows:

  1. The correlator is responsible for sorting exchanges based on the correlation key. For each incoming exchange, the correlation expression is evaluated, yielding the correlation key. For example, for the exchange shown in Figure 16.6, the correlation key evaluates to A.

  2. The aggregation strategy is responsible for merging exchanges with the same correlation key. When a new exchange, A, comes in, the aggregator looks up the corresponding aggregate exchange, A', in the aggregation repository and combines it with the new exchange.

    Until a particular aggregation cycle is completed, incoming exchanges are continuously aggregated with the corresponding aggregate exchange. An aggregation cycle lasts until terminated by one of the completion mechanisms.

  3. If a completion predicate is specified on the aggregator, the aggregate exchange is tested to determine whether it is ready to be sent to the next processor in the route. Processing continues as follows:

    • If complete, the aggregate exchange is processed by the latter part of the route. There are two alternative models for this: synchronous (the default), which causes the calling thread to block, or asynchronous (if parallel processing is enabled), where the aggregate exchange is submitted to an executor thread pool (as shown in Figure 16.6).

    • If not complete, the aggregate exchange is saved back to the aggregation repository.

  4. In parallel with the synchronous completion tests, it is possible to enable an asynchronous completion test by enabling either the completionTimeout option or the completionInterval option. These completion tests run in a separate thread and, whenever the completion test is satisfied, the corresponding exchange is marked as complete and starts to be processed by the latter part of the route (either synchronously or asynchronously, depending on whether parallel processing is enabled or not).

  5. If parallel processing is enabled, a thread pool is responsible for processing exchanges in the latter part of the route. By default, this thread pool contains ten threads, but you have the option of customizing the pool (Threading options).

In Java DSL, you can either pass the aggregation strategy as the second argument to the aggregate() DSL command or specify it using the aggregationStrategy() clause. For example, you can use the aggregationStrategy() clause as follows:

        .aggregationStrategy(new UseLatestAggregationStrategy())

Fuse Mediation Router provides the following basic aggregation strategies (where the classes belong to the org.apache.camel.processor.aggregate Java package):

If you want to apply a different aggregation strategy, you can implement a custom version of the org.apache.camel.processor.aggregate.AggregationStrategy interface. For example, the following code implements a custom aggregation strategy, MyAggregationStrategy, that concatenates all of the batch messages into a single, large message:

// Java
package com.my_package_name

import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.Exchange;

public class MyAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            // first time through, the oldExchange is null,
            // so just return newExchange as there is nothing to merge
            return newExchange;

        String oldBody =
        String newBody =
        String concatBody = oldBody.concat(newBody);
        // Concatenate old and new.
        // Ignore the message headers!
        // (in a real application, you would probably do
        //  something more sophisticated here).
        return oldExchange;

In Fuse Mediation Router 2.0, the AggregationStrategy.aggregate() callback method has been changed so that it is also invoked when the very first exchange arrives. On the first invocation of the aggregate method, the oldExchange parameter is null and the newExchange parameter contains the first incoming exchange.

To aggregate messages using the custom strategy class, MyAggregationStrategy, define a route like the following:

    .aggregate(header("StockSymbol"), new MyAggregationStrategy())

You can also configure a route with a custom aggregation strategy in XML, as follows:

<camelContext xmlns="">
    <from uri="direct:start"/>
    <aggregate strategyRef="aggregatorStrategy"
      <to uri="mock:aggregated"/>

<bean id="aggregatorStrategy" class="com.my_package_name.MyAggregationStrategy"/>

The following properties are set on each exchange that is aggregated:

Table 16.1. Aggregated Exchange Properties
Header Type Description
Exchange.AGGREGATED_CORRELATION_KEY String The value of the correlation key for this exchange.
Exchange.AGGREGATED_SIZE int The total number of exchanges aggregated into this exchange.
Exchange.AGGREGATED_COMPLETED_BY String Indicates the mechanism responsible for completing the aggregate exchange. Possible values are: predicate, size, timeout, interval, or consumer.

The following properties are set on exchanges redelivered by the HawtDB aggregation repository (see Persistent aggregation repository):

Table 16.2. Redelivered Exchange Properties
Header Type Description
Exchange.REDELIVERED boolean If true, indicates that the exchange is being redelivered.
Exchange.REDELIVERY_COUNTER int Sequence number of the current redelivery attempt (starting at 1).

It is mandatory to specify at least one completion condition, which determines when an aggregate exchange leaves the aggregator and proceeds to the next node on the route. The following completion conditions can be specified:


Evaluates a predicate after each exchange is aggregated in order to determine completeness. A value of true indicates that the aggregate exchange is complete.


Completes the aggregate exchange after the specified number of incoming exchanges are aggregated.


(Incompatible with completionInterval) Completes the aggregate exchange, if no incoming exchanges are aggregated within the specified timeout.

In other words, the timeout mechanism keeps track of a timeout for each correlation key value. The clock starts ticking after the latest exchange with a particular key value is received. If another exchange with the same key value is not received within the specified timeout, the corresponding aggregate exchange is marked complete and sent to the next node on the route.


(Incompatible with completionTimeout) Completes all outstanding aggregate exchanges, after each time interval (of specified length) has elapsed.

The time interval is not tailored to each aggregate exchange. This mechanism forces simultaneous completion of all outstanding aggregate exchanges. Hence, in some cases, this mechanism could complete an aggregate exchange immediately after it started aggregating.


When used in combination with a consumer endpoint that supports the batch consumer mechanism, this completion option automatically figures out when the current batch of exchanges is complete, based on information it receives from the consumer endpoint. See Batch consumer.

The preceding completion conditions can be combined arbitrarily, except for the completionTimeout and completionInterval conditions, which cannot be simultaneously enabled. When conditions are used in combination, the general rule is that the first completion condition to trigger is the effective completion condition.

In some aggregation scenarios, you might want to enforce the condition that the correlation key is unique for each batch of exchanges. In other words, when the aggregate exchange for a particular correlation key completes, you want to make sure that no further aggregate exchanges with that correlation key are allowed to proceed. For example, you might want to enforce this condition, if the latter part of the route expects to process exchanges with unique correlation key values.

Depending on how the completion conditions are configured, there might be a risk of more than one aggregate exchange being generated with a particular correlation key. For example, although you might define a completion predicate that is designed to wait until all the exchanges with a particular correlation key are received, you might also define a completion timeout, which could fire before all of the exchanges with that key have arrived. In this case, the late-arriving exchanges could give rise to a second aggregate exchange with the same correlation key value.

For such scenarios, you can configure the aggregator to suppress aggregate exchanges that duplicate previous correlation key values, by setting the closeCorrelationKeyOnCompletion option. In order to suppress duplicate correlation key values, it is necessary for the aggregator to record previous correlation key values in a cache. The size of this cache (the number of cached correlation keys) is specified as an argument to the closeCorrelationKeyOnCompletion() DSL command. To specify a cache of unlimited size, you can pass a value of zero or a negative integer. For example, to specify a cache size of 10000 key values:

    .aggregate(header("UniqueBatchID"), new MyConcatenateStrategy())

If an aggregate exchange completes with a duplicate correlation key value, the aggregator throws a ClosedCorrelationKeyException exception.

If you want pending aggregated exchanges to be stored persistently, you can use either the HawtDB in Component User's Guide component or the JDBC Aggregation Repository in Component User's Guide as a persistent aggregation repository. If you use HawtDB, this requires you to include a dependency on the camel-hawtdb component in your Maven POM. You can then configure a route to use the HawtDB aggregation repository as follows:

public void configure() throws Exception {
    HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat");

        .aggregate(header("id"), new UseLatestAggregationStrategy())

The HawtDB aggregation repository has a feature that enables it to recover and retry any failed exchanges (that is, any exchange that raised an exception while it was being processed by the latter part of the route). Figure 16.7 shows an overview of the recovery mechanism.

The recovery mechanism works as follows:

  1. The aggregator creates a dedicated recovery thread, which runs in the background, scanning the aggregation repository to find any failed exchanges.

  2. Each failed exchange is checked to see whether its current redelivery count exceeds the maximum redelivery limit. If it is under the limit, the recovery task resubmits the exchange for processing in the latter part of the route.

  3. If the current redelivery count is over the limit, the failed exchange is passed to the dead letter queue.

For more details about the HawtDB component, see HawtDB in Component User's Guide.

As shown in Figure 16.6, the aggregator is dsecoupled from the latter part of the route, where the exchanges sent to the latter part of the route are processed by a dedicated thread pool. By default, this pool contains just a single thread. If you want to specify a pool with multiple threads, enable the parallelProcessing option, as follows:

    .aggregate(header("id"), new UseLatestAggregationStrategy())

By default, this creates a pool with 10 worker threads.

If you want to exercise more control over the created thread pool, specify a custom java.util.concurrent.ExecutorService instance using the executorService option (in which case it is unnecessary to enable the parallelProcessing option).

The aggregator supports the following options:

Table 16.3. Aggregator Options

correlationExpression   Mandatory Expression which evaluates the correlation key to use for aggregation. The Exchange which has the same correlation key is aggregated together. If the correlation key could not be evaluated an Exception is thrown. You can disable this by using the ignoreBadCorrelationKeys option.
aggregationStrategy   Mandatory AggregationStrategy which is used to merge the incoming Exchange with the existing already merged exchanges. At first call the oldExchang parameter is null. On subsequent invocations the oldExchnage contains the merged exchanges and newExchange is of course the new incoming Exchange.
strategyRef   A reference to lookup the AggregationStrategy in the Registry.
completionSize   Number of messages aggregated before the aggregation is complete. This option can be set as either a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
completionTimeout   Time in millis that an aggregated exchange should be inactive before its complete. Camel has a background task that runs once a minute to check for inactive aggregated exchanges. This option can be set as either a fixed value or using an Expression which allows you to evaluate a timeout dynamically - will use Long as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. You cannot use this option together with completionInterval, only one of the can be used.
completionInterval   A repeating period in millis by which the aggregator will complete all current aggregated exchanges. Camel has a background tasks which is trigger every period. You cannot use this option together with completionTimeout, only one of the can be used.
completionPredicate   A Predicate to indicate when an aggregated exchange is complete.
completionFromBatchConsumer false This option is if the exchanges is coming from a Batch Consumer. Then when enabled the Aggregator will use the batch size determined by the Batch Consumer in the message header CamelBatchSize. See more details at Batch Consumer. This can be used to aggregate all files consumed from a File in Component User's Guide endpoint in that given poll.
eagerCheckCompletion false Whether or not to eager check for completion when a new incoming Exchange has been received. This option influences the behavior of the completionPredicate option as the Exchange being passed in changes accordingly. When false the Exchange passed in the Predicate is the aggregated Exchange which means any information you may store on the aggregated Exchange from the AggregationStrategy is avail for the Predicate. When true the Exchange passed in the Predicate is the incoming Exchange, which means you can access data from the incoming Exchange.
groupExchanges false If enabled then Camel will group all aggregated Exchanges into a single combined org.apache.camel.impl.GroupedExchange holder class that holds all the aggregated Exchanges. And as a result only one Exchange is being sent out from the aggregator. Can be used to combine many incomming Exchanges into a single output Exchange without coding a custom AggregationStrategy yourself.
ignoreInvalidCorrelationKeys false Whether or not to ignore correlation keys which could not be evaluated to a value. By default Camel will thrown an Exception, but you can enable this option and ignore the situation instead.
closeCorrelationKeyOnCompletion   Whether or not too late Exchange should be accepted or not. You can enable this to indicate that if a correlation key has already been completed, then any new exchanges with the same correlation key be denied. Camel will then throw a closedCorrelationKeyException exception. When using this option you pass in a integer which is a number for a LRUCache which keeps that last X number of closed correlation keys. You can pass in 0 or a negative value to indicate a unbounded cache. By passing in a number you are ensured that cache wont grown too big if you use a log of different correlation keys.
discardOnCompletionTimeout false Camel 2.5: Whether or not exchanges which completes due a timeout should be discarded. If enabled then the when a timeout occurs the aggregated message will not be sent out but dropped (discarded).
aggregationRepository   Allows you to plugin you own implementation of org.apache.camel.spi.AggregationRepository which keeps track of the current inflight aggregated exchanges. Camel uses by default a memory based implementation.
aggregationRepositoryRef   Reference to lookup a aggregationRepository in the Registry.
parallelProcessing false When aggregated are completed they are being send out of the aggregator. This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency. If not custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads.
executorService   If using parallelProcessing you can specify a custom thread pool to be used. In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well.
executorServiceRef   Reference to lookup a executorService in the Registry

The batch resequencing algorithm is enabled by default. For example, to resequence a batch of incoming messages based on the value of a timestamp contained in the TimeStamp header, you can define the following route in Java DSL:


By default, the batch is obtained by collecting all of the incoming messages that arrive in a time interval of 1000 milliseconds (default batch timeout), up to a maximum of 100 messages (default batch size). You can customize the values of the batch timeout and the batch size by appending a batch() DSL command, which takes a BatchResequencerConfig instance as its sole argument. For example, to modify the preceding route so that the batch consists of messages collected in a 4000 millisecond time window, up to a maximum of 300 messages, you can define the Java DSL route as follows:

import org.apache.camel.model.config.BatchResequencerConfig;

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("direct:start").resequence(header("TimeStamp")).batch(new BatchResequencerConfig(300,4000L)).to("mock:result");

You can also use multiple expressions to sort messages in a batch. For example, if you want to sort incoming messages, first, according to their JMS priority (as recorded in the JMSPriority header) and second, according to the value of their time stamp (as recorded in the TimeStamp header), you can define a route like the following:

from("direct:start").resequence(header("JMSPriority"), header("TimeStamp")).to("mock:result");

In this case, messages with the highest priority (that is, low JMS priority number) are moved to the front of the batch. If more than one message has the highest priority, the highest priority messages are put in order according to the value of the TimeStamp header.

You can also specify a batch resequencer pattern using XML configuration. The following example defines a batch resequencer with a batch size of 300 and a batch timeout of 4000 milliseconds:

<camelContext id="resequencerBatch" xmlns="">
    <from uri="direct:start" />
        batch-config can be omitted for default (batch) resequencer settings
      <batch-config batchSize="300" batchTimeout="4000" />
      <to uri="mock:result" />

To enable the stream resequencing algorithm, you must append stream() to the resequence() DSL command. For example, to resequence incoming messages based on the value of a sequence number in the seqnum header, you define a DSL route as follows:


The stream-processing resequencer algorithm is based on the detection of gaps in a message stream, rather than on a fixed batch size. Gap detection, in combination with timeouts, removes the constraint of needing to know the number of messages of a sequence (that is, the batch size) in advance. Messages must contain a unique sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the successor of 3 is missing. The resequencer therefore must retain message 5 until message 4 arrives (or a timeout occurs).

By default, the stream resequencer is configured with a timeout of 1000 milliseconds, and a maximum message capacity of 100. To customize the stream's timeout and message capacity, you can pass a StreamResequencerConfig object as an argument to stream(). For example, to configure a stream resequencer with a message capacity of 5000 and a timeout of 4000 milliseconds, you define a route as follows:

// Java
import org.apache.camel.model.config.StreamResequencerConfig;

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
            stream(new StreamResequencerConfig(5000, 4000L)).

If the maximum time delay between successive messages (that is, messages with adjacent sequence numbers) in a message stream is known, the resequencer's timeout parameter should be set to this value. In this case, you can guarantee that all messages in the stream are delivered in the correct order to the next processor. The lower the timeout value that is compared to the out-of-sequence time difference, the more likely it is that the resequencer will deliver messages out of sequence. Large timeout values should be supported by sufficiently high capacity values, where the capacity parameter is used to prevent the resequencer from running out of memory.

If you want to use sequence numbers of some type other than long, you would must define a custom comparator, as follows:

// Java
ExpressionResultComparator<Exchange> comparator = new MyComparator();
StreamResequencerConfig config = new StreamResequencerConfig(5000, 4000L, comparator);

You can also specify a stream resequencer pattern using XML configuration. The following example defines a stream resequencer with a message capacity of 5000 and a timeout of 4000 milliseconds:

<camelContext id="resequencerStream" xmlns="">
    <from uri="direct:start"/>
      <stream-config capacity="5000" timeout="4000"/>
      <to uri="mock:result" />

The routing slip pattern, shown in Figure 16.9, enables you to route a message consecutively through a series of processing steps, where the sequence of steps is not known at design time and can vary for each message. The list of endpoints through which the message should pass is stored in a header field (the slip), which Fuse Mediation Router reads at run time to construct a pipeline on the fly.

The Routing Slip now supports ignoreInvalidEndpoints, which the Recipient List pattern also supports. You can use it to skip endpoints that are invalid. For example:


In XML, this feature is enabled by setting the ignoreInvalidEndpoints attribute on the <routingSlip> tag:

       <from uri="direct:a"/>
       <routingSlip ignoreInvalidEndpoints="true">

Consider the case where myHeader contains the two endpoints, direct:foo,xxx:bar. The first endpoint is valid and works. The second is invalid and, therefore, ignored. Fuse Mediation Router logs at INFO level whenever an invalid endpoint is encountered.

The failover load balancer is capable of trying the next processor in case an exchange failed with an exception during processing. You can configure the failover with a list of specific exceptions that trigger failover. If you do not specify any exceptions, failover is triggered by any exception. The failover load balancer uses the same strategy for matching exceptions as the onException exception clause.

[Important]Enable stream caching if using streams

If you use streaming, you should enable Stream Caching when using the failover load balancer. This is needed so the stream can be re-read when failing over.

The failover load balancer supports options in Table 16.5.

The following example is configured to fail over, only if an IOException exception is thrown:

<route errorHandlerRef="myErrorHandler">
    <from uri="direct:foo"/>
        <to uri="direct:a"/>
        <to uri="direct:b"/>

The following example shows how to fail over in round robin mode:

    <from uri="direct:start"/>
        <!-- failover using stateful round robin,
        which will keep retrying the 4 endpoints indefinitely.
        You can set the maximumFailoverAttempt to break out after X attempts -->
        <failover roundRobin="true"/>
        <to uri="direct:bad"/>
        <to uri="direct:bad2"/>
        <to uri="direct:good"/>
        <to uri="direct:good2"/>

In many enterprise environments, where server nodes of unequal processing power are hosting services, it is usually preferable to distribute the load in accordance with the individual server processing capacities. A weighted round robin algorithm or a weighted random algorithm can be used to address this problem.

The weighted load balancing policy allows you to specify a processing load distribution ratio for each server with respect to the others. You can specify this value as a positive processing weight for each server. A larger number indicates that the server can handle a larger load. The processing weight is used to determine the payload distribution ratio of each processing endpoint with respect to the others.

The parameters that can be used are

The following examples show how to define a weighted round-robin route and a weighted random route:

<!-- round-robin -->
  <from uri="direct:start"/>
    <weighted roundRobin="true" distributionRatio="4:2:1" distributionRatioDelimiter=":" />
    <to uri="mock:x"/>
    <to uri="mock:y"/>
    <to uri="mock:z"/>

The multicast pattern, shown in Figure 16.10, is a variation of the recipient list with a fixed destination pattern, which is compatible with the InOut message exchange pattern. This is in contrast to recipient list, which is only compatible with the InOnly exchange pattern.

Whereas the multicast processor receives multiple Out messages in response to the original request (one from each of the recipients), the original caller is only expecting to receive a single reply. Thus, there is an inherent mismatch on the reply leg of the message exchange, and to overcome this mismatch, you must provide a custom aggregation strategy to the multicast processor. The aggregation strategy class is responsible for aggregating all of the Out messages into a single reply message.

Consider the example of an electronic auction service, where a seller offers an item for sale to a list of buyers. The buyers each put in a bid for the item, and the seller automatically selects the bid with the highest price. You can implement the logic for distributing an offer to a fixed list of buyers using the multicast() DSL command, as follows:

from("cxf:bean:offer").multicast(new HighestBidAggregationStrategy()).
    to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");

Where the seller is represented by the endpoint, cxf:bean:offer, and the buyers are represented by the endpoints, cxf:bean:Buyer1, cxf:bean:Buyer2, cxf:bean:Buyer3. To consolidate the bids received from the various buyers, the multicast processor uses the aggregation strategy, HighestBidAggregationStrategy. You can implement the HighestBidAggregationStrategy in Java, as follows:

// Java
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.Exchange;

public class HighestBidAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        float oldBid = oldExchange.getOut().getHeader("Bid", Float.class);
        float newBid = newExchange.getOut().getHeader("Bid", Float.class);
        return (newBid > oldBid) ? newExchange : oldExchange;

Where it is assumed that the buyers insert the bid price into a header named, Bid. For more details about custom aggregation strategies, see Aggregator.

By default, the multicast processor invokes each of the recipient endpoints one after another (in the order listed in the to() command). In some cases, this might cause unacceptably long latency. To avoid these long latency times, you have the option of enabling parallel processing by adding the parallelProcessing() clause. For example, to enable parallel processing in the electronic auction example, define the route as follows:

    .multicast(new HighestBidAggregationStrategy())
        .to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");

Where the multicast processor now invokes the buyer endpoints, using a thread pool that has one thread for each of the endpoints.

If you want to customize the size of the thread pool that invokes the buyer endpoints, you can invoke the executorService() method to specify your own custom executor service. For example:

    .multicast(new HighestBidAggregationStrategy())
        .to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");

Where MyExecutor is an instance of java.util.concurrent.ExecutorService type.

When the exchange has an InOut pattern, an aggregation strategy is used to aggregate reply messages. The default aggregation strategy takes the latest reply message and discards earlier replies. For example, in the following route, the custom strategy, MyAggregationStrategy, is used to aggregate the replies from the endpoints, direct:a, direct:b, and direct:c:

  .multicast(new MyAggregationStrategy())
      .to("direct:a", "direct:b", "direct:c")

The composed message processor pattern, as shown in Figure 16.11, allows you to process a composite message by splitting it up, routing the sub-messages to appropriate destinations, and then re-aggregating the responses back into a single message.

The scatter-gather pattern, as shown in Figure 16.12, enables you to route messages to a number of dynamically specified recipients and re-aggregate the responses back into a single message.

The following example outlines an application that gets the best quote for beer from several different vendors. The examples uses a dynamic Recipient List to request a quote from all vendors and an Aggregator to pick the best quote out of all the responses. The routes for this application are defined as follows:

<camelContext id="camel" xmlns="">
    <from uri="direct:start"/>
    <from uri="seda:quoteAggregator"/>
    <aggregate strategyRef="aggregatorStrategy" completionTimeout="1000">
      <to uri="mock:result"/>

In the first route, the Recipient List looks at the listOfVendors header to obtain the list of recipients. Hence, the client that sends messages to this application needs to add a listOfVendors header to the message. Example 16.1 shows some sample code from a messaging client that adds the relevant header data to outgoing messages.

The message would be distributed to the following endpoints: bean:vendor1, bean:vendor2, and bean:vendor3. These beans are all implemented by the following class:

public class MyVendor {
    private int beerPrice;
    @Produce(uri = "seda:quoteAggregator")
    private ProducerTemplate quoteAggregator;
    public MyVendor(int beerPrice) {
        this.beerPrice = beerPrice;
    public void getQuote(@XPath("/quote_request/@item") String item, Exchange exchange) throws Exception {
        if ("beer".equals(item)) {
        } else {
            throw new Exception("No quote available for " + item);

The bean instances, vendor1, vendor2, and vendor3, are instantiated using Spring XML syntax, as follows:

<bean id="aggregatorStrategy" class="org.apache.camel.spring.processor.scattergather.LowestQuoteAggregationStrategy"/>

<bean id="vendor1" class="org.apache.camel.spring.processor.scattergather.MyVendor">

<bean id="vendor2" class="org.apache.camel.spring.processor.scattergather.MyVendor">

<bean id="vendor3" class="org.apache.camel.spring.processor.scattergather.MyVendor">

Each bean is initialized with a different price for beer (passed to the constructor argument). When a message is sent to each bean endpoint, it arrives at the MyVendor.getQuote method. This method does a simple check to see whether this quote request is for beer and then sets the price of beer on the exchange for retrieval at a later step. The message is forwarded to the next step using POJO Producing (see the @Produce annotation).

At the next step, we want to take the beer quotes from all vendors and find out which one was the best (that is, the lowest). For this, we use an Aggregator with a custom aggregation strategy. The Aggregator needs to identify which messages are relevant to the current quote, which is done by correlating messages based on the value of the quoteRequestId header (passed to the correlationExpression). As shown in Example 16.1, the correlation ID is set to quoteRequest-1 (the correlation ID should be unique). To pick the lowest quote out of the set, you can use a custom aggregation strategy like the following:

public class LowestQuoteAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // the first time we only have the new exchange
        if (oldExchange == null) {
            return newExchange;

        if (oldExchange.getIn().getBody(int.class) < newExchange.getIn().getBody(int.class)) {
            return oldExchange;
        } else {
            return newExchange;

You can specify the recipients explicitly in the scatter-gather application by employing a static Recipient List. The following example shows the routes you would use to implement a static scatter-gather scenario:

from("direct:start").multicast().to("seda:vendor1", "seda:vendor2", "seda:vendor3");


    .aggregate(header("quoteRequestId"), new LowestQuoteAggregationStrategy()).to("mock:result")

The Dynamic Router pattern, as shown in Figure 16.13, enables you to route a message consecutively through a series of processing steps, where the sequence of steps is not known at design time. The list of endpoints through which the message should pass is calculated dynamically at run time. Each time the message returns from an endpoint, the dynamic router calls back on a bean to discover the next endpoint in the route.


You must ensure the expression used for the dynamicRouter (such as a bean), returns null to indicate the end. Otherwise, the dynamicRouter will continue in an endless loop.

In Java DSL you can use the dynamicRouter as follows:

    // use a bean as the dynamic router
    .dynamicRouter(bean(DynamicRouterTest.class, "slip"));

Which will leverage a Bean in Component Reference to compute the slip on-the-fly, which could be implemented as follows:

// Java
 * Use this method to compute dynamic where we should route next.
 * @param body the message body
 * @return endpoints to go, or <tt>null</tt> to indicate the end
public String slip(String body) {

    if (invoked == 1) {
        return "mock:a";
    } else if (invoked == 2) {
        return "mock:b,mock:c";
    } else if (invoked == 3) {
        return "direct:foo";
    } else if (invoked == 4) {
        return "mock:result";

    // no more so return null
    return null;

This example shows a Message Normalizer that converts two types of XML messages into a common format. Messages in this common format are then filtered.

Using the Fluent Builders

// we need to normalize two types of incoming messages

In this case we're using a Java bean as the normalizer. The class looks like this

// Java
public class MyNormalizer {
    public void employeeToPerson(Exchange exchange, @XPath("/employee/name/text()") String name) {

    public void customerToPerson(Exchange exchange, @XPath("/customer/@name") String name) {
    private String createPerson(String name) {
        return "&lt;person name=\"" + name + "\"/>";

The claim check pattern, shown in Figure 17.4, allows you to replace message content with a claim check (a unique key), which can be used to retrieve the message content at a later time. The message content is stored temporarily in a persistent store like a database or file system. This pattern is very useful when message content is very large (thus it would be expensive to send around) and not all components require all information.

It can also be useful in situations where you cannot trust the information with an outside party; in this case, you can use the Claim Check to hide the sensitive portions of data.

The following example shows how to replace a message body with a claim check and restore the body at a later step.

from("direct:start").to("bean:checkLuggage", "mock:testCheckpoint", "bean:dataEnricher", "mock:result");

The example route is just a Pipeline. In a real application, you would substitute some other steps for the mock:testCheckpoint endpoint.

The message is first sent to the checkLuggage bean which is implemented as follows:

public static final class CheckLuggageBean {        
    public void checkLuggage(Exchange exchange, @Body String body, @XPath("/order/@custId") String custId) {   
        // store the message body into the data store, using the custId as the claim check
        dataStore.put(custId, body);
        // add the claim check as a header
        exchange.getIn().setHeader("claimCheck", custId);
        // remove the body from the message

This bean stores the message body into the data store, using the custId as the claim check. In this example, we are using a HashMap to store the message body; in a real application you would use a database or the file system. The claim check is added as a message header for later use and, finally, we remove the body from the message and pass it down the pipeline.

The next step in the pipeline is the mock:testCheckpoint endpoint, which checks that the message body has been removed, the claim check added, and so on.

To add the message body back into the message, we use the dataEnricher bean, which is implemented as follows:

public static final class DataEnricherBean {
    public void addDataBackIn(Exchange exchange, @Header("claimCheck") String claimCheck) { 
        // query the data store using the claim check as the key and add the data
        // back into the message body
        // remove the message data from the data store
        // remove the claim check header

This bean queries the data store, using the claim check as the key, and then adds the recovered data back into the message body. The bean then deletes the message data from the data store and removes the claimCheck header from the message.

To use validate in the XML DSL, the recommended approach is to use the simple expression language:

  <from uri="jms:queue:incoming"/>
    <simple>${body} regex ^\\w{10}\\,\\d{2}\\,\\w{24}$</simple>
  <beanRef ref="myServiceBean" method="processLine"/>

<bean id="myServiceBean" class="com.mycompany.MyServiceBean"/>

You can also validate a message header—for example:

  <from uri="jms:queue:incoming"/>
    <simple>${} == 100</simple>
  <beanRef ref="myServiceBean" method="processLine"/>

<bean id="myServiceBean" class="com.mycompany.MyServiceBean"/>

The messaging mapper pattern describes how to map domain objects to and from a canonical message format, where the message format is chosen to be as platform neutral as possible. The chosen message format should be suitable for transmission through a message bus, where the message bus is the backbone for integrating a variety of different systems, some of which might not be object-oriented.

Many different approaches are possible, but not all of them fulfill the requirements of a messaging mapper. For example, an obvious way to transmit an object is to use object serialization, which enables you to write an object to a data stream using an unambiguous encoding (supported natively in Java). However, this is not a suitable approach to use for the messaging mapper pattern, however, because the serialization format is understood only by Java applications. Java object serialization creates an impedance mismatch between the original application and the other applications in the messaging system.

The requirements for a messaging mapper can be summarized as follows:

  • The canonical message format used to transmit domain objects should be suitable for consumption by non-object oriented applications.

  • The mapper code should be implemented separately from both the domain object code and the messaging infrastructure. Fuse Mediation Router helps fulfill this requirement by providing hooks that can be used to insert mapper code into a route.

  • The mapper might need to find an effective way of dealing with certain object-oriented concepts such as inheritance, object references, and object trees. The complexity of these issues varies from application to application, but the aim of the mapper implementation must always be to create messages that can be processed effectively by non-object-oriented applications.

You can use one of the following mechanisms to find the objects to map:

The polling consumer pattern, shown in Figure 18.2, is a pattern for implementing the consumer endpoint in a Fuse Mediation Router component, so it is only relevant to programmers who need to develop a custom component in Fuse Mediation Router. Existing components already have a consumer implementation pattern hard-wired into them.

Consumers that conform to this pattern expose polling methods, receive(), receive(long timeout), and receiveNoWait() that return a new exchange object, if one is available from the monitored resource. A polling consumer implementation must provide its own thread pool to perform the polling.

The competing consumers pattern, shown in Figure 18.3, enables multiple consumers to pull messages from the same queue, with the guarantee that each message is consumed once only. This pattern can be used to replace serial message processing with concurrent message processing (bringing a corresponding reduction in response latency).

The following components demonstrate the competing consumers pattern:

The purpose of the SEDA component is to simplify concurrent processing by breaking the computation into stages. A SEDA endpoint essentially encapsulates an in-memory blocking queue (implemented by java.util.concurrent.BlockingQueue). Therefore, you can use a SEDA endpoint to break a route into stages, where each stage might use multiple threads. For example, you can define a SEDA route consisting of two stages, as follows:

// Stage 1: Read messages from file system.

// Stage 2: Perform concurrent processing (3 threads).

Where the first stage contains a single thread that consumes message from a file endpoint, file://var/messages, and routes them to a SEDA endpoint, seda:fanout. The second stage contains three threads: a thread that routes exchanges to cxf:bean:replica01, a thread that routes exchanges to cxf:bean:replica02, and a thread that routes exchanges to cxf:bean:replica03. These three threads compete to take exchange instances from the SEDA endpoint, which is implemented using a blocking queue. Because the blocking queue uses locking to prevent more than one thread from accessing the queue at a time, you are guaranteed that each exchange instance can only be consumed once.

For a discussion of the differences between a SEDA endpoint and a thread pool created by thread(), see Seda in Component Reference.

The message dispatcher pattern, shown in Figure 18.4, is used to consume messages from a channel and then distribute them locally to performers, which are responsible for processing the messages. In a Fuse Mediation Router application, performers are usually represented by in-process endpoints, which are used to transfer messages to another section of the route.

You can implement the message dispatcher pattern in Fuse Mediation Router using one of the following approaches:

If your application consumes messages from a JMS queue, you can implement the message dispatcher pattern using JMS selectors. A JMS selector is a predicate expression involving JMS headers and JMS properties. If the selector evaluates to true, the JMS message is allowed to reach the consumer, and if the selector evaluates to false, the JMS message is blocked. In many respects, a JMS selector is like a filter processor, but it has the additional advantage that the filtering is implemented inside the JMS provider. This means that a JMS selector can block messages before they are transmitted to the Fuse Mediation Router application. This provides a significant efficiency advantage.

In Fuse Mediation Router, you can define a JMS selector on a consumer endpoint by setting the selector query option on a JMS endpoint URI. For example:


Where the predicates that appear in a selector string are based on a subset of the SQL92 conditional expression syntax (for full details, see the JMS specification). The identifiers appearing in a selector string can refer either to JMS headers or to JMS properties. For example, in the preceding routes, the sender sets a JMS property called CountryCode.

If you want to add a JMS property to a message from within your Fuse Mediation Router application, you can do so by setting a message header (either on In message or on Out messages). When reading or writing to JMS endpoints, Fuse Mediation Router maps JMS headers and JMS properties to, and from, its native message headers.

Technically, the selector strings must be URL encoded according to the application/x-www-form-urlencoded MIME format (see the HTML specification). In practice, the &(ampersand) character might cause difficulties because it is used to delimit each query option in the URI. For more complex selector strings that might need to embed the & character, you can encode the strings using the utility class. For example:

from("jms:dispatcher?selector=" +"CountryCode='US'","UTF-8")).

Where the UTF-8 encoding must be used.

A durable subscriber, as shown in Figure 18.6, is a consumer that wants to receive all of the messages sent over a particular publish-subscribe channel, including messages sent while the consumer is disconnected from the messaging system. This requires the messaging system to store messages for later replay to the disconnected consumer. There also has to be a mechanism for a consumer to indicate that it wants to establish a durable subscription. Generally, a publish-subscribe channel (or topic) can have both durable and non-durable subscribers, which behave as follows:

  • non-durable subscriber—Can have two states: connected and disconnected. While a non-durable subscriber is connected to a topic, it receives all of the topic's messages in real time. However, a non-durable subscriber never receives messages sent to the topic while the subscriber is disconnected.

  • durable subscriber—Can have two states: connected and inactive. The inactive state means that the durable subscriber is disconnected from the topic, but wants to receive the messages that arrive in the interim. When the durable subscriber reconnects to the topic, it receives a replay of all the messages sent while it was inactive.

In Fuse Mediation Router, the idempotent consumer pattern is implemented by the idempotentConsumer() processor, which takes two arguments:

As each message comes in, the idempotent consumer processor looks up the current message ID in the repository to see if this message has been seen before. If yes, the message is discarded; if no, the message is allowed to pass and its ID is added to the repository.

The code shown in Example 18.1 uses the TransactionID header to filter out duplicates.

Where the call to memoryMessageIdRepository(200) creates an in-memory cache that can hold up to 200 message IDs.

You can also define an idempotent consumer using XML configuration. For example, you can define the preceding route in XML, as follows:

<camelContext id="buildIdempotentConsumer" xmlns="">
    <from uri="seda:a"/>
    <idempotentConsumer messageIdRepositoryRef="MsgIDRepos">
      <to uri="seda:b"/>

<bean id="MsgIDRepos" class="org.apache.camel.processor.idempotent.MemoryMessageIdRepository">
    <!-- Specify the in-memory cache size. -->
    <constructor-arg type="int" value="200"/>

A JDBC repository is also supported for storing message IDs in the idempotent consumer pattern. The implementation of the JDBC repository is provided by the SQL component, so if you are using the Maven build system, add a dependency on the camel-sql artifact.

You can use the SingleConnectionDataSource JDBC wrapper class from the Spring persistence API in order to instantiate the connection to a SQL database. For example, to instantiate a JDBC connection to a HyperSQL database instance, you could define the following JDBC data source:

<bean id="dataSource" class="org.springframework.jdbc.datasource.SingleConnectionDataSource">
    <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
    <property name="url" value="jdbc:hsqldb:mem:camel_jdbc"/>
    <property name="username" value="sa"/>
    <property name="password" value=""/>

The preceding JDBC data source uses the HyperSQL mem protocol, which creates a memory-only database instance. This is a toy implementation of the HyperSQL database which is not actually persistent.

Using the preceding data source, you can define an idempotent consumer pattern that uses the JDBC message ID repository, as follows:

<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
	<constructor-arg ref="dataSource" />
	<constructor-arg value="myProcessorName" />

	<camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error">
		<camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" />
	<camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel">
		<camel:from uri="direct:start" />
		<camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository">
			<camel:to uri="mock:result" />

The messaging gateway pattern, shown in Figure 18.8, describes an approach to integrating with a messaging system, where the messaging system's API remains hidden from the programmer at the application level. One of the more common example is when you want to translate synchronous method calls into request/reply message exchanges, without the programmer being aware of this.

The following Fuse Mediation Router components provide this kind of integration with the messaging system:

The service activator pattern, shown in Figure 18.9, describes the scenario where a service's operations are invoked in response to an incoming request message. The service activator identifies which operation to call and extracts the data to use as the operation's parameters. Finally, the service activator invokes an operation using the data extracted from the message. The operation invocation can be either oneway (request only) or two-way (request/reply).

In many respects, a service activator resembles a conventional remote procedure call (RPC), where operation invocations are encoded as messages. The main difference is that a service activator needs to be more flexible. An RPC framework standardizes the request and reply message encodings (for example, Web service operations are encoded as SOAP messages), whereas a service activator typically needs to improvise the mapping between the messaging system and the service's operations.

The main mechanism that Fuse Mediation Router provides to support the service activator pattern is bean integration. Bean integration provides a general framework for mapping incoming messages to method invocations on Java objects. For example, the Java fluent DSL provides the processors bean() and beanRef() that you can insert into a route to invoke methods on a registered Java bean. The detailed mapping of message data to Java method parameters is determined by the bean binding, which can be implemented by adding annotations to the bean class.

For example, consider the following route which calls the Java method, BankBean.getUserAccBalance(), to service requests incoming on a JMS/ActiveMQ queue:

  .setProperty("userid", xpath("/Account/BalanceQuery/UserID").stringResult())
  .beanRef("bankBean", "getUserAccBalance")

The messages pulled from the ActiveMQ endpoint, activemq:BalanceQueries, have a simple XML format that provides the user ID of a bank account. For example:

<?xml version='1.0' encoding='UTF-8'?>

The first processor in the route, setProperty(), extracts the user ID from the In message and stores it in the userid exchange property. This is preferable to storing it in a header, because the In headers are not available after invoking the bean.

The service activation step is performed by the beanRef() processor, which binds the incoming message to the getUserAccBalance() method on the Java object identified by the bankBean bean ID. The following code shows a sample implementation of the BankBean class:

package tutorial;

import org.apache.camel.language.XPath;

public class BankBean {
    public int getUserAccBalance(@XPath("/Account/BalanceQuery/UserID") String user) {
        if (user.equals("James.Strachan")) {
            return 1200;
        else {
            return 0;

Where the binding of message data to method parameter is enabled by the @XPath annotation, which injects the content of the UserID XML element into the user method parameter. On completion of the call, the return value is inserted into the body of the Out message which is then copied into the In message for the next step in the route. In order for the bean to be accessible to the beanRef() processor, you must instantiate an instance in Spring XML. For example, you can add the following lines to the META-INF/spring/camel-context.xml configuration file to instantiate the bean:

<?xml version="1.0" encoding="UTF-8"?>
<beans ... >
  <bean id="bankBean" class="tutorial.BankBean"/>

Where the bean ID, bankBean, identifes this bean instance in the registry.

The output of the bean invocation is injected into a Velocity template, to produce a properly formatted result message. The Velocity endpoint, velocity:file:src/scripts/acc_balance.vm, specifies the location of a velocity script with the following contents:

<?xml version='1.0' encoding='UTF-8'?>

The exchange instance is available as the Velocity variable, exchange, which enables you to retrieve the userid exchange property, using ${exchange.getProperty("userid")}. The body of the current In message, ${body}, contains the result of the getUserAccBalance() method invocation.

The wire tap pattern, as shown in Figure 19.1, enables you to route a copy of the message to a separate tap location, while the original message is forwarded to the ultimate destination.

You can define a wiretap with a new exchange instance by setting the copy flag to false (the default is true). In this case, an initially empty exchange is created for the wiretap.

For example, to create a new exchange instance using the processor approach:

    .wireTap("direct:foo", false, new Processor() {
        public void process(Exchange exchange) throws Exception {
            exchange.getIn().setBody("Bye World");
            exchange.getIn().setHeader("foo", "bar");


Where the second wireTap argument sets the copy flag to false, indicating that the original exchange is not copied and an empty exchange is created instead.

To create a new exchange instance using the expression approach:

    .wireTap("direct:foo", false, constant("Bye World"))


Using the Spring XML extensions, you can indicate that a new exchange is to be created by setting the wireTap element's copy attribute to false.

To create a new exchange instance using the processor approach, where the processorRef attribute references a Spring bean with the myProcessor ID, as follows:

    <from uri="direct:start2"/>
    <wireTap uri="direct:foo" processorRef="myProcessor" copy="false"/>
    <to uri="mock:result"/>

And to create a new exchange instance using the expression approach:

    <from uri="direct:start"/>
    <wireTap uri="direct:foo" copy="false">
        <body><constant>Bye World</constant></body>
    <to uri="mock:result"/>

The following example shows how to route a request from an input queue:a endpoint to the wire tap location queue:tap it is received by queue:b

Using the Fluent Builders

RouteBuilder builder = new RouteBuilder() {
    public void configure() {

        from("seda:a").multicast().to("seda:tap", "seda:b");

Using the Spring XML Extensions

<camelContext errorHandlerRef="errorHandler" streamCache="false" id="camel" xmlns="">
        <from uri="seda:a"/>
            <to uri="seda:tap"/>
            <to uri="seda:b"/>

Fuse Mediation Router provides several ways to perform logging in a route:

  • Using the log DSL command.

  • Using the Log in Component Reference component, which can log the message content.

  • Using the Tracer, which traces message flow.

  • Using a Processor or a Bean in Component Reference endpoint to perform logging in Java.

[Important]Difference between the log DSL command and the log component

The log DSL is much lighter and meant for logging human logs such as Starting to do .... It can only log a message based on the Simple language. In contrast, the Log in Component Reference component is a fully featured logging component. The Log in Component Reference component is capable of logging the message itself and you have many URI options to control the logging.

The following table lists the parameters for scheduling one or more route starts.

routeStartDatejava.util.DateNoneSpecifies the date and time when the route is started for the first time.
routeStartRepeatCountint0When set to a non-zero value, specifies how many times the route should be started.
routeStartRepeatIntervallong0Specifies the time interval between starts, in units of milliseconds.

The following table lists the parameters for scheduling one or more route stops.

routeStopDatejava.util.DateNoneSpecifies the date and time when the route is stopped for the first time.
routeStopRepeatCountint0When set to a non-zero value, specifies how many times the route should be stopped.
routeStopRepeatIntervallong0Specifies the time interval between stops, in units of milliseconds.
routeStopGracePeriodint10000Specifies how long to wait for the current exchange to finish processing (grace period) before forcibly stopping the route. Set to 0 for an infinite grace period.
routeStopTimeUnitlongTimeUnit.MILLISECONDSSpecifies the time unit of the grace period.

The following table lists the parameters for scheduling the suspension of a route one or more times.

routeSuspendDatejava.util.DateNoneSpecifies the date and time when the route is suspended for the first time.
routeSuspendRepeatCountint0When set to a non-zero value, specifies how many times the route should be suspended.
routeSuspendRepeatIntervallong0Specifies the time interval between suspends, in units of milliseconds.

The following table lists the parameters for scheduling the resumption of a route one or more times.

routeResumeDatejava.util.DateNoneSpecifies the date and time when the route is resumed for the first time.
routeResumeRepeatCountint0When set to a non-zero value, specifies how many times the route should be resumed.
routeResumeRepeatIntervallong0Specifies the time interval between resumes, in units of milliseconds.

The following table lists the parameters for scheduling one or more route starts.

routeStartStringStringNoneSpecifies a cron expression that triggers one or more route start events.

The following table lists the parameters for scheduling one or more route stops.

routeStopTimeStringNoneSpecifies a cron expression that triggers one or more route stop events.
routeStopGracePeriodint10000Specifies how long to wait for the current exchange to finish processing (grace period) before forcibly stopping the route. Set to 0 for an infinite grace period.
routeStopTimeUnitlongTimeUnit.MILLISECONDSSpecifies the time unit of the grace period.

The following table lists the parameters for scheduling the suspension of a route one or more times.

routeSuspendTimeStringNoneSpecifies a cron expression that triggers one or more route suspend events.

The following table lists the parameters for scheduling the resumption of a route one or more times.

routeResumeTimeStringNoneSpecifies a cron expression that triggers one or more route resume events.


choice, Choice
components, Apache Camel components
consumer, Consumer endpoints
continued, Continuing processing
CronScheduledRoutePolicy, Cron Scheduled Route Policy


namespace, Namespace
syntax, Route syntax


endpoints (see components)
consumer, Consumer endpoints
producer, Producer endpoints
uri, Endpoint URIs
deadletter channel, Deadletter channel
redelivery policy, Redelivery policy
trapping, onException Clause
Exchange, Exchange objects


MEP, Message exchange patterns
message exchange, Message exchanges
message exchange pattern, Message exchange patterns


onException, onException Clause
scopes, Scopes
syntax, Syntax


performer, Overview
choice, Choice
filter, Filter
throttler, Throttler
producer, Producer endpoints


Quartz component, Quartz component


redelivery policy, Redelivery policy
retryWhile, Redelivery policy
autoStartup, Disabling automatic start-up of routes
shutdownRunningTask, Shutting down running tasks in a route
shutdownTimeout, Shutdown timeout
startupOrder, Startup order of routes
syntax, Route syntax


shutdownRunningTask, Shutting down running tasks in a route
shutdownTimeout, Shutdown timeout
SimpleScheduledRoutePolicy, Creating a simple scheduling policy
startupOrder, Startup order of routes


useOriginalMessage, Use original message