In this chapter we’ll cover the background to Cascalog and get you started with Cascalog 2.0.
In this chapter we assume the following:
You have Leiningen set up.
You have a projects directory where you create new Leiningen projects.
The benefit of Cascalog is the ability to efficiently write complex Hadoop queries with a minimal number of lines of code.
Now that you can see the landscape, let’s get started. We’ll run some basic Cascalog queries.
1. Create a new Leiningen project cascalog-intro
in your projects directory, and change to that directory:
lein new app cascalog-intro
cd cascalog-intro
2. Modify the projects.clj file
so it looks like this:
(defproject cascalog-intro "0.1.0-SNAPSHOT"
:dependencies [[org.clojure/clojure "1.7.0-RC1"]
[cascalog "2.1.1"]];
:main ^:skip-aot cascalog-intro.core
:target-path "target/%s"
:profiles {:uberjar {:aot :all}
:dev {:dependencies [[org.apache.hadoop/hadoop-core "1.2.1"]]}})
3. Modify the file test/cascalog-intro/core_test.clj
so it looks like this:
(ns cascalog-intro.core-test
(:require [clojure.test :refer :all]
[cascalog-intro.core :refer :all]
[cascalog.api :refer :all]
[cascalog.logic [ops :as c]]))
(def prices
[;; [stock-symbol price]
["APPL" 527.00]
["MSFT" 26.74]
["YHOO" 19.86]
["FB" 28.76]
["AMZN" 259.15]])
(deftest find-matching-stock-symbol
(testing "Given a price, find the corresponding stock symbol."
(is (=
(??<- [?stock-symbol]
(prices ?stock-symbol 28.76))
'(["FB"])))))
(deftest find-matching-stock-symbol-and-show-symbol-and-price
(testing "Given a price, find the corresponding stock symbol and display
the price."
(is (=
(??<- [?stock-symbol ?price]
(prices ?stock-symbol ?price) (= ?price 28.76))
'(["FB" 28.76])))))
(deftest find-average-price
(testing "Given a list of prices, find the average price."
(let [price-list (<- [?price]
(prices ?stock-symbol ?price))]
(is (=
(??<- [?avg]
(price-list ?prices)
(c/avg ?prices :> ?avg))
'([172.302]))))))
4. Before we start, we’ll make things easier for ourselves by limiting the output through log4j settings. Change into the directory resources
:
cd resources
and create a file called log4j.properties
with the following contents:
log4j.rootLogger=WARN, A1
log4j.logger.user=DEBUG
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%d %-5p %c: %m%n
This is important—because otherwise you’ll get lots of junk in your output that obfuscates the results you’re looking for.
To test the solution, follow these steps:
1. On a command prompt at the top level of the cascalog-intro
project, run the following command:
lein test
2. You should see the following results:
Ran 3 tests containing 3 assertions.
0 failures, 0 errors.
The rest of this recipe will be done at the REPL.
1. Start a Leiningen REPL in the parent directory of the project, issue the following command:
lein repl
2. We’ll load the Cascalog API:
(use 'cascalog.api)
3. And define some data:
(def prices
[;; [stock-symbol price]
["APPL" 527.00]
["MSFT" 26.74]
["YHOO" 19.86]
["FB" 28.76]
["AMZN" 259.15]])
4. Now we’re ready for some Cascalog basics. This is a basic query:
(?<- (stdout) [?stock-symbol]
(prices ?stock-symbol 28.76))
The (?<-
is a macro expansion meaning ‘execute this query’. The ?
mark preceding the variables is a Cascalog variable definition, suggesting that the variables can be nullable.1
1. If they were not nullable, then you’d declare them with an exclamation mark, for example, !stock-symbol
.
Note that this is slightly different from the one we had in our test that had two question marks, so the result would be returned as a vector in a quoted list.
The equivalent of this in SQL would be:
SELECT stock-symbol FROM prices
where price = 28.56
which means ‘find me all the stock symbols where the price is 28.56
.’
But in Datalog we’re really not matching on names but on tuples, so column order is important. The equivalent SQL is really like the following (even though ANSI SQL doesn’t have a column index operator):
SELECT [first column] FROM prices
where [second column] = 28.56
This means ‘find me the entire first column where the second column is 28.56
.’
5. Back to our query. Let’s run this on our REPL:
(?<- (stdout) [?stock-symbol]
(prices ?stock-symbol 28.76))
This gives the following result (it will be more verbose if you didn’t set the log4j.properties
step above):
RESULTS
-----------------------
FB
-----------------------
6. So we have our result. Now let’s get and display the price and the stock symbol. In the following command you will see that we have added ?price
to the parameters on the top line and queries with the same parameter ?price
from the prices
tuple on the second line. Run this on the REPL:
(?<- (stdout) [?stock-symbol ?price]
(prices ?stock-symbol ?price) (= ?price 28.76))
This results in:
RESULTS
-----------------------
FB 28.76
-----------------------
7. Now let’s get the average of all the stock prices. First we’ll load a library:
(require '[cascalog.logic [ops :as c]])
8. Then we’ll run our average query:
(let [price-list (<- [?price]
(prices ?stock-symbol ?price))]
(?<- (stdout)
[?avg]
(price-list ?prices)
(c/avg ?prices :> ?avg)))
The significant step was defining the subquery price-list
in the let
clause. This was required so the Cascalog query engine would apply the subquery to the list of results and not each individual price. (If we had applied it to each individual price we’d get the same result, but it would be less efficient.)
This gives us:
RESULTS
-----------------------
172.302
-----------------------
The two emerging frontiers in software development are mobile and big data. In the case of big data, many of the paradigms that apply to this space are lifted from the functional programming world. This alone is a great reason to learn Clojure.
Technological opinions were split in 2005 when Google released one of the secrets behind their massive scaling capability—a technology called MapReduce. For those with a software development background limited to OO, this seemed like a magical new technology. However, for those with a functional programming background, this seemed barely worth a yawn. The interesting part was how it was all coordinated.
Consider a simple map and reduce in Clojure:
=> (map count '("The" "quick" "brown" "fox" "jumped" "over" "the" "lazy" "dog"))
(3 5 5 3 6 4 3 4 3)
=> (reduce max (map count '("The" "quick" "brown" "fox" "jumped" "over" "the"
"lazy" "dog")))
6
We used the map
function to use the function count
to count each element in a list, returning a list of the same size. We used the reduce
function to use the function max
to find the maximum value in the list.
Now what’s even more powerful in Clojure is the idea of Clojure 1.5 reducers:
(require '[clojure.core.reducers :as r])
(r/fold (r/monoid max #(Double/NEGATIVE_INFINITY)) (r/map count '("The" "quick"
"brown" "fox" "jumped" "over" "the" "lazy" "dog")))
The big idea is that we can split the processing of this across multiple threads. But why stop there?
What about a list whose size is 5 terabytes? It starts to get too big for one physical machine to process. We’ve already demonstrated that this kind of problem can be broken up into pieces for different threads to process. What if we could distribute those threads across different machines?
Enter Hadoop. Hadoop is a collection of libraries and frameworks designed to enable processing of terabyte scale data across thousands of machines. It’s not a stretch to say this was Yahoo’s implementation of the MapReduce concept from Google. (See Figure 13.1.)
To use it, the big idea was that you’d need to write some Java classes that implemented the Map and Reduce pattern, compile it, deploy it, and run it.
Some (quite reasonably) thought that that cycle (code -> compile -> deploy -> test) was too arduous to get reasonable feedback when building a larger, more complex application and so DSLs were written on top of this. Many have been written, but the most notable is Pig (which has some similarities to Sawzall at Google), as shown in Figure 13.2. Pig enabled the writing of Hadoop queries in a language with some similarities to SQL.
In addition, some users of Hadoop decided they wanted to make it easier to chain Hadoop operations together, so that one MapReduce operation could cascade into another. So the Cascading library was built. (See Figure 13.3.) This uses the terminology similar to that of data flow diagrams like Taps and Sinks. So a file input would be a Tap, and a file output would be a Sink. Cascading allows you to chain the Sink of one process into the Tap of another.
After this, Nathan Marz of (at the time) Backtype2 decided he could save even more time by writing a Clojure DSL on top of Cascading. This library was inspired by Datalog,3 and so it was named Cascalog. (See Figure 13.4.)
2. Backtype was such a success that it was bought by Twitter.
3. Datalog was a precursor to SQL, based on Prolog http://en.wikipedia.org/wiki/Datalog.
18.226.88.110