This recipe will show you how you how to split a message into individual fragments, process each fragment individually, and re-aggregate the processed exchanges back into a single exchange. In EIP terms, this is known as a Composed Message Processor, and is made up of a combination of a Splitter and an Aggregator.
Java code for this recipe is located in the org.camelcookbook.splitjoin.splitaggregate
package. The Spring XML examples are located under src/main/resources/META-INF/spring
and prefixed with splitAggregate
.
In order to split and aggregate (join) a message, combine a split
DSL statement with an associated AggregationStrategy
as follows:
AggregationStrategy
instance as described in the Aggregating related messages recipe. For this example, we will reuse the SetAggregationStrategy
implementation from that recipe:<bean id="setAggregationRepository" class="org.camelcookbook,splitjoin.aggregate.SetAggregationStrategy"/>
split
block as per the Splitting a message into fragments recipe, breaking up the payload as you see fit through the use of an expression. Reference the AggregationStrategy
instance through the strategyRef
attribute in the split
element:<from uri="direct:in"/>
<split strategyRef="setAggregationStrategy">
<simple>${body}</simple>
<inOut uri="direct:someBackEnd"/>
</split>
<to uri="mock:out"/>
In the Java DSL, refer to the AggregationStrategy
instance as the second parameter to the split()
statement:
from("direct:in") .split(body(), new SetAggregationStrategy()) .inOut("direct:someBackEnd") .end() .to("mock:out");
When an exchange reaches the split
statement, it is broken up into individual fragments as expected, each of which is processed through the steps defined within the block. When each split fragment's exchange reaches the end of the split
block, it is passed into the AggregationStrategy
for aggregation.
When all of the fragments have been processed, the final aggregated message proceeds down the route from the split
block. This is different from the normal Splitter behavior, which forwards the original message to that statement after the split
statement.
Exception handling is different compared to that of the regular Splitter when using an AggregationStrategy
. When an exception is thrown during the processing of the fragment, the exchange will be immediately passed to AggregationStrategy
. It is then up to the strategy to decide what to do with this information.
The following strategy removes the exception from the message, effectively handling it, and modifies the body indicating the failure:
public class ExceptionHandlingSetAggregationStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { String body = newExchange.getIn().getBody(String.class); Exception exception = newExchange.getException(); if (exception != null) { // something went wrong newExchange.setException(null); // remove the exception body = "Failed: " + body; } if (oldExchange == null) { Set<String> set = new HashSet<String>(); set.add(body); newExchange.getIn().setBody(set); return newExchange; } else { Set<String> set = oldExchange.getIn().getBody(Set.class); set.add(body); return oldExchange; } } }
You may choose to handle the exception, or return the exchange with the Exception object intact. In the latter case, the default exception handling behavior of the Splitter will come into play, as defined in the Splitting a message into fragments and Processing split messages in parallel recipes if using parallelProcessing
.
3.135.247.68