Very occasionally you will want to interact with a third-party API that is asynchronous in nature (that is, one that uses callbacks to return a response) within a Camel Processor.
The most straightforward way to do this is to register a listener with that API, place a request, and block the current thread until a response is received using a java.util.concurrent.CountDownLatch
instance or similar. This has the downside of using one more thread than you strictly need to, and potentially slows down the rate of consumption from the route's consumer endpoint.
This recipe provides you with an alternative that allows you to truly interact asynchronously with this type of API through an Asynchronous Processor . Using this mechanism, the original thread is released to take on more work once the request has been placed, and the response is routed using the thread that triggers the callback.
The Java code for this recipe is located in the org.camelcookbook.parallelprocessing.asyncprocessor
package. The Spring XML files are located under src/main/resources/META-INF/spring
and prefixed with asyncProcessor
.
Implement the org.apache.camel.AsyncProcessor
interface. This allows you to create code with fine-grained control over whether messages are processed synchronously or asynchronously.
AsyncProcessor
interface:public class BackgroundThreadAsyncProcessor implements AsyncProcessor { //... }
The AsyncProcessor
interface defines a single method in addition to that of its parent interface, Processor
:
public interface AsyncProcessor extends org.apache.camel.Processor { boolean process(org.apache.camel.Exchange exchange, org.apache.camel.AsyncCallback asyncCallback); }
This is the only method that we will be interested in implementing. The regular process(Exchange)
method can be left empty, or an exception thrown to indicate to other developers that it will never be used:
@Override public void process(Exchange exchange) throws Exception { throw new IllegalStateException( "this should never be called"); }
process(Exchange, AsyncCallback)
method.Rather than introducing a third-party API, we are going to demonstrate asynchronous behavior by submitting a Runnable
implementation to a java.util.concurrent.ExecutorService
instance.
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
The AsyncCallback
is an object passed in from the Camel runtime that we will invoke in our simulated response handler (the Runnable
) to invoke to indicate that this processor is done.
The method returns a boolean
value to indicate whether it is completing synchronously (true
), or asynchronously (false
). The AsyncCallback.done(boolean)
method must always be invoked with the same value that is returned.
@Override public boolean process(Exchange exchange, final AsyncCallback asyncCallback) { final Message in = exchange.getIn(); final boolean completesSynchronously = false; backgroundExecutor.submit(new Runnable() { @Override public void run() { in.setBody("Handled async: " + in.getBody(String.class)); // the current thread will continue to process // the exchange through the // remainder of the route asyncCallback.done(completesSynchronously); } }); return completesSynchronously; }
In the XML DSL, first define the processor as a bean:
<bean id="slowOperationProcessor" class="org.camelcookbook.parallelprocessing.asyncprocessor.SlowOperationProcessor"/>
Then use the bean reference in a process
statement:
<from Uri="direct:in"/> <process ref="slowOperationProcessor"/> <to Uri="mock:out"/>
In the Java DSL, that same thing is expressed as:
from("direct:in") .process(new SlowOperationProcessor()) .to("mock:out");
In the preceding example, background processing by a thread pool is used to demonstrate an asynchronous interaction. Asynchronous libraries work by triggering your code when an interesting event occurs by invoking an event listener. The listener in this instance is Runnable
.
The process(Exchange, AsyncCallback)
method is responsible for interacting with the asynchronous API, and returning whether the request was processed synchronously. If it returns true
, the current thread proceeds to process the exchange as usual. If the method returns false
, then the current thread will take no further part in processing the exchange, and will be freed up by Camel to process another request from the route's consumer endpoint.
The call to asyncCallback.done()
indicates to the routing engine whether the exchange was processed synchronously. If true
, then it gives the engine the opportunity to clean up any reserved resources. If false
, then the invoking thread will be used to process the exchange through the remainder of the route.
The value returned from process()
and the argument to asyncCallback.done()
must be the same.
The technique demonstrated here might also be useful if you know that a certain processing step is always going to take a long time. Handing the processing over to an ExecutorService
through Runnable
frees up your consuming thread to handle the next request.
It is generally recommended that you implement your processing using the standard Processor
interface, unless you identify a good reason why AsyncProcessor
should be used.
An AsyncProcessor
interface does not need to process every exchange asynchronously, which is why the process(Exchange, AsyncCallback)
method returns whether or not the invocation was completed synchronously.
Here is an example of conditionally processing a message either synchronously or asynchronously depending on its contents, in this case by inspecting a header that we set:
@Override public boolean process(final Exchange exchange, final AsyncCallback asyncCallback) { final Message in = exchange.getIn(); if (in.getHeader("processAsync", Boolean.class)) { // process asynchronously executorService.submit(new Runnable() { @Override public void run() { in.setBody("Processed async: " + in.getBody(String.class)); asyncCallback.done(false); } }); return false; } else { // process synchronously in.setBody("Processed sync: " + in.getBody(String.class)); asyncCallback.done(true); return true; } }
Note that when returning true
, indicating synchronous processing, the asyncCallback.done(true)
method must be invoked beforehand to give the routing engine an opportunity to clean up any reserved resources.
AsyncCallback
interface: http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/AsyncCallback.htmlAsyncProcessor
interface: http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/AsyncProcessor.html 18.118.27.119