Available as of Camel 2.1
ToAsync is a new feature build into the core of Camel 2.1. It sets the foundation base for non blocking asynchronous Request Reply messaging with Camel. By foundation we mean that it has the moving pieces that makes Camel Components leverage this in case they natively supports non blocking request reply. At this time of writing its only the Jetty component that offers this. Over time we will incorporate this into other components. However don't despair as Camel have fallback support for simulating asynchronous request/reply in the Camel core, in cases where its not supported natively in the Camel component used.
How does it work?
Camel now has a new DSL named toAsync in the Java DSL. And in Spring XML there is a new async boolean attribute on the <to> XML tag. So in a Camel route you can leverage this by just using a using this when sending to an endpoint when you do Request Reply messaging.
The route below is from a unit test in Camel. Notice the new toAsync where we use this in action. What happens is that the message is send asynchronously to the direct:bar endpoint. When the reply comes back the message is continued being routed on the other side where its routed to the mock:result endpoint. The number 5 indicates the thread pool size, so we have 5 concurrent consumers to route the replies when they come back.
from("direct:start").to("mock:a").toAsync("direct:bar", 5).to("mock:result"); from("direct:bar").to("mock:b").transform(constant("Bye World"));
This unit test is using the direct endpoint when sending using toAsync and the direct component does not support non blocking request reply messaging natively. And therefore Camel fallback to simulate this by transferring the Exchange to an internal thread pool which handles the task of sending the Exchange. This then frees the burden of the original thread to send the request and thus its work is offloaded and it can continue. What it means is the that original thread is not blocked. The internal thread pool will in the mean time send the request and handover the exchange to the completed task queue when the reply comes back, and then the Exchange can be continued routed in the route path. In this example it will just route to the mock:result endpoint.
In this example we use the following route:
<route> <!-- we route from a direct endpoint --> <from uri="direct:start"/> <!-- log the request --> <to uri="log:+++ request +++"/> <!-- then doing a non blocking async request/reply to the http server with a pool of 10 threads --> <to uri="jetty://http://0.0.0.0:9123/myapp" async="true" poolSize="10"/> <!-- the reply from the server is logged --> <to uri="log:+++ reply +++"/> <!-- and to our mock so we can assert we got all responses --> <to ref="result"/> </route>
Notice how we have specified async="true" and poolSize="10" in the <to> tag to leverage this.
To learn more see and try this example yourself.
How this works inside Camel
Camel uses the org.apache.camel.AsyncProcessor which is responsible for processing the non blocking message.
void process(Exchange exchange, AsyncCallback callback) throws Exception;
The idea is that you invoke the callback when the reply is ready.
The org.apache.camel.AsyncCallback has the following method
void onTaskCompleted(Exchange exchange);
which is the method to invoke when the reply is ready.
All the other moving parts is build directly into the core of Camel. So you do not have to worry about that.
If you want to take a look its implemented in the org.apache.camel.processor.SendAsyncProcessor class.
This class is currently the only implementation of this feature.
Basically you do implement the AsyncProcessor interface
public class JettyHttpProducer extends DefaultProducer implements AsyncProcessor
And then invoke the callback when the reply is ready and populated on the Exchange.
See the class JettyContentExchange how this is implemented in Jetty.
What if a Component does not support non blocking?
Many components does not naturally support non blocking Request Reply. The ones that does need to be improved in Camel to support the AsyncProcessor to integrate with ToAsync. If a component does not support 'non blocking' (i.e. does not use AsyncProcessor then Camel will automatic fallback and simulate this by using an internal thread pool that sends the request and blocks until the reply is ready. Just as it did in the very first example on this page using the direct endpoint.
Configuring thread pooling
You configure the thread pool on the toAsync by either supply a pool size as the 2nd parameter as we saw in the first example.
But you can also refer to a java.util.concurrent.ExecutorService as follows:
In the Spring XML there is an attribute to refer to the thread pool
<to uri="jetty://http://0.0.0.0:9123/myapp" async="true" executorServiceRef="mySharedPool"/>
Async for synchronous support in Camel routes and as Camel client API