Chapter 30

Introducing JMS and MOM

People send messages to each other via e-mail, instant messages, Twitter, Facebook, and so on. People can also communicate using more traditional methods such as the post. You just need to drop a letter in a mailbox, and the rest will be taken care of by postal service providers and logistics companies such as USPS, FedEx, UPS, and DHL.

Applications can send messages to each other using message-oriented middleware (MOM), which plays a role similar to that of the delivery services. A program “drops the message” into a message queue (think mailbox) using the Java Messaging Service (JMS) API, and the message is delivered to another application that reads messages off of this queue.

Although JMS is a part of the Java EE specification, you can use it with Java SE applications without needing to have any Java application server — just make a .jar containing JMS classes available to your program. In this lesson you’ll learn how to write simple Java clients to send and receive applications with the JMS API via a MOM provider. In Lesson 31 I’ll show you how to bring a Java EE application server to the messaging architecture, and why it’s beneficial. Finally, in Lesson 32 you’ll learn about the value that message-driven beans bring to the table.

Messaging Concepts and Terminology

You have already learned several methods of data exchange in distributed Java applications — direct socket communication, RMI, and HTTP-based interactions. But all of them were based on remote procedure calls (RPC) or the request/response model. MOM enables you to build distributed systems that communicate asynchronously.

JMS is an API for working with MOM. JMS itself isn’t the transport for messages. JMS is to MOM what JDBC is to a relational DBMS. Java applications can use the same JMS classes with any MOM provider. Here’s a list of some popular MOM software:

  • WebSphere MQ (IBM)
  • EMS (Tibco Software)
  • SonicMQ (Progress Software)
  • ActiveMQ (open source, Apache)
  • Open MQ (open source, Oracle)

If you place an order to buy some stocks by calling the method placeOrder() on a remote machine, that’s an example of a synchronous or blocking call. The calling program can’t continue until the code in the placeOrder() method is finished or has thrown an error.

With asynchronous communications it’s different — you can place an order but don’t have to wait for its execution. Similarly, when you drop a letter in a mailbox you don’t have to wait for a mail truck. The same applies to e-mail — press the Send button and continue working on other things without waiting until your message has been received. Recipients of your e-mails also don’t have to be online when you send a message; they can read it later.

The process of placing a trading order comes down to putting a Java object that describes your order into a certain message queue of your MOM provider. After placing an order, the program may continue its execution without waiting until the processing of the order is finished. Multiple users can place orders into the same queue. Another program (not necessarily Java) should be de-queueing and processing messages. Figure 30-1 shows how a trading application can place orders (and receive executions) with another application running on a stock exchange.

To increase the throughput of your messaging-based application, add multiple consumers reading messages off of the same queue. You can create a consumer Java program that starts multiple threads, each of them de-queuing messages from the same queue. In Lesson 32 you’ll see how easy it is to configure multiple consumers using message-driven beans.

After your order has been processed, the appropriate message object will be placed into another queue and, if your application is active at that time, it will de-queue the message immediately upon its arrival. If your application is not running, but you’ve opted for guaranteed delivery (explained later in this lesson), the messages will be preserved in the queue by the MOM provider.

Even if the application in a brokerage company is down when the order execution arrives from the stock exchange, it’ll be saved in the MOM server until the receiving program is up again.

Two Modes of Message Delivery

A program can send or publish a message. If it sends a message to a particular queue, and another program receives the message from this queue, we call this point-to-point (P2P) messaging. In this mode a message is deleted from a queue as soon as it’s successfully received.

If a program publishes a message to be consumed by multiple recipients, that’s publish/subscribe (pub/sub) mode. A message is published to a particular topic and many subscribers can subscribe to receive it. The topic represents some important news for applications and/or users, for example PriceDropAlert, BreakingNews, et cetera. In pub/sub mode a message is usually deleted from a queue as soon as all subscribers receive it (read about durable subscribers in the section “How to Subscribe for a Topic”).

Another good example of a pub/sub application is a chat room. A message published by one person is received by the other people present in the chat room. Developing a chat room with JMS and MOM is a pretty trivial task.

