21. Getting Started with JMS in Clojure

Messaging is used for data sources such as a news ticker. Messaging is also used in high availability architectures to distribute work across many nodes. It is useful when there is an asynchronous control flow. The Oracle standard for messaging is the JMS interface. We’re going to use it in Clojure to build a simple message flow of time-based information.

In this recipe we’ll create a new Clojure project and create a JMS queue using a Hornet library. Then we’ll pump messages onto the queue and see them appear in our output.

Assumptions

In this chapter we assume that either you’re familiar with the concept of a JMS message or you are at least willing to look it up.

Benefits

The benefit of this chapter is being able to integrate with existing JMS messaging infrastructure as a producer or a consumer. It is also relevant to real-time flows of information in Storm.

The Recipe—Code

Follow these steps:

1. Create a new Leiningen project news-jms in your projects directory, and change to that directory.

lein new news-jms
cd news-jms

2. Next, modify the project.clj so it looks like this:

 (defproject news-jms "0.1.0-SNAPSHOT"
  :dependencies [[org.clojure/clojure "1.7.0-beta2"]
                       [org.hornetq/hornetq-core "2.2.2.Final"]
                       [org.hornetq/hornetq-jms "2.2.2.Final"]
                       [org.jboss.netty/netty "3.2.0.BETA1"]
                       [org.jboss.javaee/jboss-jms-api "1.1.0.GA"]]
  :main news-jms.core)

3. Next, modify the file src/news_jms/core.clj to include the following:

(ns news-jms.core
  (:gen-class)
    (:require [news-jms.send-messages :as sender]
              [news-jms.receive-messages :as receiver])
    (:import (org.hornetq.core.config.impl ConfigurationImpl)
             (org.hornetq.api.core TransportConfiguration)
             (org.hornetq.core.remoting.impl.netty NettyAcceptorFactory
NettyConnectorFactory)
             (org.hornetq.jms.server.config.impl JMSConfigurationImpl
ConnectionFactoryConfigurationImpl JMSQueueConfigurationImpl)
             (org.hornetq.jms.server.embedded EmbeddedJMS)
             (java.util ArrayList)))

(defn get-server
  "Instantiate and programmatically configure a Hornet JMS Server." []
  (let [configuration (ConfigurationImpl.)
        connectorConfig (TransportConfiguration. (.getName
NettyConnectorFactory))
        jmsConfig (JMSConfigurationImpl.)
        connectorNames (ArrayList.)
        queueConfig (JMSQueueConfigurationImpl. "queue1" nil false
(into-array '("/queue/queue1")))
        jmsServer (EmbeddedJMS.)]
    (doto configuration
      (.setPersistenceEnabled false)
      (.setSecurityEnabled false))
    (.add (.getAcceptorConfigurations configuration)
          (TransportConfiguration. (.getName NettyAcceptorFactory)))
    (.put (.getConnectorConfigurations configuration) "connector"
connectorConfig)
    (.add connectorNames "connector")
    (let [cfConfig  (ConnectionFactoryConfigurationImpl. "cf"
                                                         false
                                                         connectorNames
                                                         (into-array '("/
                                                         cf")))]
      (.add (.getConnectionFactoryConfigurations jmsConfig)
^ConnectionFactoryConfiguration cfConfig))
    (.add (.getQueueConfigurations jmsConfig) queueConfig)
    (doto jmsServer
      (.setConfiguration configuration)
      (.setJmsConfiguration jmsConfig)
      (.start))
  jmsServer))

(defn stop-server "stop the server"  [server]
  (.stop server))

(defn -main [& args]
  (println "main called")
  (let [server (get-server)]
    (sender/send-messages server)
    (receiver/receive-messages server)
    (stop-server server))
  (println "done"))

4. Now create the file src/news_jms/send_messages.clj with the following contents:

(ns news-jms.send-messages
    (:import (javax.jms Session MessageProducer TextMessage)
             (java.util Date)))

(defn send-messages
  "Add a message to the queue on the server."
  [jmsServer]
  (let [cf (.lookup jmsServer "/cf")
        queue (.lookup jmsServer "/queue/queue1")
        connection (.createConnection cf)
        session (.createSession connection false  Session/AUTO_ACKNOWLEDGE)
        producer  (.createProducer session queue)
        message  (.createTextMessage session (str "Hello sent at "
(Date.)))]
    (.send producer message)
    (.close connection))
  (println "sent messages"))

5. Now create the file src/news_jms/receive_messages.clj with the following contents:

(ns news-jms.receive-messages
  (:import (javax.jms MessageConsumer)
           (javax.jms Session)))

(defn receive-messages
  "receive a message from the queue"
  [jmsServer]
  (let [cf (.lookup jmsServer "/cf")
        queue  (.lookup jmsServer "/queue/queue1")

        connection  (.createConnection cf)
        session (.createSession connection false  Session/AUTO_ACKNOWLEDGE)

        messageConsumer (.createConsumer session queue)]
    (.start connection)
    (let [messageReceived (.receive messageConsumer 1000)]
      (println (str "Received message:"  (.getText messageReceived))))
    (.close connection))
  (println "finished receiving messages"))

Testing the Recipe

Now we’ll test it. Run the project with Leiningen:

lein run

You should see output similar to the following (along with other noise):

sent messages
Received message:Hello sent at Tue Jun 16 21:49:18 EST 2015
finished receiving messages

Notes on the Recipe

Looking at project.clj, we’ve added libraries for Hornet messaging and Hornet JMS implementations. We’ve added the Netty queue implementation and the JMS API. We’ve also told it to run our core.clj file when we execute lein run.

Looking at the namespace of core.clj we’ve loaded up two project files we’re about to create: send-messages and receive-messages. We’ve also imported Hornet libraries for setting up a JMS server. We’ve also brought in the Java class ArrayList to help us with configuration.

Looking at the get-server function in core.clj we see this instantiates, configures, and starts an embedded JMS server implemented in HornetQ.

Looking at the function, stop-server in core.clj, this does precisely what its name suggests. In addition, the main function is called from the project.clj when lein run executes. The main function creates the server, calls functions to send and receive messages, and then stops the server.

The send_messages.clj file contains a single function that gets a reference to our JMS server. It gets a queue, a connection, and a session, and pumps a message onto the queue.

The receive_messages.clj file contains a single function that takes a reference to the JMS server. It gets a queue, a connection, and a session, and receives a message from the queue.

Conclusion

We’ve gotten started with JMS in Clojure. We’ve started up an embedded JMS server, sent a message, and received that message. We’ve got the building blocks to put together a high availability solution.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.225.55.193