22. Integrating Storm and JMS

We’ve gone through the basic concepts of Storm with a simple application. We’ve also written a basic JMS server and client in Clojure. Now we’re going to make Storm more useful by integrating JMS as a Spout.

The big idea in this chapter is pricing information. We’re going to have a JMS feed of stock prices flowing through the system. Using the EPS information also in the message, Storm will first determine (using a simplistic algorithm) whether the price represents good value or not and then issue a recommendation.

In this recipe we’ll create a Storm spout that reads from the JMS queue. Then we’ll integrate this with a Storm topology that routes messages based on their content. Finally, we’ll see the resulting output displayed.

Assumptions

In this chapter we assume that you have worked through the chapters about Storm and JMS in Chapters 21 and Chapter 22.

Benefits

The benefit of this chapter is the knowledge of how to integrate Storm using real-time feeds from a JMS data source.

The Recipe—Code

Do the following:

1. Create a new Leiningen project storm-jms-demo in your projects directory, and change to that directory:

lein new storm-jms-demo
cd storm-jms-demo

2. Modify the project.clj to look like the following:

(defproject storm-jms-demo "0.1.0-SNAPSHOT"
  :dependencies [[org.clojure/clojure "1.5.1"]
  ;https://github.com/rbrush/clara-storm/issues/1#issuecomment-50899227
                 [org.clojure/data.json "0.2.1"]
                 [org.hornetq/hornetq-core "2.2.2.Final"]
                 [org.hornetq/hornetq-jms "2.2.2.Final"]
                 [org.jboss.netty/netty "3.2.0.BETA1"]
                 [org.jboss.javaee/jboss-jms-api "1.1.0.GA"]
                 [com.github.juliangamble/storm-jms "0.8.1"]
                 [javax.jms/jms-api "1.1-rev-1"]
                 [org.apache.storm/storm-core "0.9.5"]]
  :source-paths ["src/clj"]
  :java-source-paths ["src/java"]
  :aot [storm-jms-demo.PrinterBolt storm-jms-demo.ValueBuyBolt]
  :main storm-jms-demo.core)

3. Create the directory src/clj/storm_jms_demo/:

mkdir -p src/clj/storm_jms_demo

4. Create the file src/clj/storm_jms_demo/ValueBuyBolt.clj with the following contents:

(ns storm-jms-demo.ValueBuyBolt
  (:gen-class
     :extends backtype.storm.topology.base.BaseRichBolt
     :implements [java.io.Serializable]
     :init init
     :state state
     :methods [[initPriceEarningsRatio [java.math.BigDecimal] void]])
  (:require [storm-jms-demo.enqueue-messages :as enqueue-messages]
            [storm-jms-demo.jms-utils :as jms-utils])
   (:import [backtype.storm.task OutputCollector TopologyContext]
            [backtype.storm.topology OutputFieldsDeclarer]
            [backtype.storm.topology.base BaseRichBolt]
            [backtype.storm.tuple Fields Tuple Values]
            [org.json.simple JSONValue]
            [java.io Serializable]
            [java.math BigDecimal]
            [java.util Map]))

(defn -init []
  [[] {:pe (make-array BigDecimal 1)
       :collector (make-array OutputCollector 1)}])

(defn -initPriceEarningsRatio [this peVal]
  (-> (.state this)
      :pe
      (aset 0 peVal)))

(defn -prepare [this ^Map stormConf ^TopologyContext context ^OutputCollector
socollector]
  (-> (.state this)
      :collector
      (aset 0 socollector)))

(defn is-value-buy[this ^BigDecimal price ^BigDecimal eps]
  (try
    (let [stockPricePE (.divide price eps  BigDecimal/ROUND_HALF_UP)
           state (.state this)
          priceToEarningsRatio (-> state :pe (aget 0))
           valueBuy (< (.compareTo stockPricePE priceToEarningsRatio) 0)
           result (into-array String [(if valueBuy "BUY" "SELL")])]
      (println "result of value-buy: " (aget result 0))
      result)
    (catch ArithmeticException e (str "caught exception: " (.getMessage
e)))))

(defn -execute [this ^Tuple input]
  (let [jsonString  (.getString input 0)
        obj (JSONValue/parse jsonString)
        ^Map messageMap ^Map obj
        price  (BigDecimal. (.get messageMap "price"))
        eps (BigDecimal. (.get messageMap "eps"))
        state (.state this)
        collector (-> state :collector (aget 0))]
    (println "Executing tuple in value buy bolt: " + jsonString)
    (println "message received in bolt = "
                        (.get messageMap "stock-code")  " "
                        (.get messageMap "price")  " "
                        (.get messageMap  "eps" ))
    (.emit collector input (Values. (is-value-buy this price eps)))
    (.ack collector input)))

