Chapter 5. ActiveMQ message storage

 

This chapter covers

  • How messages are stored in ActiveMQ for both queues and topics
  • The four styles of message stores provided with ActiveMQ
  • How ActiveMQ caches messages for consumers
  • How to control message caching using subscription recovery policies

 

The JMS specification supports two types of message delivery: persistent and nonpersistent. A message delivered with the persistent delivery property must be logged to stable storage. For nonpersistent messages, a JMS provider must make best efforts to deliver the message, but it won’t be logged to stable storage.

ActiveMQ supports both of these types of message delivery and can also be configured to support message recovery, an in-between state where messages are cached in memory. ActiveMQ supports a pluggable strategy for message storage and provides storage options for in-memory, file-based, and relational databases.

Persistent messages are used if you want messages to always be available to a message consumer after they’ve been delivered to the broker, even if that consumer isn’t running when the message was sent. Once a message has been consumed and acknowledged by a message consumer, it’s typically deleted from the broker’s message store.

Nonpersistent messages are typically used for sending notifications or real-time data. You should use nonpersistent messages when performance is critical and guaranteed delivery of the message isn’t required.

This chapter will first examine why messages are stored differently for queues and topics. We’ll then look at all four different message stores available to ActiveMQ, and why and when to use them for your application. Finally we’ll look at how ActiveMQ can be configured to temporarily cache messages for retrieval by message consumers at a later point in time. The flexibility offered by ActiveMQ for caching messages is unique, allowing fine control of message retrieval for your application.

This chapter will provide a detailed guide to message persistence. In order to lay the groundwork for this, first we’ll examine the storage of messages for JMS destinations.

5.1. How are messages stored by ActiveMQ?

It’s important to gain some basic knowledge of the storage mechanisms for messages in an ActiveMQ message store. This will aid in configuration and provide an awareness of what takes place in the ActiveMQ broker during the delivery of persistent messages. Messages sent to queues and topics are stored differently, because there are some storage optimizations that can be made with topics that don’t make sense with queues, as we’ll explain.

Storage for queues is straightforward—messages are basically stored in first in, first out order (FIFO). See figure 5.1 for a depiction of this. One message is dispatched to a single consumer at a time. Only when that message has been consumed and acknowledged can it be deleted from the broker’s message store.

Figure 5.1. First in, first out message storage for queues

For durable subscribers to a topic, each consumer gets a copy of the message. In order to save storage space, only one copy of a message is stored by the broker. A durable subscriber object in the store maintains a pointer to its next stored message and dispatches a copy of it to its consumer as shown in figure 5.2. The message store is implemented in this manner because each durable subscriber could be consuming messages at different rates or they may not all be running at the same time. Also, because every message can potentially have many consumers, a message can’t be deleted from the store until it’s been successfully delivered to every interested durable subscriber.

Figure 5.2. Messages stored for durable subscribers to topics use message pointers.

Every message store implementation for ActiveMQ supports storing messages for both queues and topics, though obviously the implementation differs between storage types. For example, the memory store holds all messages in memory.

Throughout the rest of this chapter, more details about configuring the different ActiveMQ message stores and their advantages and disadvantages will be explained.

5.2. The KahaDB message store

The recommended message store for general-purpose messages since ActiveMQ version 5.3 is KahaDB. This is a file-based message store that combines a transactional journal, for reliable message storage and recovery, with good performance and scalability.

The KahaDB store is a file-based, transactional store that’s been tuned and designed for the fast storage of messages. The aim of the KahaDB store is to be easy to use and as fast as possible. Its use of a file-based message database means there’s no prerequisite for a third-party database. This message store enables ActiveMQ to be downloaded and running in literally minutes. In addition, the structure of the KahaDB store has been streamlined especially for the requirements of a message broker.

The KahaDB message store uses a transactional log for its indexes and only uses one index file for all its destinations. It’s been used in production environments with 10,000 active connections, each connection having a separate queue. The configurability of the KahaDB store means that it can be tuned for most usage scenarios, from high throughput applications (for example, trading platforms), to storing large amounts of messages (for example, GPS tracking).

