Splitting a message, and processing and gathering responses

This recipe will show you how you how to split a message into individual fragments, process each fragment individually, and re-aggregate the processed exchanges back into a single exchange. In EIP terms, this is known as a Composed Message Processor, and is made up of a combination of a Splitter and an Aggregator.

Splitting a message, and processing and gathering responses

Getting ready

Java code for this recipe is located in the org.camelcookbook.splitjoin.splitaggregate package. The Spring XML examples are located under src/main/resources/META-INF/spring and prefixed with splitAggregate.

How to do it...

In order to split and aggregate (join) a message, combine a split DSL statement with an associated AggregationStrategy as follows:

  1. Define an AggregationStrategy instance as described in the Aggregating related messages recipe. For this example, we will reuse the SetAggregationStrategy implementation from that recipe:
    <bean id="setAggregationRepository"
          class="org.camelcookbook,splitjoin.aggregate.SetAggregationStrategy"/>
  2. Define a regular split block as per the Splitting a message into fragments recipe, breaking up the payload as you see fit through the use of an expression. Reference the AggregationStrategy instance through the strategyRef attribute in the split element:
    <from uri="direct:in"/>
    <split strategyRef="setAggregationStrategy">
      <simple>${body}</simple>
      <inOut uri="direct:someBackEnd"/>
    </split>
    <to uri="mock:out"/>

    In the Java DSL, refer to the AggregationStrategy instance as the second parameter to the split() statement:

    from("direct:in")
      .split(body(), new SetAggregationStrategy())
        .inOut("direct:someBackEnd")
      .end()
      .to("mock:out");

How it works...

When an exchange reaches the split statement, it is broken up into individual fragments as expected, each of which is processed through the steps defined within the block. When each split fragment's exchange reaches the end of the split block, it is passed into the AggregationStrategy for aggregation.

When all of the fragments have been processed, the final aggregated message proceeds down the route from the split block. This is different from the normal Splitter behavior, which forwards the original message to that statement after the split statement.

Tip

By default, all processing is performed by a single thread. You can parallelize the processing of each message fragment by using the parallelProcessing option as described in the Processing split messages in parallel recipe.

There's more...

Exception handling is different compared to that of the regular Splitter when using an AggregationStrategy. When an exception is thrown during the processing of the fragment, the exchange will be immediately passed to AggregationStrategy. It is then up to the strategy to decide what to do with this information.

The following strategy removes the exception from the message, effectively handling it, and modifies the body indicating the failure:

public class ExceptionHandlingSetAggregationStrategy 
    implements AggregationStrategy {
  @Override
  public Exchange aggregate(Exchange oldExchange, 
                            Exchange newExchange) {
    String body = newExchange.getIn().getBody(String.class);
    Exception exception = newExchange.getException();
    if (exception != null) { // something went wrong
      newExchange.setException(null); // remove the exception
      body = "Failed: " + body;
    }
    if (oldExchange == null) {
      Set<String> set = new HashSet<String>();
      set.add(body);
      newExchange.getIn().setBody(set);
      return newExchange;
    }  else {
      Set<String> set = oldExchange.getIn().getBody(Set.class);
      set.add(body);
      return oldExchange;
    }
  }
}

You may choose to handle the exception, or return the exchange with the Exception object intact. In the latter case, the default exception handling behavior of the Splitter will come into play, as defined in the Splitting a message into fragments and Processing split messages in parallel recipes if using parallelProcessing.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.14.240.252