Chapter 12. Advanced client options

 

This chapter covers

  • How to use exclusive consumers
  • The power of message groups
  • Understanding support for streams and blobs
  • The failover transport
  • Scheduling message delivery

 

In the last chapter we covered advanced ActiveMQ broker features. In this chapter we’re going to look at some advanced features on the client side of ActiveMQ. We’ll look at how to ensure that one message consumer will receive messages from a queue, regardless of how many message consumers have subscribed to it. This feature is called exclusive consumer, and can be used for ensuring that messages are always consumed in order, or as a distributed locking mechanism—for which we have an example. We’ll look at message groups, where messages can be grouped together to be consumed by the same message consumer. ActiveMQ supports two different ways to send large payloads through ActiveMQ—using ActiveMQ streams and blob messages—and we’ll look at both methods. As the client-side failover transport protocol is important for applications to survive network outages and broker failure, we’ll look at its nuances in more detail. And, finally, we’ll look at sending messages with a delay, and delay using scheduled messages.

One feature that you might be expecting in this chapter is different modes for client-side acknowledgement of messages. As we’ll find out in the next chapter on ActiveMQ performance tuning, choosing the right mode for acknowledgement of messages is critical for good performance, so we’ll cover acknowledgement modes and their consequences there.

12.1. Exclusive consumers

When messages are dispatched from an ActiveMQ broker, they’ll always be in first in, first out order. But if you have more than one message consumer for a queue in your application(s), you can’t guarantee that the order in which the messages were dispatched will be the same order in which your application will consume them. This is because you never have control over the scheduling of threads used to deliver the messages on the client side—even if all your message consumers are using the same connection. Ideally you’d only have one message consumer to ensure ordering of messages. But you may also need to support failover, to have another instance of your queue message consumer take over the processing of the queue messages if the first consumer fails. ActiveMQ can support having multiple message consumers on a queue, but having only one of them receive the messages from that queue. We’ll discuss this concept in the following subsection.

12.1.1. Selecting an exclusive message consumer

For applications where message order is important, or you need to ensure that there will be only one message consumer for a queue, ActiveMQ offers a client-side option to have only one active message consumer process messages. The ActiveMQ message broker will select one consumer on the queue to process messages. The benefit of allowing the broker to make the choice is that if the consumer stops or fails, then another message consumer can be selected to be active, as depicted in figure 12.1.

Figure 12.1. How exclusive consumers behave on failure

If you mix standard consumers and exclusive consumers on the same queue, the ActiveMQ message broker will still only deliver messages to one of the exclusive consumers. If all the exclusive consumers become inactive, and there’s still a standard message consumer, then consumption of queue messages will return to the normal mode of delivery—the messages will be delivered in a round-robin fashion between all the remaining active standard message consumers.

You can create an exclusive consumer using a destination option on the client, like in the following code extract:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);

The ability to select a message consumer to be exclusive can be used for more than guaranteeing that messages are consumed by only one active message consumer. You can use the exclusive consumer pattern to create a distributed lock, as we’ll demonstrate in the next section.

12.1.2. Using exclusive consumers to provide a distributed lock

Often you use messaging to broadcast data from an external resource, be that changes to records in a database, or comma-separated values appended to a file, or a raw real-time data feed. You might wish to build in redundancy, so if an instance of the application reading and broadcasting the changing data fails, another can take over. Often you can rely on locking a resource (row lock or file lock) to ensure that only one process will be accessing the data and broadcasting over a topic at a time. But when you don’t want the overhead of using a database, or want to run processes across more than one machine (and can’t use distributed locks), then you can use the exclusive consumer functionality to create a distributed lock. In figure 12.2 we show an application where we want failover for a client reading data from a real-time feed. We only want one client to connect to the feed and distribute the events, but if it fails, we need another client available to take over.

Figure 12.2. Using exclusive consumers as a distributed lock to create an exclusive producer application