To enable the KahaDB store for ActiveMQ, you need to configure the <persistenceAdapter> element in the activemq.xml configuration file. Here’s a minimal configuration for the KahaDB message store:

<broker brokerName="broker" persistent="true" useShutdownHook="false">
...
    <persistenceAdapter>
    <kahaDB directory="activemq-data" journalMaxFileLength="16mb"/>
    </persistenceAdapter>
...
</broker>

If you want to embed an ActiveMQ broker inside an application, the message store can also be configured programmatically. Here’s an example of a programmatic configuration for KahaDB:

Although the example seems small, it’s enough to create an ActiveMQ broker using the KahaDB message store and listen for ActiveMQ clients connecting over TCP. For more information about embedding ActiveMQ, see chapter 8.

In order to better understand its use and configuration, it’s important to examine the internals of the KahaDB message store.

5.2.1. The KahaDB message store internals

The KahaDB message store is the fastest of all the provided message store implementations. Its speed is the result of the combination of a fast transactional journal comprised of data log files, the highly optimized indexing of message IDs, and in-memory message caching. Figure 5.3 provides a high-level diagram of the KahaDB message store.

Figure 5.3. In KahaDB messages are stored in indexed log files and cached for performance.

The diagram provides a view of the three distinct parts of the KahaDB message store including the following:

  • The data logs act as a message journal, which consists of a rolling log of messages and commands (such as transactional boundaries and message deletions) stored in data files of a certain length. When the maximum length of the currently used data file has been reached, a new data file is created. All the messages in a data file are reference counted, so that once every message in that data file is no longer required, the data file can be removed or archived. In the data logs, messages are only appended to the end of the current data file, so storage is fast.
  • The cache holds messages temporarily if there are active consumer(s) for the messages. If there are active consumers, messages are dispatched at the same time they’re scheduled to be stored. If messages are acknowledged in time, they don’t need to be written to disk.
  • The BTree indexes hold references to the messages in the data logs that are indexed by their message ID. The indexes maintain the FIFO data structure for queues and the durable subscriber pointers to their topic messages. The redo log is used only if the ActiveMQ broker hasn’t shut down cleanly, and are used to insure the integrity of the BTree index is maintained.

The KahaDB uses different files on disk for its data logs and indexes, so in the next section we’ll show a typical KahaDB directory structure.

5.2.2. The KahaDB message store directory structure

When you start an ActiveMQ broker configured to use a KahaDB store, a directory will automatically be created in which the persistent messages are stored. This directory structure is shown in figure 5.4.

Figure 5.4. The KahaDB message store directory structure

Inside of the KahaDB directory, the following directory and file structures can be found:

  • db log files— KahaDB stores messages into data log files named db-<Number>.log of a predefined size. When a data log is full, a new one will be created, and the log number incremented. When there are no more references to any of the messages in the data log file, it’ll be deleted or archived.
  • archive directory— This exists only if archiving is enabled. The archive is used to store data logs that are no longer needed by KahaDB, making it possible to replay messages from the archived data logs at a later point. If archiving isn’t enabled (the default), data logs that are no longer in use are deleted from the file system.
  • db.data— This file contains the persistent BTree indexes to the messages held in the message data logs.
  • db.redo— This is the redo file, used for recovering the BTree indexes if the KahaDB message store starts after a hard stop.

Now that we’ve covered the basics of the KahaDB store, the next step is to review its configuration.

5.2.3. Configuring the KahaDB message store

The KahaDB message store can be configured in the activemq.xml file. Its configuration options control the different tuning parameters, as described in table 5.1.

Table 5.1. Configuration options available for the KahaDB message store

Property name

Default value

Description

