Aggregation according to size or predicate as described in the Aggregating related messages recipe is an excellent foundation for batching messages together. This recipe will describe how to use timeouts to release an aggregated message if no additional exchanges have been received for aggregation in a while.
The Java code for this recipe is located in the org.camelcookbook.splitjoin.aggregatetimeouts
package. The Spring XML example is located in src/main/resources/META-INF/spring/aggregateCompletionTimeout-context.xml
.
Inside your Aggregator definition, add a completionTimeout
attribute containing a timeout in milliseconds. The attribute can be used as the sole aggregation condition, or in combination with others.
In the XML DSL, this is written as:
<from uri="direct:in"/>
<aggregate strategyRef="setAggregationStrategy"
completionSize="10"
completionTimeout="1000">
<correlationExpression>
<simple>${headers.group}</simple>
</correlationExpression>
<to uri="mock:out"/>
</aggregate>
In the Java DSL, the same route is expressed as:
from("direct:in")
.aggregate(header("group"), new SetAggregationStrategy())
.completionSize(10).completionTimeout(1000)
.to("mock:out")
.end();
The Aggregator initializes a background thread that is responsible for keeping track of how long it has been since the last exchange was received for aggregation. When the Aggregator times out, the timeout thread processes the aggregated exchanges through the steps defined within the aggregate
code block.
While the timeout thread is busy processing the aggregated message, no other messages are timed out. These will be processed when the thread has completed processing its exchange. If timing is critical, a thread pool can be used for processing messages—see the Processing aggregated messages in parallel recipe.
The completionTimeout
attribute works in conjunction with any completion conditions defined on the Aggregator other than a completionInterval
attribute (see the Aggregating with intervals recipe). In the preceding example, an aggregated message will be processed once for every 10 Exchange objects that have been aggregated, or if fewer have been aggregated and no further matching messages have been received within 1000 milliseconds.
18.218.228.99