In order to use exclusive consumers to create a distributed lock, we need our message producer to subscribe exclusively to a well-known queue. If the message producer receive a message from the queue, it becomes activated, and can then subscribe to the real-time feed and transform the real-time data into JMS messages. Here’s a code snippet for the message producer to initiate a distributed lock:

public void start() throws JMSException {
    this.connection = this.factory.createConnection();
    this.connection.start();
    this.session =
      this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    Destination destination = this.session.createQueue(this.queueName
      + "?consumer.exclusive=true");
    Message message = this.session.createMessage();
    MessageProducer producer = this.session.createProducer(destination);
    producer.send(message);
    MessageConsumer consumer = this.session.createConsumer(destination);
    consumer.setMessageListener(this);
 }

In this example, we always send a message to the well-known queue, to start off consumption—this step could always be done externally by a management process. Note that we use Session.CLIENT_ACKNOWLEDGE mode to consume the message. Although we want to be notified that we’re an exclusive consumer—and hence have the lock—we don’t want to remove the message from the well-known queue. In this way, if we fail, another exclusive producer will be activated.

For this example, we’d implement the MessageListener to look like the following code snippet. If we’re not already activated, we call a fictional method—start-Producing(). If this were a real application, this method would start subscribing to a real-time feed and convert real-time data into JMS messages:

public void onMessage(Message message) {
       if (message != null && this.active==false) {
           this.active=true;
           startProducing();
       }
}

We’ve shown that using an exclusive consumer allows us to ensure that only one message consumer will be active at a time. In the next section, we’ll look at message groups, where the ActiveMQ broker can selectively choose a message consumer for all messages that have the same JMSXGroupID message header property set.

12.2. Message groups

We can refine the exclusive consumer concept further with message groups. Instead of all messages going to a single message consumer, messages can be grouped together for a single consumer, and a message producer can designate a group for a message by setting the message header JMSXGroupID. The ActiveMQ broker will ensure that all messages with the same JMSXGroupID are sent to the same consumer, as shown in figure 12.3.

Figure 12.3. Message groups: messages with the same JMSXGroupID will be sent to the same consumer

If the consumer designated by the ActiveMQ broker to receive messages for a given JMSXGroupID should close or become unavailable (a network outage, for example), then the ActiveMQ broker will select a different message consumer to dispatch the grouped messages to.

Using message groups is straightforward. The definition of a group is left up to a user and is done on the message producer—it just has to be unique for a particular queue. All the routing is done in the ActiveMQ message broker.

To create a group, you need to set a JMSXGroupID string property on the messages being sent by the message producer, as shown:

Session session =
  connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("<foo>test</foo>");
message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
producer.send(message);

The previous example shows a message producer being created, and then setting a TextMessage to belong to the message group TEST_GROUP_A.

Message groups use normal message consumers, so no additional work is required to consume messages from a group. All the work is done by the message producer in defining the group messages belong to, and by the ActiveMQ broker in selecting a message consumer to send all the grouped messages to.

The ActiveMQ broker will add a sequence number to each message in a group, using the standard JMSXGroupSeq message header property. The sequence number will start from 1 for a new message group.

But from the perspective of the message consumer, you can’t assume that the first message you receive for a new group will have the JMSXGroupSeq set to 1. If an existing message group consumer closes or dies, any messages being routed to its group will be assigned a new consumer. To help identify that a message consumer is receiving messages to a new group, or a group that it hasn’t seen before, a Boolean property called JMSXGroupFirstForConsumer is set for the first message sent to the new message consumer. You can check whether a message is being sent to your consumer for the first time by seeing if this property has been set, as shown:

Session session = MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive();
String groupId = message.getStringProperty("JMSXGroupId");
if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) {
  // do processing for new group
}

