Using transactions with messaging

This recipe will show you how to set up transaction management over JMS for use within a Camel route, both for consuming and sending messages. This recipe will also detail the corner-case behaviors that you can expect from Camel when dealing with JMS transactions.

Getting ready

The Java code for this recipe is located in the org.camelcookbook.transactions.jmstransaction package. The Spring XML files are located under src/main/resources/META-INF/spring and prefixed with jmsTransaction.

As discussed in the Introduction section, Camel's transaction handling relies on the Spring's PlatformTransactionManager abstraction. Therefore, it is necessary to include the appropriate dependency in your application regardless of whether you are basing it on Spring or not—this includes regular Java as well as OSGi Blueprint applications.

To use Spring-managed JMS transactions you will require the following Maven dependencies:

<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-spring</artifactId>
  <version>${camel-version}</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-jms</artifactId>
  <version>${spring-version}</version>
</dependency>

How to do it...

There are two ways to do transactions with the JMS component: set the transacted option on the JMS Component, or configure the connection factory of the JMS client library to be transactional and use Camel's transacted DSL statement.

When we consume from JMS, we add the transacted=true attribute to the endpoint. Both the consumption of the message, and any subsequent sends using that component instance for the remainder of this route, will be transacted:

<from uri="jms:inbound?transacted=true"/>
<to uri="jms:outbound"/> 
<to uri="mock:out"/>

A JMS message will not be sent to the outbound queue until after mock:out endpoint has been invoked and the exchange's processing completed.

The same route using the Java DSL is expressed as:

from("jms:inbound?transacted=true")
  .to("jms:outbound")
  .to("mock:out");

This is a quick, but limited mechanism, in that it does not allow us to vary transactional behavior within the route as discussed in the Limiting the scope of a transaction recipe.

To get more flexibility around transactional behavior, we need to wire up our classes as shown in the following figure:

How to do it...

The javax.jms.ConnectionFactory class is the main point of entry to communicate with the message broker. Your broker vendor provides the actual class that you should use here.

Note

The ActiveMQ Component is used to access an embedded message broker in these examples. It is a specialized type of JMS Component that is optimized for dealing specifically with ActiveMQ. It should be used in place of the JMS Component when dealing with an ActiveMQ broker.

The plain JMS Component is set up in the same way, except that the class used is org.apache.camel.component.jms.JmsComponent.

The steps to be performed for using transactions with JMS messaging are as follows:

  1. Wire up your beans, in Spring, as follows:
    <bean id="connectionFactory" class="...">
      <!-- specific to your broker -->
    </bean>
    
    <bean id="jmsTransactionManager"
          class="org.springframework.jms.connection.JmsTransactionManager">
      <property name="connectionFactory"
                ref="connectionFactory"/>
    </bean>
    
    <bean id="jms"
          class="org.apache.activemq.camel.component.ActiveMQComponent">
      <property name="connectionFactory"
                ref="connectionFactory"/>
      <property name="transactionManager" 
                ref="jmsTransactionManager"/>
    </bean>
    
    <bean id="PROPAGATION_REQUIRED"
          class="org.apache.camel.spring.spi.SpringTransactionPolicy">
      <property name="transactionManager" 
                ref="jmsTransactionManager"/>
      <property name="propagationBehaviorName" 
                value="PROPAGATION_REQUIRED"/>
    </bean>

    The SpringTransactionPolicy instance contains on it a transaction propagation behavior, which is interpreted by the Spring framework. The PROPAGATION_REQUIRED value indicates that a transaction should be started if one is not already in progress, otherwise the transaction in progress should be used.

    Note

    By convention, transaction policies are named after their propagation behavior. This convention is used by Camel to provide a sensible default configuration for transactions.

    In Java, the same objects can be wired into a standalone Camel context as follows:

    SimpleRegistry registry = new SimpleRegistry();
    ConnectionFactory connectionFactory = ...;
    registry.put("connectionFactory", connectionFactory);
    
    JmsTransactionManager jmsTransactionManager = 
        new JmsTransactionManager();
    jmsTransactionManager.setConnectionFactory(
        connectionFactory);
    registry.put("jmsTransactionManager",
                 jmsTransactionManager);
    
    SpringTransactionPolicy policy = new SpringTransactionPolicy();
    policy.setTransactionManager(jmsTransactionManager);
    policy.setPropagationBehaviorName("PROPAGATION_REQUIRED");
    registry.put("PROPAGATION_REQUIRED", policy);
    
    CamelContext camelContext = new DefaultCamelContext(registry);
    ActiveMQComponent activeMQComponent =
        new ActiveMQComponent();
    activeMQComponent.setConnectionFactory(connectionFactory);
    activeMQComponent.setTransactionManager(
        jmsTransactionManager);
    camelContext.addComponent("jms", activeMQComponent);
  2. To engage the transaction manager that we have just wired up, instead of the transacted=true URI attribute, use the transacted DSL statement in your route.

    In the XML DSL, this is defined as follows:

    <from uri="jms:inbound"/>
    <transacted/>
    <to uri="jms:outbound"/>
    <to uri="mock:out"/>

    In the Java DSL, the same routing logic is expressed as follows:

    from("jms:inbound")
      .transacted()
      .to("jms:outbound")
      .to("mock:out");

