LibraryLink ToToggle FramesPrintFeedback

Processors

To enable the router to do something more interesting than simply connecting a consumer endpoint to a producer endpoint, you can add processors to your route. A processor is a command you can insert into a routing rule to perform arbitrary processing of messages that flow through the rule. Apache Camel provides a wide variety of different processors, as shown in Table 1.1.

Table 1.1. Apache Camel Processors

Java DSLSpring DSLDescription
aggregate()aggregate

Aggregator EIP: Creates an aggregator, which combines multiple incoming exchanges into a single exchange.

aop()aop

Use Aspect Oriented Programming (AOP) to do work before and after a specified sub-route. See Aspect Oriented Programming.

bean(), beanRef()bean

Process the current exchange by invoking a method on a Java object (or bean). See Bean Integration.

choice()choice

Content Based Router EIP: Selects a particular sub-route based on the exchange content, using when and otherwise clauses.

convertBodyTo()convertBodyTo

Converts the In message body to the specified type.

delay()delay

Delayer EIP: Delays the propagation of the exchange to the latter part of the route.

doTry()doTry

Creates a try/catch block for handling exceptions, using doCatch, doFinally, and end clauses.

end()N/AEnds the current command block.
enrich(),enrichRef()enrich

Content Enricher EIP: Combines the current exchange with data requested from a specified producer endpoint URI.

filter()filter

Message Filter EIP: Uses a predicate expression to filter incoming exchanges.

idempotentConsumer()idempotentConsumer

Idempotent Consumer EIP: Implements a strategy to suppress duplicate messages.

inheritErrorHandler()@inheritErrorHandlerBoolean option that can be used to disable the inherited error handler on a particular route node (defined as a sub-clause in the Java DSL and as an attribute in the Spring DSL).
inOnly()inOnly

Either sets the current exchange's MEP to InOnly (if no arguments) or sends the exchange as an InOnly to the specified endpoint(s).

inOut()inOut

Either sets the current exchange's MEP to InOut (if no arguments) or sends the exchange as an InOut to the specified endpoint(s).

loadBalance()loadBalance

Load Balancer EIP: Implements load balancing over a collection of endpoints.

log()logLogs a message to the console.
loop()loop

Loop EIP: Repeatedly resends each exchange to the latter part of the route.

markRollbackOnly()@markRollbackOnly(Transactions) Marks the current transaction for rollback only (no exception is raised). In the Spring DSL, this option is set as a boolean attribute on the rollback element. See EIP Transactions Guide.
markRollbackOnlyLast()@markRollbackOnlyLast(Transactions) If one or more transactions have previously been associated with this thread and then suspended, this command marks the latest transaction for rollback only (no exception is raised). In the Spring DSL, this option is set as a boolean attribute on the rollback element. See EIP Transactions Guide.
marshal()marshal

Transforms into a low-level or binary format using the specified data format, in preparation for sending over a particular transport protocol. See Marshalling and unmarshalling.

multicast()multicast

Multicast EIP: Multicasts the current exchange to multiple destinations, where each destination gets its own copy of the exchange.

onCompletion()onCompletion

Defines a sub-route (terminated by end() in the Java DSL) that gets executed after the main route has completed. For conditional execution, use the onWhen sub-clause. Can also be defined on its own line (not in a route).

onException()onException

Defines a sub-route (terminated by end() in the Java DSL) that gets executed whenever the specified exception occurs. Usually defined on its own line (not in a route).

pipeline()pipeline

Pipes and Filters EIP: Sends the exchange to a series of endpoints, where the output of one endpoint becomes the input of the next endpoint. See also Pipeline Processing.

policy()policy

Apply a policy to the current route (currently only used for transactional policies—see EIP Transactions Guide).

pollEnrich(),pollEnrichRef()pollEnrich

Content Enricher EIP: Combines the current exchange with data polled from a specified consumer endpoint URI.

process(),processRefprocess

Execute a custom processor on the current exchange. See Custom processor and Programing EIP Components.

recipientList()recipientList

Recipient List EIP: Sends the exchange to a list of recipients that is calculated at runtime (for example, based on the contents of a header).

removeHeader()removeHeader

Removes the specified header from the exchange's In message.

removeHeaders()removeHeadersRemoves the headers matching the specified pattern from the exchange's In message. The pattern can have the form, prefix*—in which case it matches every name starting with prefix—otherwise, it is interpreted as a regular expression.
removeProperty()removeProperty

Removes the specified exchange property from the exchange.

resequence()resequence

Resequencer EIP: Re-orders incoming exchanges on the basis of a specified comparotor operation. Supports a batch mode and a stream mode.

rollback()rollback

(Transactions) Marks the current transaction for rollback only (also raising an exception, by default). See EIP Transactions Guide.

