Integration with Pig

Configuring Hadoop with Cassandra in itself is quite some work. Writing verbose and long Java code to do something as simple as a word count is a turnoff to a high-level user like a data analyst. Wouldn't it be nice if we have a SQL-like interpreter that converts commands to MapReduce programs for us? Pig is exactly that tool.

Hadoop does not only support Java but MapReduce programs can also be written more concisely in multiple languages such as Scala, Python, C++ (Pipes), R, and many adapter languages.

Pig provides a SQL-like language called Pig Latin. One can write complex MapReduce programs using Pig Latin. You can create a set of intermediate variables that are the result of an operation and it can be used in subsequent operations, in the same way as a stored procedure in the RDBMS world. Finally, the output of an operation can be displayed on a screen or can be stored in a permanent storage such as HDFS or Cassandra.

Installing Pig

Installing Pig is very simple, what is hard is getting it to work with Hadoop and Cassandra nicely. To install Pig, just download the latest version of Pig and untar it as follows:

$ wget http://www.eng.lsu.edu/mirrors/apache/pig/pig-0.11.1/pig-0.11.1.tar.gz
$ tar xvzf pig-0.11.1.tar.gz
$ ln -s pig-0.11.1 pig

Let's call this directory $PIG_HOME. Ideally, you should just execute $PIG_HOME/bin/pig, and the Pig console should start to work given that your Cassandra and Hadoop are up and working. Unfortunately, it does not. Documentation, at the time of writing this, is not adequate to configure Pig. To get Pig started, you need to do the following:

  1. Set Hadoop's installation directory as a HADOOP_PREFIX variable.
  2. Add all the JAR files in Cassandra's lib directory to PIG_CLASSPATH.
  3. Add udf.import.list to the PIG_OPTS Pig options variable, as follows:
    export PIG_OPTS="$PIG_OPTS -Dudf.import.list=org.apache.cassandra.hadoop.pig";
    
  4. Set one of the Cassandra nodes' address, Cassandra RPC port, and Cassandra partitioner to PIG_INITIAL_ADDRESS, PIG_RPC_PORT, and PIG_PARTITIONER, respectively.

You may write a simple shell script that does this for you. Here is a shell script that accommodates the four steps (assuming, $CASSANDRA_HOME points to the Cassandra installation directory).

Note

