Defining completion actions dynamically

We have seen in the previous recipe how confirming and cancelling notifications can be defined at a route or global level, so that they can be triggered when an exchange completes processing. This is useful when that logic is tied to a single route, but how do you handle code called from multiple routes that need to do some sort of cleanup upon route completion?

This recipe will show you how to dynamically apply completion steps to an exchange's processing through the use of a Synchronization class.

This mechanism is useful when you manipulate a class of resources that need some form of post-processing in the same way. For example, Camel uses this mechanism as a part of its file consumption to move or delete files after it has finished processing their contents. The fact that post-processing happens is transparent to the developer who uses that resource.

Getting ready

The Java code for this recipe is located in the org.camelcookbook.error.synchronizations package.

How to do it...

In order to dynamically define completion actions, perform the following steps:

  1. Within a Processor implementation, implement an org.apache.camel.spi.Synchronization interface. This class will be used as a callback by the Camel runtime when an exchange's processing completes successfully or fails. The interface defines two methods:
    public interface Synchronization {
      void onComplete(Exchange exchange);
      void onFailure(Exchange exchange);
    }

    You can either implement this interface, or extend the org.apache.camel.support.SynchronizationAdapter class, which allows you to override one or the other of these methods as well as allowing you to handle an exchange regardless of the completion status via onDone(Exchange exchange).

  2. Once instantiated, bind the Synchronization class to the exchange currently being processed through Exchange.addOnComplete(Synchronization s).

    The following Processor implementation starts an operation requiring confirmation or cancellation, and then triggers the appropriate operation to finalize the transaction:

    public class ConfirmCancelProcessor implements Processor {
      @Override
      public void process(Exchange exchange) throws Exception {
        final ProducerTemplate producerTemplate =
            exchange.getContext().createProducerTemplate();
        producerTemplate.send("mock:start", exchange);
    
        exchange.addOnCompletion(
            new Synchronization() {
              @Override
              public void onComplete(Exchange exchange) {
                producerTemplate.send("mock:confirm",
                                      exchange);
              }
    
              @Override
              public void onFailure(Exchange exchange) {;
                producerTemplate.send("mock:cancel", exchange);
              }
            }
        );
      }
    }

Tip

This example uses calls to the mock: endpoints to simulate completion tasks. Routing logic wrapped in callbacks like this should usually be avoided as it can almost always be expressed better as part of a route. Triggering endpoints like this obscures backend interactions from anyone looking at the route. It also makes the route itself difficult to test, especially when your endpoints refer to a heavyweight component that is not easily stubbed.

No additional work is required to enable this functionality other than using the Processor in a route as normal:

from("direct:in")
  .process(new ConfirmCancelProcessor())
  .choice()
    .when(simple("${body} contains 'explode'"))
      .throwException(
        new IllegalArgumentException(
          "Exchange caused explosion"))
  .endChoice()
  .log("Processed message");

How it works...

Each Exchange contains a UnitOfWork property that is a conceptual holder for the transactional interactions of the Exchange instance. It holds on to a number of Synchronization instances that will be triggered as appropriate given the final state of the exchange's processing—completed or failed.

The thread that processes the exchange through the final step of the route will trigger the Synchronization object.

Unlike the onCompletion block discussed in the Defining completion actions recipe, which was limited to one callback per route, you can bind as many Synchronization instances as you like to an exchange. This allows you to have separate instances to handle success and failure, and it allows you to dynamically add cleanup logic if you have resources that are conditionally used based on message content or conditions.

Tip

You should clearly document any code that makes use of a Sychronization and explain its intent so that processing steps do not appear to happen "by magic".

There's more...

The Synchronization instances are carried with an exchange across thread boundaries, such as when entering a threads block or crossing a seda: endpoint. This may lead to unexpected behaviors when dealing with resources that are bound to threads using ThreadLocals. When the Synchronization instance attempts to access the resource once the exchange is completed, you will find that the ThreadLocal resource is no longer there!

To get around this issue, you should implement the SynchronizationVetoable interface, which extends Synchronization. This interface defines one additional method:

boolean allowHandover();

By implementing this method, and returning true, you can instruct the Synchronization object to not be copied between exchanges. This will execute the completion logic when the first exchange completes processing.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.82.253