20. Getting Started with Storm

In this chapter we’ll build your first Storm app in Clojure to get you running.1

1. I am indebted to Paul Gearon for his contributions to this chapter.

Assumptions

In this chapter we assume you have Leiningen set up and have a projects directory in which to create your project.

Benefits

The benefits discussion is contextual. The process of working out the benefits goes as follows:

Image Why would you choose Storm to solve your business problem? Storm is designed to make stream processing robust. You can retry any step until the whole pipeline is done and structure your transformations around immutable data so that a repeated message is harmless.

Image Storm is well suited to real-time information, generally information coming in chunks small enough to fit into a Java String object. It was originally developed for reporting on Twitter trends but has the potential to be adopted for stock-price information.

Image If you needed to do a big reduce job that possibly ran once a day, where you consolidated all the data output from your operations into a single list, then a map-reduce framework like Cascalog is better suited. Storm is focused on stream processing rather than batch processing.

The Recipe—Code

This recipe will demonstrate Storm using a data feed of various flavors of cookies. We’ll create the Storm spout and bolt and watch the data as it flows through the system.

1. Create a new Leiningen project storm-demo in your projects directory, and change to that directory:

lein new storm-demo
cd storm-demo

2. Modify the project.clj to look like the following:

(defproject storm-demo "0.1.0-SNAPSHOT"
  :dependencies [[org.clojure/clojure "1.5.1"]
  ;https://github.com/rbrush/clara-storm/issues/1#issuecomment-50899227
                 [org.apache.storm/storm-core "0.9.5"]]
  :aot [storm-demo.CookieSpoutClj storm-demo.PrinterBolt]
  :main storm-demo.core)

3. Create the file src/storm_demo/CookieSpoutClj.clj with the following contents:

(ns storm-demo.CookieSpoutClj
   (:gen-class
     :extends backtype.storm.topology.base.BaseRichSpout
     :implements [java.io.Serializable]
     :init init
     :state state)
   (:import [backtype.storm.utils Utils]
            [backtype.storm.spout SpoutOutputCollector]
            [backtype.storm.topology OutputFieldsDeclarer]
            [backtype.storm.topology.base BaseRichSpout]
            [backtype.storm.tuple Fields Values]
            [java.io Serializable]
            [java.util.concurrent LinkedBlockingQueue]))

 (def cookie-list
   ["Almond" "Amaretto" "Biscotti" "Half-Moon" "Bourbon cream"
    "Butter pecan" "Caramel shortbread" "Chocolate" "Chocolate chip"
    "Coconut macaroon" "Custard cream" "Florentine" "Fortune"
    "Gingerbread" "Ladyfinger" "Lincoln" "Macaroon" "Oreo"
    "Peanut butter" "Shortbread" "Wafer"])

 (defn- populated-queue []
   (let [^LinkedBlockingQueue queue (LinkedBlockingQueue. 100)]
     (dotimes [_ 100] (.offer queue (rand-nth cookie-list)))
     queue))

 (defn -init []
   [[] {:queue (populated-queue)
        :collector (make-array SpoutOutputCollector 1)}])

 (defn -open [this conf context ^SpoutOutputCollector socollector]
   (-> (.state this)
       :collector
       (aset 0 socollector)))

 (defn -nextTuple [this]
   (let [state (.state this)
         collector (-> state :collector (aget 0))]
       (if-let [cookie (-> state :queue (.poll)) ]
         (.emit collector (Values. (into-array [cookie])))
         (Utils/sleep 50))))

 (defn -declareOutputFields [this ^OutputFieldsDeclarer declarer]
   (.declare declarer (Fields. ["current-cookie-type"])))

4. Create the file src/storm_demo/PrinterBolt.clj with the following contents:

