Multicast – routing the same message to many endpoints

When you want to route the same message to a number of endpoints and have them process the message in different ways, the Multicast EIP is a good choice.

This recipe will show you the default, sequential way to use Camel's Multicast EIP. Chapter 6, Parallel Processing, contains a recipe for using Multicast with concurrency (threads).

Getting ready

The Java code for this recipe is located in the org.camelcookbook.routing.multicast package. The Spring XML files are located under src/main/resources/META-INF/spring and prefixed with multicast.

How to do it...

Use the multicast DSL statement, and list the endpoints and processing steps within it.

In the XML DSL, this routing logic is written as:

<route>
  <from uri="direct:start"/>
  <multicast>
    <to uri="mock:first"/>
    <to uri="mock:second"/>
    <to uri="mock:third"/>
  </multicast>
</route>

In the Java DSL, the same thing is written as follows:

from("direct:start")
  .multicast()
    .to("mock:first")
    .to("mock:second")
    .to("mock:third")
  .end();

How it works...

The example has each of the specified steps called in sequence on the same thread. We will talk about how to do this with multiple threads in Chapter 6, Parallel Processing.

When a message hits the Multicast EIP, a shallow copy of the message (see the Wire Tap – sending a copy of the message elsewhere recipe for an explanation) is made for each step specified in the EIP definition. The thread currently processing the message triggers each of the specified steps one by one with a unique copy of the message. Any changes made to these copied messages will not be visible in the original message that continues flowing down the main route once the Multicast is complete.

It is, however, possible for mutable state to leak information between messages when the body of a message is modified (see the Wire Tap – sending a copy of the message elsewhere recipe for an explanation).

A Multicast is quite different from merely invoking a number of steps in sequence:

from("direct:start")
  .to("direct:first")
  .to("direct:second")
  .to("direct:third");

In this route, each of the endpoints operates on the same message object; in a Multicast, each receives a copy of the original. In the preceding example, any changes made to the exchange within the route referenced by direct:first such as the setting of header values will be visible in the exchange when processing direct:second and direct:third.

In a Multicast, the MEP (Message Exchange Pattern) of the copied messages will be set to InOnly by default regardless of the MEP of the original message. This means that the original route will not receive a response. In this respect, the default behavior of a Multicast is similar to a Wire Tap with multiple steps; only the thread executing through the main route is responsible for processing the message copies through the steps. You can change this behavior, as described in the next section.

There's more...

Sometimes you may want to receive responses from the processor that you invoke in a Multicast, and modify the original message with the response. In order to do this, you need to provide an implementation of the AggregationStrategy interface to the Multicast EIP. This interface defines a single method:

public Exchange aggregate(Exchange oldExchange, 
                          Exchange newExchange)

The newExchange parameter is the current Multicast response being processed, and oldExchange parameter is the merged result so far. Note that the first time the aggregate() method is called, oldExchange will be null (see the Aggregator EIP for more details).

The following is a simple AggregationStrategy to concatenate String responses:

public class ConcatenatingAggregationStrategy 
    implements AggregationStrategy {
  @Override
  public Exchange aggregate(Exchange oldExchange, 
                            Exchange newExchange) {
    if (oldExchange == null) {
      return newExchange;
    } else {
      String oldBody = 
          oldExchange.getIn().getBody(String.class);
      String newBody =
          newExchange.getIn().getBody(String.class);
      String merged = (oldBody == null) ? newBody 
          : oldBody + "," + newBody;
      oldExchange.getIn().setBody(merged);
      return oldExchange;
    }
  }
}

Defining an aggregation repository on the EIP will result in the MEP on the messages sent to the Multicast steps to be set to InOut.

To enable this as part of the Multicast:

from("direct:start")
  .multicast().aggregationStrategy(
      new ConcatenatingAggregationStrategy())
    // list one or more endpoints here
  .end();

In the XML DSL, we need to define an instance of the implementation as a bean in the surrounding context:

<bean id="concatenatingStrategy" 
      class="ConcatenatingAggregationStrategy"/>

We then refer to the bean instance from within the route definition.

In the XML DSL, we need to define an instance of the implementation as a bean in the surrounding context:

<route>
  <from uri="direct:start"/>
  <multicast aggregationStrategyRef="concatenatingStrategy">
    <!-- list one or more endpoints here -->
  </multicast>
</route>

The aggregation strategy defined here will only aggregate responses with each other, not the original message. If you want to merge the responses with the request, you should use Multicast in combination with the Content Enricher EIP using the same AggregationStrategy instance for both EIPs:

AggregationStrategy concatenationStrategy = 
    new ConcatenatingAggregationStrategy();

from("direct:start")
  .enrich("direct:performMulticast", concatenationStrategy)
  // copy the In message to the Out message;
  // this will become the route response
  .transform(body());

from("direct:performMulticast")
  .multicast().aggregationStrategy(concatenationStrategy)
    .to("direct:first")
    .to("direct:second")
  .end();

There are two ways to deal with an exception in a Multicast:

  • Use the multicast().stopOnException() flag. This will stop any further endpoints from being called and immediately terminate the processing of the message through the current route.
  • Handle the exception within an AggregationStrategy implementation. This can be performed by checking the value of newExchange.isFailed() and taking appropriate action.

The following will result in a org.apache.camel.CamelExecutionException being thrown by the Camel runtime in a Multicast. This exception cannot be caught.

  • Failure to handle the exception in the sub-route, when not using an AggregationStrategy implementation.
  • Throwing exceptions from an AggregationStrategy implementation. To gracefully handle an exception within AggregationStrategy, check the value of newExchange.isFailed().

It is possible to invoke each step in the Multicast in parallel rather than sequentially. To enable this, the parallelProcessing statement should be used within the EIP:

from("direct:start")
    .multicast().parallelProcessing()
        .to("mock:first")
        .to("mock:second")
    .end();

The default thread pool behavior may be modified as per the Wire Tap EIP, by further referring to a custom thread pool.

An Exchange instance maintains a UnitOfWork object. This encapsulates the details of any transaction that the message is participating in, and provides the Camel routing engine with hooks to commit or rollback any transactional resources that have been used within a route.

Each of the steps within the Multicast receives a copy of the original message, which, by default, does not include the original UnitOfWork. In a Multicast, each step will see a different UnitOfWork object. Setting the shareUnitOfWork attribute of the EIP to true can modify this behavior so that all routing is performed in the context of the same transaction.

Note

Sharing the UnitOfWork object is usually a bad idea when used in conjunction with parallelProcessing. Transactional resources such as JMS Session objects and JDBC Connection objects are intended for use by a single thread at a time.

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.172.56