Chapter 10. Java Message Service

Java Message Service is defined as JSR 914, and the complete specification can be downloaded from http://jcp.org/aboutJava/communityprocess/final/jsr914/index.html.

Message-oriented middleware (MOM) allows sending and receiving messages between distributed systems. Java Message Service (JMS) is a MOM that provides a way for Java programs to create, send, receive, and read an enterprise messaging system’s messages.

JMS defines the following concepts:

JMS Provider

An implementation of the JMS interfaces, included in a Java EE implementation.

JMS Client

An application or process that produces and/or receives messages. Any Java EE application component can act as a JMS client.

JMS Message

An object that contains the data transferred between JMS clients. A JMS producer/publisher creates and sends messages. A JMS consumer/subscriber receives and consumes messages.

Administered Objects

Objects created and preconfigured by an administrator. Typically refer to JMS Destinations and Connection Factories identified by a JNDI name.

JMS supports two messaging models: Point-to-Point and Publish-Subscribe.

In the Point-to-Point model, a publisher sends a message to a specific destination, called a queue, targeted to a subscriber. Multiple publishers can send messages to the queue, but each message is delivered and consumed by one consumer only. Queues retain all messages sent to them until the messages are consumed or expire.

In the Publish-Subscribe model, a publisher publishes a message to a particular destination, called a topic, and a subscriber registers interest by subscribing to that topic. Multiple publishers can publish messages to the topic, and multiple subscribers can subscribe to the topic. By default, a subscriber will receive messages only when it is active. However, a subscriber may establish a durable connection, so that any messages published while the subscriber is not active are redistributed whenever it reconnects.

The publisher and subscriber are loosely coupled from each other; in fact, they have no knowledge of each other’s existence. They only need to know the destination and the message format.

Different levels of quality-of-service, such as missed or duplicate messages or deliver-once, can be configured. The messsages may be received synchronously or asynchronously.

A JMS message is composed of three parts:

Header

is a required part of the message and is used to identify and route messages. All messages have the same set of header fields. Some fields are initialized by JMS provider and others are initialized by the client on a per-message basis.

The standard header fields are defined in Table 10-1.

Table 10-1. JMS header fields

Message header fieldDescription
JMSDestinationDestination to which the message is sent.
JMSDeliveryModeDelivery mode is PERSISTENT (for durable topics) or NON_PERSISTENT.
JMSMessageIDString value with the prefix “ID:” that uniquely identifies each message sent by a provider.
JMSTimestampTime the message was handed off to a provider to be sent. This value may be different from the time the message was actually transmitted.
JMSCorrelationIDUsed to link one message to another (e.g., a response message with its request message).
JMSReplyToDestination supplied by a client where a reply message should be sent.
JMSRedeliveredSet by the provider if the message was delivered but not acknowledged in the past.
JMSTypeMessage type identifier; may refer to a message definition in the provider’s respository.
JMSExpirationExpiration time of the message.
JMSPriorityPriorty of the message.
Properties

are optional header fields added by the client. Just like standard header fields, these are name/value pairs. The value can be boolean, byte, short, int, long, float, double, and String. Producer/publisher can set these values and consumer/subscriber can use these values as selection criteria to fine-tune the selection of messages to be processed.

Properties may be either application-specific, (standard properties defined by JMS), or provider-specific. JMS-defined properties are prefixed JMSX, and provider-specific properties are prefixed with JMS_<vendor_name>.

Body

is the actual payload of the message, which contains the application data.

Different types of body messages are shown in Table 10-2.

Table 10-2. JMS message types

Message typeDescription
StreamMessagePayload is a stream of Java primitive types, written and read sequentially.
MapMessagePayload is a set of name/value pairs; order of the entries is undefined, can be accessed randomly or sequentially.
TextMessagePayload is a String.
ObjectMessagePayload is a serializable Java object.
ByteMessagePayload is a stream of uninterpreted bytes.

Sending a Message

A JMS message can be sent from a stateless session bean:

@Resource(lookup = "myConnection")
ConnectionFactory connectionFactory;
    
@Resource(lookup = "myQueue")
Destination inboundQueue;

