A recipient list, shown in Figure 16.3, is a type of router that sends each incoming message to multiple different destinations. In addition, a recipient list typically requires that the list of recipients be calculated at run time.
The simplest kind of recipient list is where the list of destinations is fixed and known
in advance, and the exchange pattern is InOnly. In this case, you can
hardwire the list of destinations into the to() Java DSL command.
The following example shows how to route an InOnly exchange from a
consumer endpoint, queue:a, to a fixed list of destinations:
from("seda:a").to("seda:b", "seda:c", "seda:d");The following example shows how to configure the same route in XML:
<camelContext id="buildStaticRecipientList" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="seda:a"/>
<to uri="seda:b"/>
<to uri="seda:c"/>
<to uri="seda:d"/>
</route>
</camelContext>In most cases, when you use the recipient list pattern, the list of recipients should be
calculated at runtime. To do this use the recipientList() processor, which
takes a list of destinations as its sole argument. Because Fuse Mediation Router applies a type converter
to the list argument, it should be possible to use most standard Java list types (for
example, a collection, a list, or an array).
The recipients receive a copy of the same exchange instance and Fuse Mediation Router executes them sequentially.
The following example shows how to extract the list of destinations from a message
header called recipientListHeader, where the header value is a comma-separated
list of endpoint URIs:
from("direct:a").recipientList(header("recipientListHeader").tokenize(","));In some cases, if the header value is a list type, you might be able to use it directly
as the argument to recipientList(). For example:
from("seda:a").recipientList(header("recipientListHeader"));However, this example is entirely dependent on how the underlying component parses this particular header. If the component parses the header as a simple string, this example will not work. The header must be parsed into some type of Java list.
The following example shows how to configure the preceding route in XML, where the header value is a comma-separated list of endpoint URIs:
<camelContext id="buildDynamicRecipientList" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="seda:a"/>
<recipientList delimiter=",">
<header>recipientListHeader</header>
</recipientList>
</route>
</camelContext>
The Recipient List supports
parallelProcessing, which is similar to the corresponding feature in
Splitter. Use the parallel processing feature to
send the exchange to multiple recipients concurrently—for example:
from("direct:a").recipientList(header("myHeader")).parallelProcessing();In XML, the parallel processing feature is implemented as an attribute on the
recipientList tag—for example:
<route>
<from uri="direct:a"/>
<recipientList parallelProcessing="true">
<header>myHeader</header>
</recipientList>
</route>The Recipient List supports the
stopOnException feature, which you can use to stop sending to any
further recipients, if any recipient fails.
from("direct:a").recipientList(header("myHeader")).stopOnException();And in XML its an attribute on the recipient list tag.
In XML, the stop on exception feature is implemented as an attribute on the
recipientList tag—for example:
<route>
<from uri="direct:a"/>
<recipientList stopOnException="true">
<header>myHeader</header>
</recipientList>
</route>![]() | Note |
|---|---|
You can combine |
Available as of Camel 2.3
The Recipient List supports the
ignoreInvalidEndpoints option, which enables the recipient list to skip
invalid endpoints (Routing Slip also supports
this option). For example:
from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();And in XML, you can enable this option by setting the
ignoreInvalidEndpoints attribute on the recipientList tag, as
follows
<route>
<from uri="direct:a"/>
<recipientList ignoreInvalidEndpoints="true">
<header>myHeader</header>
</recipientList>
</route> Consider the case where myHeader contains the two endpoints,
direct:foo,xxx:bar. The first endpoint is valid and works. The second
is invalid and, therefore, ignored. Fuse Mediation Router logs at INFO level whenever an
invalid endpoint is encountered.
You can use a custom AggregationStrategy with the Recipient List, which is useful for aggregating
replies from the recipients in the list. By default, Fuse Mediation Router uses the
UseLatestAggregationStrategy aggregation strategy, which keeps just the
last received reply. For a more sophisticated aggregation strategy, you can define your own
implementation of the AggregationStrategy interface—see Aggregator EIP for details. For example, to apply the
custom aggregation strategy, MyOwnAggregationStrategy, to the reply messages,
you can define a Java DSL route as follows:
from("direct:a")
.recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy())
.to("direct:b");In XML, you can specify the custom aggregation strategy as an attribute on the
recipientList tag, as follows:
<route>
<from uri="direct:a"/>
<recipientList strategyRef="myStrategy">
<header>myHeader</header>
</recipientList>
<to uri="direct:b"/>
</route>
<bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>Available as of Camel 2.2
This is only needed when you use parallelProcessing. By default Camel
uses a thread pool with 10 threads. Notice this is subject to change when we overhaul thread
pool management and configuration later (hopefully in Camel 2.2).
You configure this just as you would with the custom aggregation strategy.
You can use a Bean in Component User's Guide to provide the recipients, for example:
from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo"); Where the MessageRouter bean is defined as follows:
public class MessageRouter {
public String routeTo() {
String queueName = "activemq:queue:test2";
return queueName;
}
} You can make a bean behave as a recipient list by adding the
@RecipientList annotation to a methods that returns a list of
recipients. For example:
public class MessageRouter {
@RecipientList
public String routeTo() {
String queueList = "activemq:queue:test1,activemq:queue:test2";
return queueList;
}
}In this case, do not include the recipientList DSL
command in the route. Define the route as follows:
from("activemq:queue:test").bean(MessageRouter.class, "routeTo");Available as of Camel 2.5
If you use parallelProcessing, you can configure a total
timeout value in milliseconds. Camel will then process the messages in
parallel until the timeout is hit. This allows you to continue processing if one message is
slow.
For example, in the unit test below, you can see we multicast the message to 3
destinations. We have a timeout of 2 seconds, which means only the last two messages can be
completed within the timeframe. This means we will only aggregate the last two, which yields
a result aggregation which outputs "BC".
from("direct:start")
.multicast(new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String body = oldExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
return oldExchange;
}
})
.parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c")
// use end to indicate end of multicast route
.end()
.to("mock:result");
from("direct:a").delay(500).to("mock:A").setBody(constant("A"));
from("direct:b").to("mock:B").setBody(constant("B"));
from("direct:c").to("mock:C").setBody(constant("C"));![]() | Timeout in other EIPs |
|---|---|
This |
By default if a timeout occurs the AggregationStrategy is not
invoked. However you can implement a specialized version
// Java
public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {
/**
* A timeout occurred
*
* @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange)
* @param index the index
* @param total the total
* @param timeout the timeout value in millis
*/
void timeout(Exchange oldExchange, int index, int total, long timeout);
This allows you to deal with the timeout in the AggregationStrategy
if you really need to.
![]() | Timeout is total |
|---|---|
The timeout is total, which means that after X time, Camel will aggregate the messages
which has completed within the timeframe. The remainders will be cancelled. Camel will
also only invoke the |







![[Note]](imagesdb/note.gif)
![[Tip]](imagesdb/tip.gif)
![[Important]](imagesdb/important.gif)


