Wire Tap – sending a copy of the message elsewhere

When you want to process the current message in the background (concurrently) to the main route, without requiring a response, the Wire Tap EIP can help. A typical use case for this is logging the message to a backend system. The main thread of execution will continue to process the message through the current route as usual, while Wire Tap allows additional messaging processing to occur outside of the main route.

Wire Tap – sending a copy of the message elsewhere

This recipe will show you how to send a copy of the message to a different endpoint.

Getting ready

The Java code for this recipe is located in the org.camelcookbook.routing.wiretap package. The Spring XML files are located under src/main/resources/META-INF/spring and prefixed with wireTap.

How to do it...

Use the wireTap statement and specify the endpoint URI of where to send a copy of the message.

In the XML DSL, this is written as follows:

<route>
  <from uri="direct:start"/>
  <wireTap uri="mock:tapped"/>
  <to uri="mock:out"/>
</route>

In the Java DSL, the same route is expressed as:

from("direct:start")
  .wireTap("mock:tapped")
  .to("mock:out");

How it works...

The Wire Tap processor, by default, makes a shallow copy of the Camel Exchange instance, and then processes it on a thread from a pool managed by the Camel routing engine. The copy of the exchange is sent to the endpoint specified in the wireTap DSL statement.

In the preceding example, this thread simply dispatches it to a mock: endpoint.

A typical use case for the Wire Tap is to log the current message to a backend system. An implementation of this would pass the current message to another route via a direct: reference. The second route would be responsible for formatting a message based on the payload before invoking the backend system.

When the message is copied, the headers and a direct reference to the original message are copied into a new Exchange object. This allows the route that processes the wire-tapped message to make changes to the message without affecting the original message flowing through the main route.

The Message Exchange Pattern (MEP) on the wire-tapped message is set to InOnly, indicating that the main route expects no response from the side route.

The body of the wire tapped message is the same object as that in the original message, as Camel performs a shallow copy of the exchange. It is therefore possible that two routes executing on the wire-tapped message object may change the internal state of that object, thereby leaking information into each other. See the following Deep copying of the exchange section for details on doing a deep copy of the exchange.

We are going to pass an instance of a Cheese class into a route:

public class Cheese {
  private int age; // getters and setters omitted
}

Within the route, we want the message to be processed concurrently by the following code:

public class CheeseRipener {
  public static void ripen(Cheese cheese) {
    cheese.setAge(cheese.getAge() + 1);
  }
}

Here is the route that we will call, starting with a call to the direct:start endpoint, logging the current age of Cheese, asynchronously passing the message to another route direct:processInBackground, delaying by 1,000 ms, and then passing the message to an endpoint mock:out:

from("direct:start")
  .log("Cheese is ${body.age} months old")
  .wireTap("direct:processInBackground")
  .delay(constant(1000))
  .to("mock:out");

from("direct:processInBackground")
  .bean(CheeseRipener.class, "ripen")
  .to("mock:tapped");

The result of passing Cheese with an age of 1 into direct:start will be that mock:out will see the age changed to 2, even though the modification to the Cheese state happened in the wire tapped route.

Deep copying of the exchange

Whenever possible, this sort of state leakage can be avoided by making messages immutable by setting state through constructors only. When this is not possible, and the tapping route modifies the state, the Wire Tap EIP provides us with a mechanism to perform a "deep" copy of the message.

To exercise this functionality, we add a deep cloning method to our model Cheese:

public Cheese clone() {
  Cheese cheese = new Cheese();
  cheese.setAge(this.getAge());
  return cheese;
}

Implement a Processor class to perform the cloning step:

public class CheeseCloningProcessor implements Processor {
  public void process(Exchange exchange) throws Exception {
    Message in = exchange.getIn();
    Cheese cheese = in.getBody(Cheese.class);
    if (cheese != null) {
      in.setBody(cheese.clone());
    }
  }
}

Finally, modify the original route by setting the onPrepare attribute with our custom processor:

.wireTap("direct:processInBackground")
  .onPrepare(new CheeseCloningProcessor())

In the XML DSL, we need to define an instance of the implementation as a bean in the surrounding context:

<bean id="cheeseCloningProcessor"
      class="org.camelcookbook.routing.CheeseCloningProcessor"/>

We then refer to the bean instance from within the route definition:

<route>
  <from uri="direct:start"/>
  <wireTap uri="direct:processInBackground"
           onPrepareRef="cheeseCloningProcessor"/>
  <to uri="mock:out"/>
</route>

There's more...

Camel sets up a default thread pool, which is usually adequate enough that you would not need to customize it.

Note

The default thread pool starts with 10 threads, and grows to 20 when necessary, shutting down threads after 60 seconds of inactivity (see org.apache.camel.impl.DefaultExecutorServiceManager).

It is possible to set up a custom thread pool that is more suited to your particular use case, and to refer to it directly from within the EIP. The following code sets up a pool with a single thread that will process the wire-tapped messages in sequence, and record the name of the thread in the exchange:

<threadPool id="oneThreadOnly" 
            threadName="JustMeDoingTheTapping"
            poolSize="1"
            maxPoolSize="1"
            maxQueueSize="100"/>

<route>
  <from uri="direct:start"/>
  <wireTap uri="direct:tapped"
           executorServiceRef="oneThreadOnly"/>
  <to uri="mock:out"/>
</route>

<route>
  <from uri="direct:tapped"/>
  <setHeader headerName="threadName">
    <simple>${threadName}</simple>
  </setHeader>
  <to uri="mock:tapped"/>
</route>

An abridged Java equivalent would be:

ThreadPoolBuilder builder = new ThreadPoolBuilder(getContext());
ExecutorService oneThreadOnly = 
    builder.poolSize(1).maxPoolSize(1)
      .maxQueueSize(100).build("JustMeDoingTheTapping");

from("direct:start")
  .wireTap("direct:tapped").executorService(oneThreadOnly)
  .to("mock:out");

from("direct:tapped")
  .setHeader("threadName").simple("${threadName}")
  .to("mock:tapped");

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.46.141