public void sendMessage(String text) {
    try {
        Connection connection = 
            connectionFactory.createConnection();
        Session session = 
            connection.createSession(false, 
                                     Session.AUTO_ACKNOWLEDGE);
        MessageProducer messageProducer = 
            session.createProducer(inboundQueue);
        TextMessage textMessage = 
            session.createTextMessage(text);
        messageProducer.send(textMessage);
    } catch (JMSException ex) {
        //. . .
    }
}

In this code:

  • ConnectionFactory is a JMS-administered object and is used to create a connection with a JMS provider. QueueConnectionFactory or TopicConnectionFactory may be injected instead to perform Queue- or Topic-specific operations, respectively. Destination is also an administered object and encapsulates a provider-specific address. A Queue or Topic may be injected here instead. Both of these objects are injected using @Resource and specifying the JNDI name of the resource.

  • A Connection is created that represents an active connection to the provider. The connection must be closed explicitly.

  • A Session object is created from the connection that provides a transaction in which the producers and consumers send and receive messages as an atomic unit of work. The first argument to the method indicates whether the session is transacted; the second argument indicates whether the consumer or the client will acknowledge any messages it receives, and is ignored if the session is transacted.

    If the session is transacted, as indicated by a true value in the first parameter, then an explicit call to Session.commit is required in order for the produced messages to be sent and for the consumed messages to be acknowledged. A transaction rollback, initiated by Session.rollback, means that all produced messages are destroyed, and consumed messages are recovered and redelivered unless they have expired.

    The second argument indicates the acknowledgment mode of the received message. The permitted values are defined in Table 10-3.

    Table 10-3. JMS message acknowledgment mode

    Acknowledgment modeDescription
    Session.AUTO_ACKNOWLEDGESession automatically acknowledges a client’s receipt of a message either when the session has successfully returned from a call to receive or when the MessageListener session has called to process the message returns successfully.
    Session.CLIENT_ACKNOWLEDGEClient explicitly calls the Message.acknowledge method to acknowledge all consumed messages for a session.
    Session.DUPS_OK_ACKNOWLEDGEInstructs the session to lazily acknowledge the delivery of messages. This will likely result in the delivery of some duplicate messages (with the JMSRedelivered message header set to true). However, it can reduce the session overhead by minimizing the work the session does to prevent duplicates.

    The session must be explicitly closed.

  • Use the session and the injected Destination object, inboundQueue in this case, to create a MessageProducer to send messages to the specified destination. A Topic or Queue may be used as the parameter to this method, as both inherit from Destination.

  • Use one of the Session.createXXXMessage methods to create an appropriate message.

  • Send the message using messageProducer.send(...).

This code can be used to send messages using both messaging models.

Quality of Service

By default, a JMS provider ensures that a message is not lost in transit in case of a provider failure. This is called a durable publisher/producer. The messages are logged to stable storage for recovery from a provider failure. However, this has performance overheads and requires additional storage for persisting the messages. If a receiver can afford to miss the messages, NON_PERSISTENT delivery mode may be specified. This does not require the JMS provider to store the message or otherwise guarantee that it is not lost if the provider fails.

This delivery mode can be specified:

messageProducer.setDeliveryMode(
    DeliveryMode.NON_PERSISTENT);

All messages sent by this messageProducer follow the semantics defined by NON_PERSISTENT delivery mode.

Delivery mode may alternatively be specified for each message:

messageProducer.send(textMessage, 
    DeliveryMode.NON_PERSISTENT, 6, 5000);

In this code, textMessage is the message to be sent with the NON_PERSISTENT delivery mode. The third argument defines the priority of the message and the last argument defines the expiration time.

JMS defines priority of a message on a scale of 0 (lowest) to 9 (highest). By default, the priority of a message is 4 (Message.DEFAULT_PRIORITY). Message priority may also be changed by invoking the Message.setJMSPriority method.

By default, a message never expires, as defined by Message.DEFAULT_TIME_TO_LIVE. This can be changed by calling the Message.setJMSExpiration method.

Receiving a Message Synchronously

A JMS message can be received synchronously:

@Resource(lookup = "myConnection")
ConnectionFactory connectionFactory;
    
@Resource(lookup = "myQueue")
Destination inboundQueue;

