Aggregating with intervals

Aggregation according to size or predicate as described in the Aggregating related messages recipe is an excellent foundation for batching messages together. This recipe will describe how to use intervals to release an aggregated message per specified period.

Getting ready

The Java code for this recipe is located in the org.camelcookbook.splitjoin.aggregateinterval package. The Spring XML example is located in src/main/resources/META-INF/spring/aggregateCompletionInterval-context.xml.

How to do it...

Inside your Aggregator definition, define a completionInterval attribute containing a period in milliseconds. The attribute can be used as the sole aggregation condition, or in combination with others.

In the XML DSL, this is written as:

<from uri="direct:in"/>
<aggregate strategyRef="setAggregationStrategy"
           completionSize="10"
           completionInterval="400">
  <correlationExpression>
      <simple>${headers.group}</simple>
  </correlationExpression>
  <to uri="mock:out"/>
</aggregate>

In the Java DSL, the same route is expressed as:

from("direct:in")
  .aggregate(header("group"), new SetAggregationStrategy())
      .completionSize(10).completionInterval(400)
    .to("mock:out")
  .end();

How it works...

The Aggregator initializes a thread pool that is responsible for keeping track of how long it has been since the first exchange matching the aggregation expression was received for aggregation. Once the specified period passes, a thread processes it through the steps defined within the aggregate code block.

While the aggregation thread is busy processing the aggregated message, other threads are busy keeping track of the intervals and servicing of the other aggregated messages.

The completionInterval attribute works in conjunction with any completion conditions defined on the Aggregator other than a completionTimeout attribute (see the Aggregating with timeouts recipe). In the preceding example, an aggregated message will be processed once for every 10 Exchange objects that have been aggregated, or if fewer have been aggregated in the 400 milliseconds since the first message corresponding to the completion was received.

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.20.90