(defn -declareOutputFields [this ^OutputFieldsDeclarer declarer]
   (.declare declarer (Fields. ["peFilter"])))

5. Move the file src/storm_jms_demo/core.clj to the directory src/clj/storm_jms_demo/:

mv src/storm_jms_demo/core.clj src/clj/storm_jms_demo/

6. Delete the directory src/storm_jms_demo:

rmdir src/storm_jms_demo

7. Create the directory src/java/storm/contrib/jms/example:

mkdir -p src/java/storm/contrib/jms/example

8. Add the file JsonTupleProducer.java to the directory src/java/storm/contrib/jms/example from here:

https://github.com/ptgoetz/storm-jms/blob/c723b0db060a4244a0156c2d913410
ee21b36ff1/examples/src/main/java/backtype/storm/contrib/jms/example/
JsonTupleProducer.java

using the following shortened URL:

http://bit.ly/jsontupleproducer

9. Create the directory src/java/storm/starter/bolt:

mkdir -p src/java/storm/starter/bolt

10. Create the file src/clj/jms_storm_demo/PrinterBolt.clj with the following contents:

(ns storm-jms-demo.PrinterBolt
   (:gen-class
     :extends backtype.storm.topology.base.BaseBasicBolt)
   (:import [backtype.storm.topology BasicOutputCollector
OutputFieldsDeclarer]
            [backtype.storm.topology.base BaseBasicBolt]
            [backtype.storm.tuple Tuple]
            [java.io Serializable]))

(defn -execute [this ^Tuple tuple ^BasicOutputCollector collector]
  (println tuple))

(defn -declareOutputFields [this ^OutputFieldsDeclarer ofd])

11. Create the directory src/java/recipes/jms/provider:

mkdir -p src/java/recipes/jms/provider

12. Create the file src/java/recipes/jms/provider/HornetQJmsProvider.java with the following contents:

package recipes.jms.provider;

import javax.jms.ConnectionFactory;
import javax.jms.Destination;

import recipes.jms.provider.ProviderState;
import backtype.storm.contrib.jms.JmsProvider;

/**
 * Needs to be completely stateless so it can be serialized.
 */
public class HornetQJmsProvider implements JmsProvider {

    private static final long serialVersionUID = -1260267361423423454L;

    public ConnectionFactory connectionFactory() {
  return ProviderState.getCF();
    }

    public Destination destination() {
          return ProviderState.getDestination();
    }
}

13. Create the file src/java/recipes/jms/provider/ProviderState.java with the following contents:

package recipes.jms.provider;

import javax.jms.ConnectionFactory;
import javax.jms.Destination;


/**
 * Class using static members for state holding to get around serialization.
 *
 */
public class ProviderState  {

    private static ConnectionFactory cf;

    private static Destination dest;

    public static void setCF(ConnectionFactory connectionFactory) {cf =
connectionFactory;}

    public static void setDestination(Destination destination) {dest =
destination;}

    public static ConnectionFactory getCF() {return cf;}

    public static Destination getDestination() {return dest;}
}

14. Ensure that the file src/clj/storm_jms_demo/core.clj has the following contents:

(ns storm-jms-demo.core
  (:gen-class
   :name storm-jms-demo.core)
  (:require [storm-jms-demo.enqueue-messages :as enqueue-messages]
            [storm-jms-demo.topology :as topology]
            [storm-jms-demo.jms-utils :as jms-utils]))

(defn -main  [& args]
  (let [jms-server (jms-utils/get-server)]
    (enqueue-messages/enqueue-msgs jms-server)
    (topology/run-demo jms-server)))

15. Create the file src/clj/storm_jms_demo/enqueue_messages.clj with the following contents:

(ns storm-jms-demo.enqueue-messages
  (:require [storm-jms-demo.stock-utils :as stock-utils]
            [storm-jms-demo.jms-utils :as jms-utils])
  (:import (recipes.jms.provider ProviderState)
           (org.hornetq.jms.server.embedded EmbeddedJMS)
           (javax.jms Session MessageProducer TextMessage)
           (java.util Date)))

