Messaging is used for data sources such as a news ticker. Messaging is also used in high availability architectures to distribute work across many nodes. It is useful when there is an asynchronous control flow. The Oracle standard for messaging is the JMS interface. We’re going to use it in Clojure to build a simple message flow of time-based information.
In this recipe we’ll create a new Clojure project and create a JMS queue using a Hornet library. Then we’ll pump messages onto the queue and see them appear in our output.
In this chapter we assume that either you’re familiar with the concept of a JMS message or you are at least willing to look it up.
The benefit of this chapter is being able to integrate with existing JMS messaging infrastructure as a producer or a consumer. It is also relevant to real-time flows of information in Storm.
Follow these steps:
1. Create a new Leiningen project news-jms
in your projects directory, and change to that directory.
lein new news-jms
cd news-jms
2. Next, modify the project.clj
so it looks like this:
(defproject news-jms "0.1.0-SNAPSHOT"
:dependencies [[org.clojure/clojure "1.7.0-beta2"]
[org.hornetq/hornetq-core "2.2.2.Final"]
[org.hornetq/hornetq-jms "2.2.2.Final"]
[org.jboss.netty/netty "3.2.0.BETA1"]
[org.jboss.javaee/jboss-jms-api "1.1.0.GA"]]
:main news-jms.core)
3. Next, modify the file src/news_jms/core.clj
to include the following:
(ns news-jms.core
(:gen-class)
(:require [news-jms.send-messages :as sender]
[news-jms.receive-messages :as receiver])
(:import (org.hornetq.core.config.impl ConfigurationImpl)
(org.hornetq.api.core TransportConfiguration)
(org.hornetq.core.remoting.impl.netty NettyAcceptorFactory
NettyConnectorFactory)
(org.hornetq.jms.server.config.impl JMSConfigurationImpl
ConnectionFactoryConfigurationImpl JMSQueueConfigurationImpl)
(org.hornetq.jms.server.embedded EmbeddedJMS)
(java.util ArrayList)))
(defn get-server
"Instantiate and programmatically configure a Hornet JMS Server." []
(let [configuration (ConfigurationImpl.)
connectorConfig (TransportConfiguration. (.getName
NettyConnectorFactory))
jmsConfig (JMSConfigurationImpl.)
connectorNames (ArrayList.)
queueConfig (JMSQueueConfigurationImpl. "queue1" nil false
(into-array '("/queue/queue1")))
jmsServer (EmbeddedJMS.)]
(doto configuration
(.setPersistenceEnabled false)
(.setSecurityEnabled false))
(.add (.getAcceptorConfigurations configuration)
(TransportConfiguration. (.getName NettyAcceptorFactory)))
(.put (.getConnectorConfigurations configuration) "connector"
connectorConfig)
(.add connectorNames "connector")
(let [cfConfig (ConnectionFactoryConfigurationImpl. "cf"
false
connectorNames
(into-array '("/
cf")))]
(.add (.getConnectionFactoryConfigurations jmsConfig)
^ConnectionFactoryConfiguration cfConfig))
(.add (.getQueueConfigurations jmsConfig) queueConfig)
(doto jmsServer
(.setConfiguration configuration)
(.setJmsConfiguration jmsConfig)
(.start))
jmsServer))
(defn stop-server "stop the server" [server]
(.stop server))
(defn -main [& args]
(println "main called")
(let [server (get-server)]
(sender/send-messages server)
(receiver/receive-messages server)
(stop-server server))
(println "done"))
4. Now create the file src/news_jms/send_messages.clj
with the following contents:
(ns news-jms.send-messages
(:import (javax.jms Session MessageProducer TextMessage)
(java.util Date)))
(defn send-messages
"Add a message to the queue on the server."
[jmsServer]
(let [cf (.lookup jmsServer "/cf")
queue (.lookup jmsServer "/queue/queue1")
connection (.createConnection cf)
session (.createSession connection false Session/AUTO_ACKNOWLEDGE)
producer (.createProducer session queue)
message (.createTextMessage session (str "Hello sent at "
(Date.)))]
(.send producer message)
(.close connection))
(println "sent messages"))
5. Now create the file src/news_jms/receive_messages.clj
with the following contents:
(ns news-jms.receive-messages
(:import (javax.jms MessageConsumer)
(javax.jms Session)))
(defn receive-messages
"receive a message from the queue"
[jmsServer]
(let [cf (.lookup jmsServer "/cf")
queue (.lookup jmsServer "/queue/queue1")
connection (.createConnection cf)
session (.createSession connection false Session/AUTO_ACKNOWLEDGE)
messageConsumer (.createConsumer session queue)]
(.start connection)
(let [messageReceived (.receive messageConsumer 1000)]
(println (str "Received message:" (.getText messageReceived))))
(.close connection))
(println "finished receiving messages"))
Now we’ll test it. Run the project with Leiningen:
lein run
You should see output similar to the following (along with other noise):
sent messages
Received message:Hello sent at Tue Jun 16 21:49:18 EST 2015
finished receiving messages
Looking at project.clj
, we’ve added libraries for Hornet messaging and Hornet JMS implementations. We’ve added the Netty queue implementation and the JMS API. We’ve also told it to run our core.clj
file when we execute lein run
.
Looking at the namespace of core.clj
we’ve loaded up two project files we’re about to create: send-messages
and receive-messages
. We’ve also imported Hornet libraries for setting up a JMS server. We’ve also brought in the Java class ArrayList
to help us with configuration.
Looking at the get-server
function in core.clj
we see this instantiates, configures, and starts an embedded JMS server implemented in HornetQ.
Looking at the function, stop-server
in core.clj
, this does precisely what its name suggests. In addition, the main
function is called from the project.clj
when lein run
executes. The main
function creates the server, calls functions to send and receive messages, and then stops the server.
The send_messages.clj
file contains a single function that gets a reference to our JMS server. It gets a queue, a connection, and a session, and pumps a message onto the queue.
The receive_messages.clj
file contains a single function that takes a reference to the JMS server. It gets a queue, a connection, and a session, and receives a message from the queue.
18.225.55.193