directory activemq-data Directory path used by KahaDB
indexWriteBatchSize 1000 Number of index pages to write in a batch to disk
indexCacheSize 10000 Number of index pages cached in memory
enableIndexWriteAsync false If set, will asynchronously write indexes
journalMaxFileLength 32mb A hint to set the maximum size of each of the message data logs
enableJournalDiskSyncs true Ensures every nontransactional journal write is followed by a disk sync (JMS durability requirement)
cleanupInterval 30000 Time (ms) before checking for and discarding/moving message data logs that are no longer used
checkpointInterval 5000 Time (ms) before checkpointing the journal
ignoreMissingJournalfiles false If enabled, will ignore a missing message log file
checkForCorruptJournalFiles false If enabled, on startup will validate that the message data logs haven’t been corrupted.
checksumJournalFiles false If enabled, will provide a checksum for each message data log
archiveDataLogs false If enabled, will move a message data log to the archive directory instead of deleting it
directoryArchive null Defines the directory to move data logs to when all the messages they contain have been consumed
databaseLockedWaitDelay 10000 Time (ms) before trying to acquire the database lock (used by shared master/slave)
maxAsyncJobs 10000 Maximum number of asynchronous messages that will be queued awaiting storage (should be the same as the number of concurrent MessageProducers)
concurrentStoreAndDispatchTransactions true Enables the dispatching of messages to interested clients to happen concurrently with transaction storage
concurrentStoreAndDispatchTopics true Enables the dispatching of topic messages to interested clients to happen concurrently with message storage
concurrentStoreAndDispatchQueues true Enables the dispatching of queue messages to interested clients to happen concurrently with message storage

ActiveMQ provides a pluggable API for message stores, and there are three additional implementations to KahaDB that are shipped with ActiveMQ:

  • The AMQ message store— A file-based message store designed for performance
  • The JDBC message store— A message store based on JDBC
  • The Memory message store— A memory-based message store

We’ll look at the use cases and configuration for these additional message stores in the next three sections. We’ll start with the AMQ message store, which like the KahaDB message store is a file-based implementation. It predates KahaDB, but because of its performance characteristics, it can make sense to use the AMQ store instead of KahaDB, provided the number of persistent destinations is relatively low.

5.3. The AMQ message store

The AMQ message store, like KahaDB, is a combination of a transactional journal for reliable persistence (to survive system crashes) and high-performance indexes, which makes this store the best option when message throughput is the main requirement for an application. But because it uses two separate files for every index, and there’s an index per destination, the AMQ message store shouldn’t be used if you intend to use thousands of queues per broker. Also, recovery can be slow if the ActiveMQ broker isn’t shut down cleanly. This is because all the indexes need to be rebuilt, which requires the broker to traverse all its data logs to accurately build the indexes again.

In the next section, we’ll briefly examine the internals of the AMQ message store, which are similar to the components of KahaDB.

5.3.1. The AMQ message store internals

The main components of the AMQ message store are similar to that of the KahaDB message store, in that there’s a cache, message data logs, and a reference store for accessing the data logs in order. Figure 5.5 provides a high-level diagram of the AMQ message store.

Figure 5.5. In the AMQ store messages are stored in referenced log files and cached for performance.

The diagram provides a view of the three distinct parts of the AMQ message store:

  • The data logs— These act as a message journal.
  • The cache— This holds messages for fast retrieval in memory after they’ve been written to the data logs.
  • The reference store— This holds references to the messages in the journal that are indexed by their message ID.

It’s important to understand the file-based directory structure used by the ActiveMQ message store. This will help with the configuration and also with problem identification when using ActiveMQ.

5.3.2. The AMQ message store directory structure

When you start ActiveMQ with the AMQ message store configured, a directory will automatically be created in which the persistent messages are held. The AMQ message store directory contains subdirectories for all the brokers that are running on the machine. For this reason it’s strongly recommended that each broker use a unique name. In the default configuration for ActiveMQ, the broker name is localhost, which needs to changed to something unique. This directory structure is represented in figure 5.6—the AMQ store directory structure.

Figure 5.6. The AMQ message store directory structure