(defn enqueue-msgs
  "Put some stock prices on the queue."
  [jms-server]
  (let [session (jms-utils/get-session jms-server)
        producer (jms-utils/get-producer jms-server)]
    (stock-utils/pump-stock-prices 10 session producer)))

16. Create the file src/clj/storm_jms_demo/topology.clj with the following contents:

(ns storm-jms-demo.topology
  (:require [storm-jms-demo.jms-utils :as jms-utils])
  (:import (backtype.storm.topology TopologyBuilder)
           (recipes.jms.provider HornetQJmsProvider)
           (backtype.storm.contrib.jms.spout JmsSpout)
           (backtype.storm.contrib.jms.example JsonTupleProducer)
           (storm_jms_demo PrinterBolt ValueBuyBolt)
           (backtype.storm Config)
           (backtype.storm LocalCluster)
           (backtype.storm.utils Utils)
           (javax.jms Session)
           (java.math BigDecimal)))

(defn run-demo
  "Configure the topology with spouts and bolts and submit it to the
cluster."
  [jms-server]
  (let [builder (TopologyBuilder.)
        jmsSpout (JmsSpout.)
        jmsProvider (HornetQJmsProvider.)
        tuple-producer (JsonTupleProducer.)
        MY_SPOUT "MySpout"
        conf (Config.)
        cluster (LocalCluster.)
        valueBuyBolt (ValueBuyBolt.)]
    (doto jmsSpout
      (.setJmsProvider jmsProvider)
      (.setJmsTupleProducer tuple-producer)
      (.setJmsAcknowledgeMode Session/CLIENT_ACKNOWLEDGE)
      (.setDistributed false))
    (.setSpout builder MY_SPOUT jmsSpout)
    (.shuffleGrouping (.setBolt builder "print" (PrinterBolt.)) MY_SPOUT)
    (.initPriceEarningsRatio valueBuyBolt (BigDecimal. 10))
    (.shuffleGrouping (.setBolt builder "buy" valueBuyBolt) MY_SPOUT)
    (.submitTopology cluster "storm-demo" conf (.createTopology builder))
    (Utils/sleep 10000)
    (doto cluster
      (.killTopology "storm-demo")
      (.shutdown))))

17. Create the file src/clj/storm_jms_demo/jms_utils.clj with the following contents:

(ns storm-jms-demo.jms-utils
    (:import (recipes.jms.provider ProviderState)
             (org.hornetq.core.config.impl ConfigurationImpl)
             (org.hornetq.api.core TransportConfiguration)
             (org.hornetq.core.remoting.impl.netty NettyAcceptorFactory
NettyConnectorFactory)
             (org.hornetq.jms.server.config.impl JMSConfigurationImpl
ConnectionFactoryConfigurationImpl JMSQueueConfigurationImpl)
             (org.hornetq.jms.server.embedded EmbeddedJMS)
             (java.util ArrayList)
             (javax.jms Session)))

