The aggregator pattern, shown in Figure 7.5, enables you to combine a batch of related messages into a single message.
To control the aggregator's behavior, Apache Camel allows you to specify the properties described in Enterprise Integration Patterns, as follows:
Correlation expression — Determines which messages should be aggregated together. The correlation expression is evaluated on each incoming message to produce a correlation key. Incoming messages with the same correlation key are then grouped into the same batch. For example, if you want to aggregate all incoming messages into a single message, you can use a constant expression.
Completeness condition — Determines when a batch of messages is complete. You can specify this either as a simple size limit or, more generally, you can specify a predicate condition that flags when the batch is complete.
Aggregation algorithm — Combines the message exchanges for a single correlation key into a single message exchange.
For example, consider a stock market data system that receives 30,000 messages per second. You might want to throttle down the message flow if your GUI tool cannot cope with such a massive update rate. The incoming stock quotes can be aggregated together simply by choosing the latest quote and discarding the older prices. (You can apply a delta processing algorithm, if you prefer to capture some of the history.)
Figure 7.6 shows an overview of how the aggregator works, assuming it is fed with a stream of exchanges that have correlation keys such as A, B, C, or D.
The incoming stream of exchanges shown in Figure 7.6 is processed as follows:
The correlator is responsible for sorting exchanges based on the correlation key. For each incoming exchange, the correlation expression is evaluated, yielding the correlation key. For example, for the exchange shown in Figure 7.6, the correlation key evaluates to A.
The aggregation strategy is responsible for merging exchanges with the same correlation key. When a new exchange, A, comes in, the aggregator looks up the corresponding aggregate exchange, A', in the aggregation repository and combines it with the new exchange.
Until a particular aggregation cycle is completed, incoming exchanges are continuously aggregated with the corresponding aggregate exchange. An aggregation cycle lasts until terminated by one of the completion mechanisms.
If a completion predicate is specified on the aggregator, the aggregate exchange is tested to determine whether it is ready to be sent to the next processor in the route. If complete, the aggregate exchange is submitted to the executor thread pool, which processes the exchange in the latter part of the route. If not complete, the aggregate exchange is saved back to the aggregation repository.
In parallel with the synchronous completion tests, it is possible to enable an
asynchronous completion test by enabling either the
completionTimeout option or the completionInterval option.
These completion tests run in a separate thread and, whenever the completion test is
satisfied, the corresponding exchange is marked as complete and submitted to the
executor thread pool.
The thread pool is responsible for processing exchanges in the latter part of the route. By default, this thread pool contains just a single thread, but you have the option of customizing the pool (Threading options).
The following example aggregates exchanges with the same StockSymbol header
value, using the UseLatestAggregationStrategy aggregation strategy. For a given
StockSymbol value, if more than three seconds elapse since the last exchange
with that correlation key was received, the aggregated exchange is deemed to be complete and
is sent to the mock endpoint.
from("direct:start")
.aggregate(header("id"), new UseLatestAggregationStrategy())
.completionTimeout(3000)
.to("mock:aggregated");The following example shows how to configure the same route in XML:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<aggregate strategyRef="aggregatorStrategy"
completionTimeout="3000">
<correlationExpression>
<simple>header.StockSymbol</simple>
</correlationExpression>
<to uri="mock:aggregated"/>
</aggregate>
</route>
</camelContext>
<bean id="aggregatorStrategy"
class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>In the Java DSL, the correlation expression is always passed as the first argument to
the aggregate() DSL command. You are not limited to using the Simple expression
language here. You can specify a correlation expression using any of the expression
languages or scripting languages, such as XPath, XQuery, SQL, and so on.
For exampe, to correlate exchanges using an XPath expression, you could use the following Java DSL route:
from("direct:start")
.aggregate(xpath("/stockQuote/@symbol"), new UseLatestAggregationStrategy())
.completionTimeout(3000)
.to("mock:aggregated");If the correlation expression cannot be evaluated on a particular incoming exchange, the
aggregator throws a CamelExchangeException by default. You can suppress this
exception by setting the ignoreInvalidCorrelationKeys option. For example, in
the Java DSL:
from(...).aggregate(...).ignoreInvalidCorrelationKeys()
In the Spring DSL, you can set the ignoreInvalidCorrelationKeys option is
set as an attribute, as follows:
<aggregate strategyRef="aggregatorStrategy"
ignoreInvalidCorrelationKeys="true"
...>
...
</aggregate>In Java DSL, you can either pass the aggregation strategy as the second argument to the
aggregate() DSL command or specify it using the
aggregationStrategy() clause. For example, you can use the
aggregationStrategy() clause as follows:
from("direct:start")
.aggregate(header("id"))
.aggregationStrategy(new UseLatestAggregationStrategy())
.completionTimeout(3000)
.to("mock:aggregated");Apache Camel provides the following basic aggregation strategies (where the classes belong
to the org.apache.camel.processor.aggregate Java package):
UseLatestAggregationStrategyReturn the last exchange for a given correlation key, discarding all earlier exchanges with this key. For example, this strategy could be useful for throttling the feed from a stock exchange, where you just want to know the latest price of a particular stock symbol.
UseOriginalAggregationStrategyReturn the first exchange for a given correlation key, discarding all later
exchanges with this key. You must set the first exchange by calling
UseOriginalAggregationStrategy.setOriginal() before you can use this
strategy.
GroupedExchangeAggregationStrategyConcatenates all of the exchanges for a given correlation key
into a list, which is stored in the Exchange.GROUPED_EXCHANGE exchange
property. See Grouped exchanges.
If you want to apply a different aggregation strategy, you can implement a custom
version of the org.apache.camel.processor.aggregate.AggregationStrategy
interface. For example, the following code implements a custom aggregation strategy,
MyAggregationStrategy, that concatenates all of the batch messages into a
single, large message:
// Java
package com.my_package_name
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.Exchange;
public class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
String oldBody =
oldExchange.getIn().getBody(String.class);
String newBody =
newExchange.getIn().getBody(String.class);
String concatBody = oldBody.concat(newBody);
// Concatenate old and new.
oldExchange.getIn().setBody(concatBody);
// Ignore the message headers!
// (in a real application, you would probably do
// something more sophisticated here).
return oldExchange;
}
}![]() | Note |
|---|---|
In Apache Camel 2.0, the |
To aggregate messages using the custom strategy class,
MyAggregationStrategy, define a route like the following:
from("direct:start")
.aggregate(header("StockSymbol"), new MyAggregationStrategy())
.completionTimeout(3000)
.to("mock:result");You can also configure a route with a custom aggregation strategy in XML, as follows:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<aggregate strategyRef="aggregatorStrategy"
completionTimeout="3000">
<correlationExpression>
<simple>header.StockSymbol</simple>
</correlationExpression>
<to uri="mock:aggregated"/>
</aggregate>
</route>
</camelContext>
<bean id="aggregatorStrategy" class="com.my_package_name.MyAggregationStrategy"/>The following properties are set on each exchange that is aggregated:
| Header | Type | Description |
|---|---|---|
Exchange.AGGREGATED_CORRELATION_KEY
|
String
|
The value of the correlation key for this exchange. |
Exchange.AGGREGATED_SIZE
|
int
|
The total number of exchanges aggregated into this exchange. |
Exchange.AGGREGATED_COMPLETED_BY |
String
|
Indicates the mechanism responsible for completing the aggregate exchange. Possible
values are: predicate, size,
timeout, interval, or
consumer. |
The following properties are set on exchanges redelivered by the HawtDB aggregation repository (see Persistent aggregation repository):
| Header | Type | Description |
|---|---|---|
Exchange.REDELIVERED
|
boolean
|
If true, indicates that the exchange is being redelivered. |
Exchange.REDELIVERY_COUNTER
|
int
|
Sequence number of the current redelivery attempt (starting at 1). |
It is mandatory to specify at least one completion condition, which determines when an aggregate exchange leaves the aggregator and proceeds to the next node on the route. The following completion conditions can be specified:
completionPredicateEvaluates a predicate after each exchange is aggregated in order to determine
completeness. A value of true indicates that the aggregate exchange is
complete.
completionSizeCompletes the aggregate exchange after the specified number of incoming exchanges are aggregated.
completionTimeout(Incompatible with completionInterval) Completes
the aggregate exchange, if no incoming exchanges are aggregated within the specified
timeout.
In other words, the timeout mechanism keeps track of a timeout for each correlation key value. The clock starts ticking after the latest exchange with a particular key value is received. If another exchange with the same key value is not received within the specified timeout, the corresponding aggregate exchange is marked complete and sent to the next node on the route.
completionInterval(Incompatible with completionTimeout) Completes
all outstanding aggregate exchanges, after each time interval
(of specified length) has elapsed.
The time interval is not tailored to each aggregate exchange. This mechanism forces simultaneous completion of all outstanding aggregate exchanges. Hence, in some cases, this mechanism could complete an aggregate exchange immediately after it started aggregating.
completionFromBatchConsumerWhen used in combination with a consumer endpoint that supports the batch consumer mechanism, this completion option automatically figures out when the current batch of exchanges is complete, based on information it receives from the consumer endpoint. See Batch consumer.
The preceding completion conditions can be combined arbitrarily, except for the
completionTimeout and completionInterval conditions, which
cannot be simultaneously enabled. When conditions are used in combination, the general rule
is that the first completion condition to trigger is the effective completion
condition.
You can specify an arbitrary predicate expression that determines when an aggregated exchange is complete. There are two possible ways of evaluating the predicate expression:
On the latest aggregate exchange—this is the default behavior.
On the latest incoming exchange—this behavior is selected
when you enable the eagerCheckCompletion option.
For example, if you want to terminate a stream of stock quotes every time you receive an
ALERT message (as indicated by the value of a MsgType header in
the latest incoming exchange), you can define a route like the following:
from("direct:start")
.aggregate(
header("id"),
new UseLatestAggregationStrategy()
)
.completionPredicate(
header("MsgType").isEqualTo("ALERT")
)
.eagerCheckCompletion()
.to("mock:result");The following example shows how to configure the same route using XML:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<aggregate strategyRef="aggregatorStrategy"
eagerCheckCompletion="true">
<correlationExpression>
<simple>header.StockSymbol</simple>
</correlationExpression>
<completionPredicate>
<simple>$MsgType = 'ALERT'</simple>
</completionPredicate>
<to uri="mock:result"/>
</aggregate>
</route>
</camelContext>
<bean id="aggregatorStrategy"
class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>It is possible to specify a dynamic completion timeout, where the
timeout value is recalculated for every incoming exchange. For example, to set the timeout
value from the timeout header in each incoming exchange, you could define a
route as follows:
from("direct:start")
.aggregate(header("StockSymbol"), new UseLatestAggregationStrategy())
.completionTimeout(header("timeout"))
.to("mock:aggregated");
You can configure the same route in the Spring DSL, as follows:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<aggregate strategyRef="aggregatorStrategy">
<correlationExpression>
<simple>header.StockSymbol</simple>
</correlationExpression>
<completionTimeout>
<header>timeout</header>
</completionTimeout>
<to uri="mock:aggregated"/>
</aggregate>
</route>
</camelContext>
<bean id="aggregatorStrategy"
class="org.apache.camel.processor.UseLatestAggregationStrategy"/>
![]() | Note |
|---|---|
You can also add a fixed timeout value and Apache Camel will fall back to use this value,
if the dynamic value is |
It is possible to specify a dynamic completion size, where the
completion size is recalculated for every incoming exchange. For example, to set the
completion size from the mySize header in each incoming exchange, you could
define a route as follows:
from("direct:start")
.aggregate(header("StockSymbol"), new UseLatestAggregationStrategy())
.completionSize(header("mySize"))
.to("mock:aggregated");And the same example using Spring XML:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<aggregate strategyRef="aggregatorStrategy">
<correlationExpression>
<simple>header.StockSymbol</simple>
</correlationExpression>
<completionSize>
<header>mySize</header>
</completionSize>
<to uri="mock:aggregated"/>
</aggregate>
</route>
</camelContext>
<bean id="aggregatorStrategy"
class="org.apache.camel.processor.UseLatestAggregationStrategy"/>
![]() | Note |
|---|---|
You can also add a fixed size value and Apache Camel will fall back to use this value, if
the dynamic value is |
In some aggregation scenarios, you might want to enforce the condition that the correlation key is unique for each batch of exchanges. In other words, when the aggregate exchange for a particular correlation key completes, you want to make sure that no further aggregate exchanges with that correlation key are allowed to proceed. For example, you might want to enforce this condition, if the latter part of the route expects to process exchanges with unique correlation key values.
Depending on how the completion conditions are configured, there might be a risk of more than one aggregate exchange being generated with a particular correlation key. For example, although you might define a completion predicate that is designed to wait until all the exchanges with a particular correlation key are received, you might also define a completion timeout, which could fire before all of the exchanges with that key have arrived. In this case, the late-arriving exchanges could give rise to a second aggregate exchange with the same correlation key value.
For such scenarios, you can configure the aggregator to suppress aggregate exchanges
that duplicate previous correlation key values, by setting the
closeCorrelationKeyOnCompletion option. In order to suppress duplicate
correlation key values, it is necessary for the aggregator to record previous correlation
key values in a cache. The size of this cache (the number of cached correlation keys) is
specified as an argument to the closeCorrelationKeyOnCompletion() DSL command.
To specify a cache of unlimited size, you can pass a value of zero or a negative integer.
For example, to specify a cache size of 10000 key values:
from("direct:start")
.aggregate(header("UniqueBatchID"), new MyConcatenateStrategy())
.completionSize(header("mySize"))
.closeCorrelationKeyOnCompletion(10000)
.to("mock:aggregated");If an aggregate exchange completes with a duplicate correlation key value, the
aggregator throws a ClosedCorrelationKeyException exception.
You can combine all of the aggregated exchanges in an outgoing batch into a single
org.apache.camel.impl.GroupedExchange holder class. To enable grouped
exchanges, specify the groupExchanges() option, as shown in the following Java
DSL route:
from("direct:start")
.aggregate(header("StockSymbol"))
.completionTimeout(3000)
.groupExchanges()
.to("mock:result");The grouped exchange that is sent to mock:result contains the list of
aggregated exchanges stored in the exchange property,
Exchange.GROUPED_EXCHANGE. The following line of code shows how a subsequent
processor can access the contents of the grouped exchange in the form of a list:
// Java List<Exchange> grouped = ex.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
![]() | Note |
|---|---|
When you enable the grouped exchanges feature, you must not configure an aggregation strategy (the grouped exchanges feature is itself an aggregation strategy). |
The aggregator can work together with the batch consumer pattern
to aggregate the total number of messages reported by the batch consumer (a batch consumer
endpoint sets the CamelBatchSize, CamelBatchIndex , and
CamelBatchComplete properties on the incoming exchange). For example, to
aggregate all of the files found by a File consumer endpoint, you could use a route like the
following:
from("file://inbox")
.aggregate(xpath("//order/@customerId"), new AggregateCustomerOrderStrategy())
.completionFromBatchConsumer()
.to("bean:processOrder");Currently, the following endpoints support the batch consumer mechanism: File, FTP, Mail, iBatis, and JPA.
If you want pending aggregated exchanges to be stored persistently, you can use the
HawtDB component as a persistent aggregation repository. This requres you to include a
dependency on the camel-hawtdb component in your Maven POM. You can then
configure a route to use the HawtDB aggregation repository as follows:
public void configure() throws Exception {
HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat");
from("direct:start")
.aggregate(header("id"), new UseLatestAggregationStrategy())
.completionTimeout(3000)
.aggregationRepository(repo)
.to("mock:aggregated");
}The HawtDB aggregation repository has a feature that enables it to recover and retry any failed exchanges (that is, any exchange that raised an exception while it was being processed by the latter part of the route). Figure 7.7 shows an overview of the recovery mechanism.
The recovery mechanism works as follows:
The aggregator creates a dedicated recovery thread, which runs in the background, scanning the aggregation repository to find any failed exchanges.
Each failed exchange is checked to see whether its current redelivery count exceeds the maximum redelivery limit. If it is under the limit, the recovery task resubmits the exchange for processing in the latter part of the route.
If the current redelivery count is over the limit, the failed exchange is passed to the dead letter queue.
For more details about the HawtDB component, see HawtDB in
As shown in Figure 7.6, the aggregator is dsecoupled
from the latter part of the route, where the exchanges sent to the latter part of the route
are processed by a dedicated thread pool. By default, this pool contains just a single
thread. If you want to specify a pool with multiple threads, enable the
parallelProcessing option, as follows:
from("direct:start")
.aggregate(header("id"), new UseLatestAggregationStrategy())
.completionTimeout(3000)
.parallelProcessing()
.to("mock:aggregated");By default, this creates a pool with 10 worker threads.
If you want to exercise more control over the created thread pool, specify a custom
java.util.concurrent.ExecutorService instance using the
executorService option (in which case it is unnecessary to enable the
parallelProcessing option).