(ns storm-demo.PrinterBolt
   (:gen-class
     :extends backtype.storm.topology.base.BaseBasicBolt)
   (:import [backtype.storm.topology BasicOutputCollector
OutputFieldsDeclarer]
            [backtype.storm.topology.base BaseBasicBolt]
            [backtype.storm.tuple Tuple]
            [java.io Serializable]))

(defn -execute [this ^Tuple tuple ^BasicOutputCollector collector]
  (println tuple))

(defn -declareOutputFields [this ^OutputFieldsDeclarer ofd])

5. Modify the file src/clj/storm_demo/core.clj to look like the following:

(ns storm-demo.core
  (:gen-class)
  (:import
   (backtype.storm.topology TopologyBuilder)
   (storm_demo CookieSpoutClj PrinterBolt)
   (backtype.storm Config)
   (backtype.storm LocalCluster)
   (backtype.storm.utils Utils)))

(defn -main [& args]
  (let [builder (TopologyBuilder.)
        cookieSpout (CookieSpoutClj.)
        MY_SPOUT "MySpout"
        conf (Config.)
        cluster (LocalCluster.)]
    (.setSpout builder MY_SPOUT cookieSpout)
    (.shuffleGrouping (.setBolt builder "print" (PrinterBolt.)) MY_SPOUT)
    (.submitTopology cluster "storm-demo" conf (.createTopology builder))
    (Utils/sleep 10000)
    (.killTopology cluster "storm-demo")
    (.shutdown cluster)))

Testing the Recipe

Now let’s run it.

lein run

You should see the following output (in addition to other Storm information):

{current-cookie-type Shortbread}
{current-cookie-type Amaretto}
{current-cookie-type Gingerbread}
{current-cookie-type Custard cream}
{current-cookie-type Fortune}
{current-cookie-type Chocolate chip}
{current-cookie-type Butter pecan}
{current-cookie-type Oreo}
{current-cookie-type Custard cream}
{current-cookie-type Macaroon}
{current-cookie-type Gingerbread}

You can see our cookie information is flowing through the system.

Notes on the Recipe

Storm was developed by Nathan Marz in his start-up company Backtype, which was subsequently bought by Twitter. The big idea is high scalability processing of message feeds such as stock prices or tweet streams.

To get started, there are four components you need to understand in the Storm ecosystem: tuples, spouts, bolts, and topologies.

Tuples are the data items that flow through the system. These could be individual tweets or prices or aggregate information, such as total tweets with this hashtag so far or stock price movement today.

Spouts are data sources. This could come from an HTTP feed, a database, a file, or a JMS queue. These produce the tuples that flow through the system.

Bolts are tuple receivers. The message flow can end at a bolt or it can be passed on to a subsequent bolt. Bolts transform tuples; they can output them to a target like a file or database. Spouts can also keep some state that they can then pass on in output tuples.

Topologies chain all these components together. You instantiate a spout and decide how many you will run in parallel. You instantiate a bolt and connect it to the spout. You may add subsequent spouts. You can chain together other spouts and bolts. You then start and stop the topology.

Let’s illustrate these ideas with a simple example to get you started. We’re going to imagine a feed of information about cookie consumption is flowing through our system.

Note in the project.clj the use of Clojure 1.5.1 instead of 1.7. It appears Storm was never ported forward to work with newer versions of Clojure.

Looking at the class CookieSpoutClj.clj, at a high level what we’ve done here is extend the Storm class RichBaseSpout. We’ve added a queue of Strings and filled it with 100 items of randomly chosen cookies.

This is using Clojure code to create a class definition for use by Storm. The first step to do this is the gen-class declaration in the ns declaration.

By default, the class name will mirror the namespace, which is why this namespace is using CamelCase (as this is the standard for Java class names). Alternatively, a Clojure standard name could have been used, and a :name option provided to :gen-class.

As in the Java code, the class needs to extend BaseRichSpout, and this is declared on line 3. That class extends Serializable, so this is not actually required here, but it was included to make the serialization requirement more explicit, as this has implications for the implementation.