Pig 0.14, Cassandra 2.1.2, and Hadoop 2.6.0 have some classpath conflicts among each other. Some JAR has been added and deleted to make the integration work. You may specifically want to replace all Guava libraries with Guava version 16.0. Cassandra does not like the older version, and Hadoop fails if we have the newer version (17 onwards, https://issues.apache.org/jira/browse/HADOOP-11032).

Also, keep an eye out for the bugs yet to be fixed in version 2.1.2 (https://issues.apache.org/jira/browse/CASSANDRA-8541 and https://issues.apache.org/jira/browse/CASSANDRA-8599).

export PIG_HOME=/home/naishe/apps/pig-0.14.0
export HADOOP_PREFIX=/home/naishe/apps/hadoop-2.6.0
export CASSANDRA_HOME=/home/naishe/apps/apache-cassandra-2.1.2
CLASSPATH=""
for cassandra_jar in $CASSANDRA_HOME/lib/*.jar; do
  CLASSPATH=$CLASSPATH:$cassandra_jar
done

PIG_JAR=""
for jar in $PIG_HOME/*.jar $PIG_HOME/lib/*.jar; do
   PIG_JAR=$PIG_JAR:$jar
done

export PIG_CLASSPATH=""
export PIG_CLASSPATH=$PIG_CLASSPATH:$CLASSPATH:$PIG_JAR;
export PIG_CLASSPATH=$PIG_CLASSPATH:/home/naishe/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.2/cassandra-driver-core-2.1.2.jar
export PIG_CLASSPATH=$PIG_CLASSPATH:/home/naishe/.m2/repository/org/apache/cassandra/cassandra-all/2.1.2/cassandra-all-2.1.2.jar

# Hack to avoid ClassNotFound exception
export PIG_CLASSPATH=$PIG_CLASSPATH:/home/naishe/apps/downloads/metrics-core-3.0.2.jar

export PIG_OPTS="$PIG_OPTS -Dudf.import.list=org.apache.cassandra.hadoop.pig";
export PIG_INITIAL_ADDRESS=localhost
export PIG_RPC_PORT=9160
export PIG_PARTITIONER=org.apache.cassandra.dht.Murmur3Partitioner

If everything works OK and Cassandra and Hadoop are up, you may access the Pig console to execute queries in an interactive mode as follows:

pig$ bin/pig
2013-07-22 13:32:22,709 [main] INFO  org.apache.pig.Main - Apache Pig version 0.11.1 (r1459641) compiled Mar 22 2013, 02:13:53
2013-07-22 13:32:22,710 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/nishant/apps/pig-0.11.1/pig_1374480142703.log
2013-07-22 13:32:22,757 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /home/nishant/.pigbootup not found
2013-07-22 13:32:23,080 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:9000
2013-07-22 13:32:24,133 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:9001
grunt>

Let's copy some Hadoop XML files into HDFS and run a word count on it as follows:

# Load all the files in $HADOOP_HOME/conf to pigdata in HDFS
$ bin/hadoopfs -put confpigdata

# --- in pig console ---
# load all the files from HDFS
grunt> A = load './pigdata';

# loop line by line in all the input files from A split them into words
grunt> B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word;

# Group the tokenized words into variable C, groub by attribute "word"
grunt> C = group B by word;

# Generare a map of number of terms in each group and group name
grunt> D = foreach C generate COUNT(B), group;

# print this map to console
grunt> dump D;

If it works, you will see this output:

...
(31,for)
(2,get)
(4,jks)
(12,job)
(1,log)
(1,map)
(2,max)
(1,pid)
(8,set)
(1,sig)
(1,ssh)
(83,the)
(1,two)
(6,use)
(3,via)
(3,who)
...

Integrating Pig and Cassandra

By getting Hadoop working with Cassandra, we are almost done and ready to use the Pig console to get data from Cassandra and store results back into Cassandra. One thing that you need to know is what storage method is used to store and retrieve data from Cassandra. It is CassandraStorage() that you will be using in your Pig Latin to transfer data to and from Cassandra. The usage is exactly the same as you would use in PigStorage().

In Pig, the data structure that is used to store/get data to/from Cassandra is a tuple of row keys and a bag of tuples, where each tuple is a column-name and column-value pair, such as this:

(ROW_KEY, { (COL1, VAL1), (COL2, VAL2), (COL3, VAL3), ...})

Here is an example of the word count from the Cassandra table. This example uses the same data (from Alice in Wonderland) as we did when we showed the MapReduce example with Cassandra. The book is split into lines, and each row contains 500 lines in 500 columns. There are a total of 6 rows:

# Pull Data from dataCF column family under testksKeyspace
grunt> rows = LOAD 'cassandra://testks/dataCF' USING CassandraStorage();
grunt> cols = FOREACH rows GENERATE flatten(columns);
grunt>vals = FOREACH cols GENERATE flatten(TOKENIZE((chararray)$1)) as word;
grunt>grps = group vals by word;
grunt>cnt = foreachgrps generate group, COUNT(vals), 'count' as ccnt;
grunt>grp_by_word = group cnt by $0;
grunt>cagg = foreachgrp_by_word generate group, cnt.(ccnt, $1);

# Put Data into result1CF column family under testksKeyspace
grunt> STORE cagg into 'cassandra://testks/result1CF' USING CassandraStorage();

2013-07-22 14:12:45,144 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: GROUP_BY
[-- snip --]
2013-07-22 14:12:50,464 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases cnt,cols,grps,rows,vals
2013-07-22 14:12:50,464 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: rows[6,7],cols[7,7],vals[8,7],cnt[10,6],grps[9,7] C: cnt[10,6],grps[9,7] R: cnt[10,6]
[--  snip --]
2013-07-22 14:13:45,626 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 97% complete
2013-07-22 14:13:49,669 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
[-- snip --]

Input(s):
Successfully read 6 records (360 bytes) from: "cassandra://testks/dataCF"

Output(s):
Successfully stored 4440 records in: "cassandra://testks/result1CF"
[-- snip --]
2013-07-22 14:13:49,693 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
Let's look at the result that is stored in result1CF and compare it with the previous result.
cqlsh> use testks;
cqlsh:testks> select * from result1CF where key = 'the';
 KEY | count
-----+-------
the |  1666

cqlsh:testks> select * from resultCF where key = 'Alice';
 KEY   | count
-------+-------
 Alice |   377

cqlsh:testks> select * from resultCF where key = 'Hatter';
 KEY    | count
--------+-------
 Hatter |    54

cqlsh:testks> select * from resultCF where key = 'Cat';
 KEY | count
-----+-------
 Cat |    23

There is a small difference in counting of the words, but that's likely due to the split that I use and the split function that Pig uses.

Note that the Pig Latin that we have used here may be very inefficient. The purpose of this example is to show the Cassandra and Pig integration. To learn about Pig Latin, look at the Pig documentation. Reading Apache Pig's official tutorial (http://pig.apache.org/docs/r0.11.1/start.html#tutorial) is recommended to know more about it.

You may also want to use CQL with Pig. You will have to use CqlStorage (with some versions, CqlStorage may not work so try using CqlNativeStorage), a word count example looks as follows:

grunt> alice = LOAD 'cql://hadoop_test/lines' USING CqlStorage();
grunt> B = foreach alice generate flatten(TOKENIZE((chararray)$0)) as word;
grunt> C = group B by word;
grunt> D = foreach C generate COUNT(B) as word_count, group as word;
grunt> E = FOREACH D GENERATE TOTUPLE(TOTUPLE('word',word)),TOTUPLE('word_count', word_count);

grunt> STORE E INTO 'cql://hadoop_test/output?output_query=UPDATE%20hadoop_test.output%20SET%20word_count%20%3D%20%3F' USING CqlStorage();

The only noticeable change is the way you load data from and store data in Cassandra. You need to use CqlStorage (or CqlNativeStorage) and pass CQL as the parameter to store. The query format is as follows:

cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>][&where_clause=<clause>][&split_size=<size>][&use_secondary=true|false][&partitioner=<partitioner>]]

Integration with other analytical tools

Although Hadoop and its companion projects are the most widely used tools for the analysis of large datasets, in the recent demand for real-time analytics and machine learning there are very successful tools that have been developed. Some of these tools store their own data like MongoDB, which is basically a database but also provides decent built-in analytics toolings as part of it, and Druid (https://github.com/metamx/druid), which claims to be a column store (like Cassandra) with fast analytical tooling. Software such as Twitter Storm (https://storm.apache.org/) that provide real-time stream analysis and Spark or Shark (https://spark.apache.org/) that do not have their own data store but databases can be plugged into their respective frameworks to get them working. The scope of this chapter does not allow us to discuss the how-to for all this software; however, it is not extremely painful to get them working with Cassandra.

Storm can easily be integrated with Cassandra by actually writing read or write code using the Cassandra driver in its Spout and/or Bolt definitions. This is probably the easiest approach. One may also look into a somewhat older Cassandra–Storm integration project at https://github.com/ptgoetz/storm-cassandra.

DataStax provides integration for Spark with Cassandra. If you need to integrate with Cassandra, it may be worth having a look at the documentation of DataStax's Spark–Cassandra-connector project at https://github.com/datastax/spark-cassandra-connector.

One may want to look into DataStax Enterprise Edition for the built-in integration of some of the popular analytical engines with Cassandra.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.71.211