Message delivery can be guaranteed, and MOM, just like the post office, will keep the message in a queue until the receiver gets it. In this mode messages are persistent — the MOM provider stores them in its internal storage, which can be a DBMS or a file system. In a non-guaranteed mode MOM will deliver a message only to the receiving applications that are up-and-running at the moment that the message arrives.

JMS API Overview

JMS is a pretty simple API that includes Java classes that enable you to send and receive messages. Following are the names of the major JMS interfaces and a short description of each. All of them are defined in the package javax.jms. All Java EE 6 JMS classes and interfaces are listed at http://download.oracle.com/javaee/6/api/javax/jms/package-summary.html, but I’ll just give you the summary of the main ones.

  • Queue is a place to put (or get) your messages. The messages will be retrieved using the first in first out (FIFO) rule. A message producer (sender) puts messages in a queue and a message consumer (receiver) de-queues them.
  • QueueConnection is an interface (it extends Connection) that represents a particular connection to MOM (and plays a similar role to the JDBC class Connection).
  • QueueConnectionFactory is an object that creates Connection objects (just the JDBC class DataSource does).
  • QueueSession is an object that represents a particular session between the client and the MOM server. QueueConnection creates a session object.
  • QueueSender is an object that actually sends messages.
  • QueueReceiver receives messages.
  • TopicPublisher publishes messages (it has a similar functionality to QueueSender).
  • TopicSubscriber is an object that receives messages (it has a similar functionality to QueueReceiver).
  • Topic is an object that is used in pub/sub mode to represent some important application event.
  • TopicPublisher publishes messages to a topic so the TopicSubscribers can subscribe to it.
  • Message is an object that serves as a wrapper to an application-specific object that can be placed in a JMS queue or published to a topic.

Types of Messages

Every message contains a header and optional body and has facilities for providing additional properties. The header contains the message identification (unique message ID, destination, type, et cetera). The optional properties can be set by a program to tag a message with application-specific data, for example UrgentOrder.

The optional body contains a message that has to be delivered. Following are the types of JMS messages that you can place in a message body. All these interfaces are inherited from javax.jms.Message.

  • TextMessage is an object that can contain any Java String.
  • ObjectMessage can hold any Serializable Java object.
  • BytesMessage holds an array of bytes.
  • StreamMessage has a stream of Java primitives.
  • MapMessage contains any key/value pairs, for example id=123.

How to Send a Message

Queues have to be preconfigured in MOM and their names must be known before a program can start sending messages. In the real world a MOM server administrator manages queues and other messaging artifacts. When Java developers get to know the queue parameters, they have a choice of either creating message objects (such as ConnectionFactory and Queue) programmatically every time they need to send or receive a message, or pre-creating objects bound to some kind of naming service (you’ll see examples in Lesson 31).

A program has to perform the following steps to send a message:

1. Create (or get from some naming server) a ConnectionFactory object.

2. Create a Connection object and call its start() method.

3. Create a Session object.

4. Create a Queue object.

5. Create a MessageProducer object.

6. Create one of the Message objects (such as TextMessage) and put some data in it.

7. Call the send() method on the QueueSender.

8. Close the QueueSender, Session, and Connection objects to release system resources.

Listing 30-1 shows a code fragment that sends a message to a queue called TestQueue. Note that the method createQueue() just creates an instance of the Queue object locally — the actual destination with the name TestQueue has to exist in MOM.

download.eps