Instances of the CookieSpoutClj class need to hold data, such as the data queue and the open collector. Clojure classes store their data immutably in a single field, whose name is declared with the :state option, and this state is configured in an initialization function referred to as :init.

Note that all of the functions that are publicly available to Java have names prefixed with the "-" character. This prefix can be changed, but the hyphen is the default.

The -init function is called when the class is instantiated, and it must then return a 2-element vector containing arguments for parent class constructors (if required) and a value to save in the internal state for the instance (the state value declared in the namespace). In this case, the state contains a standard map containing the initialized queue and a value for the currently open collector. The collector is being stored in a 1-element Java array, which is a very unusual thing to do in Clojure, though it serves an important purpose here.

The -open function is called by Storm to pass along the Collector that this spout is supposed to talk to. The semantics of this function require that the CookieSpoutClj class update its internal state to refer to this collector instance. Most data in Clojure is immutable, though it is possible to use mutable objects such as atoms and references to refer to changeable data in cases like this. Unfortunately, neither atoms nor references will work with Storm here.

Internally, Storm instantiates classes to do work and then serializes those instances to send to the nodes that will perform the analysis. Even though this example is entirely on a single node, the same process is followed regardless of the number of nodes. As a consequence, the entire class instance must be Serializable (this was called out explicitly in the namespace). However, references and atoms are both general reference types and can therefore refer to objects that may not serialize. This means that neither atoms nor references can be serialized, and if they are used in CookieSpoutClj, then Storm will complain about not being able to serialize them.

The solution here is to use a single element array. These do not have the same thread safety that atoms and references provide, but that is not needed here. Because Java arrays are typically only used for Java interoperability, the syntax is a little more verbose than the simple access that atoms and references provide.

The -open function uses the threading macro, first to get the entire map value from the state field, then to get the single-element collector array, before using aset to update this array to hold socollector. Similarly, the -nextTuple function uses the threading macro to get out the collector array and call aget on its single element to get the current collector.

One other peculiarity is how the constructors for values and fields get called. The values constructor is declared as

Values(Object... vals)

In the Java code this allows the constructor to be called as

new Values(cookie)

but this is a syntactic convenience. When this is compiled, the constructor declaration tells the compiler to wrap the cookie value in an array before being passed to the constructor. So the constructor only takes an array, which means that this is what must be passed here. The easiest way to do that is to wrap the cookie in a vector and convert that to an array with make-array.

Fields also has a constructor like this:

Fields(String... fields)

However, in this case it also has a second constructor of

Fields(List<String> fields)

Since Clojure’s vector implements List, we can just use a simple vector and not wrap it with make-array.

Looking at the class PrinterBolt.clj, this file comes from the storm-starter kit repository. It simply dumps any tuple given to it to stdout (in our case, the command prompt window). The class extends BaseBasicBolt and implements two methods, execute(...) and declareOutputFields(...).

The execute function is the primary worker. It takes an input tuple and an output collector. In this case the collector is not used. In this implementation the tuple is sent straight to stdout via the println method.

The declareOutputFields function is similar to what we saw in the CookieSpoutClj class implementation. In this case, because we’re not outputting any fields, this method has no body.

Looking at the core.clj file, we’ve imported our CookieSpout and PrinterBolt classes. We’ve also imported Storm libraries to configure the topology.

Inside the function -main we create a TopologyBuilder and a CookieSpout instance. We then set cookieSpout as the spout for our TopologyBuilder. We then add our PrinterBolt to the topologyBuilder and group it with the cookieSpout we just created. We then instantiate the Cluster and submit this topology to the cluster.

Conclusion

We’ve outlined the basic Storm concepts, including tuples, spouts, bolts, and topologies. We’ve written a custom Spout that produced a series of tuples about different cookies being consumed. We’ve written a basic topology in Clojure that linked this to an output bolt. We’ve run this and seen the cookie information flow through the system.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.74.231