Integration with Pig

Configuring Hadoop with Cassandra in itself is quite some work. Writing verbose and long Java code to do something as trivial as word count is a turn-off to a high-level user like a data analyst. Wouldn't it be nice if we had a SQL-like interpreter that converts commands to MapReduce programs for us? Pig is exactly that tool.

Note

Hadoop does not only support Java but also MapReduce programs can be written more concisely in multiple languages such as Scala, Python, C++ (Pipes), R, and many adapter languages.

Pig provides a SQL-like language called Pig Latin. One can write complex MapReduce programs using Pig Latin. You can create a set of intermediate variables that are a result of an operation and it can be used in subsequent operations, in the same way a stored procedure in RDBMS would. Finally, the output of an operation can be displayed on a screen or can be stored in a permanent storage such as HDFS or Cassandra.

Installing Pig

Pig installation is very simple; what is hard is getting it to work with Hadoop and Cassandra nicely. To install Pig, just download the latest version of Pig and untar it.

$ wget http://www.eng.lsu.edu/mirrors/apache/pig/pig-0.11.1/pig-0.11.1.tar.gz
$ tar xvzf pig-0.11.1.tar.gz
$ ln -s pig-0.11.1 pig

Lets call this directory $PIG_HOME. Ideally, you should just execute $PIG_HOME/bin/pig and the Pig console should start to work given that your Cassandra and Hadoop are up and working. Unfortunately, it does not. Documentation, at the time of writing, is not adequate to configure Pig. To get Pig started, you need to do the following:

  1. Set Hadoop's installation directory as a HADOOP_PREFIX variable.
  2. Add all the jar files in the Cassandra lib directory to PIG_CLASSPATH.
  3. Add udf.import.list to the Pig options variable, PIG_OPTS, in this way:
    export PIG_OPTS="$PIG_OPTS -Dudf.import.list=org.apache.cassandra.hadoop.pig";
  4. Set one of the Cassandra nodes' address, Cassandra RPC port, and Cassandra partitioner to PIG_INITIAL_ADDRESS, PIG_RPC_PORT, and PIG_PARTITIONER respectively.

You may write a simple shell script that does this for you. Here is a shell script that accommodates the four steps (assuming $CASSANDRA_HOME points to the Cassandra installation directory):

export CASSANDRA_HOME=/home/nishant/apps/cassandra11