It’s often the case that you start a number of message consumers to process messages at the same time. The ActiveMQ message broker will allocate all message groups evenly across all consumers, but if there are already messages waiting to be dispatched, the message groups will typically be allocated to the first consumer. To ensure an even distributed load, it’s possible to give the message broker a hint to wait for more message consumers to start. To do this, you have to set up a destination policy in the ActiveMQ broker’s XML configuration. Set the consumersBeforeDispatch-Starts property with the number of message consumers you expect your application to use, as the following example demonstrates:

 <destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">"
        consumersBeforeDispatchStarts="2"
        timeBeforeDispatchStarts="5000"/>
    </policyEntries>
  </policyMap>
</destinationPolicy>

The sample configuration tells the ActiveMQ broker that any queue (the name of the queue is >, which is a wildcard for any match) should wait for two consumers before dispatching. Additionally we’ve also set the timeBeforeDispatchStarts property to 5000ms to inform the ActiveMQ broker that if two message consumers aren’t available within 5 seconds of getting the first message on the queue, it should use the first that becomes available.

Using message groups does add some minimal overhead to the ActiveMQ broker, in terms of storing routing information for each message group. It’s possible to explicitly close a message group by sending a message to the ActiveMQ broker with the JMSXGroupID set to the group you want to close and the JMSXGroupSeq property set to -1, like in the following example:

Session session =
  connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("group.queue");
MessageProducer producer = session.createProducer(queue);<foo />
Message message = session.createTextMessage("<foo>close</foo>");
message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
message.setIntProperty("JMSXGroupSeq", -1);
producer.send(message);

You can re-create a message group that has been closed by sending a new message to the group. But the group may be assigned to a different message consumer by the ActiveMQ broker.

Conceptually, message groups are like using message selectors. The difference is that message groups automatically handle the selection of message consumers, and they also handle the failover of message groups when a message consumer fails.

Having looked at exclusive consumers and message groups, in the next sections we’re going to look at how to transport large messages using advanced client-side options with ActiveMQ, using either JMS streams or blob messages.

12.3. ActiveMQ streams

ActiveMQ streams are an advanced feature that allows you to use an ActiveMQ client as a Java IOStream. ActiveMQ will break an OutputStream into distinct chunks of data and send each chunk through ActiveMQ as a JMS message. A corresponding ActiveMQ JMS InputStream should be used on the consumer side to reassemble the data chunks.

If you use a queue as the destination for the stream, using more than one consumer on a queue (or an exclusive consumer) is fine because this feature uses message groups. This causes messages with the same group ID to be pinned to a single consumer. Using more than one producer in this scenario could cause problems with the message order.

The benefit of using JMS streams is that ActiveMQ will break a stream into manageable chunks and reassemble them for you at the consumer. So it’s possible to transfer very large files using this functionality, as depicted in figure 12.4.

Figure 12.4. Using IOStreams to transfer a large file through ActiveMQ

To demonstrate using streams, here’s an example of reading a large file and writing it out over ActiveMQ:

//source of our large data
FileInputStream in = new FileInputStream("largetextfile.txt");

String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory =
  new ActiveMQConnectionFactory(brokerURI);
Connection connection = (ActiveMQConnection)
  connectionFactory.createConnection();
connection.start();
Session session =
  connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);

OutputStream out = connection.createOutputStream(destination);

//now write the file on to ActiveMQ
byte[] buffer = new byte[1024];
while(true){
   int bytesRead = in.read(buffer);
   if (bytesRead==-1){
       break;
   }
        out.write(buffer,0,bytesRead);
}
//close the stream so the receiving side knows the steam is finished
out.close();

In the example, we create an ActiveMQConnection and create an OutputStream using a queue as the destination. We read the file using a FileInputStream, then write the FileInputStream onto the ActiveMQ OutputStream.

Note that we close the ActiveMQ OutputStream when we’ve completed reading the file. This is important so that the receiving side can determine whether the stream is finished. It’s recommended that you use a new OutputStream for each file you send.

For completeness, here’s the receiving end of an ActiveMQ stream:

//destination of our large data
FileOutputStream out = new FileOutputStream("copied.txt");

String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory =
  new ActiveMQConnectionFactory(brokerURI);
Connection connection = (ActiveMQConnection)
  connectionFactory.createConnection();