The following directories and files can be found inside the data directory of an ActiveMQ broker:

  • A lock file— Ensures that only one broker can access this data at any given time. The lock is commonly used for hot standby purposes where more than one broker with the same name will exist on the same system.
  • A temp-storage directory— Used for storing nonpersistent messages that can no longer be stored in broker memory. These messages are typically awaiting delivery to a slow consumer.
  • The kr-store— The directory structure used by the reference (index) part of theAMQ message store. It uses the Kaha reference store by default (Kaha is part of the ActiveMQ core library) to index and store references to messages in the data logs. There are two distinct parts to the kr-store:

    • The data directory— Contains the indexes and collections used to reference the messages held in the data logs. This data directory is deleted and rebuilt as part of recovery, if the broker hasn’t shut down cleanly. You can force recovery by manually deleting this directory before starting the broker.
    • The state directory— Holds information about durable topic consumers. The journal itself doesn’t hold information about consumers, so when it’s recovered it has to retrieve information about the durable subscribers first to accurately rebuild its database.
  • The journal directory— Contains the data files for the data logs, and a data-control file that holds some meta information. The data files are reference counted, so when all the contained messages are delivered, a data file can be deleted or archived.
  • The archive directory— Exists only if archiving is enabled. Its default location can be found next to the journal. It makes sense to use a separate partition or disk. The archive is used to store data logs from the journal directory, which are moved here instead of being deleted. This makes it possible to replay messages from the archive at a later point. To replay messages, move the archived data logs (or a subset) to a new journal directory and start a new broker pointed to the location of this directory. It’ll automatically replay the data logs in the journal.

Now that the basics of the AMQ message store have been covered, the next step is to review its configuration.

5.3.3. Configuring the AMQ message store

The AMQ store configuration allows the user to change its basic behaviour around indexing, checkpoint intervals, and the size of the journal data files. These items and many more can be customized through the use of properties. The key properties for the AMQ store are shown in table 5.2.

Table 5.2. Configuration properties for the AMQ message store

Property name

Default value

Description

directory activemq-data The directory path used by the AMQ message store.
useNIO true NIO provides faster write-through to the systems disks.
syncOnWrite false Syncs every write to disk.
syncOnTransaction true Syncs every transaction to disk.
maxFileLength 32mb The maximum size of the message journal data files before a new one is used.
persistentIndex true Persistent indexes are used. If false, an in-memory HashMap is used.
maxCheckpointMessageAddSize 4kb The maximum memory used for a transaction before writing to disk.
cleanupInterval 3000(ms) Time before checking which journal data files are still required.
checkpointInterval 20000(ms) Time before moving cached message IDs to the reference store indexes.
indexBinSize 1024 The initial number of hash bins to use for indexes.
indexMaxBinSize 16384 The maximum number of hash bins to use.
directoryArchive archive The directory path used by the AMQ message store to place archived journal files.
archiveDataLogs false If true, journal files are moved to the archive instead of being deleted.
recoverReferenceStore true Recovers the reference store if the broker isn’t shut down cleanly; this errs on the side of extreme caution.
forceRecoverReferenceStore false Forces a recovery of the reference store.

Here’s an example of using the properties from table 5.2 in an ActiveMQ XML configuration file:

<?xml version="1.0" encoding="UTF-8"?>
<beans>
  <broker xmlns="http://activemq.apache.org/schema/core">
   <persistenceAdapter>
       <amqPersistenceAdapter
         directory="target/Broker2-data/activemq-data"
         syncOnWrite="true"
         indexPageSize="16kb"
         indexMaxBinSize="100"
         maxFileLength="10mb" />
    </persistenceAdapter>
</broker>
</beans>

This is but a small example of a customized configuration for the AMQ store using the available properties.

The AMQ store, like the KahaDB store, enables users to get up and running quickly, as there are no external dependencies on other databases. But when you want to run an ActiveMQ broker and use an already established relational database, you need to use a JDBC message store.

5.4. The JDBC message store

