LibraryToggle FramesPrintFeedback

A recipient list, shown in Figure 16.3, is a type of router that sends each incoming message to multiple different destinations. In addition, a recipient list typically requires that the list of recipients be calculated at run time.


The simplest kind of recipient list is where the list of destinations is fixed and known in advance, and the exchange pattern is InOnly. In this case, you can hardwire the list of destinations into the to() Java DSL command.

[Note]Note

The examples given here, for the recipient list with fixed destinations, work only with the InOnly exchange pattern (similar to a pipeline). If you want to create a recipient list for exchange patterns with Out messages, use the multicast pattern instead.

The Recipient List supports parallelProcessing, which is similar to the corresponding feature in Splitter. Use the parallel processing feature to send the exchange to multiple recipients concurrently—for example:

from("direct:a").recipientList(header("myHeader")).parallelProcessing();

In XML, the parallel processing feature is implemented as an attribute on the recipientList tag—for example:

<route>
  <from uri="direct:a"/>
  <recipientList parallelProcessing="true">
    <header>myHeader</header>
  </recipientList>
</route>

Available as of Camel 2.3

The Recipient List supports the ignoreInvalidEndpoints option, which enables the recipient list to skip invalid endpoints (Routing Slip also supports this option). For example:

from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();

And in XML, you can enable this option by setting the ignoreInvalidEndpoints attribute on the recipientList tag, as follows

<route>
  <from uri="direct:a"/>
  <recipientList ignoreInvalidEndpoints="true">
    <header>myHeader</header>
  </recipientList>
</route>      

Consider the case where myHeader contains the two endpoints, direct:foo,xxx:bar. The first endpoint is valid and works. The second is invalid and, therefore, ignored. Fuse Mediation Router logs at INFO level whenever an invalid endpoint is encountered.

You can use a custom AggregationStrategy with the Recipient List, which is useful for aggregating replies from the recipients in the list. By default, Fuse Mediation Router uses the UseLatestAggregationStrategy aggregation strategy, which keeps just the last received reply. For a more sophisticated aggregation strategy, you can define your own implementation of the AggregationStrategy interface—see Aggregator EIP for details. For example, to apply the custom aggregation strategy, MyOwnAggregationStrategy, to the reply messages, you can define a Java DSL route as follows:

from("direct:a")
    .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy())
    .to("direct:b");

In XML, you can specify the custom aggregation strategy as an attribute on the recipientList tag, as follows:

<route>
  <from uri="direct:a"/>
  <recipientList strategyRef="myStrategy">
    <header>myHeader</header>
  </recipientList>
  <to uri="direct:b"/>
</route>
        
<bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>

You can use a Bean in Component User's Guide to provide the recipients, for example:

from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");      

Where the MessageRouter bean is defined as follows:

public class MessageRouter {

    public String routeTo() {
        String queueName = "activemq:queue:test2";
        return queueName;
    }
}      

Available as of Camel 2.5

If you use parallelProcessing, you can configure a total timeout value in milliseconds. Camel will then process the messages in parallel until the timeout is hit. This allows you to continue processing if one message is slow.

For example, in the unit test below, you can see we multicast the message to 3 destinations. We have a timeout of 2 seconds, which means only the last two messages can be completed within the timeframe. This means we will only aggregate the last two, which yields a result aggregation which outputs "BC".

from("direct:start")
    .multicast(new AggregationStrategy() {
            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                if (oldExchange == null) {
                    return newExchange;
                }

                String body = oldExchange.getIn().getBody(String.class);
                oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                return oldExchange;
            }
        })
        .parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c")
    // use end to indicate end of multicast route
    .end()
    .to("mock:result");

from("direct:a").delay(500).to("mock:A").setBody(constant("A"));

from("direct:b").to("mock:B").setBody(constant("B"));

from("direct:c").to("mock:C").setBody(constant("C"));
[Tip]Timeout in other EIPs

This timeout feature is also supported by Splitter and both multicast and recipientList.

By default if a timeout occurs the AggregationStrategy is not invoked. However you can implement a specialized version

// Java
public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {

    /**
     * A timeout occurred
     *
     * @param oldExchange  the oldest exchange (is &lt;tt>null&lt;/tt> on first aggregation as we only have the new exchange)
     * @param index        the index
     * @param total        the total
     * @param timeout      the timeout value in millis
     */
    void timeout(Exchange oldExchange, int index, int total, long timeout);

This allows you to deal with the timeout in the AggregationStrategy if you really need to.

[Important]Timeout is total

The timeout is total, which means that after X time, Camel will aggregate the messages which has completed within the timeframe. The remainders will be cancelled. Camel will also only invoke the timeout method in the TimeoutAwareAggregationStrategy once, for the first index which caused the timeout.

Comments powered by Disqus
loading table of contents...