public void receiveMessage() {
    try {
        Connection connection = 
            connectionFactory.createConnection();
        Session session = 
            connection.createSession(false, 
                                Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = 
            session.createConsumer(inboundQueue);
        connection.start();
        while (true) {
             Message m = consumer.receive();
             // process the message
        }
    } catch (JMSException ex) {
        //. . .
    }
}

In this code:

  • ConnectionFactory and Destination are administered objects and are injected by the container by using the specified JNDI name. This is similar to what is done during message sending.

  • As done during message sending, a Connection object and a Session object are created. Instead of creating MessageProducer, a MessageConsumer is created from session and is used for receiving a message.

  • In an infinite loop, consumer.receive waits for a synchronous receipt of the message.

There are multiple publishers and subscribers to a topic. The subscribers receive the message only when they are active. However, a durable subscriber may be created that receives messages published while the subscriber is not active:

@Resource(lookup = "myTopicConnection")
TopicConnectionFactory topicConnectionFactory;

@Resource(lookup = "myTopic")
Topic myTopic;

public void receiveMessage() {
    TopicConnection connection = 
        topicConnectionFactory.createTopicConnection();
    TopicSession session = 
        connection.createTopicSession(false,
                                 Session.AUTO_ACKNOWLEDGE);
    TopicSubscriber subscriber = 
        session.createDurableSubscriber(myTopic, "myID");
    //. . .
}

In this code, TopicConnectionFactory and Topic are injected using @Resource. TopicConnection is created from the factory, which is then used to create TopicSession. TopicSession.createDurableSubscriber creates a durable subscriber. This method takes two arguments: the first is the durable Topic to subscribe to, and the second is the name used to uniquely identify this subscription. A durable subscription can have only one active subscriber at a time. The JMS provider retains all the messages until they are received by the subscriber or expire.

A client may use QueueBrowser to look at messages on a queue without removing them:

QueueBrowser browser = session.createBrowser(inboundQueue);
Enumeration messageEnum = browser.getEnumeration();
while (messageEnum.hasMoreElements()) {
    Message message = (Message)messageEnum.nextElement();
    //. . .
}

Receiving a Message Asynchronously

A JMS message can be received asynchronously using a message-driven bean:

@MessageDriven(mappedName = "myDestination")
public class MyMessageBean implements MessageListener {
    
    @Override
    public void onMessage(Message message) {
        try {
            // process the message
        } catch (JMSException ex) {
            //. . .
        }
    }
}

In this code:

  • @MessageDriven defines the bean to be a message-driven bean.

  • The mappedName attribute specifies the JNDI name of the JMS destination from which the bean will consume the message. This is the same destination to which the message was targeted from the producer.

  • The bean must implement the MessageListener interface, which provides only one method, onMessage. This method is called by the container whenever a message is received by the message-driven bean and contains the application-specific business logic.

This code shows how a message received by the onMessage method is a text message, and how the message body can be retrieved and displayed:

public void onMessage(Message message) {
    try {
        TextMessage tm = (TextMessage)message;
        System.out.println(tm.getText());
    } catch (JMSException ex) {
        //. . .
    }
}

Even though a message-driven bean cannot be invoked directly by a session bean, it can still invoke other session beans. A message-driven bean can also send JMS messages.

Temporary Destinations

Typically, JMS Destination objects (i.e., Queue and Topic) are administered objects and identified by a JNDI name. These objects can also be created dynamically, where their scope is bound to the Connection from which they are created:

TopicConnection connection = 
    topicConnectionFactory.createTopicConnection();
TopicSession session = 
    connection.createTopicSession(false, 
                                  Session.AUTO_ACKNOWLEDGE);
TemporaryTopic tempTopic = 
    session.createTemporaryTopic();

Similarly, a TemporaryQueue can be created:

QueueConnection connection = 
    queueConnectionFactory.createQueueConnection();
QueueSession session = 
    connection.createQueueSession(false, 
                                  Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = 
    session.createTemporaryQueue();

These temporary destinations are automatically closed, deleted, and their contents lost when the connection is closed. They can also be explicitly deleted by calling the TemporaryQueue.delete or TemporaryTopic.delete method.

These temporary destinations can be used to simulate a request-reply design pattern by using JMSReplyTo and JMSCorrelationID header field.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.34.39