Let's say that we want to inform a bunch of applications when a new course is added. Such use cases can be best implemented by a JMS topic. A topic can have many subscribers. When a message is added to the topic, all subscribers are sent the same message. This is unlike a queue, where only one queue listener gets a message.
Steps to publish messages to a topic and subscribe for messages are very similar to those for a queue, except for the different classes, and in some cases, different method names.
Let's implement a topic publisher, which we will use when the message for adding a course is successfully handled in the onMessage method of the listener class implemented in CourseQueueReceiver.
Create CourseTopicPublisher in the packt.jee.eclipse.jms package with the following content:
package packt.jee.eclipse.jms; //skipped imports public class CourseTopicPublisher { private TopicConnection connection; private TopicSession session; private Topic topic; public CourseTopicPublisher() throws Exception { InitialContext initCtx = new InitialContext(); TopicConnectionFactory connectionFactory =
(TopicConnectionFactory)initCtx. lookup("jms/CourseManagemenCF"); connection = connectionFactory.createTopicConnection(); connection.start(); session = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE); topic = (Topic)initCtx.lookup("jms/courseManagementTopic"); } public void close() { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace();. } } } public void publishAddCourseMessage (CourseDTO course) throws
Exception { TopicPublisher sender = session.createPublisher(topic); ObjectMessage objMessage =
session.createObjectMessage(course); sender.send(objMessage); } }
The code is quite simple and self-explanatory. Let's now modify the queue receiver class that we implemented, CourseQueueReceiver, to publish a message to the topic from the onMessage method, after the message from the queue is handled successfully:
public class CourseQueueReceiver { private CourseTopicPublisher topicPublisher; public CourseQueueReceiver(String name) throws Exception{ //code to lookup connection factory, create session, //and look up queue remains unchanged. Skipping this code //create topic publisher topicPublisher = new CourseTopicPublisher(); QueueReceiver receiver = session.createReceiver(queue); //register message listener receiver.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //we expect ObjectMessage here; of type CourseDTO //Skipping validation try { //code to process message is unchanged. Skipping it //publish message to topic if (topicPublisher != null) topicPublisher.publishAddCourseMessage(course); } catch (Exception e) { e.printStackTrace(); //TODO: handle and log exception } } }); } //remaining code is unchanged. Skipping it }