The flexibility of the ActiveMQ pluggable message store API allows for many different implementation choices. The oldest and more common store implementation uses JDBC for messaging persistence.

The most common reason why so many organizations choose the JDBC message store is because they already have expertise administering relational databases. JDBC persistence is definitely not superior in performance to the aforementioned message store implementations. The fact of the matter is that many businesses have invested in the use of relational databases so they prefer to make full use of them.

But the use of a shared database is particularly useful for making a redundant master/slave topology out of multiple brokers. When a group of ActiveMQ brokers is configured to use a shared database, they’ll all try to connect and grab a lock in the lock table, but only one will succeed and become the master. The remaining brokers will be slaves, and will be in a wait state, not accepting client connections until the master fails. This is a common deployment scenario for ActiveMQ, which will be covered in more detail in chapter 10.

When using the JDBC message store, the default JDBC driver used in ActiveMQ is Apache Derby. But many other relational databases are supported.

 

Using Apache Derby ActiveMQ

As mentioned, Apache Derby is the default database used with the JDBC store. Not only is it written in 100% Java, but it’s also designed to be embed-dable. Derby offers a full feature set, performs well, and provides a small footprint. But there’s one caveat with the use of Derby that should be passed along to ActiveMQ users. Derby can be tough on the garbage collector in the JVM. Because so much churn takes place with the storing and deleting of messages in the database, experience has proven that putting Derby in its own JVM instance allows ActiveMQ to perform much better. The reason for this comes down to the fact that ActiveMQ and Derby will no longer be competing for the same JVM resources.

 

5.4.1. Databases supported by the JDBC message store

Just about any database with a JDBC driver can be used. Though this isn’t an exhaustive list, the JDBC store has been shown to operate with the following relational databases:

  • Apache Derby
  • MySQL
  • PostgreSQL
  • Oracle
  • SQL Server
  • Sybase
  • Informix
  • MaxDB

Some users prefer to use a relational database for message persistence simply because of the ability to query the database to examine messages. The following sections will discuss this topic.

5.4.2. The JDBC message store schema

The JDBC message store uses a schema consisting of three tables. Two of the tables are used to hold messages, and the third is used as a lock table to ensure that only one ActiveMQ broker can access the database at one time. Here’s a detailed breakdown of these tables.

The message table, shown in table 5.3, is by default named ACTIVEMQ_MSGS and is defined as follows.

Table 5.3. The columns of the ACTIVEMQ_MSGS SQL table

Column name

Default type

Description

ID INTEGER The sequence ID used to retrieve the message.
CONTAINER VARCHAR(250) The destination of the message.
MSGID_PROD VARCHAR(250) The ID of the message producer.
MSGID_SEQ INTEGER The producer sequence number for the message. This together with the MSGID_PROD is equivalent to the JMS-MessageID.
EXPIRATION BIGINT The time in milliseconds when the message will expire.
MSG BLOB The serialized message itself.

Messages are broken down and stored into the ACTIVEMQ_MSGS table for both queues and topics.

There’s a separate table for holding durable subscriber information and an ID to the last message the durable subscriber received. This information is held in the ACTIVEMQ_ACKS table, which is shown in table 5.4.

Table 5.4. The columns of the ACTIVEMQ_ACKS SQL table

Column name

Default type

Description

CONTAINER VARCHAR(250) The destination of the message
SUB_DEST VARCHAR(250) The destination of the durable subscriber (can be different from the container if using wildcards)
CLIENT_ID VARCHAR(250) The client ID of the durable subscriber
SUB_NAME VARCHAR(250) The subscriber name of the durable subscriber
SELECTOR VARCHAR(250) The selector of the durable subscriber
LAST_ACKED_ID Integer The sequence ID of last message received by this subscriber

For durable subscribers, the LAST_ACKED_ID sequence is used as a simple pointer into the ACTIVEMQ_MSGS and enables messages for a particular durable subscriber to be easily selected from the ACTIVEMQ_MSGS table.

