Chapter 11. ActiveMQ broker features in action

 

This chapter covers

  • Using wildcards and composite destinations
  • Utilizing advisory messages
  • Understanding virtual topics and retroactive consumers
  • Using ActiveMQ plug-ins
  • An introduction to Apache Camel

 

In the previous chapter we looked at deploying ActiveMQ in enterprise environments: how to deploy ActiveMQ for high availability and for passing messages across geographically dispersed locations. In this chapter we’ll look at some of the more advanced configuration options for the ActiveMQ message broker. We’ll look at receiving messages from multiple destinations using wildcards, and sending messages to multiple destinations at the same time using composite destinations. We’ll show how to actively listen for changes in the state of the ActiveMQ broker and for clients leaving and joining by using advisory messages. Other advanced features of the broker we’ll look at include virtual topics, which let you broadcast messages over a topic, but have load balanced queues dispatch the messages. We’ll also look at message redelivery and dead-letter queues, and how to extend the functionality of the ActiveMQ broker with interceptor plug-ins. Finally, we’ll introduce Apache Camel, the popular integration framework, which can be embedded into an ActiveMQ broker to create a powerful integration engine and extend the flexibility and routing of ActiveMQ.

In the first section, we’ll look at how to send and receive messages from more than one destination at a time, using composite destinations and wildcards.

11.1. Wildcards and composite destinations

In this section we’ll look at two useful features of ActiveMQ: subscribing to multiple destinations using wildcards, and publishing to multiple destinations using composite destinations. ActiveMQ uses a special notation to denote a wildcard subscription; we’ll describe that in the next section.

11.1.1. Consume from multiple destinations using wildcards

ActiveMQ supports the concept of destination hierarchies—where the name of a destination can be used to organize messages into hierarchies, an element in the name is delimited by a dot (.). Destination hierarchies apply to both topics and queues.

For example, if you had an application that subscribed to the latest results for sports on a Saturday afternoon, you could use the following naming convention for your topics:

<Sport>.<League>.<Team> -

For example, to subscribe to the latest result for a team called Leeds in an English football game, you’d subscribe to the topic: football.division1.leeds. Now Leeds plays both football and rugby, and for convenience, you’d want to see all results for Leeds for both football and rugby for the same MessageConsumer. This is where wildcards are useful.

Three special characters are reserved for destination names:

  • . A dot, used to separate elements in the destination name
  • * Used to match one element
  • > Matches one or all trailing elements

So to subscribe to the latest scores that all Leeds teams are playing in, you can subscribe to the topic named *.*.Leeds, as shown:

String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory =
  new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Topic allLeeds = session.createTopic("*.*.Leeds");

MessageConsumer consumer = session.createConsumer(allLeeds);
Message result = consumer.receive();

If you wanted to find out the results of all the football games in Division 1, you’d subscribe to football.division1.*, and if you wanted to find out the latest scores for all rugby games, you could subscribe to rugby.>.

Wildcards and destination hierarchies are useful for adding flexibility to your applications, allowing for a message consumer to subscribe to more than one destination at a time. The ActiveMQ broker will scan any destination name for a match using wildcards, so generally the shorter the destination name, the better the performance.

But wildcards only work for consumers. If you publish a message to a topic named rugby.>, the message will only be sent to the topic named rugby.>, and not all topics that start with the name “rugby.” There is a way for a message producer to send a message to multiple destinations: by using composite destinations, which we look at next.

11.1.2. Sending a message to multiple destinations

It can be useful to send the same message to different destinations at once. For example, when you need real-time analytics about your enterprise: an application used by a retail store might want to send a message to request more inventory. So a message is sent to a queue destination at the retail store’s warehouse. But it may also want to broadcast that order to an in-store activity monitoring system. Usually you’d have to do this by sending the message twice and use two message producers—one for the queue and one for the topic. ActiveMQ supports a feature called composite destinations that allows you to send the same message to multiple destinations at once.

