Not all Camel components support specifying multiple consumers (threads) processing messages concurrently (see the Increased message consumption through multiple endpoint consumers recipe). Routes that use consuming endpoints, such as direct:
or file:
, default to using a single thread to process the message. This recipe will show you how to get around this constraint, by using the threads
DSL to pass messages that were originally consumed sequentially, for example using direct:
, to a thread pool so that they can now be processed in parallel.
The Java code for this recipe is located in the org.camelcookbook.parallelprocessing.threadsdsl
package. The Spring XML files are located under src/main/resources/META-INF/spring
and prefixed with threadsDsl
.
This recipe assumes you are starting with a route that starts on a single thread, such as when consuming using the Direct Component, and you want to use a pool of threads for some of the processing steps. The following route is such an example:
<from uri="direct:in"/> <log message="Received ${body}:${threadName}"/> <delay> <constant>200</constant> <!-- simulate slow routing step --> </delay> <to Uri="mock: out"/>
Using the threads
DSL allows you to define a portion of your route where messages will be processed by set of threads distinct from the original consuming thread.
In the XML DSL, this logic is written as:
<from uri="direct:in"/> <log message="Received ${body}:${threadName}"/> <threads> <log message="Processing ${body}:${threadName}"/> <delay> <constant>200</constant> <!-- simulate slow routing step --> </delay> <to Uri="mock:out"/> </threads>
In the Java DSL, the same route is expressed as:
from("direct:in")
.log("Received ${body}:${threadName}")
.threads()
.delay(200)
.log("Processing ${body}:${threadName}")
.to("mock:out");
Within the threads
block, Camel creates a thread pool that is assigned to consume messages from an in-memory queue. When a thread processes a message through a route up to the threads
block, it places the exchange onto that queue, from which it is picked up by one of the threads from the pool for further processing.
Using this approach you can consume messages from a single-threaded endpoint, and have multiple threads process the message later in the route, leaving the consuming thread to fetch the next message.
In the preceding example, when we send messages through the route, we can see that different threads process the messages before and after the threads
block; the Received
logging statements are also interleaved with the Processing
statements:
Received Message[40]:Camel (camel-1) thread #3 - ProducerTemplate Processing Message[31]:Camel (camel-1) thread #12 - Threads Received Message[41]:Camel (camel-1) thread #5 - ProducerTemplate Processing Message[33]:Camel (camel-1) thread #14 - Threads Received Message[42]:Camel (camel-1) thread #9 - ProducerTemplate Processing Message[34]:Camel (camel-1) thread #16 - Threads ... Processing Message[40]:Camel (camel-1) thread #15 - Threads Processing Message[41]:Camel (camel-1) thread #10 - Threads Processing Message[42]:Camel (camel-1) thread #12 – Threads
This technique is very similar to using a SEDA endpoint as described in the Asynchronously connecting routes recipe in Chapter 1, Structuring Routes. Both techniques use an in-memory queue to pass messages to a consuming thread pool, and can be used with InOut
as well as InOnly
exchanges if the consuming endpoint supports asynchronous routing. An example of this is Jetty, which supports accepting an HTTP request with one thread and responding with another through its continuation support.
It is possible to configure a number of options around the threads
DSL through its attributes. These are the same as that of the threadPool
, and are described in the Using custom thread pools recipe.
As an alternative to setting up the behavior of a thread pool directly within the DSL, a threads
block may be referenced to re-use an existing java.util.concurrent.ExecutorService
. See the Using custom thread pools recipe for further details.
18.188.115.155