One of the simplest ways of increasing throughput is to raise the number of threads that are consuming from an endpoint.
This recipe assumes that you are starting with a route that is asynchronous, for example using seda:
as the consumer endpoint (using the from
DSL statement). SEDA is a mechanism included in Camel Core for connecting routes to each other asynchronously by passing Exchange objects over an in-memory queue—see the Asynchronously connecting routes recipe in Chapter 1, Structuring Routes.
This is important as an endpoint such as direct:
always uses the calling thread for executing the processing steps, so you need to use different techniques to switch processing to a different thread, or thread pool. This is discussed in the Spreading the load within a route using a set of threads recipe.
This recipe will show you how to increase the number of consumer threads for asynchronous consuming endpoints such as seda:
.
The Java code for this recipe is located in the org.camelcookbook.parallelprocessing.endpointconsumers
package. The Spring XML files are located under src/main/resources/META-INF/spring
and prefixed with endpointConsumers
.
This recipe assumes you have an existing asynchronous route, such as the following:
<from uri="seda:in"/> <delay> <constant>200</constant> <!-- slow running service --> </delay> <log message="Processing ${body}:${threadName}"/> <to uri="mock:out"/>
This route introduces a 200 ms delay in order to simulate a more complex, and therefore time-consuming processing step.
Use the concurrentConsumers
option to increase the number of threads available to consume messages:
In the XML DSL, this is written as:
<from uri="seda:in?concurrentConsumers=10"/>
<delay>
<constant>200</constant> <!-- slow running service -->
</delay>
<log message="Processing ${body}:${threadName}"/>
<to Uri="mock:out"/>
In the Java DSL, the same route is expressed as:
from("seda:in?concurrentConsumers=10")
.delay(200)
.log("Processing ${body}:${threadName}")
.to("mock:out");
The SEDA Component's default behavior is to use only a single thread for consuming incoming messages. This means that when our delay of 200 ms per message is taken into account, at most five messages per second will be processed. To increase this, we can allocate more consumers (threads) to this endpoint by using the concurrentConsumers
option on the seda:
endpoint.
When we run a set of messages through the route, we can see from our logging that different threads pick up each message:
Processing Message[8]:Camel (camel-1) thread #8 - seda://in Processing Message[0]:Camel (camel-1) thread #0 - seda://in Processing Message[4]:Camel (camel-1) thread #3 - seda://in Processing Message[2]:Camel (camel-1) thread #2 - seda://in
With 10 concurrent consumers, we should now expect about 50 exchanges to be processed per second versus the original single threaded version, which only processes five per second.
The threads compete with each other when polling the SEDA Component's in-memory queue. This means that there is no guarantee that they will process the messages in the same order that they are sent to the SEDA endpoint. That is why Message[8]
is processed before Message[0]
in the preceding output.
The support for concurrent consumption from an endpoint depends on the component. The concurrentConsumers
property is part of the org.apache.camel.component.seda.SedaEndpoint
class (see the Using Camel components recipe in Chapter 1, Structuring Routes, for a discussion of endpoint properties)—it is not a standard attribute that applies to all components.
Not all components allow for concurrent consumption, as the underlying technology for some may be fundamentally sequential. For example, the JPA Component selects rows from a database table when consuming and potentially processing batches of selected rows at a time. If multiple threads were executing the same select
statement at the same time, the same rows would be processed multiple times.
The following components allow you to have more than one thread consuming from them:
Component |
Endpoint attributes |
---|---|
SEDA, VM |
|
JMS, ActiveMQ |
|
Jetty |
|
To check whether concurrent consumption is configurable, you should refer back to the Camel documentation for the component that you are using.
3.133.134.17