for cassandra_jar in $CASSANDRA_HOME/lib/*.jar; do
CLASSPATH=$CLASSPATH:$cassandra_jar
done

export PIG_CLASSPATH=$PIG_CLASSPATH:$CLASSPATH;

export PIG_OPTS="$PIG_OPTS -Dudf.import.list=org.apache.cassandra.hadoop.pig";

export HADOOP_PREFIX=/home/nishant/apps/hadoop
export PIG_INITIAL_ADDRESS=localhost;
export PIG_RPC_PORT=9160;
export PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner;

If everything goes OK and Cassandra and Hadoop are up, you may access the Pig console to execute queries in an interactive mode.

pig$ bin/pig
2013-07-22 13:32:22,709 [main] INFO  org.apache.pig.Main - Apache Pig version 0.11.1 (r1459641) compiled Mar 22 2013, 02:13:53
2013-07-22 13:32:22,710 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/nishant/apps/pig-0.11.1/pig_1374480142703.log
2013-07-22 13:32:22,757 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /home/nishant/.pigbootup not found
2013-07-22 13:32:23,080 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:9000
2013-07-22 13:32:24,133 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:9001
grunt>

Let's copy some Hadoop XML files into HDFS and run a word count on it.

# Load all the files in $HADOOP_HOME/conf to pigdata in HDFS
$ bin/hadoop fs -put conf pigdata 

# --- in pig console ---
# load all the files from HDFS
grunt> A = load './pigdata' ;

# loop line by line in all the input files from A split them into words
grunt> B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word; 

# Group the tokenized words into variable C, groub by attribute “wordâ€
grunt> C = group B by word; 

# Generare a map of number of terms in each group and group name
grunt> D = foreach C generate COUNT(B), group; 

# print this map to console
grunt> dump D; 

If it works, you will see an output something like this:

...
(31,for)
(2,get)
(4,jks)
(12,job)
(1,log)
(1,map)
(2,max)
(1,pid)
(8,set)
(1,sig)
(1,ssh)
(83,the)
(1,two)
(6,use)
(3,via)
(3,who)
...

Integrating Pig and Cassandra

By getting Hadoop working with Cassandra, we are almost done and ready to use the Pig console to get data from Cassandra and store results back to Cassandra. One thing that you need to know is what storage method is used to store and retrieve data from Cassandra. It is CassandraStorage() that you will be using in your Pig Latin to transfer data to and from Cassandra. Usage is exactly the same as you would use PigStorage().

In Pig, the data structure that is used to store/get data to/from Cassandra is a tuple of row key and a bag of tuples, where each tuple is a column name and column value pair, such as this:

(ROW_KEY, { (COL1, VAL1), (COL2, VAL2), (COL3, VAL3), ...})

Here is an example of word count from the Cassandra table. This example uses the same data ("Alice in Wonderland" book) as we did when we showed the MapReduce example with Cassandra. The book is split into lines and each row contains 500 lines in 500 columns. There are a total of six rows.

# Pull Data from dataCF column family under testks Keyspace
grunt> rows = LOAD 'cassandra://testks/dataCF' USING CassandraStorage();       
grunt> cols = FOREACH rows GENERATE flatten(columns); 
grunt> vals = FOREACH cols GENERATE flatten(TOKENIZE((chararray)$1)) as word; 
grunt> grps = group vals by word; 
grunt> cnt = foreach grps generate group, COUNT(vals), 'count' as ccnt; 
grunt> grp_by_word = group cnt by $0; 
grunt> cagg = foreach grp_by_word generate group, cnt.(ccnt, $1); 

# Put Data into result1CF column family under testks Keyspace
grunt> STORE cagg into 'cassandra://testks/result1CF' USING CassandraStorage(); 

2013-07-22 14:12:45,144 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: GROUP_BY 
[-- snip --] 
2013-07-22 14:12:50,464 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases cnt,cols,grps,rows,vals 
2013-07-22 14:12:50,464 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: rows[6,7],cols[7,7],vals[8,7],cnt[10,6],grps[9,7] C: cnt[10,6],grps[9,7] R: cnt[10,6] 
[--  snip --] 
2013-07-22 14:13:45,626 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 97% complete 
2013-07-22 14:13:49,669 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete 
[-- snip --] 

Input(s): 
Successfully read 6 records (360 bytes) from: "cassandra://testks/dataCF" 

Output(s): 
Successfully stored 4440 records in: "cassandra://testks/result1CF" 
[-- snip --] 
2013-07-22 14:13:49,693 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

Let's look at the result that is stored in result1CF and compare it with the previous result.

cqlsh> use testks;
cqlsh:testks> select * from result1CF where key = 'the';
 KEY | count
-----+-------
 the |  1666

cqlsh:testks> select * from resultCF where key = 'Alice';
 KEY   | count
-------+-------
 Alice |   377

cqlsh:testks> select * from resultCF where key = 'Hatter';
 KEY    | count
--------+-------
 Hatter |    54

cqlsh:testks> select * from resultCF where key = 'Cat';
 KEY | count
-----+-------
 Cat |    23

There is a small difference in the counting of the, but that's likely due to the split that I use and the split function that Pig uses.

Please note that the Pig Latin that we have used here may be very inefficient. The purpose of this example is to show Cassandra and Pig integration. To learn about Pig Latin, look into the Pig documentation. Apache Pig's official tutorial (http://pig.apache.org/docs/r0.11.1/start.html#tutorial) is recommended reading to know more about it.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.128.205.21