A composite destination uses a comma-separated name as the destination name. For example, if you created a queue with the name store.order.backoffice, store.order.warehouse, then the messages sent to that composite destination would actually be sent to the two queues from the same send operation, one queue named store.order.backoffice and one queue named store.order.warehouse.

Composite destinations can support a mixture of queues and topics at the same time. By default, you have to prepend the destination name with either queue:// or topic://. So for the store application scenario where you want to send an order message to both the order queue and also a topic, you’d set up your message producer as follows:

String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue ordersDestination = session.createQueue("store.orders, topic://
     store.orders");
MessageProducer producer = session.createProducer(ordersDestination);
Message order = session.createObjectMessage();
producer.send(order);

Wildcards and composite destinations are powerful tools for building less-complicated and flexible applications with ActiveMQ.

Next we’ll look at the management advisory messages that the ActiveMQ broker produces, and how you can subscribe to them to gain useful information on changes to your ActiveMQ system.

11.2. Advisory messages

Advisory messages are notification messages generated by the ActiveMQ broker as a result of changes to the broker. Typically, an advisory message will be generated every time a new administered object (connection, destination, consumer, producer) joins or leaves the broker, but advisory messages can be generated to warn about the ActiveMQ broker reaching system limits, too. Advisory messages are regular JMS messages that are generated on system-defined topics, which enables ActiveMQ applications to be notified asynchronously over JMS of changes in the ActiveMQ broker’s state. They can be a good alternative to using JMX to find out about the running state of an ActiveMQ broker.

ActiveMQ uses advisory messages internally too, to notify connections about the availability of temporary destinations and notify networks about the availability of consumers, so you should take care if you want to disable them.

Every advisory message generated has a JMSType of Advisory and predefined JMS String properties, identifying the broker where the advisory was generated:

  • originBrokerId—The ID of the broker that generated the advisory
  • originBrokerName—The name of the broker that generated the advisory
  • originBrokerURL—The first transport connector URL of the broker that generated the advisory

Advisory messages for changes in state to the administered objects usually use ActiveMQ-specific internal commands as the payload, but they do carry useful information. Let’s look at how to listen for connections starting and stopping with ActiveMQ:

String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory =
  new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();

Session session =
  connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic connectionAdvisory = AdvisorySupport.CONNECTION_ADVISORY_TOPIC;
MessageConsumer consumer = session.createConsumer(connectionAdvisory);

ActiveMQMessage message = (ActiveMQMessage) consumer.receive();

DataStructure data = (DataStructure) message.getDataStructure();
if (data.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) {
      ConnectionInfo connectionInfo = (ConnectionInfo) data;
      System.out.println("Connection started: " + connectionInfo);
} else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
     RemoveInfo removeInfo = (RemoveInfo) data;
     System.out.println("Connection stopped: " + removeInfo.getObjectId());
} else {
    System.err.println("Unknown message " + data);
}

You can see from the example that we use a regular JMS construct to start listening to advisory topics. It’s worth noting the use of the AdvisorySupport class, which contains the definition of all the advisory topic definitions. Things get harder when we start using ActiveMQ-specific command objects—although a ConnectionInfo is sent when a connection starts, a RemoveInfo is sent when a connection stops. The RemoveInfo does carry the connectionId (set as the RemoveInfo’s objectId)—so it’s possible to correlate which connection has stopped.

Most advisory messages are specific to destinations. But the AdvisorySupport class does have some helper methods to determine which advisory topic to listen to. You can also use wildcards—so, for example, if you created an advisory topic for the queue named >, you’d get information for all queues.

Let’s look at an example of listening for consumers coming and going for a queue named test.Queue:

Listing 11.1. Subscribing for consumer advisories
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory =
  new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();

//Lets first create a Consumer to listen too
Session session =
  connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//Lets first create a Consumer to listen too
Queue queue = session.createQueue("test.Queue");

MessageConsumer testConsumer = session.createConsumer(queue);

