13. Introduction to Cascalog

In this chapter we’ll cover the background to Cascalog and get you started with Cascalog 2.0.

Assumptions

In this chapter we assume the following:

Image You have Leiningen set up.

Image You have a projects directory where you create new Leiningen projects.

Benefits

The benefit of Cascalog is the ability to efficiently write complex Hadoop queries with a minimal number of lines of code.

The Recipe—Code

Now that you can see the landscape, let’s get started. We’ll run some basic Cascalog queries.

1. Create a new Leiningen project cascalog-intro in your projects directory, and change to that directory:

lein new app cascalog-intro
cd cascalog-intro

2. Modify the projects.clj file so it looks like this:

(defproject cascalog-intro "0.1.0-SNAPSHOT"
  :dependencies [[org.clojure/clojure "1.7.0-RC1"]
                 [cascalog "2.1.1"]];
  :main ^:skip-aot cascalog-intro.core
  :target-path "target/%s"
  :profiles {:uberjar {:aot :all}
             :dev {:dependencies [[org.apache.hadoop/hadoop-core "1.2.1"]]}})

3. Modify the file test/cascalog-intro/core_test.clj so it looks like this:

 (ns cascalog-intro.core-test
  (:require [clojure.test :refer :all]
            [cascalog-intro.core :refer :all]
            [cascalog.api :refer :all]
            [cascalog.logic [ops :as c]]))

(def prices
  [;; [stock-symbol price]
   ["APPL" 527.00]
   ["MSFT" 26.74]
   ["YHOO" 19.86]
   ["FB" 28.76]
   ["AMZN" 259.15]])

