Increasing message consumption through multiple endpoint consumers

One of the simplest ways of increasing throughput is to raise the number of threads that are consuming from an endpoint.

This recipe assumes that you are starting with a route that is asynchronous, for example using seda: as the consumer endpoint (using the from DSL statement). SEDA is a mechanism included in Camel Core for connecting routes to each other asynchronously by passing Exchange objects over an in-memory queue—see the Asynchronously connecting routes recipe in Chapter 1, Structuring Routes.

This is important as an endpoint such as direct: always uses the calling thread for executing the processing steps, so you need to use different techniques to switch processing to a different thread, or thread pool. This is discussed in the Spreading the load within a route using a set of threads recipe.

This recipe will show you how to increase the number of consumer threads for asynchronous consuming endpoints such as seda:.

Getting ready

The Java code for this recipe is located in the org.camelcookbook.parallelprocessing.endpointconsumers package. The Spring XML files are located under src/main/resources/META-INF/spring and prefixed with endpointConsumers.

This recipe assumes you have an existing asynchronous route, such as the following:

<from uri="seda:in"/>
<delay>
  <constant>200</constant> <!-- slow running service -->
</delay>
<log message="Processing ${body}:${threadName}"/>
<to uri="mock:out"/>

This route introduces a 200 ms delay in order to simulate a more complex, and therefore time-consuming processing step.

How to do it...

Use the concurrentConsumers option to increase the number of threads available to consume messages:

In the XML DSL, this is written as:

<from uri="seda:in?concurrentConsumers=10"/>
<delay>
  <constant>200</constant> <!-- slow running service -->
</delay>
<log message="Processing ${body}:${threadName}"/>
<to Uri="mock:out"/>

In the Java DSL, the same route is expressed as:

from("seda:in?concurrentConsumers=10")
  .delay(200)
  .log("Processing ${body}:${threadName}")
  .to("mock:out");

How it works...

The SEDA Component's default behavior is to use only a single thread for consuming incoming messages. This means that when our delay of 200 ms per message is taken into account, at most five messages per second will be processed. To increase this, we can allocate more consumers (threads) to this endpoint by using the concurrentConsumers option on the seda: endpoint.

When we run a set of messages through the route, we can see from our logging that different threads pick up each message:

Processing Message[8]:Camel (camel-1) thread #8 - seda://in
Processing Message[0]:Camel (camel-1) thread #0 - seda://in
Processing Message[4]:Camel (camel-1) thread #3 - seda://in
Processing Message[2]:Camel (camel-1) thread #2 - seda://in

With 10 concurrent consumers, we should now expect about 50 exchanges to be processed per second versus the original single threaded version, which only processes five per second.

The threads compete with each other when polling the SEDA Component's in-memory queue. This means that there is no guarantee that they will process the messages in the same order that they are sent to the SEDA endpoint. That is why Message[8] is processed before Message[0] in the preceding output.

The support for concurrent consumption from an endpoint depends on the component. The concurrentConsumers property is part of the org.apache.camel.component.seda.SedaEndpoint class (see the Using Camel components recipe in Chapter 1, Structuring Routes, for a discussion of endpoint properties)—it is not a standard attribute that applies to all components.

There's more...

Not all components allow for concurrent consumption, as the underlying technology for some may be fundamentally sequential. For example, the JPA Component selects rows from a database table when consuming and potentially processing batches of selected rows at a time. If multiple threads were executing the same select statement at the same time, the same rows would be processed multiple times.

The following components allow you to have more than one thread consuming from them:

Component

Endpoint attributes

SEDA, VM

concurrentConsumers defines the size of a fixed thread pool

JMS, ActiveMQ

concurrentConsumers defines the minimum size of a thread pool—more threads will be created if required; maxConcurrentConsumers defines the upper bound of that pool

Jetty

minThreads defines a minimum number of threads servicing HTTP requests; maxThreads defines an upper bound

To check whether concurrent consumption is configurable, you should refer back to the Camel documentation for the component that you are using.

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.134.17