Working with asynchronous APIs

Very occasionally you will want to interact with a third-party API that is asynchronous in nature (that is, one that uses callbacks to return a response) within a Camel Processor.

The most straightforward way to do this is to register a listener with that API, place a request, and block the current thread until a response is received using a java.util.concurrent.CountDownLatch instance or similar. This has the downside of using one more thread than you strictly need to, and potentially slows down the rate of consumption from the route's consumer endpoint.

This recipe provides you with an alternative that allows you to truly interact asynchronously with this type of API through an Asynchronous Processor . Using this mechanism, the original thread is released to take on more work once the request has been placed, and the response is routed using the thread that triggers the callback.

Note

This is an advanced recipe that should only be used by developers leveraging a native third-party asynchronous API.

Getting ready

The Java code for this recipe is located in the org.camelcookbook.parallelprocessing.asyncprocessor package. The Spring XML files are located under src/main/resources/META-INF/spring and prefixed with asyncProcessor.

How to do it...

Implement the org.apache.camel.AsyncProcessor interface. This allows you to create code with fine-grained control over whether messages are processed synchronously or asynchronously.

  1. Define a class that implements the AsyncProcessor interface:
    public class BackgroundThreadAsyncProcessor 
        implements AsyncProcessor {
      //...
    }

    The AsyncProcessor interface defines a single method in addition to that of its parent interface, Processor:

    public interface AsyncProcessor 
        extends org.apache.camel.Processor {
      boolean process(org.apache.camel.Exchange exchange,
          org.apache.camel.AsyncCallback asyncCallback);
    }

    This is the only method that we will be interested in implementing. The regular process(Exchange) method can be left empty, or an exception thrown to indicate to other developers that it will never be used:

    @Override
    public void process(Exchange exchange) throws Exception {
      throw new IllegalStateException(
          "this should never be called");
    }
  2. Implement the process(Exchange, AsyncCallback) method.

    Rather than introducing a third-party API, we are going to demonstrate asynchronous behavior by submitting a Runnable implementation to a java.util.concurrent.ExecutorService instance.

    private final ExecutorService executorService =
        Executors.newSingleThreadExecutor();

    The AsyncCallback is an object passed in from the Camel runtime that we will invoke in our simulated response handler (the Runnable) to invoke to indicate that this processor is done.

    The method returns a boolean value to indicate whether it is completing synchronously (true), or asynchronously (false). The AsyncCallback.done(boolean) method must always be invoked with the same value that is returned.

    @Override
    public boolean process(Exchange exchange, 
        final AsyncCallback asyncCallback) {
      final Message in = exchange.getIn();
      final boolean completesSynchronously = false;
      backgroundExecutor.submit(new Runnable() {
        @Override
        public void run() {
          in.setBody("Handled async: "
                     + in.getBody(String.class));
          // the current thread will continue to process
          // the exchange through the
          // remainder of the route
          asyncCallback.done(completesSynchronously);
        }
      });
      return completesSynchronously;
    }
  3. Include the processor as part of the route.

    In the XML DSL, first define the processor as a bean:

    <bean id="slowOperationProcessor" 
          class="org.camelcookbook.parallelprocessing.asyncprocessor.SlowOperationProcessor"/>

    Then use the bean reference in a process statement:

    <from Uri="direct:in"/>
    <process ref="slowOperationProcessor"/>
    <to Uri="mock:out"/>

    In the Java DSL, that same thing is expressed as:

    from("direct:in")
      .process(new SlowOperationProcessor())
      .to("mock:out");

How it works...

In the preceding example, background processing by a thread pool is used to demonstrate an asynchronous interaction. Asynchronous libraries work by triggering your code when an interesting event occurs by invoking an event listener. The listener in this instance is Runnable.

The process(Exchange, AsyncCallback) method is responsible for interacting with the asynchronous API, and returning whether the request was processed synchronously. If it returns true, the current thread proceeds to process the exchange as usual. If the method returns false, then the current thread will take no further part in processing the exchange, and will be freed up by Camel to process another request from the route's consumer endpoint.

The call to asyncCallback.done() indicates to the routing engine whether the exchange was processed synchronously. If true, then it gives the engine the opportunity to clean up any reserved resources. If false, then the invoking thread will be used to process the exchange through the remainder of the route.

The value returned from process() and the argument to asyncCallback.done() must be the same.

Tip

The technique demonstrated here might also be useful if you know that a certain processing step is always going to take a long time. Handing the processing over to an ExecutorService through Runnable frees up your consuming thread to handle the next request.

It is generally recommended that you implement your processing using the standard Processor interface, unless you identify a good reason why AsyncProcessor should be used.

There's more...

An AsyncProcessor interface does not need to process every exchange asynchronously, which is why the process(Exchange, AsyncCallback) method returns whether or not the invocation was completed synchronously.

Here is an example of conditionally processing a message either synchronously or asynchronously depending on its contents, in this case by inspecting a header that we set:

@Override
public boolean process(final Exchange exchange,
    final AsyncCallback asyncCallback) {
  final Message in = exchange.getIn();
  if (in.getHeader("processAsync", Boolean.class)) {
    // process asynchronously
    executorService.submit(new Runnable() {
      @Override
      public void run() {
        in.setBody("Processed async: "
                 + in.getBody(String.class));
        asyncCallback.done(false);
      }
    });
    return false;
  } else {
    // process synchronously
    in.setBody("Processed sync: " + in.getBody(String.class));
    asyncCallback.done(true);
    return true;
  }
}

Note that when returning true, indicating synchronous processing, the asyncCallback.done(true) method must be invoked beforehand to give the routing engine an opportunity to clean up any reserved resources.

Note

This technique should be used carefully if you intend on using it along with transactions. The underlying transactional resources may not support being accessed concurrently from different threads. If you intend on combining these two features, make sure that you test the behavior thoroughly.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.27.119