Publish/Subscribe Messaging Example

Now, you will build a simple bulletin board application. For this example, the bulletin board publisher program will generate 10 simple messages. The subscriber will be a Swing application that will display the messages as they arrive.

The bulletin board uses a topic called jms/bulletinBoard created along with the other JMS resources at the start of today's examples.

Bulletin Board Publisher

The same mechanism is used to create a topic as you used to create a queue, so Listing 9.4 should appear very similar to the previous point-to-point sender example.

Listing 9.4. Bulletin Board Publisher BBPublisher.java
import javax.jms.*;

public class BBPublisher {

    private Connection connection;
    private Session session;
    private MessageProducer producer;

    public static void main(String[] args) {
        BBPublisher publisher = null;
        try {
            publisher = new BBPublisher("jms/TopicConnectionFactory","jms/bulletinBoard");
            System.out.println ("Publisher is up and running");
            for (int i = 0; i < 10; i++) {
                String bulletin = "Bulletin Board Message number: " + i;
                System.out.println (bulletin);
                publisher.sendMessage(bulletin);
            }
        }
        catch(Exception ex) {
             System.err.println("Exception in BulletinBoardPublisher: " + ex);
        }
        finally {
            try {publisher.close();} catch(Exception ex){}
        }
    }

    public BBPublisher(String JNDIconnectionFactory, String JNDItopic) throws JMSException
, NamingException {
        Context context = new InitialContext();
        ConnectionFactory connectionFactory = (ConnectionFactory)context.lookup
(JNDIconnectionFactory);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination bulletinBoard = (Destination)context.lookup(JNDItopic);
        producer = session.createProducer(bulletinBoard);
    }

    public void sendMessage(String msg) throws JMSException {
        TextMessage message = session.createTextMessage();
        message.setText(msg);
        producer.send(message);
    }

    public void close() throws JMSException {
        connection.close();
    }
}

In fact if you modify the PTPSender program to use the jms/TopicConnectionFactory and jms/bulletinBoard topic rather than the jms/QueueConnectionFactory and jms/firstQueue resources you can send individual messages. The program BBSender.java on the accompanying Web site is a modified version of PTPSender.

Run the Bulletin Board publisher program using

asant BBPublisher

This will check that JMS resources are configured correctly and that the program runs okay, but remember that messages published to topics are not persistent. For the subscriber program (which you are about to see) to pick up the messages, you will need to run the publisher program again while the subscriber is running.

Bulletin Board Subscriber

The subscriber is a Swing application that outputs the bulletins as they arrive (see Listing 9.5).

Remember that for this program to receive the bulletins, it must be running when they are published.

Listing 9.5. Bulletin Board Subscriber BBSubscriber.java
import javax.naming.*;
import javax.jms.*;
import java.io.*;
import javax.swing.*;
import java.awt.*;
import java.awt.event.*;

public class BBSubscriber extends JFrame implements MessageListener {

    private Connection connection;
    private Session session;
    private MessageConsumer consumer;
    private JTextArea textArea = new JTextArea(4,32);

    public static void main(String[] args) {
        BBSubscriber subscriber = null;
        try {
            subscriber = new BBSubscriber("jms/TopicConnectionFactory","jms/bulletinBoard");
        }
        catch(Exception ex) {
            System.err.println("Exception in BulletinBoardSubscriber: " + ex);
            try {subscriber.close();} catch(Exception e){}
        }
        show (subscriber);
    }

    private static void show(final BBSubscriber subscriber) {
        subscriber.addWindowListener(new WindowAdapter() {
            public void windowClosing(WindowEvent ev) {
                try {
                    subscriber.close();
                } catch(Exception ex) {
                    System.err.println("Exception in BulletinBoardSubscriber: " + ex);
                }
                subscriber.dispose();
                System.exit(0);
            }
        });
        subscriber.setSize(500,400);
        subscriber.setVisible(true);
    }

    public BBSubscriber(String JNDIconnectionFactory, String JNDItopic) throws
 JMSException, NamingException {
        super (JNDIconnectionFactory+":"+JNDItopic);
        getContentPane().add(new JScrollPane(textArea));
        Context context = new InitialContext();
        ConnectionFactory connectionFactory = (ConnectionFactory)context.lookup
(JNDIconnectionFactory);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination bulletinBoard = (Destination)context.lookup(JNDItopic);
        consumer = session.createConsumer(bulletinBoard);
        consumer.setMessageListener(this);
        connection.start();
    }

    public void onMessage(Message message) {
        System.out.println("Message "+message);
        try {
            if (message instanceof TextMessage) {
                String bulletin = ((TextMessage) message).getText();
                String text = textArea.getText();
                textArea.setText(text+"
"+bulletin);
            }
        } catch(JMSException ex) {
             System.err.println("Exception in BulletinBoardSubscriber:OnMessage: " + ex);
        }
    }

    public void close() throws JMSException {
        connection.close();
    }
}

When you run this program from the command line, a small window will appear. Any messages published to the bulletin board topic while the program is running will appear in this window. You can run this program using the command

asant BBSubscriber

Creating Durable Subscriptions

When you run the bulletin board example, you will have seen that you need to synchronize the publisher and subscriber and that the subscriber can miss bulletins if it is not running when they are sent. Bulletins are missed because the Session.createConsumer() method creates a non-durable subscriber. A non-durable subscriber can only receive messages that are published while it is active.

To get around this restriction, the JMS API provides a Session.createDurableSubscriber() method. With a durable subscription, the JMS provider stores the messages published to the topic, just as it would store messages sent to a queue.

Figure 9.6 shows diagrammatically how messages are consumed with non-durable and durable subscriptions when the subscriber is inactive during the period when messages are published.

Figure 9.6. Non-durable and durable subscriptions.


Only messages sent after a subscriber registers a durable subscription are stored. The mechanism for registering a durable subscriber may vary from one implementation of a Message Service to another.

The J2EE RI durable subscribers are created by associating a TopicConnectionFactory with a client ID. The subscription is registered the first time the client connects to a Topic using the durable TopicConnectionFactory and the associated client ID. Effectively, this means that a J2EE RI durable subscriber application must be run once to register the subscriber before any messages will be made durable for that client ID.

The JMS resources created at the start of these examples included a TopicConnectionFactory called jms/DurableTopicConnectionFactory associated with the client ID BBS. The example program BBDurable.java is a variation of the BBSubscriber.java program shown in Listing 9.5. The only changes are in obtaining the connection factory:

ConnectionFactory connectionFactory =
     (ConnectionFactory)context.lookup("jms/DurableTopicConnectionFactory");

and in creating (and registering) the subscription:

Topic bulletinBoard = (Topic)context.lookup("jms/bulletinBoard");
subscriber = session.createDurableSubscriber(bulletinBoard,"BBS");

To register the durable subscriber, run the program once and then close down the Swing window:

asant BBDurable

Now send some messages to the topic using the publisher program from Listing 9.4:

asant BBPublish

Finally run the subscriber once more to see the messages posted while the subscriber was not connected:

asant BBDurable

A subscriber can permanently stop receiving messages by unsubscribing a durable subscription with the unsubscribe() method after closing the subscriber.

subscriber.close();
session.unsubscribe("BBS");

Messages will no longer be stored for this subscription.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.113.163