Simple Synchronous Receiver Example

The code for a simple synchronous receiver is very similar to that of the sender method already presented. There are two important differences.

The first one is obvious; a MessageConsumer is created instead of a MessageProducer. The Session.createConsumer() method throws a JMSException if the session fails to create a consumer, and an InvalidDestinationException if an invalid destination is specified.

The second difference is that this time there is a call to the connection's start() method, which starts (or restarts) delivery of incoming messages for this destination. Calling start() twice has no detrimental effect. It also has no effect on the connection's ability to send messages. The start() method may throw a JMSException if an internal error occurs. A consumer can use Connection.stop() to temporarily suspend delivery of messages.

The different consumer code is:

consumer = session.createConsumer(destination);
connection.start();

The message is obtained using the synchronous receive() method, as shown next. This may throw a JMSException:

Message msgBody = consumer.receive();
if (msgBody instanceof TextMessage) {
    String text = ((TextMessage) msgBody).getText();
}

If there is no message in the queue, the receive() method blocks until a message is available. There are two alternative versions of the receive() method:

  • receiveNoWait() Retrieves the next message available, or returns null if one is not immediately available

  • receive(long timeout) Retrieves the next message or returns null if no message is received within the timeout period (milliseconds)

Receive JMS Text Message Example

The code for the entire point-to-point receiver example is shown in Listing 9.2. You will find this code in the PTPReceiver.java file in the examples directory for Day 9 on the Web site.

Listing 9.2. Complete Code for PTPReceiver.java
import javax.naming.*;
import javax.jms.*;

public class PTPReceiver {

    private Connection connection;
    private Session session;
    private MessageConsumer consumer;

    public static void main(String[] args) {
        PTPReceiver receiver = null;
        try {
            receiver = new PTPReceiver();
            String textMsg;
            textMsg = receiver.consumeMessage();
            System.out.println ("Received: " + textMsg);
        }
        catch(Exception ex) {
             System.err.println("Exception in PTPReceiver: " + ex);
        }
        finally {
            try {receiver.close();} catch(Exception ex){}
        }
    }

    public PTPReceiver() throws JMSException, NamingException {
        Context context = new InitialContext();
        ConnectionFactory connectionFactory = (ConnectionFactory)context.lookup("jms
/QueueConnectionFactory");
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = (Destination)context.lookup("jms/firstQueue");
        consumer = session.createConsumer(destination);
        connection.start();
    }

    public String consumeMessage () throws JMSException {
        String text = null;
        Message msgBody = consumer.receive();
        if (msgBody instanceof TextMessage) {
            text = ((TextMessage) msgBody).getText();
        }
        else {
          text = msgBody.toString();
        }
        return text;
    }

    public void close() throws JMSException {
        connection.close();
    }
}

Run this program using the following asant command:

asant PTPReceiver

The receiver should print out the single message you sent using the PTPSender example. If you run the receiver again it will block until another message is sent. Try this by running PTPReceiver in one window and then running PTPSender from another window.

Asynchronous Messaging

For many applications, the synchronous mechanism, shown previously, is not suitable and an asynchronous technique is required. To implement this in JMS, you need to register an object that implements the MessageListener interface. The JMS provider invokes this object's onMessage() each time a message is available at the destination.

The consumer example will now be extended to support asynchronous messaging by implementing the MessageListener interface as follows:

public class PTPListener implements MessageListener {

The message listener is registered with a specific MessageConsumer by using the setMessageListener() method before calling the connection's start() method as follows:

consumer.setMessageListener(this);

Messages might be missed if you call start() before you register the message listener.

The MessageListener interface defines a single onMessage() method. The JMS provider calls your implementation of this method when it has a message to deliver. The following is an example onMessage() method:

public void onMessage(Message message) {
    try {
        if (message instanceof TextMessage) {
             String text = ((TextMessage) message).getText();
             System.out.println("Received: " + text);
        }
    }
    catch(JMSException ex) {
        System.err.println("Exception in OnMessage: " + ex);
    }
}

The onMessage() method should handle all exceptions. If onMessage() throws an exception, the signature will be altered and, therefore, not recognized.

An asynchronous receiver must be closed down cleanly. One common technique is to send a special closedown message and this approach is shown in the example in Listing 9.3.

Listing 9.3. Complete code for PTPListener.java
import javax.naming.*;
import javax.jms.*;

public class PTPListener implements MessageListener {

    private Connection connection;
    private Session session;
    private MessageConsumer consumer;

    public static void main(String[] args) {
        System.out.println ("Listener running");
        try {
            PTPListener receiver = new PTPListener();
          }
        catch(Exception ex) {
             System.err.println("Exception in PTPListener: " + ex);
        }
    }

    public PTPListener() throws JMSException, NamingException {
        try {
            Context context = new InitialContext();
            ConnectionFactory connectionFactory =
                (ConnectionFactory)context.lookup("jms/QueueConnectionFactory");
            Destination destination =
                (Destination)context.lookup("jms/firstQueue");
            connection = connectionFactory.createConnection();
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(this);
            connection.start();
        }
        catch (JMSException ex) {
            try {connection.close();} catch(Exception e){}
            throw ex;
        }
    }

    public void onMessage(Message message) {
        try {
            String text;
            if (message instanceof TextMessage) {
                text = ((TextMessage) message).getText();
            }
            else {
                text = message.toString();
            }
            System.out.println("Received: " + text);
            message.acknowledge();
            if (text.equals("Quit")) {
                System.out.println("PTPReceiver closing down");
                new Thread(new Runnable () {
                    public void run() {
                        try {
                            PTPListener.this.close();
                        }
                        catch(JMSException ex) {
                            System.err.println("Exception in Closer: " + ex);
                        }
                    }
                }).start();
            }
        }
        catch(JMSException ex) {
            System.err.println("Exception in OnMessage: " + ex);
        }
    }

    public void close() throws JMSException {
        connection.close();
    }
}

The are a number of points to examine in this code:

  • The constructor handles any JMS errors and closes down the connection. The exception is then thrown to the caller for processing.

  • In the constructor, to illustrate client acknowledgement of messages, the session is created with the CLIENT_ACKNOWLEDGE (rather then the AUTO_ACKNOWLEDGE). In the onMessage() method a call to Message.acknowledge() is now needed to acknowledge receipt of each message.

  • In onMessage() a new thread of execution is created to handle the connection closedown. If the Connection.close() method is called directly the listener will deadlock as the close() method cannot complete until the onMessage() method completes, which cannot complete until close() completes, which cannot complete until onMessage() completes and so on.

You can run this example using the command

asant PTPListener

Use the PTPSender example to send messages and then run the command

asant PTPQuit

to close down the listener. The following code fragment shows the shutdown code for the PTPQuit application

sender = new PTPSender();
sender.sendMessage("Quit");

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.222.239