When dealing with external systems, it is often necessary to ensure that the same request is not sent multiple times—a process often called deduplication.
Camel implements the Idempotent Consumer EIP, which allows you to mark a segment of a route as callable only once for each unique message. This is useful if it is possible for your route to receive duplicate messages, either from an upstream system or from the underlying infrastructure (for example, through a message queue configured for redelivery on failure), and you want to protect routing logic that is intolerant of duplicates.
This recipe explains how to make portions of your route idempotent in Camel.
The Java code for this recipe is located in the org.camelcookbook.transactions.idempotentconsumer
package. The Spring XML files are located under src/main/resources/META-INF/spring
and prefixed with idempotentConsumer
.
In order to use an Idempotent consumer in the XML DSL, perform the following steps:
org.apache.camel.spi.IdempotentRepository
implementation class as a regular Spring/Blueprint bean. Implementations of this class are responsible for storing keys that represent previously seen messages. Here we will use a memory-based IdempotentRepository
, using the following code:<bean id="wsIdempotentRepository" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>
idempotentConsumer
block. An idempotentConsumer
block determines a message's uniqueness through checking the result of an expression against the set of previously seen messages in the IdempotentRepository
. The following XML DSL fragment shows this in action:<from uri="direct:in"/> <idempotentConsumer messageIdRepositoryRef="wsIdempotentRepository"> <header>messageId</header> <!-- unique key --> <to uri="mock:ws"/> </idempotentConsumer> <to uri="mock:out"/>
In the Java DSL, the complete setup above can be expressed in a single step as follows:
from("direct:in") .idempotentConsumer( header("messageId"), // unique key new MemoryIdempotentRepository()) .to("mock:ws") .end() .to("mock:out");
When two messages with the same messageId
header are pushed through either of these routes, the first will invoke the mock:ws
and mock:out
endpoints, while the second message will trigger mock:out
only.
When an exchange reaches an idempotentConsumer
statement, the provided expression is evaluated to determine a unique key for that message. In the preceding example, this expression involves a lookup of a header value, but it could have been the result of any of Camel's Expression Languages, including the result of a Java method or XPath query.
The key is checked against all previously seen keys in the IdempotentRepository
instance. If this value has previously been seen, the routing logic contained within the block is skipped; otherwise the key is inserted into the repository and the routing logic within the block is executed.
The reason behind this behavior is that internally the pattern registers a Synchronization
instance with the exchange as described in Defining completion actions dynamically of Chapter 7, Error Handling and Compensation. This callback commits the change to the IdempotentRepository
when the exchange completes processing, or rolls it back if it fails.
In order to ensure that the changes to the repository are isolated, and subsequent routing logic does not remove the key when an exception is thrown, we need to use a separate Exchange instance with its own UnitOfWork
for the idempotentConsumer
logic. This is easier than it might seem.
If an exception is thrown at any point in the route after entering the idempotentConsumer
block, including routing logic defined after the block, the key is removed from the repository. This will result in a replayed message once again being processed through the block.
In the preceding example, if mock:out
endpoint threw an exception, the next request with the same messageId
header would execute both mock:ws
and mock:out
endpoints.
This is not usually what you want, as you would expect that only an exception thrown within the idempotentConsumer
block would have this effect.
To commit the update to the IdempotentRepository
immediately after the idempotentConsumer
block, we split out the idempotent logic into its own route, and trigger that route through the enrich
statement as seen in Enriching your content with some help from other endpoints of Chapter 4, Transformation.
In the XML DSL, the enrichment logic is expressed as follows:
<route>
<from uri="direct:in"/>
<enrich uri="direct:invokeWs"/>
<to uri="mock:out"/>
</route>
<route>
<from uri="direct:invokeWs"/>
<idempotentConsumer
messageIdRepositoryRef="wsIdempotentRepository">
<header>messageId</header>
<to uri="mock:ws"/>
</idempotentConsumer>
</route>
In the Java DSL, this same logic is written as follows:
from("direct:in")
.enrich("direct:invokeWs")
.to("mock:out");
from("direct:invokeWs")
.idempotentConsumer(
header("messageId"),
new MemoryIdempotentRepository())
.to("mock:ws");
The enricher creates a copy of the original exchange that is then passed through this second route. This second Exchange instance contains a distinct UnitOfWork
that is completed when that instance has worked its way through the route. It is at that point that the movement of the message through the idempotentConsumer
block is committed to the IdempotentRepository
instance. On return from the enricher, the contents of the exchange are made available to the calling route.
If we merely invoked a route through a regular to
statement, instead of enrich
, any exceptions thrown after the idempotentConsumer
block in the main route would cause the message key to be removed from the IdempotentRepository
instance. This would lead to incorrect behavior, as the routing logic protected by the block would be executed again if the message were to be replayed.
It is possible for the idemponentConsumer
instance to mark exchanges as having been previously seen instead of bypassing their processing. This allows you to route previously seen messages differently from new ones.
To do this, set the skipDuplicate
attribute on the idempotentConsumer
block to false
. When a duplicate message is seen, the CamelDuplicateMessage
(Exchange.DUPLICATE_MESSAGE
) property will be set on the exchange. You can then use this property to make routing decisions.
In the XML DSL, this is expressed as:
<from uri="direct:in"/> <idempotentConsumer messageIdRepositoryRef="wsIdempotentRepository" skipDuplicate="false"> <header>messageId</header> <choice> <when> <property>CamelDuplicateMessage</property> <to uri="mock:duplicate"/> </when> <otherwise> <to uri="mock:ws"/> </otherwise> </choice> </idempotentConsumer>
In the Java DSL, the same routing logic is written as follows:
from("direct:in") .idempotentConsumer(header("messageId"), new MemoryIdempotentRepository()) .skipDuplicate(false) .choice() .when(property(Exchange.DUPLICATE_MESSAGE)) .to("mock:duplicate") .otherwise() .to("mock:ws") .endChoice() .end();
There are a number of IdempotentRepository
implementations to choose from depending on the Quality of Service (QoS) that you would like your route to provide. If none of these provide the sort of QoS that you require, it is possible to write your own.
The following table lists some of the commonly used IdempotentRepository
implementations, along with tips on their usage:
Additional IdempotentRepository
implementations are provided by the Camel HBase and Camel Redis Components.
Cache eviction strategies vary depending on the IdempotentRepository
implementation used. Both the MemoryIdempotentRepository
and FileIdempotentRepository
use a fixed-size least recently used (LRU) Map
, while the JdbcMessageIdRepository
class requires that you set up an external script to periodically delete unwanted records. You should refer to the appropriate API documentation for full details.
By default, the message key is written, but not committed, into the IdempotentRepository
instance as soon as the exchange enters the idempotent block. This prevents two duplicate messages that reach the block in quick succession from triggering the same logic. By setting the eager
attribute on the idempotentConsumer
block to false
, you can delay this write until the exchange's processing is completed.
In the XML DSL, this is written as:
<idempotentConsumer
messageIdRepositoryRef="wsIdempotentRepository"
eager="false">
<!-- ... -->
</idempotentConsumer>
In the Java DSL, the same thing is expressed as:
.idempotentConsumer(header("messageId"),
new MemoryIdempotentRepository())
.eager(false)
A removeOnFailure
attribute is available that can be set to false
(default is true
). This comes in useful, for example, if you want to remove keys from the IdempotentRepository
instance manually through an onException
block only if a certain type of Exception
is thrown.
JdbcMessageIdRepository
): http://camel.apache.org/sql-component.htmlHazelcastIdempotentRepository
): http://camel.apache.org/hazelcast-component.html18.117.11.247