(deftest find-matching-stock-symbol
  (testing "Given a price, find the corresponding stock symbol."
       (is (=
      (??<- [?stock-symbol]
        (prices ?stock-symbol 28.76))
      '(["FB"])))))

(deftest find-matching-stock-symbol-and-show-symbol-and-price
  (testing "Given a price, find the corresponding stock symbol and display
the price."
    (is (=
      (??<- [?stock-symbol ?price]
        (prices ?stock-symbol ?price) (= ?price 28.76))
      '(["FB" 28.76])))))

(deftest find-average-price
  (testing "Given a list of prices, find the average price."
    (let [price-list (<- [?price]
                       (prices ?stock-symbol ?price))]
      (is (=
        (??<- [?avg]
          (price-list ?prices)
          (c/avg ?prices :> ?avg))
        '([172.302]))))))

4. Before we start, we’ll make things easier for ourselves by limiting the output through log4j settings. Change into the directory resources:

cd resources

and create a file called log4j.properties with the following contents:

log4j.rootLogger=WARN, A1
log4j.logger.user=DEBUG
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%d %-5p %c: %m%n

This is important—because otherwise you’ll get lots of junk in your output that obfuscates the results you’re looking for.

Testing the Solution

To test the solution, follow these steps:

1. On a command prompt at the top level of the cascalog-intro project, run the following command:

lein test

2. You should see the following results:

Ran 3 tests containing 3 assertions.
0 failures, 0 errors.

The rest of this recipe will be done at the REPL.

1. Start a Leiningen REPL in the parent directory of the project, issue the following command:

lein repl

2. We’ll load the Cascalog API:

(use 'cascalog.api)

3. And define some data:

(def prices
  [;; [stock-symbol price]
   ["APPL" 527.00]
   ["MSFT" 26.74]
   ["YHOO" 19.86]
   ["FB" 28.76]
   ["AMZN" 259.15]])

4. Now we’re ready for some Cascalog basics. This is a basic query:

(?<- (stdout) [?stock-symbol]
  (prices ?stock-symbol 28.76))

The (?<- is a macro expansion meaning ‘execute this query’. The ? mark preceding the variables is a Cascalog variable definition, suggesting that the variables can be nullable.1

1. If they were not nullable, then you’d declare them with an exclamation mark, for example, !stock-symbol.

Note that this is slightly different from the one we had in our test that had two question marks, so the result would be returned as a vector in a quoted list.

The equivalent of this in SQL would be:

SELECT stock-symbol FROM prices
where price = 28.56

which means ‘find me all the stock symbols where the price is 28.56.’

But in Datalog we’re really not matching on names but on tuples, so column order is important. The equivalent SQL is really like the following (even though ANSI SQL doesn’t have a column index operator):

SELECT [first column] FROM prices
where [second column]  = 28.56

This means ‘find me the entire first column where the second column is 28.56.’

5. Back to our query. Let’s run this on our REPL:

(?<- (stdout) [?stock-symbol]
  (prices ?stock-symbol 28.76))

This gives the following result (it will be more verbose if you didn’t set the log4j.properties step above):

RESULTS
-----------------------
FB
-----------------------

6. So we have our result. Now let’s get and display the price and the stock symbol. In the following command you will see that we have added ?price to the parameters on the top line and queries with the same parameter ?price from the prices tuple on the second line. Run this on the REPL:

(?<- (stdout) [?stock-symbol ?price]
  (prices ?stock-symbol ?price) (= ?price 28.76))

This results in:

RESULTS
-----------------------
FB        28.76
-----------------------

7. Now let’s get the average of all the stock prices. First we’ll load a library:

 (require '[cascalog.logic [ops :as c]])

8. Then we’ll run our average query:

(let [price-list (<- [?price]
                     (prices ?stock-symbol ?price))]
  (?<- (stdout)
       [?avg]
       (price-list ?prices)
       (c/avg ?prices :> ?avg)))

The significant step was defining the subquery price-list in the let clause. This was required so the Cascalog query engine would apply the subquery to the list of results and not each individual price. (If we had applied it to each individual price we’d get the same result, but it would be less efficient.)

This gives us:

RESULTS
-----------------------
172.302
-----------------------

Notes on the Recipe

The two emerging frontiers in software development are mobile and big data. In the case of big data, many of the paradigms that apply to this space are lifted from the functional programming world. This alone is a great reason to learn Clojure.

Technological opinions were split in 2005 when Google released one of the secrets behind their massive scaling capability—a technology called MapReduce. For those with a software development background limited to OO, this seemed like a magical new technology. However, for those with a functional programming background, this seemed barely worth a yawn. The interesting part was how it was all coordinated.

Consider a simple map and reduce in Clojure:

=> (map count '("The" "quick" "brown" "fox" "jumped" "over" "the" "lazy" "dog"))

(3 5 5 3 6 4 3 4 3)

=> (reduce max (map count '("The" "quick" "brown" "fox" "jumped" "over" "the"
"lazy" "dog")))

6

We used the map function to use the function count to count each element in a list, returning a list of the same size. We used the reduce function to use the function max to find the maximum value in the list.

Now what’s even more powerful in Clojure is the idea of Clojure 1.5 reducers:

(require '[clojure.core.reducers :as r])

(r/fold (r/monoid max #(Double/NEGATIVE_INFINITY)) (r/map count '("The" "quick"
"brown" "fox" "jumped" "over" "the" "lazy" "dog")))

The big idea is that we can split the processing of this across multiple threads. But why stop there?

What about a list whose size is 5 terabytes? It starts to get too big for one physical machine to process. We’ve already demonstrated that this kind of problem can be broken up into pieces for different threads to process. What if we could distribute those threads across different machines?

Enter Hadoop. Hadoop is a collection of libraries and frameworks designed to enable processing of terabyte scale data across thousands of machines. It’s not a stretch to say this was Yahoo’s implementation of the MapReduce concept from Google. (See Figure 13.1.)

Image

Figure 13.1 Hadoop is built upon the MapReduce concept.

To use it, the big idea was that you’d need to write some Java classes that implemented the Map and Reduce pattern, compile it, deploy it, and run it.

Some (quite reasonably) thought that that cycle (code -> compile -> deploy -> test) was too arduous to get reasonable feedback when building a larger, more complex application and so DSLs were written on top of this. Many have been written, but the most notable is Pig (which has some similarities to Sawzall at Google), as shown in Figure 13.2. Pig enabled the writing of Hadoop queries in a language with some similarities to SQL.

Image

Figure 13.2 Pig is built on Hadoop.

In addition, some users of Hadoop decided they wanted to make it easier to chain Hadoop operations together, so that one MapReduce operation could cascade into another. So the Cascading library was built. (See Figure 13.3.) This uses the terminology similar to that of data flow diagrams like Taps and Sinks. So a file input would be a Tap, and a file output would be a Sink. Cascading allows you to chain the Sink of one process into the Tap of another.

Image

Figure 13.3 Cascading is built on Hadoop.

After this, Nathan Marz of (at the time) Backtype2 decided he could save even more time by writing a Clojure DSL on top of Cascading. This library was inspired by Datalog,3 and so it was named Cascalog. (See Figure 13.4.)

2. Backtype was such a success that it was bought by Twitter.

3. Datalog was a precursor to SQL, based on Prolog http://en.wikipedia.org/wiki/Datalog.

Image

Figure 13.4 Cascalog is built on Cascading.

Conclusion

We’ve looked at the Cascalog landscape and have run some basic Cascalog queries. Next we’ll look at integrating our work in Cascalog into the Hadoop framework.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.88.110