Publish/Subscribe Messaging Example

Now, you will build a simple bulletin board application. For this example, the bulletin board publisher program will generate 10 simple messages. The subscriber will be a Swing application that will display the messages as they arrive.

The bulletin board used a topic called jms/bulletinBoard. This must be created using the Configure Installation screen in deploytool or using j2eeadmin as follows:

j2eeadmin –addJMSDestination jms/bulletinBoard topic

Bulletin Board Publisher

The same mechanism is used to create a topic as a queue, so Listing 9.3 should appear very similar to that in the point-to-point receiver example, except that all references to a queue are replaced with topic.

Listing 9.3. Bulletin Board Publisher Program
 1: import javax.naming.*;
 2: import javax.jms.*;
 3:
 4: public class BulletinBoardPublisher {
 5:     private TopicConnection topicConnection;
 6:     private TopicSession topicSession;
 7:     private TopicPublisher bulletinBoardPublisher;
 8:     private Topic bulletinBoard;
 9:
10:     public static void main(String[] args) {
11:         try {
12:             BulletinBoardPublisher publisher = new BulletinBoardPublisher
("TopicConnectionFactory","jms/bulletinBoard");
13:
14:             System.out.println ("Publisher is up and running");
15:
16:             for (int i = 0; i < 10; i++) {
17:                 String bulletin = "Bulletin Board Message number: " + i;
18:                 System.out.println (bulletin);
19:                 publisher.publishMessage(bulletin);
20:             }
21:             publisher.close();
22:         } catch(Exception ex) {
23:             System.err.println("Exception in BulletinBoardPublisher: " + ex);
24:         }
25:     }
26:
27:     public BulletinBoardPublisher(String JNDIconnectionFactory, String JNDItopic)
 throws JMSException, NamingException {
28:         Context context = new InitialContext();
29:         TopicConnectionFactory topicFactory = (TopicConnectionFactory)context.lookup
(JNDIconnectionFactory);
30:         topicConnection = topicFactory.createTopicConnection();
31:         topicSession = topicConnection.createTopicSession(false, Session
.AUTO_ACKNOWLEDGE);
32:         bulletinBoard = (Topic)context.lookup(JNDItopic);
33:         bulletinBoardPublisher = topicSession.createPublisher(bulletinBoard);
34:     }
35:
36:     public void publishMessage(String msg) throws JMSException {
37:         TextMessage message = topicSession.createTextMessage();
38:         message.setText(msg);
39:         bulletinBoardPublisher.publish(message);
40:     }
41:
42:     public void close() throws JMSException {
43:         bulletinBoardPublisher.close();
44:         topicSession.close();
45:         topicConnection.close();
46:     }
47: }
						

Run this program from the command line. This will check that this program runs okay, but remember that messages published to topics are not persistent. For the subscriber program to pick up the messages, you will need to run this program again while the subscriber is running.

Bulletin Board Subscriber

The subscriber is a Swing application that outputs the bulletins as they arrive (see Listing 9.4).

Remember that for this program to receive the bulletins, it must be running when they are published.

