Aggregating with timeouts

Aggregation according to size or predicate as described in the Aggregating related messages recipe is an excellent foundation for batching messages together. This recipe will describe how to use timeouts to release an aggregated message if no additional exchanges have been received for aggregation in a while.

Getting ready

The Java code for this recipe is located in the org.camelcookbook.splitjoin.aggregatetimeouts package. The Spring XML example is located in src/main/resources/META-INF/spring/aggregateCompletionTimeout-context.xml.

How to do it...

Inside your Aggregator definition, add a completionTimeout attribute containing a timeout in milliseconds. The attribute can be used as the sole aggregation condition, or in combination with others.

In the XML DSL, this is written as:

<from uri="direct:in"/>
<aggregate strategyRef="setAggregationStrategy"
           completionSize="10"
           completionTimeout="1000">
  <correlationExpression>
      <simple>${headers.group}</simple>
  </correlationExpression>
  <to uri="mock:out"/>
</aggregate>

In the Java DSL, the same route is expressed as:

from("direct:in")
  .aggregate(header("group"), new SetAggregationStrategy())
      .completionSize(10).completionTimeout(1000)
    .to("mock:out")
  .end();

How it works...

The Aggregator initializes a background thread that is responsible for keeping track of how long it has been since the last exchange was received for aggregation. When the Aggregator times out, the timeout thread processes the aggregated exchanges through the steps defined within the aggregate code block.

While the timeout thread is busy processing the aggregated message, no other messages are timed out. These will be processed when the thread has completed processing its exchange. If timing is critical, a thread pool can be used for processing messages—see the Processing aggregated messages in parallel recipe.

The completionTimeout attribute works in conjunction with any completion conditions defined on the Aggregator other than a completionInterval attribute (see the Aggregating with intervals recipe). In the preceding example, an aggregated message will be processed once for every 10 Exchange objects that have been aggregated, or if fewer have been aggregated and no further matching messages have been received within 1000 milliseconds.

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.219.239.118