Listing 30-1: Sending a message

       Session session=null;
       ConnectionFactory factory;
       QueueConnection connection=null;
       
       try{
            //The next two lines are specific to Open MQ. Another MOM
            // provider will have their own version of creating connection 
            factory = new com.sun.messaging.ConnectionFactory();  
            factory.setProperty(ConnectionConfiguration.imqAddressList, 
                                "mq://localhost:7677,mq://localhost:7677");
            
            connection = factory.createQueueConnection("admin","admin");
            
            connection.start();
 
            session = connection.createQueueSession( 
                                false, Session.AUTO_ACKNOWLEDGE);
            Queue ioQueue = session.createQueue("TestQueue");
            MessageProducer queueSender = session.createProducer(ioQueue);
            
            // Buy 200 shares of IBM at market price            
            TextMessage outMsg = session.createTextMessage("IBM 200 Mkt");
           
            queueSender.send(outMsg);
            queueSender.close();
            
            System.out.println(
"Successfully placed an order to purchase 200 shares of IBM");
          }
          catch (JMSException e){
                 System.out.println("Error: " + e.getMessage());
          } 
          finally{
             try{
                session.close();
                connection.close();
             } catch (Exception e) {
                System.out.println("Can't close JMS connection/session " + 
                                                            e.getMessage());
            }
          }      

The code in Listing 30-1 uses an Open MQ–specific implementation of JMS’s ConnectionFactory. After you read the section “Administering Objects in Open MQ” later in this lesson, you’ll understand why these lines refer to localhost and port 7677.

How to Receive a Message

The program that receives messages is called a message consumer. It can be a standalone Java program or a special type of enterprise Java bean called a message-driven bean (MDB). You can receive messages either synchronously, using the receive() method, or asynchronously by implementing the MessageListener interface and programming a callback onMessage().

Under the hood, the receive() method uses a polling mechanism that constantly asks for a message. It blocks program execution until the message is received or the specified time has expired:

QueueReceiver queueReceiver= Session.createReceiver(ioQueue);
Message myMessage = queueReceiver.receive(); 

The next line shows how to set a timeout interval of 500 milliseconds:

Message myMessage=queueReceiver.receive(500); 

Using an asynchronous callback onMessage() is the best way to receive messages because the message consumer is not sending multiple requests to MOM just to see if the message is in the queue. The callback method onMessage() will be called immediately when a message is put in the queue. The listener class must perform the following steps to receive messages:

1. Create (or get from some naming server) the QueueConnectionFactory object.

2. Create a Connection object and call its start() method.

3. Create a Session object.

4. Create a Queue object.

5. Create a QueueReceiver object.

6. If your class implements MessageListener (see Listing 30-2) write implementation for the callback method onMessage(). If you want to get messages synchronously, just call the QueueReceiver.receive() method. In this case implementation of the MessageListener interface is not needed.

7. Close the Session and Connection objects to release the system resources.

The sample class MyReceiver in Listing 30-2 shows how to consume messages from the TestQueue asynchronously. Its constructor creates JMS objects and registers itself as a message listener. The callback onMessage() has code for processing the received messages.

download.eps

Listing 30-2: Receiving a message

package com.practicaljava.lesson30;
 
import javax.jms.*;
import com.sun.messaging.ConnectionFactory;
import com.sun.messaging.ConnectionConfiguration;
 
public class MessageReceiver implements MessageListener{
 
      Session session=null;
       ConnectionFactory factory;
       QueueConnection connection=null;
       MessageConsumer consumer=null;
      
       MessageReceiver(){
             try{
              factory = new com.sun.messaging.ConnectionFactory();  
              factory.setProperty(ConnectionConfiguration.imqAddressList, 
                                "mq://localhost:7677,mq://localhost:7677");
              connection = factory.createQueueConnection("admin","admin");
                  
              connection.start();
 
              Session session = connection.createQueueSession( 
                                false, Session.AUTO_ACKNOWLEDGE);
 
             Queue ioQueue = session.createQueue( "TestQueue" );
 
             consumer = session.createConsumer(ioQueue);
             consumer.setMessageListener(this);
                  
             System.out.println("Listening to the TestQueue...");
                  
            // Don't finish -  wait for messages
            Thread.sleep(100000);
                
          } catch (InterruptedException e){
            System.out.println("Error: " + e.getMessage());
         }
        catch (JMSException e){
           System.out.println("Error: " + e.getMessage());
      } 
      finally{
         try{
                      connection.close();
         } catch (Exception e) {
              System.out.println("Can't close JMS connection/session " 
                                                         + e.getMessage());
         }
     }      
      
}
      
 public static void main(String[] args){
        new MessageReceiver();
}      
 
