Processing aggregated messages in parallel

The default behavior of an Aggregator is to process the aggregated exchange through the steps defined within the aggregate block using a single thread. This is either the thread that pushes the last message into the block that triggers the completion condition, or the timer thread described in the Aggregating with timeouts recipe. This recipe will describe how to modify the Aggregator so that the aggregated messages can be processed in parallel.

Getting ready

The Java code for this recipe is located in the org.camelcookbook.splitjoin.aggregateparallel package. The Spring XML example is located in src/main/resources/META-INF/spring/aggregateParallelProcessing-context.xml.

How to do it...

Inside your aggregate statement, define a parallelProcessing attribute set to true. The attribute can be used alongside any aggregation condition.

In the XML DSL, this is written as:

<from uri="direct:in"/>
<aggregate strategyRef="setAggregationStrategy"
           completionSize="10"
           completionTimeout="400"
           parallelProcessing="true">
  <correlationExpression>
    <simple>${headers.group}</simple>
  </correlationExpression>
  <delay>
    <constant>500</constant>
  </delay>
  <log message="${threadName} - processing output"/>
  <to uri="mock:out"/>
</aggregate>

In the Java DSL, use the parallelProcessing() builder method with the aggregate statement:

from("direct:in")
  .aggregate(header("group"), new SetAggregationStrategy())
      .completionSize(10).completionTimeout(400)
      .parallelProcessing()
    .log("${threadName} - procesessing output")
    .delay(500)
    .to("mock:out")
  .end();

How it works...

The Aggregator initializes a thread pool that is responsible for processing the aggregated messages. When processing messages through the routes defined, you will see the logging output confirming that different threads are executing the work:

[#1 - Aggregator] INFO  route1 - Camel (camel-1) thread #1 - Aggregator - processing output
[#2 - Aggregator] INFO  route1 - Camel (camel-1) thread #2 - Aggregator - processing output

This is a useful strategy for Aggregators that work under a heavy load of incoming messages, and where the processing of the aggregated messages is time consuming.

There's more...

The default strategy for pooling in Camel is to instantiate a pool of 10 threads to service the aggregated messages. You can customize this by referring to a threadPool instance defined within the Camel context by its id.

In the XML DSL, this is written as:

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <threadPool id="customPool" poolSize="20" threadName="pool"/>

  <route>
    <from uri="direct:in"/>
    <aggregate strategyRef="setAggregationStrategy"
               completionSize="10"
               completionTimeout="400"
               executorServiceRef="customPool">
      <!-- ... -->
    </aggregate>
  </route>
</camelContext>

Tip

Use of the executorService attribute implies parallelProcessing, so you do not need to define them together.

The Java DSL contains an equivalent option where you can refer to a thread pool in Camel's registry:

.aggregate(header("group"), new SetAggregationStrategy())
    .completionSize(10).completionTimeout(400)
    .executorServiceRef("customPool")

It also allows you to define an executorService attribute inline:

.aggregate(header("group"), new SetAggregationStrategy())
    .completionSize(10).completionTimeout(400)
    .executorService(Executors.newFixedThreadPool(20))

See also

  • The Aggregating related messages recipe
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.175.253