Integrating with messaging queues

As soon as your system and application landscape widens, you try to create components which are independent of each other. One of the first steps is to create an asynchronous messaging architecture. This allows decoupling of components, but more importantly, you can also be implementation-independent by simply using standards of data exchange. There exist messaging standards such as AMQP or STOMP, which can be read by any application. The most well known, defined API for messaging in the Java world is JMS, the Java Message Service.

In this example Apache ActiveMQ will be used to let an arbitrary amount of Play instances communicate with each other. The communication will be done via a publish-subscribe mechanism which is called a topic in JMS terms. In order to get a short overview, there are two mechanisms, which could be chosen for this task. First a producer-consumer pattern, second a publish-subscribe pattern. The producer-consumer pattern however requires not more than one listener which is an often needed use-case, but not feasible in this example. Think a little bit, as to which of these patterns you really need for your application.

In case you are asking why another technology is needed for this, it is because a distributed cache can possibly be accessed already by all Play nodes. This is right, but you would have to implement a reliable publish-subscribe infrastructure yourself using memcached. This is quite a lot of work and actually the wrong tool for this task.

The source code of the example is available at examples/chapter6/messaging.

Getting ready

In order to understand what happens in this example you should have taken a look at the chat example application in the samples-and-tests directory of your Play installation. This example will change the functionality of the websocket implementation of the chat application. Websockets are a new standard defined by W3C in order to support long running connections and to be able to communicate in both directions, from client to server and from server to client, which is not possible with pure HTTP. This is one of the biggest drawbacks in regular HTTP communication. Basically, the chat application takes a message from one user and directs it to all others currently connected.

This works as long as only one application is running. As soon as more than one Play node is running, you need communication between these nodes. Every node has to:

  • Publish a message, whenever an action at this node happens
  • Receive messages at any time and feed them to their connected users

This sounds pretty simple with a publish-subscribe architecture. Every node subscribes and publishes to the same queue and whenever a received message on this queue has not been sent by the node itself, it has to be sent to all connected users on the node.

You need a running installation of Apache ActiveMQ. The assumption in this example is always a running ActiveMQ system on localhost. Installation of ActiveMQ is straightforward; you just grab a copy at http://activemq.apache.org/ and run bin/activemq start. Either an error message is written out on the console including the command you should execute next, or a process ID along with the message that ActiveMQ was started. You can check at http://localhost:8161 whether your ActiveMQ instance is running.

The next step is to write a unit test, which emulates the behavior of a second node. The unit test checks whether the plugin sends messages to ActiveMQ and also whether it correctly receives messages, when other nodes are sending messages:

public class ActiveMqTest extends UnitTest {

   private TopicConnection receiveConnection;
   private TopicSession receiveSession;
   private Topic receiveTopic;
   private TopicSubscriber receiveSubscriber;
   private TopicConnection sendingConnection;
   private TopicSession sendingSession;
   private Topic sendingTopic;
   private TopicPublisher sendingPublisher;

   @Before
   public void initialize() throws Exception {
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
            ActiveMQConnection.DEFAULT_USER,
            ActiveMQConnection.DEFAULT_PASSWORD,
            ActiveMQConnection.DEFAULT_BROKER_URL);

      // removed code to create a connection for the subscriber and the publisher
      ...
      receiveSubscriber = receiveSession.createSubscriber(receiveTopic);
      sendingPublisher = sendingSession.createPublisher(sendingTopic);

      ChatRoom.clean();
   }
   
   @After
   public void shutdown() throws Exception {
      // removed: closing all connections      
      ...
   }

   @Test
   public void assertThatPluginSendsMessages() throws Exception {
      assertEquals(0, ChatRoom.get().archive().size());
      Event event = new Join("user");
      ChatRoom.get().publish(event);

    
  // Check chatroom
      int currentEventCount = ChatRoom.get().archive().size();
      assertEquals(1, currentEventCount);
      
      // Check for messages
      Message msg = receiveSubscriber.receive(2000);
      Event evt = (ChatRoom.Event) ((ObjectMessage) msg).getObject();
      assertEquals("join", evt.type);
   }
   
   @Test
   public void assertThatPluginReceivesMessages() throws Exception {
      assertEquals(0, ChatRoom.get().archive().size());
      
      // Send event via JMS
      Event event = new ChatRoom.Message("alex", "cool here");
      ObjectMessage objMsg = sendingSession.createObjectMessage(event);
      sendingPublisher.publish(objMsg);
      
      Thread.sleep(1000); // short sleep to make sure the content arrives
      assertEquals(1, ChatRoom.get().archive().size());
   }
   
}