//so lets listen for the Consumer starting and stopping
Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(queue);
MessageConsumer consumer = session.createConsumer(advisoryTopic);
consumer.setMessageListener(new MessageListener(){

public void onMessage(Message m) {
   try {
       System.out.println("Consumer Count = "
       + m.getStringProperty("consumerCount"));
       DataStructure data = (DataStructure) message.getDataStructure();
       if (data.getDataStructureType() ==
          ConsumerInfo.DATA_STRUCTURE_TYPE) {
        ConsumerInfo consumerInfo = (ConsumerInfo) data;
        System.out.println("Consumer started: " + consumerInfo);
       } else if (data.getDataStructureType() ==
                 RemoveInfo.DATA_STRUCTURE_TYPE) {
        RemoveInfo removeInfo = (RemoveInfo) data;
        System.out.println("Consumer stopped: "
          + removeInfo.getObjectId());
       } else {
        System.err.println("Unknown message " + data);
       }
    } catch (JMSException e) {
     e.printStackTrace();
   }

});

testConsumer.close();

You’ll notice in the example that we create a test consumer on the queue test.queue before we create the listener for consumer advisories on test.queue. This is to demonstrate that the ActiveMQ broker will also send advisory messages for consumers that already exist when you start to listen for them.

There are some advisories on destinations that aren’t enabled by default; these are advisories on message delivery, slow consumers, fast producers, and so forth. To enable these advisories, you have to configure them on a destination policy in the ActiveMQ broker configuration file. For example, to configure an advisory message for slow consumers on all queues, you need to add the following to your configuration:

<destinationPolicy>
   <policyMap><policyEntries>
      <policyEntry queue=">" advisoryForSlowConsumers="true" />
    </policyEntries></policyMap>
</destinationPolicy>

You can use advisory messages to supplement your application behavior (for example, you could slow message production from your producers if you have slow consumers) or to supplement JMX monitoring of the ActiveMQ broker. Advisory messages are useful for getting dynamic information on changes to your ActiveMQ system.

Many different advisories are generated by the ActiveMQ broker to provide information about the running system. A dozen of the more useful advisory topics appear in the numbered list below, and their properties (matched by number) appear in table 11.1.

Table 11.1. Properties from the list of 12 ActiveMQ advisory topics
 

Description

Properties

Data structure

Generated by default

Policy entry property

1 Generated when a connection start/stops null null true none
2 Producer start/stop messages on a queue String=’producerCount’—number of producers ProducerInfo true none
3 Consumer start/stop messages on a Queue String=’consumerCount’—number of Consumers ConsumerInfo true none
4 Queue created/destroyed null null true none
5 Expired messages on a queue String=’orignalMessageId’—expired id Message true none
6 Slow queue consumer String=’consumerId’—consumer ID ConsumerInfo false advisoryForSlowConsumers
7 Fast queue producer String=’producerId’—producer ID ProducerInfo false advisdoryForFastProducers
8 Message delivered to the broker String=’orignalMessageId’—delivered ID Message false advisoryForDelivery
9 Message consumed by a client String=’orignalMessageId’—delivered ID Message false advisoryForConsumed
10 A usage resource is at its limit String=’usageName’—name of usage resource null false advisoryWhenFull
11 A broker is now the master in a master/slave configuration null null true none
12 Message sent to a dead letter queue String=’orignalMessageId’—delivered ID Message true none

  1. ActiveMQ.Advisory.Connection
  2. ActiveMQ.Advisory.Producer.Queue
  3. ActiveMQ.Advisory.Consumer.Queue
  4. ActiveMQ.Advisory.Queue
  5. ActiveMQ.Advisory.Expired.Queue
  6. ActiveMQ.Advisory.SlowConsumer.Queue
  7. ActiveMQ.Advisory.FastProducer.Queue
  8. ActiveMQ.Advisory.MessageDelivered.Queue
  9. ActiveMQ.Advisory.MessageConsumed.Queue
  10. ActiveMQ.Advisory.FULL
  11. ActiveMQ.Advisory.MasterBroker
  12. ActiveMQ.Advisory.MessageDLQd.Queue

In the next section, we’re going to change tack and look at an advanced feature called virtual topics, which can be used to supplement the way you consume messages, combining the features of both of topics and queues.

11.3. Supercharge JMS topics by going virtual

If you want to broadcast a message to multiple consumers, then you use a JMS topic. If you want a pool of consumers to receive messages from a destination, then you use a JMS queue. But there’s no satisfactory way to send a message to a topic and then have multiple consumers receiving messages on that topic the way you can with queues.

The JMS spec requires that a durable subscriber to a topic use a unique JMS client ID and subscriber name. Also, only one thread (a single consumer) can be active at any time with that unique JMS client ID and subscriber name. This means that if that subscriber dies for some reason, there will be no failover to another consumer and there’s no ability to load balance messages across competing consumers. But using JMS queue semantics allows the ability to fail over consumers, to load balance messages among competing consumers, and to use ActiveMQ message groups (see chapter 12), which allows sticky load balancing of messages to maintain message order. Furthermore, JMS queue depths can be monitored via JMX (see chapter 14). Using virtual topics works around these disadvantages while still retaining the benefits of JMS topics.

Virtual topics allow a publisher to send messages to a normal JMS topic while consumers receive messages from a normal JMS queue. So consumers subscribe to a queue to receive messages that were published to a topic. Figure 11.1 shows a diagram of how virtual topics are structured in ActiveMQ.

Figure 11.1. ActiveMQ virtual topics feed queues from topic messages

Some naming conventions are required to allow virtual topics to operate correctly. First, to identify that a topic is to be treated as a virtual topic, the topic name should always follow the pattern of VirtualTopic.<topic name>. So if you want to create a virtual topic for a topic whose name is orders, you need to create a destination with the name VirtualTopic.orders. This means that a publisher sends messages to a topic named VirtualTopic.orders. In order to consume from the queue that’s backed by the virtual topic, consumers must subscribe to a queue whose name follows the pattern Consumer.<consumer name>.VirtualTopic.<virtual topic name>.

Suppose you want consumers to compete for messages on a queue, but you want that queue to be backed by a topic. You’d create a two queue receivers, each consuming from a queue named Consumer.Foo.VirtualTopic.orders. An example of this is shown next.

Listing 11.2. Using virtual topics
...
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;

ConnectionFactory connectionFactory =
  new ActiveMQConnectionFactory(brokerURI);
Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();

String queueName = "Consumer.Foo.VirtualTopic.orders";

//Create the first consumer for Consumer.Foo.VirtualTopic.orders
Session sessionA =
  consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue fooQueueA = sessionA.createQueue(queueName);
MessageConsumer consumerA = sessionA.createConsumer(fooQueueA);
consumerA.setMessageListener(getMessageListener());

// Create the second consumer for Consumer.Foo.VirtualTopic.orders
Session sessionB =
  consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue fooQueueB = sessionB.createQueue(queueName);
MessageConsumer consumerB = sessionB.createConsumer(fooQueueB);
consumerB.setMessageListener(getMessageListener());

// Create the sender
String topicName = "VirtualTopic.orders";
Connection senderConnection = connectionFactory.createConnection();
senderConnection.start();
Session senderSession =
  senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic ordersDestination = senderSession.createTopic(topicName);
MessageProducer producer =
  senderSession.createProducer(ordersDestination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

// Send 2000 messages
for (int i = 0; i < 2000; ++i) {
  TextMessage message = createMessage(i);
  producer.send(message);
}
...

In listing 11.2, note that two consumers are subscribed to the same queue whose name follows the virtual topic naming pattern for the queue side. Also note that the producer is sending to a topic whose name follows the virtual topic naming pattern for the topic side. When the 2,000 messages are sent to the topic, each consumer will receive 1,000 messages from the queue.

Virtual topics are a convenient mechanism to combine the load balancing and failover aspects of queues, with the durability of topics. Not only does the consumer not need to worry about creating a unique JMS client ID and subscriber name, but the consumers are competing for messages in a load balanced manner using JMS queue semantics. If one of the consumers dies, the other consumer will continue to receive all the messages on the queue.

In the next section we’ll look at using ActiveMQ to combine the longevity of durable subscribers, with the performance of normal topic subscribers.

11.4. Retroactive consumers

For applications that require messages to be sent and consumed as quickly as possible—for example, a real-time data feed—it’s recommend that you send messages with persistence turned off.

There’s a downside to consuming nonpersistent messages, in that you’ll only be able to consume messages from the point when your message consumer starts. You can miss messages if your message consumer starts behind the message producer, or there’s a network glitch and your message consumer needs to reconnect to the broker (or another one in a network).

In order to provide a limited method of retroactive consumption of messages without requiring message persistence, ActiveMQ has the ability to cache a configurable size or number of messages sent on a topic. There are two parts to this—your message consumers need to inform the ActiveMQ broker that it’s interested in retroactive messages, and you need to configure the destination in the broker to say how many messages should be cached for consumption at a later point.

To mark a consumer as being retroactive, you need to set the retroactive flag for the message consumer. The easiest way to do that is to set the property on the topic name you use:

String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory =
  new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();

Session session =
  connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic =
  session.createTopic("soccer.division1.leeds?consumer.retroactive=true");
MessageConsumer consumer = session.createConsumer(topic);
Message result = consumer.receive();

On the broker side, you can configure a number of recovery policies on a topic-by-topic basis. The default is called the FixedSizedSubscriptionRecoveryPolicy, which holds a number of messages in a topic, based on the calculated size the messages will take from the broker memory. The default size is 64 KB.

You can configure the subscription recovery policy on a named topic, or use wild-cards to apply them to hierarchies. Here’s a sample configuration snippet of how to change the default cache size for the FixedSizedSubscriptionRecoveryPolicy for all topics created in the ActiveMQ broker:

 <destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry topic=">">
        <subscriptionRecoveryPolicy>
          <fixedSizedSubscriptionRecoveryPolicy maximumSize="8mb"/>
        </subscriptionRecoveryPolicy>
      </policyEntry>
    </policyEntries>
  </policyMap>
 </destinationPolicy>

Retroactive message consumers are a convenient mechanism to improve the reliability of your applications without incurring the overhead of message persistence. We’ve seen how enable retroactive consumers and how to configure their broker-side counterpart, the SubscriptionRecoveryPolicy.

In the next section we’re going to look at how ActiveMQ stores messages that can’t be delivered to message consumers—dead-letter queues.

11.5. Message redelivery and dead-letter queues

When messages expire on the ActiveMQ broker (they exceed their time-to-live, if set) or can’t be redelivered, they’re moved to a dead-letter queue, so they can be consumed or browsed by an administrator at a later point.

Messages are normally redelivered to a client in the following scenarios:

  • A client is using transactions and calls rollback() on the session.
  • A client is using transactions and closes before calling commit.
  • A client is using CLIENT_ACKNOWLEDGE on a session and calls recover() on that session.

A client application usually has a good reason to roll back a transacted session or call recover()—it may not be able to complete the processing of the message(s) because of its inability to negotiate with a third-party resource, for example. But sometimes an application may decide to not accept delivery of a message because the message is poorly formatted. For such a scenario, it doesn’t make sense for the ActiveMQ broker to attempt redelivery forever.

A configurable POJO is associated with the ActiveMQ connection that you can tune to set different policies. You can configure the amount of time the ActiveMQ broker should wait before trying to resend the message, whether that time should increase after every failed attempt (use an exponential back-off and back-off multiplier), and the maximum number of redelivery attempts before the message(s) are moved to a dead-letter queue.

Here’s an example of how to configure a client’s redelivery policy:

RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

By default, there’s one dead-letter queue for all messages, called AcitveMQ.DLQ, which expired messages or messages that have exceeded their redelivery attempts get sent to. You can configure a dead-letter queue for a hierarchy, or an individual destination in the ActiveMQ broker configuration, like in the following example, where we set an IndividualDeadLetterStrategy:

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">">
        <deadLetterStrategy>
          <individualDeadLetterStrategy
            queuePrefix="DLQ."
            useQueueForQueueMessages="true"
            processExpired="false"
            processNonPersistent="false"/>
        </deadLetterStrategy>
      </policyEntry>
    </policyEntries>
  </policyMap>
</destinationPolicy>

Note that we configure this dead-letter strategy to ignore nonpersistent and expired messages, which can prevent overwhelming the ActiveMQ broker with messages, if you’re using time-to-live on nonpersistent messages.

When a message is sent to a dead-letter queue, an advisory message is generated for it. You can listen for dead-letter queue advisory messages on the topic ActiveMQ.Advisory.MessageDLQd.*.

In the next section, we’ll look at some of the interceptor plug-ins that are available to extend the behavior of the ActiveMQ broker.

11.6. Extending functionality with interceptor plug-ins

ActiveMQ provides the ability to supply custom code to supplement broker functionality. To do so requires a good understanding of the ActiveMQ broker internals, which is unfortunately outside the scope of this book. But some ActiveMQ broker interceptor plug-ins are provided with the ActiveMQ distribution; some we’ve already covered, such as the authentication plug-in, in chapter 6. There are some additional miscellaneous plug-ins, and for completeness, it’s worth looking at those now.

We’ll start with visualization, which uses two different plug-ins that generate graphical representations of connections and destinations.

11.6.1. Visualization

Visualization can be useful for identifying usage patterns for an ActiveMQ broker. For example, being able to see a diagram of all the connections and the destinations that they’re consuming messages from can help identify rogue clients in production environments that have been erroneously consuming messages from the wrong queue. You can use two visualization plug-ins that generate a graph visualization file, which contains structural information for viewing structured data. You can use several tools to visualize the generated files, such as Graphviz (http://www.graphviz.org).

Two types of visualization plug-ins are available, one for connections (and associated consumers and producers) called connectionDotFilePlugin, and one for generating a destination hierarchy of all the queues and topics used in the ActiveMQ Broker, called destinationDotFilePlugin. When these plug-ins are enabled, they generate a DOT file on disk. A DOT file contains the structural information in a directed graph notation, suitable to be read by a graph visualization tool.

The connectionDotFilePlugin writes information about the client connections attached to the ActiveMQ broker to a DOT file on disk. By default the file is written into the current directory the ActiveMQ broker was started from and is called ActiveMQConnections.dot. Information about each client connection, including which destinations the client connection is sending messages to or receiving messages from, is also written to the DOT file. Every time there’s a change to a client connection (for example, it starts or stops a message consumer), a new client connection starts, or an old one stops, the DOT file will be overwritten.

The connectionDotFilePlugin has only one property: the name of the file to write the state information into, as shown in table 11.2.

Table 11.2. Properties for the connection DOT plug-in

Property name

Default value

Description

file ActiveMQConnections.dot The path name to write state information in DOT format for connections to the ActiveMQ broker

The destinationDotFilePlugin is similar to the connectionDotFilePlugin. When this plug-in is enabled, it will write state information about the current destinations in use by the ActiveMQ broker to a DOT file that’s by default called ActiveMQDestinations. dot. Whenever the state information of destinations changes within the broker, this DOT file will be updated. An example of the rendered DOT file for the destination-DotFilePlugin is shown in figure 11.2.

Figure 11.2. Active destinations for an ActiveMQ broker graphically generated by the destinationDotFilePlugin

The destinationDotFilePlugin has only one property: the name of file to write the state information into, as shown in table 11.3.

Table 11.3. Properties for the destination DOT plug-in

Property name

Default value

Description

file ActiveMQDestinations.dot The path name to write state information in DOT format for destinations to the ActiveMQ broker

All ActiveMQ broker plug-ins can be enabled within the configuration file for the broker. Here’s an example of an ActiveMQ broker configuration file which has the connectionDotFilePlugin and destinationDotFilePlugin enabled.

Listing 11.3. Configuring visualization plug-ins for the ActiveMQ broker
<broker useJmx="false" persistent="true">
  <plugins>
    <connectionDotFilePlugin file="ActiveMQConnections.dot"/>
    <destinationDotFilePlugin file="ActiveMQDestinations.dot"/>
  </plugins>
</broker>

The next plug-in we’ll look at was developed by one of ActiveMQ’s many users to enhance the logging available for the ActiveMQ broker.

11.6.2. Enhanced logging

The logging interceptor (loggingInterceptor), if configured, allows you to log messages that are sent or acknowledged on an ActiveMQ broker, in addition to the normal logging done by ActiveMQ. This plug-in can be useful for tracing problems or auditing messages. A few properties for the logging interceptor can be configured, as shown in table 11.4.

Table 11.4. Properties for the logging interceptor

Property name

Default value

Description

logAll false Log all the events
logMessageEvents false Log events associated with messages
logConnectionEvents true Log events associated with connections
logConsumerEvents false Log events associated with producers
logProducerEvents false The maximum size of the message journal data files before a new one is used
logInternalEvents false Detailed information of workings of the broker

11.6.3. Central timestamp messages with the timestamp interceptor plug-in

The timestamp plug-in (timestampingBrokerPlugin), if configured, updates the timestamp on messages as they arrive at the broker. This can be useful when there’s a difference (however small) between the system clocks on a computer sending messages to an ActiveMQ broker and the computer the broker resides on. When messages are sent with the timeToLive property set, it’s important that the system clocks between the sending machine and the broker are in sync; otherwise messages may get expired erroneously. A few properties for the timestamp plug-in can be configured, as shown in table 11.5.

Table 11.5. Properties for the message timestamp plug-in

Property name

Default value

Description

zeroExpirationOverride 0 When not zero, will override the expiration date for messages that currently don’t have an expiration set
ttlCeiling 0 When not zero, will limit the expiration time
futureOnly false If true, won’t update the timestamp of messages to past values

It’s recommend that the timestampingBrokerPlugin be enabled on the ActiveMQ broker if you’re using the timeToLive property on messages.

The next interceptor plug-in is used for generating messages about management statistics for the ActiveMQ broker.

11.6.4. Statistics

The statisticsBrokerPlugin will send MapMessages containing information about the statistics of the running of the ActiveMQ broker. There are two types of message: one for destinations and one that gives an overview of the broker itself.

To retrieve the statistics of the running broker with the statisticsBrokerPlugin enabled, send an empty message to the destination (queue or topic—it doesn’t matter) called ActiveMQ.Statistics.Broker. The JMSReplyTo header names the destination where the statistics message is sent.

Similarly, to retrieve information about a destination, send an empty message to the name of the destination, prepended with ActiveMQ.Statistics.Destination. For example, to retrieve statistics for the destination Topic.Foo, send a message to the destination ActiveMQ.Statistics.DestinationTopic.Foo.

You enable an ActiveMQ broker plug-in by including it in the broker configuration file, as shown:

Listing 11.4. Configuring plugins for the broker
<broker useJmx="false" persistent="false">
  <plugins>
    <loggingBrokerPlugin logAll="true" logConnectionEvents="false"/>
    <timeStampingBrokerPlugin
      zeroExpirationOverride="1000"
      ttlCeiling="60000" futureOnly="true"/>
    <statisticsBrokerPlugin/>
  </plugins>
</broker>

Broker interceptors are a useful addition for extending the functionality of ActiveMQ. But you can provide more features and flexibility by embedding Apache Camel, the powerful integration framework, in the ActiveMQ broker. We’ll look at Apache Camel integration next.

11.7. Routing engine with Apache Camel framework

Apache Camel is a simple-to-use integration framework that’s easily embedded in containers and applications.

At the core of the Camel framework is a routing engine builder. It allows you to define your own routing rules, the sources from which to accept messages, and how to process and send them to other destinations. Camel defines an integration language that allows you to define routing rules, akin to business processes.

Although Apache Camel supports a large number of integration components, we’ll demonstrate some relatively simple ways to extend the power of ActiveMQ by using Apache Camel for some simple routing.

Camel can use either a Java-based domain-specific language (DSL), or Scala DSL, or an XML-based DSL to define routes. We’ll concentrate on the XML-based DSL, so we can extend an ActiveMQ broker functionality directly from an XML configuration file. Apache Camel uses simple English prepositions, such as from and to, to denote a route. It’s easy to explain with an example. First we’ll define a simple broker configuration file to include a Camel XML file with routing rules:

<beans>
  <broker brokerName="testBroker">
    <transportConnectors>
      <transportConnector uri="tcp://localhost:61616"/>
    </transportConnectors>
  </broker>
  <import resource="camel.xml"/>
</beans>

Note we call import resource after the broker definition to include a Camel XML configuration file. Apache Camel comes with both a generic JMS component and a more specific, optimized ActiveMQ component. Obviously we’ll use the latter. The ActiveMQ component needs to be configured to communicate with the broker, and we’ll use the vm:// transport to do this. Note we called the ActiveMQ broker testBroker, so this needs to be the name we use when we set up the vm:// transport in the Camel XML configuration file:

<bean id="activemq"
  class="org.apache.activemq.camel.component.ActiveMQComponent">
  <property name="connectionFactory">
    <bean class="org.apache.activemq.ActiveMQConnectionFactory">
      <property name="brokerURL"
        value="vm://testBroker?create=false&waitForStart=1000"/>
      <property name="userName" value="DEFAULT_VALUE"/>
      <property name="password" value="DEFAULT_VALUE"/>
    </bean>
  </property>
</bean>

We can now define a route. A useful enhancement is to tap into messages broadcast on a topic, and place them in a queue for processing later:

<route>
  <from uri="activemq:topic:Test.Topic"/>
  <to uri="activemq:queue:Test.Queue"/>
</route>

This route will consume messages on the topic Test.Topic and route them to the queue Test.Queue. Simple, but useful stuff.

Let’s demonstrate something more complex. The statistics broker plug-in (statisticsBrokerPlugin) will only publish a statistic message when requested. So it’d be useful to broadcast a message with statistical information periodically, and we can use Apache Camel to do that.

First, we need to ensure that the statisticsBrokerPlugin is enabled, as in the following example configuration:

<beans>
  <broker useJmx="false" persistent="false">
    <plugins>
      <statisticsBrokerPlugin/>
    </plugins>
  </broker>
  <import resource="camel.xml"/>
</beans>

Then, with Apache Camel, we’ll do the following:

  • Use the timer component to initiate the name of the route to poll.
  • Communicate with the statistics plug-in using a request/reply pattern. In Apache Camel, a request/reply exchange is called InOut—we’ll poll the queue named Test.Queue.
  • Broadcast the result on a topic called Statistics.Topic.

The complete Apache Camel route is only three lines of code, as shown:

<route>
   <from uri="timer://foo?fixedRate=true&amp;period=1000"/>
   <inOut uri="activemq:queue:ActiveMQ.Statistics.DestinationTest.Queue"/>
   <to uri="activemq:topic:Statistics.Topic"/>
</route>

Apache Camel is an extremely flexible and feature-rich framework. We’ve only touched the surface of what you can achieve with it in conjunction with ActiveMQ. For more information, we encourage you to read Camel in Action (Claus Ibsen and Jonathan Anstey), available from Manning Publications.

11.8. Summary

In this chapter you’ve learned how to use wildcard and composite destinations, to improve the flexibility of your ActiveMQ applications to receive and send messages with multiple destinations. You now have an understanding of advisory messages generated by the ActiveMQ broker.

We’ve also covered the benefits of using virtual topics and retroactive consumers, and when to use them. We’ve also explained when dead-letter queues are used and how to configure them. Finally we covered how to use Apache Camel routes in ActiveMQ, for extending flexibility and functionality of the message broker.

In the next chapter, we’ll examine advanced messaging features that can be used from the client side of ActiveMQ.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.216.190.167