Preventing duplicate invocation of routing logic

When dealing with external systems, it is often necessary to ensure that the same request is not sent multiple times—a process often called deduplication.

Camel implements the Idempotent Consumer EIP, which allows you to mark a segment of a route as callable only once for each unique message. This is useful if it is possible for your route to receive duplicate messages, either from an upstream system or from the underlying infrastructure (for example, through a message queue configured for redelivery on failure), and you want to protect routing logic that is intolerant of duplicates.

This recipe explains how to make portions of your route idempotent in Camel.

Getting ready

The Java code for this recipe is located in the org.camelcookbook.transactions.idempotentconsumer package. The Spring XML files are located under src/main/resources/META-INF/spring and prefixed with idempotentConsumer.

How to do it...

In order to use an Idempotent consumer in the XML DSL, perform the following steps:

  1. Instantiate an org.apache.camel.spi.IdempotentRepository implementation class as a regular Spring/Blueprint bean. Implementations of this class are responsible for storing keys that represent previously seen messages. Here we will use a memory-based IdempotentRepository, using the following code:
    <bean id="wsIdempotentRepository"
          class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>
  2. Wrap the routing logic that you want to only trigger once in an idempotentConsumer block. An idempotentConsumer block determines a message's uniqueness through checking the result of an expression against the set of previously seen messages in the IdempotentRepository. The following XML DSL fragment shows this in action:
    <from uri="direct:in"/>
    <idempotentConsumer 
        messageIdRepositoryRef="wsIdempotentRepository">
      <header>messageId</header> <!-- unique key -->
      <to uri="mock:ws"/>
    </idempotentConsumer>
    <to uri="mock:out"/>

In the Java DSL, the complete setup above can be expressed in a single step as follows:

from("direct:in")
  .idempotentConsumer(
      header("messageId"), // unique key
      new MemoryIdempotentRepository())
    .to("mock:ws")
  .end()
  .to("mock:out");

When two messages with the same messageId header are pushed through either of these routes, the first will invoke the mock:ws and mock:out endpoints, while the second message will trigger mock:out only.

How it works...

When an exchange reaches an idempotentConsumer statement, the provided expression is evaluated to determine a unique key for that message. In the preceding example, this expression involves a lookup of a header value, but it could have been the result of any of Camel's Expression Languages, including the result of a Java method or XPath query.

The key is checked against all previously seen keys in the IdempotentRepository instance. If this value has previously been seen, the routing logic contained within the block is skipped; otherwise the key is inserted into the repository and the routing logic within the block is executed.

The reason behind this behavior is that internally the pattern registers a Synchronization instance with the exchange as described in Defining completion actions dynamically of Chapter 7, Error Handling and Compensation. This callback commits the change to the IdempotentRepository when the exchange completes processing, or rolls it back if it fails.

In order to ensure that the changes to the repository are isolated, and subsequent routing logic does not remove the key when an exception is thrown, we need to use a separate Exchange instance with its own UnitOfWork for the idempotentConsumer logic. This is easier than it might seem.

Note

If an exception is thrown at any point in the route after entering the idempotentConsumer block, including routing logic defined after the block, the key is removed from the repository. This will result in a replayed message once again being processed through the block.

In the preceding example, if mock:out endpoint threw an exception, the next request with the same messageId header would execute both mock:ws and mock:out endpoints.

This is not usually what you want, as you would expect that only an exception thrown within the idempotentConsumer block would have this effect.

To commit the update to the IdempotentRepository immediately after the idempotentConsumer block, we split out the idempotent logic into its own route, and trigger that route through the enrich statement as seen in Enriching your content with some help from other endpoints of Chapter 4, Transformation.

In the XML DSL, the enrichment logic is expressed as follows:

<route>
  <from uri="direct:in"/>
  <enrich uri="direct:invokeWs"/>
  <to uri="mock:out"/>
</route>

<route>
  <from uri="direct:invokeWs"/>
  <idempotentConsumer
      messageIdRepositoryRef="wsIdempotentRepository">
    <header>messageId</header>
    <to uri="mock:ws"/>
  </idempotentConsumer>
</route>

In the Java DSL, this same logic is written as follows:

from("direct:in")
  .enrich("direct:invokeWs")
  .to("mock:out");

from("direct:invokeWs")
  .idempotentConsumer(
      header("messageId"), 
      new MemoryIdempotentRepository())
  .to("mock:ws");

The enricher creates a copy of the original exchange that is then passed through this second route. This second Exchange instance contains a distinct UnitOfWork that is completed when that instance has worked its way through the route. It is at that point that the movement of the message through the idempotentConsumer block is committed to the IdempotentRepository instance. On return from the enricher, the contents of the exchange are made available to the calling route.