As you can see, there are two tests. The first one publishes a local event like a web request would do via ChatRoom.get().publish(event). It then waits for a subscriber a maximum of two seconds to check whether the message has been published via ActiveMQ.

The second test fakes the existence of another node by sending a message directly via the publisher and checks whether the ChatRoom archive field now contains any content. In order to make sure the local archive of events is empty, ChatRoom.clean() is invoked in initialize() method.

How to do it...

The ActiveMQ plugin luckily only consists of one class and one interface. The first class is the plugin itself, which starts the JMS connection on application start and frees all resources when the application is stopped:

public class ActiveMqPlugin extends PlayPlugin {

   private ActiveMQConnectionFactory connectionFactory;
   
   private TopicConnection sendingConnection;
   private TopicSession sendingSession;
   private Topic sendingTopic;
   private TopicPublisher sendingPublisher;
   
   private TopicConnection receiveConnection;
   private TopicSession receiveSession;
   private Topic receiveTopic;
   private TopicSubscriber receiveSubscriber;

   private static final String uuid = UUID.randomUUID().toString();

   public void onApplicationStart() {
      Logger.info("ActiveMQ Plugin started");
      try {
         List<Class> jobClasses = new ArrayList<Class>();
         for (ApplicationClass applicationClass : Play.classes.getAssignableClasses(ActiveMqJob.class)) {
            if (Job.class.isAssignableFrom(applicationClass.javaClass)) {
               jobClasses.add(applicationClass.javaClass);
            }
         }
         MessageListener listener = new ActiveMqConsumer(jobClasses);

         // setting up all the topic specific variables
         ... // removed to save space
      } catch (Exception e) {
         Logger.error(e, "Could not start activemq broker");
      }
   }

   public void onApplicationStop() {
      Logger.info("Stopping activemq connections");
      try {
         // closing all activemq specific connections
         ... // removed to save space
      } catch (JMSException e) {
         Logger.error(e, "Problem closing connection");
      }
   }

The onEvent() method listens for all messages beginning with chatEvent and sends them via ActiveMQ if possible. The inner ActiveMQConsumer class handles incoming messages, extracts data out of the message, and executes a job which sets the received data in the application:

   @Override
   public void onEvent(String message, Object context) {
      if ("chatEvent".equals(message) && context instanceof Serializable) {
         Serializable event = (Serializable) context;
         try {
            ObjectMessage objMsg = sendingSession.createObjectMessage(event);
            objMsg.setStringProperty("hostId", uuid);
            sendingPublisher.publish(objMsg);
            Logger.info("Sent event to queue: %s", ToStringBuilder.reflectionToString(event));
         } catch (Exception e) {
            Logger.error(e, "Could not publish message");
         }
      }
   }

   public static class ActiveMqConsumer implements MessageListener {

    
  private List<Class> jobs;

      public ActiveMqConsumer(List<Class> jobs) {
         this.jobs = jobs;
      }
      
      @Override
      public void onMessage(Message message) {
         try {
            if (message instanceof ObjectMessage) {
               ObjectMessage objectMessage = (ObjectMessage) message;
               if (uuid.equals(objectMessage.getStringProperty("hostId"))) {
                  Logger.debug("Ignoring activemq event because it came from own host");
                  return;
               }
               Serializable event = (Serializable) objectMessage.getObject();
               Logger.debug("Received activemq event to plugin: %s", ToStringBuilder.reflectionToString(event));
               for (Class jobClass : jobs) {
                  Job job = (Job) jobClass.newInstance();
                  job.getClass().getField("serializable").set(job, event);
                  job.now().get(); // run in sync
                  
               }
            }
         } catch (Exception e) {
            Logger.error(e, "Error getting message");
         }
      }
   }
}