The lock table, called ACTIVEMQ_LOCK, is used to ensure that only one ActiveMQ broker instance can access the database at one time. If an ActiveMQ broker can’t grab the database lock, that broker won’t initialize fully, and will wait until the lock becomes free, or it’s shut down. The table structure of the lock table is defined in table 5.5.

Table 5.5. The columns of the ACTIVEMQ_LOCK SQL table

Column name

Default type

Description

ID INTEGER A unique ID for the lock
Broker Name VARCHAR(250) The name of the ActiveMQ broker that has the lock

Now that we’ve explained the structure of the database tables used by the JDBC store, we can walk through some examples of configuring JDBC message stores, which we look at in the next section.

5.4.3. Configuring the JDBC message store

Configuring the default JDBC message store is straightforward. As stated previously, the default JDBC store uses Apache Derby in the broker configuration as shown:

<beans>
  <broker brokerName="test-broker"
    persistent="true"
    xmlns="http://activemq.apache.org/schema/core">

    <persistenceAdapter>
      <jdbcPersistenceAdapter dataDirectory="activemq-data"/>
    </persistenceAdapter>

  </broker>
</beans>

The preceding configuration sets the persistence adaptor for the ActiveMQ broker to be the JDBC message store (which uses Apache Derby by default) and sets the data directory to be used by the embedded Apache Derby instance.

One of the key properties on the JDBC persistence adapter (the interface onto the JDBC message store) is the dataSource property. This property defines a factory from which connections to a relational database are created. Configuring the dataSource object enables the JDBC persistence adaptor to use physical databases other than the default. Here’s an example of an ActiveMQ configuration for the JDBC message store using the MySQL database:

<?xml version="1.0" encoding="UTF-8"?>
<beans>
  <broker brokerName="test-broker"
    persistent="true"
    xmlns="http://activemq.apache.org/schema/core">

  <persistenceAdapter>
     <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
  </persistenceAdapter>
</broker>

<bean id="mysql-ds"
  class="org.apache.commons.dbcp.BasicDataSource"
  destroy-method="close">
 <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
 <property name="url"
   value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
 <property name="username" value="activemq"/>
 <property name="password" value="activemq"/>
 <property name="maxActive" value="200"/>
 <property name="poolPreparedStatements" value="true"/>
</bean>

</beans>

The preceding example uses the Apache Commons DBCP BasicDataSource to wrap the MySQL JDBC driver for connection pooling. In the example, the driverClassName is the name of the JDBC driver to use. Some properties that you can configure are passed directly to the database driver itself. For example, maxActive is a property for the MySQL database connector, which tells the database how many active connections to hold open at one time.

Just as a point of comparison, here’s an example of a configuration to use the Oracle database:

<?xml version="1.0" encoding="UTF-8"?>
<beans>
  <broker brokerName="test-broker"
    persistent=true
    xmlns="http://activemq.apache.org/schema/core">

    <persistenceAdapter>
      <jdbcPersistenceAdapter dataSource="#oracle-ds"/>
    </persistenceAdapter>
  </broker>

 <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource"
   destroy-method="close">
    <property name="driverClassName"
      value="oracle.jdbc.driver.OracleDriver"/>
    <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/>
    <property name="username" value="scott"/>
    <property name="password" value="tiger"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
 </bean>

</beans>

This example uses the Apache Commons DBCP BasicDataSource to wrap the Oracle JDBC driver for connection pooling.

Now that some example configurations for the JDBC message store have been shown, you might ask, when is it best to use this type of persistence?

5.4.4. Using the JDBC message store with the ActiveMQ journal

Though the performance of the JDBC message store isn’t wonderful, it can be improved through the use of the ActiveMQ journal. The journal ensures the consistency of JMS transactions. Because it incorporates fast message writes with caching technology, it can significantly improve the performance of the ActiveMQ broker.

Here’s an example configuration using the journal with JDBC (aka journaled JDBC). In this case, Apache Derby is being used.