(defn get-server
  "Return a programamtically configured HornetQ JMS Server instance."
  []
  (let [configuration (ConfigurationImpl.)
        connectorConfig (TransportConfiguration. (.getName
NettyConnectorFactory))
        jmsConfig (JMSConfigurationImpl.)
        connectorNames (ArrayList.)
        jmsServer (EmbeddedJMS.)]
    (doto configuration
      (.setPersistenceEnabled false)
      (.setSecurityEnabled false))
    (.add (.getAcceptorConfigurations configuration)
          (TransportConfiguration. (.getName NettyAcceptorFactory)))
    (.put (.getConnectorConfigurations configuration)
          "connector"
          connectorConfig)
    (.add connectorNames "connector")
    (let  [cfConfig  (ConnectionFactoryConfigurationImpl.
                      "cf"
                      false
                      connectorNames
                      (into-array '("/cf")))
           queueConfig (JMSQueueConfigurationImpl.
                        "queue1"
                        nil
                        false
                        (into-array '("/queue/queue1")))]
      (.add (.getConnectionFactoryConfigurations jmsConfig)
            ^ConnectionFactoryConfiguration cfConfig)
      (.add (.getQueueConfigurations jmsConfig) queueConfig))
    (doto jmsServer
      (.setConfiguration configuration)
      (.setJmsConfiguration jmsConfig)
      (.start))
  jmsServer))

(defn stop-server
  "Stop the JMS Server instance."
  [server]
  (.stop server))

(defn get-destination
  "Get the Queue (Destination) from the server."
  [jms-server]
  (let [destination  ^Destination (.lookup jms-server "/queue/queue1")]
    destination))

(defn get-session
  "Get the JMS Session."
  [jms-server]
  (let [cf (.lookup jms-server "/cf")
        destination (get-destination jms-server)]
    (println (str "destination: " destination))
    (ProviderState/setCF cf)
    (ProviderState/setDestination destination)
    (let [connection  (.createConnection cf)
          session (.createSession connection false
Session/AUTO_ACKNOWLEDGE)]
      session)))

(defn get-producer
  "Get the JMS Producer."
  [jms-server]
  (let [session (get-session jms-server)
        destination (get-destination jms-server)
        producer  (.createProducer session destination)]
    producer))

18. Create the file src/clj/storm_jms_demo/stock_utils.clj with the following contents:

(ns storm-jms-demo.stock-utils
  (:require [storm-jms-demo.price-feed :as price-feed]
            [clojure.data.json :as json])
  (:import (org.hornetq.jms.server.embedded EmbeddedJMS)
           (javax.jms Session MessageProducer TextMessage)
           (java.util Date)
           (java.util.concurrent Executors)))

(defn return-stock-message
  "Put the pricing message on the queue."
  [stock-code price eps session producer]
  (let [message (.createTextMessage session (str "Price sent at " (Date.)))
        message-map {:stock-code stock-code :price price :eps eps}
        jsonText (json/write-str message-map)]
    (.setText message jsonText)
    (println (str "Sending message: " (.getText message)))
    (.send producer message)))

(defn pump-stock-prices
  "This provides random prices for the stocks."
  [num-iterations session producer]
  (dotimes [n num-iterations]
    (Thread/sleep  1000 )
    (let [stock-code  (rand-nth price-feed/stock-codes)
          price (price-feed/curr-price stock-code)
          stock-info (price-feed/stock-map (keyword stock-code))
          eps (:eps stock-info)]
      (return-stock-message stock-code price eps session producer)
      (println (str stock-code " " price " " eps " " stock-info " "
(java.util.Date.) " ")))))

19. Create the file src/clj/storm_jms_demo/price_feed.clj with the following contents:

(ns storm-jms-demo.price-feed)

(def stock-map {:YHOO {:stock-name "YHOO" :max-price 31.1 :min-price 11.51
:wavelength 90 :starting-point 0.5 :eps 2.15 }
                :AAPL {:stock-name "AAPL" :max-price 408.38 :min-price 84.11
:wavelength 60 :starting-point 0 :eps 31.25 }
                :GOOG {:stock-name "GOOG" :max-price 809.1 :min-price 292.96
:wavelength 50 :starting-point 0.75 :eps 29.31 }
                :AMZN {:stock-name "AMZN" :max-price 274.7 :min-price 42.7
:wavelength 45 :starting-point 0.25 :eps 1.15 }})

(defn time-model
  "Build a fake pricing model for stocks based on a sine wave."
  [time-secs stock-map]
      (let [max-price (:max-price stock-map)
              min-price (:min-price stock-map)
      wavelength (:wavelength stock-map)
            med-price (+ (/ (- max-price min-price) 2) min-price)
            amplitude (- max-price med-price)
      starting-point (:starting-point stock-map)]
      (+
           (*
      (Math/sin
                (-
                   (/
                     (* 2 Math/PI time-secs )
                     wavelength  )
                   (* 2 Math/PI starting-point wavelength)))
    amplitude)
  med-price)))

(defn curr-price
  "Given a stock code, get the price in our model for the current point in
time."
  [stock-code]
      (format "%.2f"
         (time-model
                    (/
                       (java.lang.System/currentTimeMillis)
                       60)
                           (stock-map (keyword stock-code)))))

(def stock-codes ["AAPL" "GOOG" "AMZN" "YHOO"])

20. We’ll just check our directory structure of files modified. We expect the following:

storm-jms-demo:
  src:
    clj:
      storm_jms_demo:
        core.clj
        jms_utils.clj
        price_feed.clj
        stock_utils.clj
        topology.clj
        ValueBuyBolt.clj
        PrinterBolt.clj
    java:
      recipes:
        jms:
          provider:
            HornetQJmsProvider.java
            ProviderState.java
      storm:
        contrib:
          jms:
            example:
              JsonTupleProducer.java

Testing the Recipe

Now run it with Leiningen:

lein run

Among all the output you should see the buy recommendations flowing out:

source: ValueFilter:2, stream: default, id: {-9084197743852402921=-563118137714884
1233}, [BUY, source: MySpout:4, stream: default, id: {-9084197743852402921=-644404
7755504095379}, [{"stock-code":"AAPL","price":"214.20","eps":31.25}]]

Notes on the Recipe

Looking at the project.clj file, we see that we’ve brought in the HornetQ JMS libraries we saw in Chapter 21 about JMS. We’ve also added the Storm libraries we saw in Chapter 22 about Storm. We’ve also added an :aot instruction to compile our gen-class Clojure classes to Java classes so they can be accessed from Storm.

Looking at the ValueBuyBolt.clj file, we see that we’ve extended the Storm class BaseRichBolt. In the constructor for our class ValueBuyBolt, we take an input parameter priceToEarningsRatio. In a very simplistic (and this is not financial advice) model of evaluating whether to buy a stock, you can rank all the stocks according to each stock’s ratio of price to earnings (P/E). The stocks with the lowest price-to-earnings ratio represent the best value and so should be considered first for a buy order. (In reality, a zillion more factors are taken into account, and you will get creamed in the market if you use this model alone. We’re just building a simple model for now.)

In the prepare function we get a reference to the Storm collector. This is so we can pass on messages after we’ve processed them.

In the execute function we take a tuple input that represents stock information. For this application we’re storing information in the tuple in JSON format, so we extract that out and get the stock code, price, and earnings-per-share information. Then we pass on an output tuple that represents whether this stock is considered a value buy linked to the original tuple. Then we send an ack message back to the sender to confirm this tuple has been sent, so it doesn’t try to resend it.

In the isValueBuy function we take the price and EPS for the stock and work out the price-to-earnings ratio for this stock and price. If it meets our criteria for P/E, then we determine that this is a value buy; otherwise, we return false.

In the declareOutputFields function we signal to the Storm topology that this bolt has an additional output field and give it a name.

The JsonTupleProducer.java class takes a Storm tuple and turns it into JSON that we treat as a string. This class comes from the storm-contrib library.

PrinterBolt.clj is a printing bolt. It takes whatever tuples it is given, runs a toString() method on them, and prints them to the standard out.

The HornetQJmsProvider class implements the Storm interface for providing queues. This interface is used by the library class we call JMSSpout. The purpose of this class is to bridge between our HornetQ and the standard JMSSpout storm library. We have methods that return a JMS Destination and a JMS ConnectionFactory.

Looking at the ProviderState.java class, we see that the Storm libraries assume that spouts can be serialized. We wanted to store session state in our JmsProvider class, but that wasn’t going to work. The purpose of this class is to be a static ‘holder’ for the session state of the JMSProvider class.

The file core.clj is the namespace that is called by Leiningen when the run command is executed. It instantiates a JMS server from Hornet, queues up the message on the queue, and then starts the Storm topology.

In the file enqueue_messages.clj, the enqueue-msgs function is responsible for putting messages on the queue. It gets a session and producer, and then calls the pump-stock-prices function to put prices on the queue.

The file topology.clj starts the Storm topology and runs it. First it configures the JMSSpout, then the PrinterBolt. Then it chains the two together, starts a Storm cluster, and submits the topology for execution. After 10 seconds, it shuts down the topology and the cluster.

The file jms_utils.clj is our utility namespace to enable other parts of the application to use the HornetQ. It has functions to retrieve a JMS server, a destination, a session, and a producer. It also has functions to stop and start the HornetQ server.

The file stock_utils.clj contains a namespace to ensure that stock prices are pushed to the queue. The function pump-stock-prices iterates over each stock code. This then pumps out random prices according to a model defined on a sine wave. The send-stock-prices function stakes the stock price information in the map, dehydrates it into a string of JSON, and then pushes it on the queue.

The file price_feed.clj contains a namespace for our simulated stock pricing model. We assume four stocks that can have their prices over time modeled by a sine wave. The time-model function calculates their point on the sine wave and returns a price. The curr-price function is the wrapper method; given a stock code, it gets the price model map, feeds it into the pricing function, and returns a map of price information for that stock code at that point in time.

Conclusion

We’ve integrated Storm with JMS using a spout. We have tied the two together in a Storm topology.

Particularly, we have instantiated a HornetQ JMS Server and populated it with stock codes. We have used the standard JMSSpout library for Storm and written a custom provider for the HornetQ JMS queue. We have written a topology that reads the JMS messages from the JMS spout and determines whether they are a value buy or not. Then those price recommendations that are determined to be a value buy are passed on to the standard out printing bolt.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.66.94