Listing 9.4. Bulletin Board Subscriber Program
 1: import javax.naming.*;
 2: import javax.jms.*;
 3: import java.io.*;
 4: import javax.swing.*;
 5: import java.awt.*;
 6: import java.awt.event.*;
 7: public class BulletinBoardSubscriber extends JFrame implements MessageListener {
 8:     private TopicConnection topicConnection;
 9:     private TopicSession topicSession;
10:     private TopicSubscriber bulletinBoardSubscriber;
11:     private Topic bulletinBoard;
12:     private JTextArea textArea = new JTextArea(4,32);
13:
14:     public static void main(String[] args) {
15:         try {
16:             final BulletinBoardSubscriber subscriber = new BulletinBoardSubscriber
("jms/TopicConnectionFactory","jms/bulletinBoard");
17:             subscriber.addWindowListener(new WindowAdapter() {
18:                 public void windowClosing(WindowEvent ev) {
19:                     try {
20:                         subscriber.close();
21:                     } catch(Exception ex) {
22:                         System.err.println("Exception in BulletinBoardSubscriber: " + ex);
23:                     }
24:                     subscriber.dispose();
25:                     System.exit(0);
26:                 }
27:             } );
28:             subscriber.setSize(500,400);
29:             subscriber.setVisible(true);
30:         } catch(Exception ex) {
31:             System.err.println("Exception in BulletinBoardSubscriber: " + ex);
32:         }
33:     }
34:
35:     public BulletinBoardSubscriber(String JNDIconnectionFactory, String JNDItopic)
 throws JMSException, NamingException {
36:         super (JNDIconnectionFactory+":"+JNDItopic);
37:         getContentPane().add(new JScrollPane(textArea));
38:         Context context = new InitialContext();
39:         TopicConnectionFactory topicFactory = (TopicConnectionFactory)context.lookup
(JNDIconnectionFactory);
40:         topicConnection = topicFactory.createTopicConnection();
41:         topicSession = topicConnection.createTopicSession(false, Session
.AUTO_ACKNOWLEDGE);
42:         bulletinBoard = (Topic)context.lookup(JNDItopic);
43:         bulletinBoardSubscriber = topicSession.createSubscriber(bulletinBoard);
44:         bulletinBoardSubscriber.setMessageListener(this);
45:         topicConnection.start();
46:     }
47:
48:     public void onMessage(Message message) {
49:         try {
50:             if (message instanceof TextMessage) {
51:                 String bulletin = ((TextMessage) message).getText();
52:                 String text = textArea.getText();
53:                 textArea.setText(text+"
"+bulletin);
54:             }
55:         } catch(JMSException ex) {
56:              System.err.println("Exception in BulletinBoardSubscriber:OnMessage: " + ex);
57:         }
58:     }
59:
60:
61:     public void close() throws JMSException {
62:         bulletinBoardSubscriber.close();
63:         topicSession.close();
64:         topicConnection.close();
65:     }
66: }
						

When you run this program from the command line, a small window will appear. Any messages published to the bulletin board topic while the program is running will appear in this window.

Creating Durable Subscriptions

When you run the bulletin board example, you will have seen that you need to get the timing right and that the subscriber can miss bulletins if it is not running when they are sent. This is because the TopicSession.createSubscriber() method creates a non-durable subscriber. A non-durable subscriber can only receive messages that are published while it is active.

To get around this restriction, the JMS API provides a TopicSession.createDurableSubscriber() method. With a durable subscription, the JMS provider stores the messages published to the topic, just as it would store messages sent to a queue.

Figure 9.8 shows diagrammatically how messages are consumed with non-durable and durable subscriptions when the subscriber is inactive during the period when messages are published.

Figure 9.8. Non-durable and durable subscriptions.


To create a durable subscription, you must associate a connection factory with a defined user and use this factory to create the connection. You will be shown how to create users on day 15, “Security,” but for now, you can use the user guest that has been set up for you.

Use the following command to associate a connection factory with the user guest:

j2eeadmin -addJmsFactory jms/DurableTopic topic -props clientID=guest

or you can add the factory using the Configure Installation screen in install tool. Select Connection Factories in the panel on the left. Add the new factory, jms/DurableTopic, to the panel on the top right and add ClientID as the Property Name with Value of guest in the panel at bottom right.

After using this connection factory to create the connection and session, you call the createDurableSubscriber method with two arguments, the topic and the subscription ID string that specifies the name of the subscription:

String subID = "DurableBulletins";
TopicSubscriber topicSubscriber = topicSession.createDurableSubscriber(bulletinBoard, subID);

Messages are then read from the topic as normal. To temporarily stop receiving messages, you simply close the subscriber.

topicSubscriber.close();

Messages are now stored by the JMS provider until the subscription is reactivated with another call to createDurableSubscriber() with the same subscription ID.

A subscriber can permanently stop receiving messages by unsubscribing a durable subscription with the unsubscribe() method. You first need to close the subscriber.

topicSubscriber.close();
topicSession.unsubscribe(subID);

If you make these changes to the bulletin board subscriber program, you will not initially notice any difference in operation. The distinction becomes apparent if you close the subscriber program and run the publisher. Now when you start up the durable subscriber once more, you will receive messages sent to the bulletin board while the program was not running.

Additional JMS Features

The following sections cover some additional features available in JMS. Not all the features of JMS are covered, and you should refer to the JMS API specification for more information.

Message Selectors

So far, you have received all the messages sent. The JMS API provides support for filtering received messages. This is accomplished by using a message selector. The createReceiver and the two forms of createSubscriber (durable and nondurable) methods all have a signature that allows a message selector to be specified.

The message selector is a string containing an SQL conditional expression. Only message header values and properties can be specified in the message selector. Sadly, it is not possible to filter messages on the basis of the contents of the message body.

String highPriority = "JMSPriority = '9' AND topic = 'Java'";
bulletinBoardSubscriber = topicSession.createSubscriber(bulletinBoard, highPriority, false);

This selector will ensure that only priority nine messages are received. Note here that topic is a property of the message that has been created and set by the sender.

Notice that for this form of the createSubscriber the parameters are as follows:

TopicSession.createSubscriber(topic, messageSelector, noLocal);

You need to set the noLocal parameter to specify whether you want to receive messages created by your own connection. Set noLocal to false to prevent the delivery of messages created by the subscriber's own connection.

Session Acknowledgement Modes

In the examples given so far, auto acknowledgement has been used to send the acknowledgement automatically as soon as the message is received. This has the advantage of removing the burden of acknowledging messages from you, but it has the disadvantage that if your application fails before the message is processed, the message may be lost. After a message is acknowledged, the JMS provider will never redeliver it.

Deferring acknowledgement until after you have processed the message will protect against loss of data. To do this, the session must be created with client acknowledgement.

queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

Now when the message is received, no acknowledgement will be sent automatically. It is up to you to ensure that the message is acknowledged at some later point.

message = (TextMessage) queueReceiver.receive();
// process the message
message.acknowledge();

If you do not acknowledge the message, it may be resent.

A third acknowledgement mode, DUPS_OK_ACKNOWLEDGE, can be used when the delivery of duplicates can be tolerated. This is a form of AUTO_ACKNOWLEDGE that has the advantage of reducing the session overhead spent preventing the delivery of duplicate messages.

Message Persistence

The default JMS delivery mode for a message is PERSISTENT. This ensures that the message will be delivered, even if the JMS provider fails or is shut down.

A second delivery mode, NON_PERSISTENT, can be used where guaranteed delivery is not required. A NON_PERSISTENT message has the lowest overhead because the JMS provider does not need to copy the message to a stable storage medium. JMS still guarantees to deliver a NON_PERSISTENT message at most once (but maybe not at all). Nonpersistent messages should be used when:

  • Performance is important and reliability is not

  • Messages can be lost with no effect on system functionality

Persistent and non-persistent messages can be delivered to the same destination.

Transactions

Often, acknowledgement of single messages is not enough to ensure the integrity of an application. Think of a banking system where two messages are sent to debit an amount from one account and credit the same amount to another. If only one of the messages is received, there will be a problem. A transaction is required where a number of operations involving many messages forms an atomic piece of work.

In JMS, you can specify that a session is transacted when a session queue or topic is created:

createQueueSession(boolean transacted, int acknowledgeMode)
createTopicSession(boolean transacted, int acknowledgeMode)

In a transacted session, several sends and receives are grouped together in a single transaction. The JMS API provides Session.commit() to acknowledge all the messages in a transaction and Session.rollback() to discard all messages. After a rollback, the messages will be redelivered unless they have expired.

To create a transacted queue session, set the transacted parameter to true, as shown in the following:

topicSession = topicConnection.createTopicSession(true, 0);

For transacted sessions, the acknowledgeMode parameter is ignored. The previous code sets this parameter to 0 to make this fact explicit.

There is no explicit transaction start. The contents of a transaction are simply those messages that have been produced and consumed during the current session, either since the session was created or since the last commit(). After a commit() or rollback(), a new transaction is started.

Note

Because the commit() and rollback() methods are associated with a session, it is not possible to mix messages from queues and topics in the same transaction.


The following example shows a simple transaction involving two messages.

queueSession = queueConnection.createQueueSession(true, 0);
Queue bank1Queue = (Queue)context.lookup("queue/FirstUSA");
Queue bank2Queue = (Queue)context.lookup("queue/ArabBank");
bank1QueueSender = queueSession.createSender(bank1Queue);
bank2QueueSender = queueSession.createSender(bank2Queue);
// .. application processing to create debit and credit messages
try {
    bank1QueueSender.send(bank1Queue, debitMsg);
    bank2QueueSender.send(bank2Queue, creditMsg);
    queueSession.commit();
} catch(JMSException ex) {
    System.err.println("Exception in bank transaction:" + ex);
    queueSession.rollback();
}

Where a receiver handles atomic actions sent in multiple messages, it should similarly only commit when all the messages have been received and processed.

XA support

A JMS provider may provide support for distributed transaction using the X/Open XA resource interface. This is performed by utilizing the Java Transaction API (JTA). JTA was covered on Day 8, “Transactions and Persistence”. XA support is optional; refer to your JMS provider documentation to see if XA support is provided.

Multithreading

Not all the objects in JMS support concurrent use. The JMS API only specifies that the following objects can be shared by across multiple threads

  • Connection Factories

  • Connections

  • Destinations

Many threads in the same client may share these objects, whereas the following:

  • Sessions

  • Message Producers

  • Message Consumers

can only be accessed by one thread at a time. The restriction on single-threaded sessions reduces the complexity required by the JMS provider to support transactions.

Session concurrency can be implemented within a multithreaded client by creating multiple sessions.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.227.72.212