Splitting messages and re-aggregating them using different criteria

The process described in the Splitting a message, and processing and gathering responses recipe broke up a message into fragments and aggregated those same processed fragments back into a result message that flowed out of a split block.

This recipe, which is a variant of the Composed Message Processor EIP, will describe how a message can be split up and aggregated, using criteria other than the original message ID. The aggregation phase may yield one or more aggregated messages.

In this example, we are going to accept, as input, XML messages that list books with titles and categories:

<books>
  <book category="Tech" 
        title="Apache Camel Developer's Cookbook"/>
  <book category="Cooking"
        title="Camel Cookbook"/>
  <book category="Cooking" 
        title="Double decadence with extra cream"/>
  <book category="Cooking"
        title="Cooking with Butter"/>
</books>

We are going to extract the book titles, and aggregate them into sets, by book category, before sending them to some destination endpoint. Based on the preceding input message, we want the target endpoint to receive two messages: one containing a set of Tech titles, and another containing Cooking titles.

Getting ready

The Java code for this recipe is located in the org.camelcookbook.splitjoin.splitreaggregate package. The Spring XML example is located in src/main/resources/META-INF/spring/splitReaggregate-context.xml.

How to do it...

In order to split a message up and re-aggregate its fragments using a criterion other than the original message ID, perform the following 2-step process:

  1. Define a route that will split the message. Make sure the last step of the split block calls the route containing the aggregation that you will define in step 2:

    In the XML DSL, this is written as:

    <from uri="direct:in"/>
    <split>
      <xpath>/books/book</xpath>
      <setHeader headerName="category">
        <xpath resultType="String">/book/@category</xpath>
      </setHeader>
      <transform>
        <xpath resultType="String">/book/@title</xpath>
      </transform>
      <to uri="direct:groupByCategory"/>
    </split>

    In the Java DSL, the same route is expressed as:

    from("direct:in")
      .split(xpath("/books/book"))
        .setHeader("category", 
            xpath("/book/@category").stringResult())
        .transform(xpath("/book/@title").stringResult())
        .to("direct:groupByCategory")
      .end();
  2. Define another route that will accept the split fragments and aggregate them:
    <from uri="direct:groupByCategory"/>
    <aggregate strategyRef="setAggregationStrategy"
               completionTimeout="500">
      <correlationExpression>
        <simple>${header[category]}</simple>
      </correlationExpression>
      <to uri="mock:out"/>
    </aggregate>

    The setAggregationStrategy is the same strategy that we first defined in the Aggregating related messages recipe, and is defined in the Spring context as a bean.

    In the Java DSL, this is written as:

    from("direct:groupByCategory")
      .aggregate(header("category"), 
          new SetAggregationStrategy()).completionTimeout(500)
        .to("mock:out")
      .end();

How it works...

This example combines the independent Splitter and Aggregator mechanisms that we have already seen in this chapter.

Within the example, the payload is first split by an XPath Expression that yields XML fragments. The category of each book is extracted through another expression, and set on the category message header. The message body is then replaced with the book's title, and the exchange is passed to another route containing the aggregation phase.

The Aggregator joins together fragments, grouping them according to the value of their category headers. In the preceding example, the aggregated exchanges are released for further processing, once no new messages have arrived in 500 milliseconds.

There's more...

It is not strictly necessary to divide up the split and aggregate phases in different routes like this, but it makes it much easier to read, and reason about, the routing logic's correctness. This is especially true when using the Java DSL where its easy to flatten out a route definition by omitting end() and endParent() statement. For comparison, this is the Java equivalent with everything in the one route:

from("direct:combined")
  .split(xpath("/books/book"))
    .setHeader("category", 
        xpath("/book/@category").stringResult())
    .transform(xpath("/book/@title").stringResult())
    .aggregate(header("category"), new SetAggregationStrategy())
        .completionTimeout(500)
      .to("mock:out")
    .endParent()
  .end();

Even though the Aggregator will merge fragments from across multiple books' payloads, this is not immediately obvious, and will likely lead to headaches for anyone who has to maintain the code.

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.234.150