 public void onMessage(Message msg){
       String msgText;
       try{
           if (msg instanceof TextMessage){
                   msgText = ((TextMessage) msg).getText();
                  System.out.println("Got from the queue: " + msgText);
           }else{
              System.out.println("Got a non-text message");
           }
       }
       catch (JMSException e){
            System.out.println("Error while consuming a message: " + 
                                                      e.getMessage());
       } 
 }
}

In this code sample the onMessage() method checks to ensure that the arrived message is of the TextMessage type. All other types of messages can be ignored.

The message acknowledgment mode is defined when the Session object is created. The createSession() method has two arguments. If the first argument is true, the session is transacted, the value of the second argument is irrelevant, and the message can be either committed or rolled back by the consumer.

If the commit() method has been called, the message is removed from the queue. The rollback() method leaves the message in the queue. If the session is non-transacted, as in our earlier example, the second argument defines the acknowledgement mode.

  • AUTO_ACKNOWLEDGE mode sends the acknowledgment back as soon as the method onMessage() is successfully finished.
  • CLIENT_ACKNOWLEDGE mode requires explicit acknowledgment, such as msg.acknowledge(). (This grants permission to delete the message from the queue.)
  • DUP_OK_ACKNOWLEDGE mode is used in case the server fails; the same message may be delivered more than once. In some use cases it’s acceptable — for example, receiving a price quote twice won’t hurt.

If more than one message is processed by the same Session object, acknowledgement of one message affects all messages from the same session.

How to Publish a Message

Programs publish messages to topics, which should be created in advance by the MOM system administrator. Multiple subscribers can get messages published to the same topic (this is also known as one-to-many mode).

Message publishing is very similar to message sending, but the program should create a Topic instead of a Queue and a Publisher instead of a Sender, and the publish() method should be called instead of send()as in Listing 30-3:

download.eps

Listing 30-3: Publishing a message to a topic

...
TopicConnection connection = connectionFactory.createTopicConnection();
 
TopicSession pubSession = connection.createTopicSession(false,  
                                         Session.AUTO_ACKNOWLEDGE);
 
Topic myTopic = pubSession.createTopic ("Price_Drop_Alerts");
 
TopicPublisher publisher= pubSession.createPublisher(myTopic);
 
connection.start();
 
TextMessage message = pubSession.createTextMessage();
message.setText("Sale in Apple starts  tomorrow");
publisher.publish(message); 

How to Subscribe for a Topic

Subscribers can be durable or non-durable. Durable subscribers are guaranteed to receive their messages; they do not have to be active at the time a message arrives. Non-durable subscribers will receive only those messages that come when they are active. This is similar to the way chat rooms operate — you must be online to get messages.

The code snippet in Listing 30-4 creates a non-durable subscriber. Two modifications have to be made to this code to create a durable subscriber: The client ID has to be assigned to the connection — connection.setClientID(username) — and the createDurableSubscriber(topic) method should be used instead of createSubscriber(topic).

download.eps

Listing 30-4: Subscribing to a topic

TopicSession subSession =  
connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
 
subSession.createTopic("Price_Drop_Alerts");
 
TopicSubscriber subscriber = subSession.createSubscriber(topic);   
 
connection.start();
 
subscriber.setMessageListener(this);
 
public void onMessage(Message message) {
 
String msgText;
try{
    if (msg instanceof TextMessage){
            msgText = ((TextMessage) msg).getText();
           System.out.println("Got " + msgText);
    }else{
       System.out.println("Got a non-text message");
    }
}
catch (JMSException e){
     System.out.println("Error: " + e.getMessage());
} 
}

Message Selectors

If you have to share a queue with some other applications or developers from your team, use message selectors (also known as filters) to avoid “stealing” somebody else’s messages. For example:

String selector = "StoreName=Apple";
session.createReceiver(queue, selector);

In this case the queue listener will de-queue only those messages that have the String property StoreName with the value Apple. Message producers have to set this property:

TextMessage outMsg = session.createTextMessage();
outMsg.setText("Super sale starts tomorrow");  
outMsg.setStringProperty("StoreName", "Apple");