<?xml version="1.0" encoding="UTF-8"?>
<beans>
  <broker brokerName="test-broker"
    xmlns="http://activemq.apache.org/schema/core">

    <persistenceFactory>
      <journalPersistenceAdapterFactory
       journalLogFiles="4"
       journalLogFileSize="32768"
       useJournal="true"
       useQuickJournal="true"
       dataSource="#derby-ds"
       dataDirectory="activemq-data" />
    </persistenceFactory>

  </broker>

  <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
    <property name="databaseName" value="derbydb"/>
    <property name="createDatabase" value="create"/>
  </bean>

</beans>

The journal can be used with any JDBC datasource, but it’s important to know when it should and shouldn’t be used.

The journal offers considerable performance advantages over the use of a standard JDBC message store, especially when the JDBC database is co-located on the same machine as the ActiveMQ broker. The only time when it’s not possible to use the journal is in a shared database master/slave configuration. Because messages from the master may be stored locally in the journal before they’ve been committed to the database, using the journal in this configuration could lead to lost messages if the master failed because the journal isn’t replicated.

We’ve covered message storage in relational databases with some example configurations. In the next section we’ll look at the memory store, which doesn’t persist messages.

5.5. The memory message store

The memory message store holds all persistent messages in memory. No active caching is involved, so you have to be careful that both the JVM and the memory limits you set for the broker are large enough to accommodate all the messages that may exist in this message store at one time.

The memory message store can be useful if you know that the broker will only store a finite amount of messages, which will typically be consumed quickly. But it really comes into its own for small test cases, where you want to prove interaction with a JMS broker, but don’t want to incur the cost of a message store start time, or the hassle of cleaning up the message store after the test has finished.

5.5.1. Configuring the memory store

Configuring the memory store is simple. The memory store is the implementation used when the broker property named persistent is set to false (the default is true). Here’s an example of configuration which enables use of the ActiveMQ message store:

<?xml version="1.0" encoding="UTF-8"?>
<beans>
   <broker brokerName="test-broker"
           persistent="false"
           xmlns="http://activemq.apache.org/schema/core">
        <transportConnectors>
          <transportConnector uri="tcp://localhost:61635"/>
        </transportConnectors>
    </broker>

</beans>

By setting the persistent attribute on the broker element to false, this effectively tells the broker not to persist messages to long-term storage. Instead, the ActiveMQ broker will hold messages in memory until the messages are either consumed or the ActiveMQ broker is shut down.

Embedding an ActiveMQ broker with the memory store is easy. The following example starts a broker with the memory store:

import org.apache.activemq.broker.BrokerService;

public void createEmbeddedBroker() throws Exception {

    BrokerService broker = new BrokerService();
    //configure the broker to use the Memory Store
    broker.setPersistent(false);

    //Add a transport connector
    broker.addConnector("tcp://localhost:61616");

    //now start the broker
    broker.start();
}

Note the bold text that sets persistence to false on the broker object. This is equivalent to the previous XML configuration example.

There are currently no utilities to change from one type of ActiveMQ message store to another. If you want to change message stores for an application, it’s recommended that you only do so on a new ActiveMQ broker, or wait until your application has consumed all the messages sent, then close down the ActiveMQ broker, reconfigure it for a new message store, and restart it.

This concludes the discussion of the various message store implementations for message persistent in ActiveMQ. Another topic that bears some discussion regarding message persistence is a more specialized case for caching messages in the ActiveMQ broker for nondurable topic subscribers.

5.6. Caching messages in the broker for consumers

Although one of the most important aspects of message persistence is that the messages will survive in long-term storage, there are a number of cases where messages are required to be available for consumers that were disconnected from the broker, but persisting the messages in a database is too slow. Real-time data delivery of pricing information for a trading platform is a good example. But typically real-time data applications use messages that are only valid for a finite amount of time, often less than a minute. So it’s pointless to persist them to survive a system outage because new messages will arrive soon.

ActiveMQ supports the caching of messages for these types of systems using message caching in the broker by using something called a subscription recovery policy. This configurable policy is used for deciding which types of messages should be cached, how many, and for how long. In the rest of this section we’ll explain how message caching works in ActiveMQ and how to configure the different types of subscription recovery policies that are available.