How it works...

The transacted statement uses conventions to provide sensible transaction behavior. Its processor will fetch a SpringTransactionPolicy named PROPAGATION_REQUIRED from Camel's registry. If none is defined, it will fetch a PlatformTransactionManager object and implicitly use PROGAGATION_REQUIRED behavior.

In the preceding code, the transacted processor will start a transaction using the JmsTransactionManager. Under the covers, it will fetch a JMS Session from the ConnectionFactory and set its JMS acknowledgement mode to SESSION_TRANSACTED. Any interaction with that endpoint for the remainder of the route, by this thread, will be through this transaction.

The processor will then set up Camel's TransactionErrorHandler to roll back transactions when an exception is thrown and not handled by another error handler.

If the exchange's processing is completed without any problems, the transaction will commit at the end of the route.

If an exception is thrown at any time in the route, the JMS consumer endpoint involved in a transaction will send a negative acknowledgement, or NACK, back to the message broker. This will either place the message into a dead-letter queue (DLQ), or mark the message for redelivery to this or another message consumer, depending on your broker's configuration.

Note

By default, the ActiveMQ broker, used in the JMS code examples, will send the message to a DLQ after attempting to redeliver the message 6 times.

Sending a message to a JMS producer endpoint engaged in a transaction will open a JMS Session in transactional mode, and issue a send operation via a JMS MessageProducer interface. The Session will not be committed, and therefore the message will not actually be sent, until the exchange has completed processing.

It is also possible to define a transaction as only spanning a portion of a route, or to change the transaction policy being used through the policy DSL statement. See the Limiting the scope of a transaction recipe for more details.

By and large, it is almost too easy for a JMS endpoint to become engaged in a transaction. This is due to the number of possible combinations in which transactional behavior can be switched on. The following section outlines the combinations that result in transactional behavior.

Consumption from a JMS endpoint will be transacted if:

  • The endpoint contains the transacted=true attribute, OR
  • The route defines a policy DSL block that refers to a SpringTransactionPolicy instance, which uses a JmsTransactionManager object that wraps the ConnectionFactory instance used by the endpoint, OR
  • The route uses the transacted DSL statement, AND
    • No transaction manager is defined in the Camel context, OR
    • A JmsTransactionManager is defined in the Camel context and it wraps the ConnectionFactory being used by the endpoint

A send through a JMS producer endpoint will be transacted if:

  • The route consumes messages using the same JMS endpoint and the endpoint uses the transacted=true attribute, OR
  • A JmsTransactionManager is defined that wraps the ConnectionFactory being used by its endpoint, AND:
    • It is the only transaction manager defined in the Camel context and the route contains a transacted statement, OR
    • A SpringTransactionPolicy is defined that wraps the transaction manager, and it is referenced by a policy block that wraps the send, or a transacted DSL statement that precedes the send.

Tip

To be truly satisfied that your transactional behavior works as expected, you should unit test it thoroughly.

There's more...

Sending an exchange using the InOut MEP to a JMS endpoint triggers request-reply over messaging. This is a mechanism that implements a remote service invocation using JMS as a transport. The mechanism is as follows:

  1. A temporary queue is created by the JMS endpoint, on which the current thread will wait for a reply. The message broker automatically assigns the name of this queue. The endpoint may cache these queues between requests, which gives a considerable performance boost (the caching level is set via the replyToCacheLevelName endpoint attribute).
  2. The contents of the exchange are then sent as the body of a JMS Message on the queue named in the endpoint (outbound). The name of the temporary queue (assigned by the message broker) is set in the JMSReplyTo header.
  3. An external service picks up messages from the request queue (outbound) and processes them. If it has been specifically written to deal with request-response, it will send a new message to the temporary queue named in the request's JMSReplyTo header.
  4. The message listener receives the response from the temporary queue and places its contents onto exchange, at which point it continues to be processed by the original thread.

Note

Any messages sent using the InOut MEP over a JMS endpoint do not participate in any transactions running with that component instance.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.142.56