Remember that message selectors slow down the process of retrieval. The messages stay in a queue until the listener with the matching selector picks them up. Selectors really help if your team has a limited number of queues and everyone needs to receive messages without interfering with others. But if someone starts the queue listener without selectors, it just drains the queue.

Administering Objects in Open MQ

To test all code samples from this lesson you need a MOM provider that will transport your messages. I’ll be using the open-source MOM provider Open MQ. It’s well-known commercial-grade software. As a bonus, it comes with GlassFish. You already have it installed under your glassfishv3/mq directory. If you decide to use Open MQ with any other application server you can download it separately from https://mq.dev.java.net/.

As I stated earlier in this lesson, I’m not going to use Java EE server — the goal is to test standalone Java clients communicating with Open MQ directly, without the middlemen.

First, open a command (or Terminal) window to the glassfishv3/mq/bin directory, and start the Open MQ broker. In Mac OS I enter the following command (in Windows you’ll need to run imqbrokerd.exe):

./imqbrokerd -port 7677

You’ll see a prompt informing you that the broker is ready on port 7677. Now open another command window, change to the glassfishv3/mq/bin directory again, and start the admin GUI tool imqadmin to create the required messaging destinations:

./imqadmin

The window of the Open MQ administration console will open. Add a new broker and name it StockBroker, change the port to 7677, enter the password admin, and click OK. Figure 30-2 shows a snapshot of my screen after these steps.

Now connect to StockBroker (using the right-click menu) and create a new destination named TestQueue to match the queue name in the code sample from Listing 30-1. I didn’t change any settings in the pop-up window shown in Figure 30-3. Note the radio button on top that enables you to specify whether you need a queue or a topic.

The creation of administrated MOM objects (the TestQueue) is complete; now you’re ready to write and test the message sender and receiver.

Try It

The goal is to write two Java programs — one to send messages to the TestQueue and the other to receive them from the TestQueue.

Lesson Requirements

You should have Java and Open MQ installed. You’ll also need the JMS classes and their implementation by Open MQ — jms.jar and imq.jar.

note.ai

You can download the code and resources for this “Try It” section from the book’s web page at www.wrox.com. You can find them in the Lesson30 folder in the download.

Hints

Open two console views by clicking the little icon with the plus sign in the lower right of Eclipse to see output of the sender and receiver in different console views. You can also use the pin feature (click the icon with the pin image) to pin each console to a specific application. You can also detach the Eclipse console to display it as a separate window.

Step-by-Step

1. In Eclipse, go to Java perspective and select File New Create a New Java Project. Name it Lesson30 and click Next. Select the tab Libraries and click Add External JARs. Find jms.jar and imq.jar in glassfishv3/mq/lib and add them to the project. Click Finish. You’ve created a regular Java SE project with two extra jar files in its CLASSPATH. jms.jar contains the standard JMS classes and interfaces, and imq.jar has the Open MQ implementation of the JMS API.

2. Create a new class called MessageSender with a main()method, and add the import statements for all JMS classes and the Open MQ implementation of ConnectionFactory:

import javax.jms.*;
import com.sun.messaging.ConnectionFactory;
import com.sun.messaging.ConnectionConfiguration;

3. Enter the code from Listing 30-1 into the method main().

4. Compile and run the program. You should see the message “Successfully placed an order to purchase 200 shares of IBM.” At this point the message is located in the TestQueue configured under the Open MQ server. It’ll stay there until some program de-queues it from there.

5. Create another Java class called MessageReceiver to look like what is created by the code in Listing 30-2.

6. Run MessageReceiver. It’ll print on the console the message “Listening to the TestQueue” and will be retrieving all the messages from the queue. Thread.sleep() will keep this program running.

7. Observe that one console displays the messages of the producer application, and that the consumer properly receives the messages and printing confirmations in another console view.

cd.ai

Please select Lesson 30 on the DVD with the print book, or watch online at www.wrox.com/go/fainjava to view the video that accompanies this lesson.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.74.25