5.6.1. How message caching for consumers works

The ActiveMQ message broker caches messages in memory for every topic that’s used. The only types of topics that aren’t supported are temporary topics and ActiveMQ advisory topics. Caching of messages in this way isn’t handled for queues, as the normal operation of a queue is to hold every message sent to it.

Messages that are cached by the broker are only dispatched to a topic consumer if the consumer is retroactive, and never to durable topic subscribers.

Topic consumers are marked as being retroactive by a property set on the destination when the topic consumer is created. Here’s an example:

On the broker side, the message caching is controlled by a destination policy called a subscriptionRecoveryPolicy. The default subscription recovery policy used in the broker is a FixedSizeSubscriptionRecoveryPolicy. Let’s walk through the different subscription recovery policies that are available.

5.6.2. The ActiveMQ subscription recovery policies

There are a number of different policies that allow for fine-tuning the duration and type of messages that are cached for nondurable topic consumers. Each policy type is explained here.

The Activemq Fixed Size Subscription Recovery Policy

This policy limits the number of messages cached for the topic based on the amount of memory they use. This is the default subscription recovery policy in ActiveMQ. You can choose to have the cache limit applied to all topics, or on a topic-by-topic basis. The properties available are shown in table 5.6.

Table 5.6. Configuration properties for a fixed size subscription recovery policy

Property name

Default value

Description

maximumSize 6553600 The memory size in bytes for this cache
useSharedBuffer true If true, the amount of memory allocated will be used across all topics
The Activemq Fixed Count Subscription Recovery Policy

This policy limits the number of messages cached by the topic based on a static count. Only one property is available, as listed in table 5.7.

Table 5.7. Configuration properties for a fixed count subscription recovery policy

Property name

Default value

Description

maximumSize 100 The number of messages allowed in the topics cache
The Activemq Query-Based Subscription Recovery Policy

This policy limits the number of messages cached based on a JMS property selector that’s applied to each message. Only one property is available, as shown in table 5.8.

Table 5.8. Configuration properties for a query-based subscription recovery policy

Property name

Default value

Description

query null Caches only messages that match the query
The Activemq Timed Subscription Recovery Policy

This policy limits the number of messages cached by the topic based on an expiration time that’s applied to each message. Note that the expiration time on a message is independent from the timeToLive that’s set by the MessageProducer. The configuration properties for a timed subscription policy are shown in table 5.9.

Table 5.9. Configuration properties for a timed subscription recovery policy

Property name

Default value

Description

recoverDuration 60000 The time in milliseconds to keep messages in the cache
The Activemq Last Image Subscription Recovery Policy

This policy holds only the last message sent to a topic. It can be useful for real-time pricing information—where a price per topic is used, you might only want the last price that’s sent to that topic. There are no configuration properties for this policy.

The Activemq No Subscription Recovery Policy

This policy disables message caching for topics. There are no properties to configure for this policy.

5.6.3. Configuring the subscription recovery policy

You can configure the subscriptionRecoveryPolicy for either individual topics, or you can use wildcards, in the ActiveMQ broker configuration. An example configuration is shown here:

5.7. Summary

This chapter began by discussing how messages are stored differently for queues and topics. Then the various message store implementations were explained and discussed, including their configuration and when to use each. You should have a good understanding about the two types of file-based message stores that you can use with ActiveMQ—the AMQ message store and the KahaDB message store—and their tradeoffs between performance and scalability. We also covered the JDBC message store, which is an option if you want to use an existing relational database and the ActiveMQ memory message store.

Finally we discussed the special case for caching messages in the broker for nondurable topic consumers. This section explained why caching is required, when it makes sense to use this feature, and the flexibility ActiveMQ provides in configuring the message caches.

In the next chapter, we’ll look at authentication of users of ActiveMQ and how to restrict access to destinations using authorization.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.234.225