This recipe will show you how to set up transaction management over JMS for use within a Camel route, both for consuming and sending messages. This recipe will also detail the corner-case behaviors that you can expect from Camel when dealing with JMS transactions.
The Java code for this recipe is located in the org.camelcookbook.transactions.jmstransaction
package. The Spring XML files are located under src/main/resources/META-INF/spring
and prefixed with jmsTransaction
.
As discussed in the Introduction section, Camel's transaction handling relies on the Spring's PlatformTransactionManager
abstraction. Therefore, it is necessary to include the appropriate dependency in your application regardless of whether you are basing it on Spring or not—this includes regular Java as well as OSGi Blueprint applications.
To use Spring-managed JMS transactions you will require the following Maven dependencies:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring</artifactId> <version>${camel-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring-version}</version> </dependency>
There are two ways to do transactions with the JMS component: set the transacted
option on the JMS Component, or configure the connection factory of the JMS client library to be transactional and use Camel's transacted
DSL statement.
When we consume from JMS, we add the transacted=true
attribute to the endpoint. Both the consumption of the message, and any subsequent sends using that component instance for the remainder of this route, will be transacted:
<from uri="jms:inbound?transacted=true"/>
<to uri="jms:outbound"/>
<to uri="mock:out"/>
A JMS message will not be sent to the outbound
queue until after mock:out
endpoint has been invoked and the exchange's processing completed.
The same route using the Java DSL is expressed as:
from("jms:inbound?transacted=true")
.to("jms:outbound")
.to("mock:out");
This is a quick, but limited mechanism, in that it does not allow us to vary transactional behavior within the route as discussed in the Limiting the scope of a transaction recipe.
To get more flexibility around transactional behavior, we need to wire up our classes as shown in the following figure:
The javax.jms.ConnectionFactory
class is the main point of entry to communicate with the message broker. Your broker vendor provides the actual class that you should use here.
The ActiveMQ Component is used to access an embedded message broker in these examples. It is a specialized type of JMS Component that is optimized for dealing specifically with ActiveMQ. It should be used in place of the JMS Component when dealing with an ActiveMQ broker.
The plain JMS Component is set up in the same way, except that the class used is org.apache.camel.component.jms.JmsComponent
.
The steps to be performed for using transactions with JMS messaging are as follows:
<bean id="connectionFactory" class="..."> <!-- specific to your broker --> </bean> <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory" ref="connectionFactory"/> </bean> <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="connectionFactory" ref="connectionFactory"/> <property name="transactionManager" ref="jmsTransactionManager"/> </bean> <bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <property name="transactionManager" ref="jmsTransactionManager"/> <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/> </bean>
The SpringTransactionPolicy
instance contains on it a transaction propagation behavior, which is interpreted by the Spring framework. The PROPAGATION_REQUIRED
value indicates that a transaction should be started if one is not already in progress, otherwise the transaction in progress should be used.
In Java, the same objects can be wired into a standalone Camel context as follows:
SimpleRegistry registry = new SimpleRegistry(); ConnectionFactory connectionFactory = ...; registry.put("connectionFactory", connectionFactory); JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(); jmsTransactionManager.setConnectionFactory( connectionFactory); registry.put("jmsTransactionManager", jmsTransactionManager); SpringTransactionPolicy policy = new SpringTransactionPolicy(); policy.setTransactionManager(jmsTransactionManager); policy.setPropagationBehaviorName("PROPAGATION_REQUIRED"); registry.put("PROPAGATION_REQUIRED", policy); CamelContext camelContext = new DefaultCamelContext(registry); ActiveMQComponent activeMQComponent = new ActiveMQComponent(); activeMQComponent.setConnectionFactory(connectionFactory); activeMQComponent.setTransactionManager( jmsTransactionManager); camelContext.addComponent("jms", activeMQComponent);
transacted=true
URI attribute, use the transacted
DSL statement in your route.In the XML DSL, this is defined as follows:
<from uri="jms:inbound"/>
<transacted/>
<to uri="jms:outbound"/>
<to uri="mock:out"/>
In the Java DSL, the same routing logic is expressed as follows:
from("jms:inbound")
.transacted()
.to("jms:outbound")
.to("mock:out");
The transacted
statement uses conventions to provide sensible transaction behavior. Its processor will fetch a SpringTransactionPolicy
named PROPAGATION_REQUIRED
from Camel's registry. If none is defined, it will fetch a PlatformTransactionManager
object and implicitly use PROGAGATION_REQUIRED
behavior.
In the preceding code, the transacted processor will start a transaction using the JmsTransactionManager
. Under the covers, it will fetch a JMS Session
from the ConnectionFactory
and set its JMS acknowledgement mode to SESSION_TRANSACTED
. Any interaction with that endpoint for the remainder of the route, by this thread, will be through this transaction.
The processor will then set up Camel's TransactionErrorHandler
to roll back transactions when an exception is thrown and not handled by another error handler.
If the exchange's processing is completed without any problems, the transaction will commit at the end of the route.
If an exception is thrown at any time in the route, the JMS consumer endpoint involved in a transaction will send a negative acknowledgement, or NACK, back to the message broker. This will either place the message into a dead-letter queue (DLQ), or mark the message for redelivery to this or another message consumer, depending on your broker's configuration.
Sending a message to a JMS producer endpoint engaged in a transaction will open a JMS Session
in transactional mode, and issue a send
operation via a JMS MessageProducer
interface. The Session
will not be committed, and therefore the message will not actually be sent, until the exchange has completed processing.
It is also possible to define a transaction as only spanning a portion of a route, or to change the transaction policy being used through the policy DSL statement. See the Limiting the scope of a transaction recipe for more details.
By and large, it is almost too easy for a JMS endpoint to become engaged in a transaction. This is due to the number of possible combinations in which transactional behavior can be switched on. The following section outlines the combinations that result in transactional behavior.
Consumption from a JMS endpoint will be transacted if:
transacted=true
attribute, ORpolicy
DSL block that refers to a SpringTransactionPolicy
instance, which uses a JmsTransactionManager
object that wraps the ConnectionFactory
instance used by the endpoint, ORtransacted
DSL statement, ANDJmsTransactionManager
is defined in the Camel context and it wraps the ConnectionFactory
being used by the endpointA send through a JMS producer endpoint will be transacted if:
transacted=true
attribute, ORJmsTransactionManager
is defined that wraps the ConnectionFactory
being used by its endpoint, AND:transacted
statement, ORSpringTransactionPolicy
is defined that wraps the transaction manager, and it is referenced by a policy
block that wraps the send, or a transacted
DSL statement that precedes the send.Sending an exchange using the InOut
MEP to a JMS endpoint triggers request-reply over messaging. This is a mechanism that implements a remote service invocation using JMS as a transport. The mechanism is as follows:
replyToCacheLevelName
endpoint attribute).outbound
). The name of the temporary queue (assigned by the message broker) is set in the JMSReplyTo
header.outbound
) and processes them. If it has been specifically written to deal with request-response, it will send a new message to the temporary queue named in the request's JMSReplyTo
header.18.118.142.56