Setting up XA transactions over multiple transactional resources

Using the recipes discussed previously in this chapter, it is possible to set up multiple transactions in one route with many transacted/policy blocks each referring to a different transaction manager, each managing a different transactional resource. When an exchange's processing is completed, the various transaction managers are instructed to commit one-by-one. If one of them fails to commit, perhaps due to an inconveniently-timed outage, that failure may leave the other backends in an inconsistent state overall, with some already having been committed, and now unable to roll back.

To get around this, you can employ a single over-arching transaction manager to coordinate multiple backends through a standards-based scheme known as XA transactions. eXtended Architecture (XA) transaction are supported in Java through the Java Transaction API (JTA). To enable this, the resources being managed, databases or message brokers, must be able to have their interactions managed through XA's two-phase commit (2PC) protocol.

This recipe will show you how to set up an XA transaction manager to manage a single transaction across two resources—a database and a JMS message broker. The concepts discussed here can be built on to enable coordinated transactions between whatever combination of brokers and databases you require.

Getting ready

The Java code for this recipe is located in the org.camelcookbook.transactions.xatransaction package. The Spring XML files are located under src/main/resources/META-INF/spring and prefixed with xaTransaction.

The example shown here is based on Spring and the Camel XML DSL. A Java-only version that replicates the same steps described here is provided in the sample code in the XATransactionTest class.

Note

This example uses the Atomikos TransactionsEssentials® open source XA transaction manager due to its popularity, and abundance of documentation. The specifics of setting up other XA transaction managers, such as JOTM, will be quite different, although the end behavior in your Camel route should be the same.

Servers based on the Java EE specification provide JTA/XA out of the box.

To use the Atomikos transaction manager, with support for JMS and JDBC, you will need the following Maven dependencies:

<dependency>
  <groupId>com.atomikos</groupId>
  <artifactId>transactions</artifactId>
  <version>${atomikos-version}</version>
</dependency>
<dependency>
  <groupId>com.atomikos</groupId>
  <artifactId>transactions-jdbc</artifactId>
  <version>${atomikos-version}</version>
</dependency>
<dependency>
  <groupId>com.atomikos</groupId>
  <artifactId>transactions-jms</artifactId>
  <version>${atomikos-version}</version>
</dependency>

The ${atomikos-version} used here is 3.9.1.

To use the Atomikos classes from your Camel application, you will additionally require the following Maven dependencies:

<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-spring</artifactId>
  <version>${camel-version}</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-tx</artifactId>
  <version>${spring-version}</version>
</dependency>

How to do it...

To get Atomikos-managed XA transactions working across a database and a JMS provider, we need to perform the following high-level steps:

  1. Set up an XA-capable DataSource, and wrap it with an Atomikos bean that ties it into the XA transaction.
  2. Set up an XA-capable ConnectionFactory, and wrap it with an Atomikos bean that ties it into the XA transaction.
  3. Instantiate the Atomikos transaction classes, and wrap them with a Spring PlatformTransactionManager that will be used from Camel through a transaction policy.
  4. Use the transaction policy from within a Camel route.

    Note

    Due to the complexity of XA setups, it is strongly recommended that you unit test your configuration.

In order to use XA transactions that span a database and a message broker, we need to wire together a number of classes as follows:

How to do it...

