Spreading the load within a route using a set of threads

Not all Camel components support specifying multiple consumers (threads) processing messages concurrently (see the Increased message consumption through multiple endpoint consumers recipe). Routes that use consuming endpoints, such as direct: or file:, default to using a single thread to process the message. This recipe will show you how to get around this constraint, by using the threads DSL to pass messages that were originally consumed sequentially, for example using direct:, to a thread pool so that they can now be processed in parallel.

Getting ready

The Java code for this recipe is located in the org.camelcookbook.parallelprocessing.threadsdsl package. The Spring XML files are located under src/main/resources/META-INF/spring and prefixed with threadsDsl.

This recipe assumes you are starting with a route that starts on a single thread, such as when consuming using the Direct Component, and you want to use a pool of threads for some of the processing steps. The following route is such an example:

<from uri="direct:in"/>
<log message="Received ${body}:${threadName}"/>
<delay>
  <constant>200</constant> <!-- simulate slow routing step -->
</delay>
<to Uri="mock: out"/>

How to do it...

Using the threads DSL allows you to define a portion of your route where messages will be processed by set of threads distinct from the original consuming thread.

In the XML DSL, this logic is written as:

<from uri="direct:in"/>
<log message="Received ${body}:${threadName}"/>
<threads>
  <log message="Processing ${body}:${threadName}"/>
  <delay>
    <constant>200</constant> <!-- simulate slow routing step -->
  </delay>
  <to Uri="mock:out"/>
</threads>

In the Java DSL, the same route is expressed as:

from("direct:in")
  .log("Received ${body}:${threadName}")
  .threads()
  .delay(200)
  .log("Processing ${body}:${threadName}")
  .to("mock:out");

How it works...

Within the threads block, Camel creates a thread pool that is assigned to consume messages from an in-memory queue. When a thread processes a message through a route up to the threads block, it places the exchange onto that queue, from which it is picked up by one of the threads from the pool for further processing.

Using this approach you can consume messages from a single-threaded endpoint, and have multiple threads process the message later in the route, leaving the consuming thread to fetch the next message.

In the preceding example, when we send messages through the route, we can see that different threads process the messages before and after the threads block; the Received logging statements are also interleaved with the Processing statements:

Received Message[40]:Camel (camel-1) thread #3 - ProducerTemplate
Processing Message[31]:Camel (camel-1) thread #12 - Threads
Received Message[41]:Camel (camel-1) thread #5 - ProducerTemplate
Processing Message[33]:Camel (camel-1) thread #14 - Threads
Received Message[42]:Camel (camel-1) thread #9 - ProducerTemplate
Processing Message[34]:Camel (camel-1) thread #16 - Threads
...
Processing Message[40]:Camel (camel-1) thread #15 - Threads
Processing Message[41]:Camel (camel-1) thread #10 - Threads
Processing Message[42]:Camel (camel-1) thread #12 – Threads

Note

You may have noticed that there are multiple threads logging Received statements. This is because the test requests have been fired into the route asynchronously. The reasons behind this are explained in the Routing a request asynchronously recipe.

This technique is very similar to using a SEDA endpoint as described in the Asynchronously connecting routes recipe in Chapter 1, Structuring Routes. Both techniques use an in-memory queue to pass messages to a consuming thread pool, and can be used with InOut as well as InOnly exchanges if the consuming endpoint supports asynchronous routing. An example of this is Jetty, which supports accepting an HTTP request with one thread and responding with another through its continuation support.

Note

This technique should be used carefully if you intend on using it along with transactions. The underlying transactional resources may not support being accessed concurrently from different threads. If you intend on combining these two features, make sure that you test the behavior thoroughly.

There's more...

It is possible to configure a number of options around the threads DSL through its attributes. These are the same as that of the threadPool, and are described in the Using custom thread pools recipe.

As an alternative to setting up the behavior of a thread pool directly within the DSL, a threads block may be referenced to re-use an existing java.util.concurrent.ExecutorService. See the Using custom thread pools recipe for further details.

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.139.15