When you send a message using the ProducerTemplate.sendBody()
method into a Camel endpoint, you may find that even though it contains asynchronous processing, such as through the threads
DSL (see the Spreading the load within a route using a set of threads recipe), your original thread is blocked until the request is fully processed. This is by design, as Camel will not return unless the message was fully processed—keeping track of whether an exchange was fully processed is one of the roles of Camel's Asynchronous Routing Engine.
This recipe will explain how to interact with Camel asynchronously through a ProducerTemplate
instance, so that you can continue doing other work while a message submitted to Camel is being processed.
Java code for this recipe is located in the org.camelcookbook.parallelprocessing.asyncrequest
package.
This recipe assumes that you have an existing route that will process a message synchronously, and may take a significant amount of time to complete. The following is an example of such a route:
from("direct:processInOut") .log("Received ${body}") .delay(1000) // simulate slow processing .log("Processing ${body}") .transform(simple("Processed ${body}"));
We know upfront that processing a message through this route will take some time, so we would like to send a message into it asynchronously, and get on with doing other work until a response is returned to us.
Use the ProducerTemplate
interface's asyncSend()
or asyncRequest()
variant methods to allow you to send a message asynchronously to a Camel endpoint, and still have the ability to know when the message processing is complete.
ProducerTemplate
interface's asyncSend()
or asyncRequest()
methods depending on whether you require the exchange to be passed as InOnly
or InOut
respectively. Here we use asyncRequestBody()
to initiate an InOut
conversation from which we expect a response:
Future<Object> future = producerTemplate.asyncRequestBody(
"direct:processInOut", "SomePayload");
The async...()
methods return a java.util.concurrent.Future
that we can use to check whether the processing of the exchange was completed, that is, whether the request was processed:
while(!future.isDone()) {
log.info("Doing something else while processing...");
Thread.sleep(200);
}
future.get()
method to obtain the response from the route if successful. Calling this method will block the current thread until a response is received.String response = (String) future.get();
log.info("Received a response");
assertEquals("Processed SomePayload", response);
If an exception was thrown while processing the exchange, future.get()
will throw a java.util.concurrent.ExecutionException
interface that wraps the exception raised from within Camel.
Running this code shows the following output:
Doing something else while processing... Received SomePayload Doing something else while processing... Doing something else while processing... Doing something else while processing... Doing something else while processing... Doing something else while processing... Processing SomePayload Received a response
Internally, a thread pool based upon the default settings (10 threads) is used to process any exchanges sent using the ProducerTemplate
interface's async...()
methods. The calling thread submits the message into an in-memory queue for processing by one of the threads in this pool, and continues processing as usual.
It is also possible to use a callback, and deal with the return status of the exchange asynchronously by providing org.apache.camel.spi.Synchronization
implementation to one of the ProducerTemplate
interface's asyncCallback...()
methods:
Future<Object> future = template.asyncCallbackRequestBody( "direct:processInOut", "AnotherPayload", new Synchronization() { @Override public void onComplete(Exchange exchange) { assertEquals("Processed AnotherPayload", exchange.getOut().getBody()); } @Override public void onFailure(Exchange exchange) { fail(); } });
ProducerTemplate
interface: http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/ProducerTemplate.htmlSynchronization
interface: http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/spi/Synchronization.html18.191.176.194