In detail, the wiring is performed as follows:

  1. A javax.sql.DataSource class is the main entry point to your database. Your database vendor provides the actual class used here. In order for a database to participate in an XA transaction, the DataSource implementation needs to also implement the javax.sql.XADataSource interface.

    Here we use the H2 in-memory database:

    <bean id="xa.h2DataSource"
          class="org.h2.jdbcx.JdbcDataSource">
      <property name="URL"
                value="jdbc:h2:mem:db1;DB_CLOSE_DELAY=-1"/>
      <property name="user"
                value="sa"/>
      <property name="password"
                value=""/>
    </bean>

    Note

    If using a database connection pool, make sure that the size of the pool is at least that of the number of concurrent XA transactions that you expect to support. The reason for this is that each active database transaction makes use of a separate connection.

  2. Each XADataSource resource that you want to engage in an XA transaction needs to be wrapped by an AtomikosDataSourceBean object. This bean is a delegate to the underlying auditDataSource, and will automatically register itself with the XA transaction manager:
    <bean id="atomikos.dataSource" 
          class="com.atomikos.jdbc.AtomikosDataSourceBean">
      <property name="uniqueResourceName"
                value="xa.h2"/>
      <property name="xaDataSource"
                ref="xa.h2DataSource"/>
    </bean>

    Tip

    If you would like to use multiple databases from within your route in an XA transaction, you would repeat this configuration step for each database's DataSource, assigning unique ids to each database's DataSource and each wrapping AtomikosDataSource.

    This bean will be the DataSource instance that you use from your SQL Component.

    <bean id="sql" 
          class="org.apache.camel.component.sql.SqlComponent">
      <property name="dataSource"
                ref="atomikos.dataSource"/>
    </bean>
  3. A javax.jms.ConnectionFactory class is the main entry point to using a JMS-based message broker. In order for a broker to be able to take part in an XA transaction, its ConnectionFactory implementation class needs to also implement the javax.jms.XAConnectionFactory interface.

    Here we use an embedded ActiveMQ broker:

    Note

    Note that for the purposes of XA transactions with ActiveMQ, we need to use a completely different ConnectionFactory implementation, instead of the usual ActiveMQConnectionFactory.

    <bean id="xa.amqConnectionFactory"
        class="org.apache.activemq.ActiveMQXAConnectionFactory">
      <property name="brokerURL"
                value="vm://myEmbeddedBroker"/>
    </bean>
  4. Wrap the ActiveMQXAConnectionFactory object with an Atomikos-specific bean that also supports the ConnectionFactory interface. This step is required to make the factory visible to the transaction manager:
    <bean id="atomikos.connectionFactory" 
          class="com.atomikos.jms.AtomikosConnectionFactoryBean"
          init-method="init" 
          destroy-method="close">
      <property name="uniqueResourceName" 
                value="xa.activemq"/>
      <property name="xaConnectionFactory"
                ref="xa.amqConnectionFactory"/>
      <property name="maxPoolSize" 
                value="10"/>
      <property name="ignoreSessionTransactedFlag" 
                value="false"/>
    </bean>

    Tip

    If you would like to use multiple message brokers from within your route in an XA transaction, you would repeat this configuration step for each broker's ConnectionFactory, assigning unique IDs to each ActiveMQXAConnectionFactory, and to each wrapping AtomikosConnectionFactory.

    This bean is the ConnectionFactory instance that should be used from your JMS/ActiveMQ Component:

    <bean id="jms" 
          class="org.apache.activemq.camel.component.ActiveMQComponent">
      <property name="connectionFactory" 
                ref="atomikos.connectionFactory"/>
      <property name="transactionManager" 
                ref="jta.transactionManager"/>
    </bean>

    The jta.transactionManager bean will be defined shortly.

  5. Set up the Atomikos JTA transaction classes as follows:
    <!-- javax.transaction.UserTransaction -->
    <bean id="atomikos.userTransaction"
          class="com.atomikos.icatch.jta.UserTransactionImp">
      <property name="transactionTimeout"
                value="300" />
    </bean>
    
    <!-- javax.transaction.TransactionManager -->
    <bean id="atomikos.transactionManager" 
          class="com.atomikos.icatch.jta.UserTransactionManager"
          init-method="init" 
          destroy-method="close"
          depends-on="atomikos.connectionFactory,atomikos.dataSource">
      <property name="forceShutdown"
                value="false"/>
    </bean>

    Note

    While the transactional resources atomikos.dataSource and atomikos.connectionFactory are not explicitly registered with the atomikos.transactionManager bean, they do need to be added to the depends-on attribute of the transaction manager's Spring bean tag.

    This ensures that any XA transactions that are in-flight are allowed to complete before Spring attempts to clean up the underlying resources on ApplicationContext shutdown.

  6. Wrap these in a Spring JTA-aware PlatformTransactionManager. Camel uses the Spring transaction management abstractions as discussed in the Introduction section:
    <bean id="spring.transactionManager" 
          class="org.springframework.transaction.jta.JtaTransactionManager">
      <property name="transactionManager" 
                ref="atomikos.transactionManager"/>
      <property name="userTransaction"
                ref="atomikos.userTransaction"/>
    </bean>
  7. Wrap the transaction manager in a SpringTransactionPolicy element, for use within the route:
    <bean id="PROPAGATION_REQUIRED"
          class="org.apache.camel.spring.spi.SpringTransactionPolicy">
      <property name="transactionManager" 
                ref="spring.transactionManager"/>
      <property name="propagationBehaviorName"
                value="PROPAGATION_REQUIRED"/>
    </bean>
  8. Use this policy (the PROPAGATION_REQUIRED bean id from step 7) in a route that consumes from one JMS queue (inbound), and inserts to a database and another JMS queue (outbound). Note that for the consumption to be transacted, we need to use the transacted=true attribute in the from endpoint's URI—Atomikos will very helpfully complain if you fail to do so.
    <from uri="jms:inbound?transacted=true"/>
    <transacted ref="PROPAGATION_REQUIRED"/>
    <log message="Processing message: ${body}"/>
    <setHeader headerName="message">
      <simple>${body}</simple>
    </setHeader>
    <to uri="sql:insert into audit_log (message) values(:#message)"/>
    <to uri="jms:outbound"/>
    <to uri="mock:out"/>

    If an exception is thrown from the mock:out endpoint, the send to the outbound queue, and the insert into the database, will be rolled back. The message consumed from the inbound queue will be returned to ActiveMQ, which will attempt to redeliver it up to six times before sending it to a dead-letter queue, by default.

How it works...

The Atomikos XA transaction manager keeps a local log (journal) of transaction operations on disk for rollback purposes (this is not a strict requirement of all XA transaction managers). The transaction manager instructs any XA-aware resources (the database and the message broker) that they need to execute their operations within an externally-coordinated transaction. Both the broker and the database keep their own view of the transaction written to disk.

The database insert and the message send are performed but not yet committed.

Once all of the processing steps are complete, the transaction manager asks each resource whether it is capable of proceeding, that is, committing the transaction successfully. This is known as the prepare phase. During this phase, any responses received from the resources are written to disk by the Atomikos transaction manager.

If all the resources agree they are ready to commit, the transaction manager issues this instruction during the commit phase, persisting the fact that it did so, as well as any subsequent success or failure responses that it receives.

If the transaction fails at any time during this process, for example due to a resource outage, the transaction manager will tell all the resources to rollback to their previous state. If the transaction manager's process goes down, there is a recovery process defined by the XA standard that will followed by the Atomikos transaction manager based on the state that it has written to its journal on disk. This recovery process will again attempt to perform the transaction, or tell the resources to rollback.

The XA standard tries to provide a predicable set of outcomes for these various failure scenarios.

The communication involved in coordinating the various resources, and the logging of each step to a journal (disk forcing), take additional time over that of the actual routing logic. In the majority of cases, this is an acceptable trade-off to guarantee consistency. Information around tuning options can be found in the Atomikos documentation.

There's more...

An in-depth discussion of JTA/XA transactions, and the vast number of corner cases and failure scenarios, is beyond the scope this book. If you would like to find out more, refer to the XA Exposed article link given in the See also section for an excellent primer.

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.34.25