Routing a request asynchronously

When you send a message using the ProducerTemplate.sendBody() method into a Camel endpoint, you may find that even though it contains asynchronous processing, such as through the threads DSL (see the Spreading the load within a route using a set of threads recipe), your original thread is blocked until the request is fully processed. This is by design, as Camel will not return unless the message was fully processed—keeping track of whether an exchange was fully processed is one of the roles of Camel's Asynchronous Routing Engine.

This recipe will explain how to interact with Camel asynchronously through a ProducerTemplate instance, so that you can continue doing other work while a message submitted to Camel is being processed.

Getting ready

Java code for this recipe is located in the org.camelcookbook.parallelprocessing.asyncrequest package.

This recipe assumes that you have an existing route that will process a message synchronously, and may take a significant amount of time to complete. The following is an example of such a route:

from("direct:processInOut")
  .log("Received ${body}")
  .delay(1000) // simulate slow processing
  .log("Processing ${body}")
  .transform(simple("Processed ${body}"));

We know upfront that processing a message through this route will take some time, so we would like to send a message into it asynchronously, and get on with doing other work until a response is returned to us.

How to do it...

Use the ProducerTemplate interface's asyncSend() or asyncRequest() variant methods to allow you to send a message asynchronously to a Camel endpoint, and still have the ability to know when the message processing is complete.

  1. Send a message into the Camel route using one of the ProducerTemplate interface's asyncSend() or asyncRequest() methods depending on whether you require the exchange to be passed as InOnly or InOut respectively.

    Here we use asyncRequestBody() to initiate an InOut conversation from which we expect a response:

    Future<Object> future = producerTemplate.asyncRequestBody(
        "direct:processInOut", "SomePayload");

    The async...() methods return a java.util.concurrent.Future that we can use to check whether the processing of the exchange was completed, that is, whether the request was processed:

    while(!future.isDone()) {
      log.info("Doing something else while processing...");
      Thread.sleep(200);
    }
  2. Use the future.get() method to obtain the response from the route if successful. Calling this method will block the current thread until a response is received.
    String response = (String) future.get();
    log.info("Received a response");
    assertEquals("Processed SomePayload", response);

    If an exception was thrown while processing the exchange, future.get() will throw a java.util.concurrent.ExecutionException interface that wraps the exception raised from within Camel.

    Running this code shows the following output:

    Doing something else while processing...
    Received SomePayload
    Doing something else while processing...
    Doing something else while processing...
    Doing something else while processing...
    Doing something else while processing...
    Doing something else while processing...
    Processing SomePayload
    Received a response

How it works...

Internally, a thread pool based upon the default settings (10 threads) is used to process any exchanges sent using the ProducerTemplate interface's async...() methods. The calling thread submits the message into an in-memory queue for processing by one of the threads in this pool, and continues processing as usual.

There's more...

It is also possible to use a callback, and deal with the return status of the exchange asynchronously by providing org.apache.camel.spi.Synchronization implementation to one of the ProducerTemplate interface's asyncCallback...() methods:

Future<Object> future = template.asyncCallbackRequestBody(
    "direct:processInOut", 
    "AnotherPayload", 
    new Synchronization() {
      @Override
      public void onComplete(Exchange exchange) {
        assertEquals("Processed AnotherPayload",
                     exchange.getOut().getBody());
      }

      @Override
      public void onFailure(Exchange exchange) {
        fail();
      }
    });
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.98.208