The default behavior of an Aggregator is to process the aggregated exchange through the steps defined within the aggregate
block using a single thread. This is either the thread that pushes the last message into the block that triggers the completion condition, or the timer thread described in the Aggregating with timeouts recipe. This recipe will describe how to modify the Aggregator so that the aggregated messages can be processed in parallel.
The Java code for this recipe is located in the org.camelcookbook.splitjoin.aggregateparallel
package. The Spring XML example is located in src/main/resources/META-INF/spring/aggregateParallelProcessing-context.xml
.
Inside your aggregate
statement, define a parallelProcessing
attribute set to true
. The attribute can be used alongside any aggregation condition.
In the XML DSL, this is written as:
<from uri="direct:in"/>
<aggregate strategyRef="setAggregationStrategy"
completionSize="10"
completionTimeout="400"
parallelProcessing="true">
<correlationExpression>
<simple>${headers.group}</simple>
</correlationExpression>
<delay>
<constant>500</constant>
</delay>
<log message="${threadName} - processing output"/>
<to uri="mock:out"/>
</aggregate>
In the Java DSL, use the parallelProcessing()
builder method with the aggregate
statement:
from("direct:in")
.aggregate(header("group"), new SetAggregationStrategy())
.completionSize(10).completionTimeout(400)
.parallelProcessing()
.log("${threadName} - procesessing output")
.delay(500)
.to("mock:out")
.end();
The Aggregator initializes a thread pool that is responsible for processing the aggregated messages. When processing messages through the routes defined, you will see the logging output confirming that different threads are executing the work:
[#1 - Aggregator] INFO route1 - Camel (camel-1) thread #1 - Aggregator - processing output [#2 - Aggregator] INFO route1 - Camel (camel-1) thread #2 - Aggregator - processing output
This is a useful strategy for Aggregators that work under a heavy load of incoming messages, and where the processing of the aggregated messages is time consuming.
The default strategy for pooling in Camel is to instantiate a pool of 10 threads to service the aggregated messages. You can customize this by referring to a threadPool
instance defined within the Camel context by its id
.
In the XML DSL, this is written as:
<camelContext xmlns="http://camel.apache.org/schema/spring"> <threadPool id="customPool" poolSize="20" threadName="pool"/> <route> <from uri="direct:in"/> <aggregate strategyRef="setAggregationStrategy" completionSize="10" completionTimeout="400" executorServiceRef="customPool"> <!-- ... --> </aggregate> </route> </camelContext>
The Java DSL contains an equivalent option where you can refer to a thread pool in Camel's registry:
.aggregate(header("group"), new SetAggregationStrategy())
.completionSize(10).completionTimeout(400)
.executorServiceRef("customPool")
It also allows you to define an executorService
attribute inline:
.aggregate(header("group"), new SetAggregationStrategy())
.completionSize(10).completionTimeout(400)
.executorService(Executors.newFixedThreadPool(20))
3.145.175.253