A small interface definition is needed to mark jobs, which should be executed upon incoming messages:

public interface ActiveMqJob {
   public void setSerializable(Serializable serializable);
   public Serializable getSerializable();
}

After adding a play.plugins file for the ActiveMQ plugin the module can now be built and added to your copy of the chat application.

The next change is regarding the ChatRoom class in the chat application. You need to add three methods:

public void publish(Serializable event) {
       PlayPlugin.postEvent("chatEvent", event);
       chatEvents.publish((Event) event);
}
    
public void publishWithoutPluginNotification(Serializable event) {
       chatEvents.publish((Event) event);
}
    
public static void clean() {
       instance = null;
}

Apart from that you need to make the Chatroom.Event class serializable so that it can be transmitted over the network. The last step is to change the three methods, which indicate a chat room join or leave or a text message sent by a user. You have to use the publish method inside each of these methods:

    public EventStream<ChatRoom.Event> join(String user) {
          publish(new Join(user));
        return chatEvents.eventStream();
    }
    
    public void leave(String user) {
        publish(new Leave(user));
    }
    
    public void say(String user, String text) {
        if(text == null || text.trim().equals("")) {
            return;
        }
        publish(new Message(user, text));
    }

The application needs to implement a special job, which implements the interface that we defined some time back:

public class UpdateChatroomJob extends Job implements ActiveMqJob {

   private Serializable serializable;

   public Serializable getSerializable() {
      return serializable;
   }
   
   public void setSerializable(Serializable serializable) {
      this.serializable = serializable;
   }
   
   public void doJob() {
      if (serializable != null) {   ChatRoom.get().publishWithoutPluginNotification(serializable);
      }
   }

}

How it works...

After making your changes you have two possibilities to test your code. First, start up your application and run the tests. Second, copy the application directory to a second directory, change the http.port variable to some arbitrary port, and remove the %test.db=mem entry in case you are running in test mode. Otherwise, the H2 databases will conflict when trying to bind to port 8082. After both applications are started, connect to both in a browser, choose websocket for chat connection, and check whether you can see the messages of the other user, even though both are connected to two different web applications.

So, after checking if it works, it is time to explain it in a little detail. The simplest part in the plugin is the interface definition. You are expected to have an arbitrary amount of jobs implementing this interface. The job implementation makes sure that the serialized object representing the content of the message received via the subscriber gets to its endpoint, which is the ChatRoom in the implementation. In order to make sure it does not create any loop or again sends this message to the ActiveMQ topic, the implementation invokes the publishWithoutPluginNotification() method. Otherwise content might be duplicated.

The plugin itself has a lot of code to create and initialize the connection to the broker. Broker is an alternative name for the messaging endpoint. The onApplicationStart() method collects all classes which implement the ActiveMqJob interface and are assigned from a job. The list of these classes is handed over to the internally defined ActiveMqConsumer class, which represents the code executed on each incoming message. This class checks if the incoming message was not sent from this host, as in this case the message has already been spread locally. Then it loops through the list of jobs and executes each of them, after setting the serializable field in order to hand over the incoming message to the job. This is not really a better solution, but it works in this case.

The remaining part is the sending of messages, so that all other nodes can process them. As the ChatRoom class has been changed to always post a plugin event upon receiving a new chatroom specific event, the plugin checks for such events and sends chatroom specific events over the wire. Before sending an event a UUID gets attached as a property, which allows checking of whether an incoming message has been sent from this host.

As you can see, integrating messaging in your application is not too complex. However, keeping it fast and closely adapted to your needs might be complex. A message based architecture helps you to scale your systems, as you can decouple presentation from backend logic and scale your systems differently.

There's more...

Messaging is quite a complex topic. There are many more implementations and protocol standards other than those provided by ActiveMQ, so it's highly possible that you are already using a similar software.

More info about AMQP

In order to be more independent of an API you should try AMQP implementing services such as RabbitMQ or ZeroMQ. They provide a standard protocol, which can be used in any programming language. If you need speed in delivery, these might be faster than old fashioned JMS services. Find more information about AMQP at http://www.amqp.org/, about RabbitMQ at http://www.rabbitmq.com/, and about ZeroMQ at http://www.zeromq.org/.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.61.170