connection.start();
Session session =
  connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//we want to be an exclusive consumer
String exclusiveQueueName= QUEUE_NAME + "?consumer.exclusive=true";
Queue destination = session.createQueue(exclusiveQueueName);

InputStream in = connection.createInputStream(destination);

//now write the file from ActiveMQ
byte[] buffer = new byte[1024];
while(true){
   int bytesRead = in.read(buffer);
   if (bytesRead==-1){
        break;
   }
   out.write(buffer,0,bytesRead);
}
out.close();

In the example, we create an ActiveMQConnection and from that create an Input-Stream using a queue as a consumer. Note that we use an exclusive consumer by appending "?consumer.exclusive=true" to the name of the queue. We do this to ensure that only one consumer will be reading the stream at a time. We read the ActiveMQ InputStream and then write it to a FileOutputStream to reassemble the file on disk. Note that we expect the end of the file to be denoted by the end of the stream (or -1).

You can use streams with topics too—though if a consumer for a topic starts partway through the delivery of a stream, it won’t receive any data that was sent before it was started.

ActiveMQ breaks the stream into manageable chunks of data and sends each chunk of data as a separate message. This means that you have to be careful when using them, because if the message consumer should fail partway through reading the InputStream, there’s currently no way to replay the messages already consumed by the failed message consumer.

ActiveMQ streams are useful for transferring large payloads, though you’ll need to think about how an application using ActiveMQ streams should handle failure scenarios. There’s an alternative and more robust method of sending large payloads: using blob messages, which we cover in the next section.

12.4. Blob messages

ActiveMQ introduced the concept of blob messages so that users can take advantage of ActiveMQ message delivery semantics (transactions, load balancing, and smart routing) in conjunction with very large messages. A blob message doesn’t contain the data being sent, but is a notification that a blob (binary large object) is available. The blob itself is transferred out of bounds, by either FTP or HTTP. In fact, an ActiveMQ BlobMessage only contains the URL to the data itself, with a helper method to grab an InputStream to the real data. Let’s work through an example.

First we look at how to create a blob message. In this example we’ll assume that a large file already exists on a shared website, so we have to create a blob message to notify any interested consumers that it exists, as shown:

import org.apache.activemq.BlobMessage;
...

String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory =
  new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
ActiveMQSession session = (ActiveMQSession)
  connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
BlobMessage message =
session.createBlobMessage(new URL("http://example.com/bigfile.dat"));
producer.send(message);

In the example, we create a JMS connection, and from that an ActiveMQ session which has methods to support blob messages. We create a blob message from the URL of the file on our shared site (http://example.com) and send the blob message on a well-known queue (QUEUE_NAME).

Here’s the corresponding message consumer for blob messages:

import org.apache.activemq.BlobMessage;
...

// destination of our Blob data
FileOutputStream out = new FileOutputStream("blob.dat");

String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory =
  new ActiveMQConnectionFactory(brokerURI);
Connection connection = (ActiveMQConnection)
  connectionFactory.createConnection();
connection.start();
Session session =
  connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);

MessageConsumer consumer = session.createConsumer(destination);
BlobMessage blobMessage = (BlobMessage) consumer.receive();

InputStream in = blobMessage.getInputStream();
// now write the file from ActiveMQ
byte[] buffer = new byte[1024];
while (true) {
    int bytesRead = in.read(buffer);
    if (bytesRead == -1) {
    break;
    }
    out.write(buffer, 0, bytesRead);
}
out.close();

In the example we create a message consumer on our well-known queue (QUEUE_NAME). We assume that all messages sent to this queue are of type org.apache.activemq.BlobMessage. A blob message has a helper method to get an InputStream to the remote URL that the message producer created the blob message with. We grab the InputStream and use it to read the remote file and write it to a local disk, called blob.dat.

Using blob messages is more robust than stream messages, as each one is an atomic unit of work. But they do rely on an external server being available for storage of the actual data—in this example a file.

12.5. Surviving network or broker failure with the failover protocol