If we merely invoked a route through a regular to statement, instead of enrich, any exceptions thrown after the idempotentConsumer block in the main route would cause the message key to be removed from the IdempotentRepository instance. This would lead to incorrect behavior, as the routing logic protected by the block would be executed again if the message were to be replayed.

There's more...

It is possible for the idemponentConsumer instance to mark exchanges as having been previously seen instead of bypassing their processing. This allows you to route previously seen messages differently from new ones.

To do this, set the skipDuplicate attribute on the idempotentConsumer block to false. When a duplicate message is seen, the CamelDuplicateMessage (Exchange.DUPLICATE_MESSAGE) property will be set on the exchange. You can then use this property to make routing decisions.

In the XML DSL, this is expressed as:

<from uri="direct:in"/>
<idempotentConsumer
    messageIdRepositoryRef="wsIdempotentRepository" 
    skipDuplicate="false">
  <header>messageId</header>
  <choice>
    <when>
      <property>CamelDuplicateMessage</property>
      <to uri="mock:duplicate"/>
    </when>
    <otherwise>
      <to uri="mock:ws"/>
    </otherwise>
  </choice>
</idempotentConsumer>

In the Java DSL, the same routing logic is written as follows:

from("direct:in")
  .idempotentConsumer(header("messageId"), 
                      new MemoryIdempotentRepository())
        .skipDuplicate(false)
    .choice()
      .when(property(Exchange.DUPLICATE_MESSAGE))
        .to("mock:duplicate")
      .otherwise()
        .to("mock:ws")
    .endChoice()
  .end();

There are a number of IdempotentRepository implementations to choose from depending on the Quality of Service (QoS) that you would like your route to provide. If none of these provide the sort of QoS that you require, it is possible to write your own.

The following table lists some of the commonly used IdempotentRepository implementations, along with tips on their usage:

MemoryIdempotentRepository

A fast, non-persistent store that uses a Map to store all keys. All data contained within will be lost when the JVM is stopped.

No support for clustering.

This class is available in Camel Core and therefore requires no additional dependencies.

FileIdempotentRepository

A basic persistent store that uses a map as a cache for looking up keys, and a file that persists the current view of that Map.

All operations that access the file are synchronized, which means that it could be a bottleneck in a high-throughput scenario. Additions append to the file; a removal causes the entire file to be rewritten. It is possible that the cache could be corrupted if a rewrite is taking place when the JVM shuts down.

As only one JVM should write to a file, it is not suitable for an Active/Active (Hot-Hot) cluster. An Active/Active deployment is one where two or more processing nodes are working on the same set of data at the same time and appear to act as a single instance to the outside world. Failure of one node means that any others that remain process the request load.

This repository is OK for use in Active/Passive (Hot-Cold) setups. These are clusters where only one node out of two or more is running at a time, and where a standby is started in the event of the running node's failure.

This class is available in Camel Core and therefore requires no additional dependencies.

JdbcMessageIdRepository

Uses any of the usual standard SQL databases to persist stored messages. Each consumer requires it's own repository, which can all use the same underlying database table to store keys.

High volume of interaction with the database may be a performance problem as each check, write or delete triggers a SQL query.

Suitable for Active/Active clustering.

HazelcastIdempotentRepository

Uses the open source Hazelcast in-memory data grid / distributed cache to share a map of keys between a number of JVM instances.

Hazelcast can be configured to persist its caches to a shared database synchronously when the key is written to the cache (write-through, slower), or asynchronously in the background (write-behind, faster, but less reliable as messages could be lost if the JVM shuts down before writing). As the cache is used to do key lookups rather than a store, it is much faster than the JdbcMessageIdRepository.

Suitable for Active/Active clustering.

Additional IdempotentRepository implementations are provided by the Camel HBase and Camel Redis Components.

Note

Cache eviction strategies vary depending on the IdempotentRepository implementation used. Both the MemoryIdempotentRepository and FileIdempotentRepository use a fixed-size least recently used (LRU) Map, while the JdbcMessageIdRepository class requires that you set up an external script to periodically delete unwanted records. You should refer to the appropriate API documentation for full details.

By default, the message key is written, but not committed, into the IdempotentRepository instance as soon as the exchange enters the idempotent block. This prevents two duplicate messages that reach the block in quick succession from triggering the same logic. By setting the eager attribute on the idempotentConsumer block to false, you can delay this write until the exchange's processing is completed.

In the XML DSL, this is written as:

<idempotentConsumer
    messageIdRepositoryRef="wsIdempotentRepository"
    eager="false">
  <!-- ... -->
</idempotentConsumer>

In the Java DSL, the same thing is expressed as:

.idempotentConsumer(header("messageId"), 
                    new MemoryIdempotentRepository())
  .eager(false)

A removeOnFailure attribute is available that can be set to false (default is true). This comes in useful, for example, if you want to remove keys from the IdempotentRepository instance manually through an onException block only if a certain type of Exception is thrown.

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.11.247