routingSlip()routingSlip

Routing Slip EIP: Routes the exchange through a pipeline that is constructed dynamically, based on the list of endpoint URIs extracted from a slip header.

sample()sampleCreates a sampling throttler, allowing you to extract a sample of exchanges from the traffic on a route.
setBody()setBody

Sets the message body of the exchange's In message.

setExchangePattern()setExchangePattern

Sets the current exchange's MEP to the specified value. See Message exchange patterns.

setHeader()setHeader

Sets the specified header in the exchange's In message.

setOutHeader()setOutHeader

Sets the specified header in the exchange's Out message.

setProperty()setProperty()

Sets the specified exchange property.

sort(), sortBody()sort

Sorts the contents of the In message body (where a custom comparator can optionally be specified).

split()split

Splitter EIP: Splits the current exchange into a sequence of exchanges, where each split exchange contains a fragment of the original message body.

stop()stop

Stops routing the current exchange and marks it as completed.

threads()threads

Creates a thread pool for concurrent processing of the latter part of the route.

throttle()throttle

Throttler EIP: Limit the flow rate to the specified level (exchanges per second).

throwException()throwException

Throw the specified Java exception.

to()to

Send the exchange to one or more endpoints. See Pipeline Processing.

toF()N/ASend the exchange to an endpoint, using string formatting. That is, the endpoint URI string can embed substitutions in the style of the C printf() function.
transacted()transacted

Create a Spring transaction scope that encloses the latter part of the route. See EIP Transactions Guide.

transform()transform

Message Translator EIP: Copy the In message headers to the Out message headers and set the Out message body to the specified value.

unmarshal()unmarshal

Transforms the In message body from a low-level or binary format to a high-level format, using the specified data format. See Marshalling and unmarshalling.

validate()validateTakes a predicate expression to test whether the current message is valid. If the predicate returns false, throws a PredicateValidationException exception.
wireTap()wireTap

Wire Tap EIP: Sends a copy of the current exchange to the specified wire tap URI, using the ExchangePattern.InOnly MEP.


To get some idea of how to use processors in a route, see the following examples:

The choice() processor is a conditional statement that is used to route incoming messages to alternative producer endpoints. Each alternative producer endpoint is preceded by a when() method, which takes a predicate argument. If the predicate is true, the following target is selected, otherwise processing proceeds to the next when() method in the rule. For example, the following choice() processor directs incoming messages to either Target1, Target2, or Target3, depending on the values of Predicate1 and Predicate2:

from("SourceURL").choice().when(Predicate1).to("Target1")
                    .when(Predicate2).to("Target2")
                    .otherwise().to("Target3");

Or equivalently in Spring XML:

<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <choice>
      <when>
        <!-- First predicate -->
        <simple>header.foo = 'bar'</simple>
        <to uri="Target1"/>
      </when>
      <when>
        <!-- Second predicate -->
        <simple>header.foo = 'manchu'</simple>
        <to uri="Target2"/>
      </when>
      <otherwise>
        <to uri="Target3"/>
      </otherwise>
    </choice>
  </route>
</camelContext>

The filter() processor can be used to prevent uninteresting messages from reaching the producer endpoint. It takes a single predicate argument: if the predicate is true, the message exchange is allowed through to the producer; if the predicate is false, the message exchange is blocked. For example, the following filter blocks a message exchange, unless the incoming message contains a header, foo, with value equal to bar:

from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");

Or equivalently in Spring XML:

<camelContext id="filterRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <filter>
      <simple>header.foo = 'bar'</simple>
      <to uri="TargetURL"/>
    </filter>
  </route>
</camelContext>

The throttle() processor ensures that a producer endpoint does not get overloaded. The throttler works by limiting the number of messages that can pass through per second. If the incoming messages exceed the specified rate, the throttler accumulates excess messages in a buffer and transmits them more slowly to the producer endpoint. For example, to limit the rate of throughput to 100 messages per second, you can define the following rule:

from("SourceURL").throttle(100).to("TargetURL");

Or equivalently in Spring XML:

<camelContext id="throttleRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <throttle maximumRequestsPerPeriod="100" timePeriodMillis="1000">
      <to uri="TargetURL"/>
    </throttle>
  </route>
</camelContext>

If none of the standard processors described here provide the functionality you need, you can always define your own custom processor. To create a custom processor, define a class that implements the org.apache.camel.Processor interface and overrides the process() method. The following custom processor, MyProcessor, removes the header named foo from incoming messages:


To insert the custom processor into a router rule, invoke the process() method, which provides a generic mechanism for inserting processors into rules. For example, the following rule invokes the processor defined in Example 1.3:

org.apache.camel.Processor myProc = new MyProcessor();

from("SourceURL").process(myProc).to("TargetURL");