We introduced the failover protocol in chapter 4, where we explained the basics behind allowing a client to fail over to another ActiveMQ broker in the case of failure. The failover protocol is the default protocol used by the Java client, so it’s worth looking in more detail at some of its optional features and capabilities.

By default, you specify in the client URI one or more ActiveMQ brokers that could be used for the connection:

failover:(tcp://host1:61616,tcp://host2:61616,ssl://host2:61616)

By the way, specifying the failover transport URI like this is okay, too, although it can get a bit messy if there are any embedded query parameters.

ActiveMQ will connect to one of the brokers defined in the list, selecting one at random. With the failover protocol, the ActiveMQ client will instantiate a periodic keepalive protocol, so that it can detect whether the broker is no longer reachable (connection or broker lost). When it detects that the broker is no longer available, it will randomly select another broker from the list provided at startup. If only one broker URI is provided, the client will periodically check to see if the broker is available again. It’s possible to listen for transport interrupts by setting a TransportListener on the ActiveMQ connection:

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.transport.DefaultTransportListener;

...

ActiveMQConnection connection = (ActiveMQConnection)
  connectionFactory.createConnection();
  connection.addTransportListener(new DefaultTransportListener() {
      public void onException(IOException arg0) {
      System.err.println("This is bad");
      }

      public void transportInterrupted() {
      System.out.println("Transport interrupted");
      }
      public void transportResumed() {
      System.out.println("Transport resumed");
      }
  });

connection.start();

When you supply the failover protocol with more than one transport URI to use, by default it will select one at random to use, to ensure load balancing of clients across brokers. When you want to have a guaranteed order of brokers that the client will connect to, you need to disable the random selection of brokers by disabling the randomize failover property:

failover:(tcp://master:61616,tcp://slave:61616)?randomize=false

If none of the ActiveMQ brokers specified by the failover URI are available, then by default the failover transport will wait before trying again. The failover protocol will wait an increasing amount of time between each successive failure to connect to an ActiveMQ broker—this is called an exponential back-off. The failover protocol by default has useExponentialBackOff enabled. The wait time between successive attempts to connect is called the initialReconnectDelay (initial value is 10ms) and the multiplier to increase the wait time is called the backOffMultiplier (default value is 2.0). You can also set the maximum time period for the failover protocol by using maxReconnectDelay (default is 30000ms). An example configuration is shown next:

failover:(tcp://master:61616,tcp://slave:61616)?
backOffMultiplier=1.5,initialReconnectDelay=1000

One potential problem that you may run into using any transport protocol based on TCP is the ability to know when a peer (for ActiveMQ, this will be the broker) has died. This can happen for several reasons, like the failure of the ActiveMQ broker or loss of network. Also, if there’s a firewall between the ActiveMQ client and broker, it may drop the connection if it’s inactive for some time. It’s possible to configure keepalive on the TCP connection, but this is operating system–specific and can require changes to kernel parameters—and doesn’t work well in heterogeneous environments. For this reason, ActiveMQ uses a keepalive protocol on top of its transports, to keep firewalls open and also detect whether the broker is no longer reachable. The keepalive protocol periodically sends a lightweight command message to the broker, and expects a response. If it doesn’t receive one within a given time period, ActiveMQ will assume that the transport is no longer valid. The failover transport listens for failed transports and will select another transport to use on such a failure. The parameter used by the keepalive protocol in ActiveMQ is maxInactivityDuration, which is an OpenWire property; the default is 30000 (milliseconds). You can specify a different timeout to be used with the failover transport, as shown:

failover:(tcp://host1:61616?wireFormat.maxInactivityDuration=1000,
tcp://host2:61616?wireFormat.maxInactivityDuration=1000)

Note that you have to set this parameter (and any other OpenWire properties) on the transports used by the failover protocol, not the failover protocol itself.

By default, the delivery mode for sending messages from an ActiveMQ client is persistent (this is so ActiveMQ is compliant with the JMS specification). A message sent with a persistent delivery mode will be synchronous—send() will block until it gets a receipt from the broker that it has successfully received and stored the message. For applications where performance is important, using nonpersistent delivery can dramatically improve results (see chapter 13). When nonpersistent delivery is used, messages are sent asynchronously, which has the downside that you can potentially lose messages in flight if a transport fails. You can configure the failover transport to prevent this by enabling message caching with the trackMessages failover transport property. You can also control the maximum size of this message cache by use of the maxCacheSize failover property—the default is 128 KB (the memory allocation size allowed for the message cache). Here’s an example configuration for enabling caching and setting the maximum cache size:

failover:(tcp://host1:61616,tcp://host2:61616)?
trackMessages=true,maxCacheSize=256000

For high-performance applications, fast failover is important too. It takes a considerable amount of time to build up a new transport connection (in the order of tens to hundreds of milliseconds), so to enable fast failover, ActiveMQ can optionally allow the failover protocol to build a backup connection ready to go if the primary transport fails. The failover property to set to allow a backup connection is unsurprisingly called backup. You can have more than one backup enabled (the default is 1) by setting the failover property backupPoolSize. An example failover URI using backup is shown next:

failover:(tcp://host1:61616,tcp://host2:61616,
tcp://host3:61616)?backup=true,backupPoolSize=2

So far we’ve looked at configuring the failover transport with a static list of URIs to the broker, but an ActiveMQ broker does know what brokers it’s connected to, so it can optionally dynamically update clients with changes to the cluster as brokers come and go. To enable dynamic updates of brokers to an ActiveMQ client, we need to enable the property updateClusterClients on the TransportConnector used in the ActiveMQ broker configuration. Properties on the TransportConnector are used to control the updates; these are as shown in table 12.1.

Table 12.1. TransportConnector properties for updating clients of cluster changes

Property

Default value

Description

updateClusterClients false If true, pass information to connected clients about changes in the topology of the broker cluster.
rebalanceClusterClients false If true, connected clients will be asked to rebalance across a cluster of brokers when a new broker joins the network of brokers.
updateClusterClientsOnRemove false If true, will update clients when a cluster is removed from the network. Having this as separate option enables clients to be updated when new brokers join, but not when brokers leave.
updateClusterFilter null Comma-separated list of regular expression filters used to match broker names of brokers to designate as being part of the failover cluster for the clients.

An interesting property is rebalanceClusterClients which, if enabled, ensures that the ActiveMQ clients will evenly distribute themselves across the cluster when a new broker joins.

An example configuration for an ActiveMQ broker on a machine named tokyo using these properties is shown next:

<broker>
  ...
  <transportConnectors>
    <transportConnector name="clustered"
      uri="tcp://0.0.0.0:61616"
      updateClusterClients="true"
      updateClusterFilter="*newyork*,*london*" />
  </<transportConnectors>
  ...
</broker>

This configuration will update any clients that are using the failover transport protocol with the locations of any brokers joining that have newyork or london in their broker names. With updateClusterClients enabled, you only need to configure the failover protocol with one broker in the cluster, for example:

failover:(tcp://tokyo:61616)

As the client will be updated automatically as new brokers join and leave the cluster, if the machine tokyo should fail, the client would automatically fail over to either newyork or london.

You may wish for your clients to automatically be distributed around all the machines in a cluster, so all the machines share the load of your messaging application. By enabling the property rebalanceClusterClients on the TransportConnector, as ActiveMQ brokers join and leave the cluster, this will automatically happen.

In this section we’ve taken a deeper look at some the functionality that can be used with the failover transport protocol. You should now have a better understanding of how to configure an ActiveMQ client to detect and survive a network outage or broker failure.

In the next section we’re going to look at scheduling a message to be delivered by the ActiveMQ broker at some time in the future.

12.6. Scheduling messages to be delivered by ActiveMQ in the future

The ability to schedule a message to be delivered after a delay, or at regular intervals, is an extremely useful feature provided by ActiveMQ. One unique benefit is that messages that are scheduled to be delivered in the future are stored persistently, so that they can survive a hard failure of an ActiveMQ broker and be delivered on restart.

You specify that you want a message to be delivered at a later time by setting well-defined properties on the message. For convenience, the well-known property names are defined in the org.apache.activemq.ScheduledMessage interface. These properties are shown in table 12.2.

Table 12.2. TransportConnector properties for updating clients of cluster changes

Property

type

Description

AMQ_SCHEDULED_DELAY false The time in milliseconds that a message will wait before being scheduled to be delivered by the broker
AMQ_SCHEDULED_PERIOD false The time in milliseconds after the start time to wait before scheduling the message again
AMQ_SCHEDULED_REPEAT false The number of times to repeat scheduling a message for delivery
AMQ_SCHEDULED_CRON String Use a cron entry to set the schedule

To have a message wait for a period of time before its delivered, you only need to set the AMQ_SCHEDULED_DELAY property. Suppose you want to publish a message from your client, but have it actually delivered in 5 minutes time. You’d need to do something like the following in your client code:

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delayTime = 5 * 60 * 1000;

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayTime);
producer.send(message);

ActiveMQ will store the message persistently in the broker, and when it’s scheduled, it will deliver it to its destination. This is important, because although you’ve specified that you want the message to be delivered in 5 minutes time, if the destination is a queue, it will be posted to the end of the queue. So the actual delivery time will be dependent on how many messages already exist on the queue awaiting delivery.

You can also use a the AMQ_SCHEDULED_PERIOD and AMQ_SCHEDULED_REPEAT properties to have messages delivered at a fixed rate. The following example will send a message 100 times, every 30 seconds:

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 30 * 1000;
int repeat = 99;

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,
  COUNT repeat);
producer.send(message);

Note that we specified the repeat as being 99, as the first message + 99 = 100. If you schedule a message to be sent once, the message ID will be the same as the one you published. If you schedule a repeat, or use the AMQ_SCHEDULED_CRON property to schedule your message, then ActiveMQ will create a unique message ID for the delivered message.

Cron is a well-known job scheduler on Unix systems, and it uses an expression string to denote when a job should be scheduled. ActiveMQ uses the same syntax, as described next:

.---------------- minute (0 - 59)
| .------------- hour (0 - 23)
| | .---------- day of month (1 - 31)
| | | .------- month (1 - 12) - 1 = January
| | |   |       .---- day of week (0 - 7) (Sunday=0 or 7
| | |   |       |
* * *   *       *

For example, if you want to schedule a message to be delivered at 2 a.m. on the twelfth day of every month, you’d need to do the following:

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,
  "0 2 12 * *");
producer.send(message);

You can combine scheduling with cron and a simple delay and repeat, but the cron entry will always take precedence. For example, instead of sending one message at 2 a.m. on the twelfth day of every month, you may want to schedule 10 messages to be delivered every 30 seconds:

long delay = 30 * 1000;
long period = 30 * 1000;
int repeat = 9;

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");

message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,
  "0 2 12 * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,
  COUNT repeat);

producer.send(message);

In this section we’ve looked at how to schedule messages for sometime in the future using ActiveMQ. You should now be able to send messages after a delay, send multiple instances of the same message at regular intervals, and use a cron entry to schedule message delivery.

12.7. Summary

In this chapter we learned about some of the advanced features that an ActiveMQ client can use above and beyond the JMS specification.

We learned about exclusive consumers, and walked through an example of using them as a distributed locking mechanism to ensure (paradoxically) that only one producer will be running for a distributed application. We’ve seen the power of using message groups to group messages together so that they’re consumed by the same message consumer.

We also looked at two different ways of transporting large payloads with ActiveMQ: ActiveMQ streams and blob messages. You should also have a much better understanding of the options available when using the failover transport protocol, and how to schedule delivery of messagse in the future with ActiveMQ.

In the next chapter we’ll look at performance tuning with ActiveMQ, and some of the trade-offs between reliability and performance.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.110.119