Configuring Hadoop with Cassandra in itself is quite some work. Writing verbose and long Java code to do something as trivial as word count is a turn-off to a high-level user like a data analyst. Wouldn't it be nice if we had a SQL-like interpreter that converts commands to MapReduce programs for us? Pig is exactly that tool.
Pig provides a SQL-like language called Pig Latin. One can write complex MapReduce programs using Pig Latin. You can create a set of intermediate variables that are a result of an operation and it can be used in subsequent operations, in the same way a stored procedure in RDBMS would. Finally, the output of an operation can be displayed on a screen or can be stored in a permanent storage such as HDFS or Cassandra.
Pig installation is very simple; what is hard is getting it to work with Hadoop and Cassandra nicely. To install Pig, just download the latest version of Pig and untar it.
$ wget http://www.eng.lsu.edu/mirrors/apache/pig/pig-0.11.1/pig-0.11.1.tar.gz $ tar xvzf pig-0.11.1.tar.gz $ ln -s pig-0.11.1 pig
Lets call this directory $PIG_HOME
. Ideally, you should just execute $PIG_HOME/bin/pig
and the Pig console should start to work given that your Cassandra and Hadoop are up and working. Unfortunately, it does not. Documentation, at the time of writing, is not adequate to configure Pig. To get Pig started, you need to do the following:
HADOOP_PREFIX
variable.jar
files in the Cassandra lib
directory to PIG_CLASSPATH
.udf.import.list
to the Pig options variable, PIG_OPTS
, in this way:export PIG_OPTS="$PIG_OPTS -Dudf.import.list=org.apache.cassandra.hadoop.pig";
PIG_INITIAL_ADDRESS
, PIG_RPC_PORT
, and PIG_PARTITIONER
respectively.You may write a simple shell script that does this for you. Here is a shell script that accommodates the four steps (assuming $CASSANDRA_HOME
points to the Cassandra installation directory):
export CASSANDRA_HOME=/home/nishant/apps/cassandra11 for cassandra_jar in $CASSANDRA_HOME/lib/*.jar; do CLASSPATH=$CLASSPATH:$cassandra_jar done export PIG_CLASSPATH=$PIG_CLASSPATH:$CLASSPATH; export PIG_OPTS="$PIG_OPTS -Dudf.import.list=org.apache.cassandra.hadoop.pig"; export HADOOP_PREFIX=/home/nishant/apps/hadoop export PIG_INITIAL_ADDRESS=localhost; export PIG_RPC_PORT=9160; export PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner;
If everything goes OK and Cassandra and Hadoop are up, you may access the Pig console to execute queries in an interactive mode.
pig$ bin/pig 2013-07-22 13:32:22,709 [main] INFO org.apache.pig.Main - Apache Pig version 0.11.1 (r1459641) compiled Mar 22 2013, 02:13:53 2013-07-22 13:32:22,710 [main] INFO org.apache.pig.Main - Logging error messages to: /home/nishant/apps/pig-0.11.1/pig_1374480142703.log 2013-07-22 13:32:22,757 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /home/nishant/.pigbootup not found 2013-07-22 13:32:23,080 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:9000 2013-07-22 13:32:24,133 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:9001 grunt>
Let's copy some Hadoop XML files into HDFS and run a word count on it.
# Load all the files in $HADOOP_HOME/conf to pigdata in HDFS $ bin/hadoop fs -put conf pigdata # --- in pig console --- # load all the files from HDFS grunt> A = load './pigdata' ; # loop line by line in all the input files from A split them into words grunt> B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word; # Group the tokenized words into variable C, groub by attribute “word†grunt> C = group B by word; # Generare a map of number of terms in each group and group name grunt> D = foreach C generate COUNT(B), group; # print this map to console grunt> dump D;
If it works, you will see an output something like this:
... (31,for) (2,get) (4,jks) (12,job) (1,log) (1,map) (2,max) (1,pid) (8,set) (1,sig) (1,ssh) (83,the) (1,two) (6,use) (3,via) (3,who) ...
By getting Hadoop working with Cassandra, we are almost done and ready to use the Pig console to get data from Cassandra and store results back to Cassandra. One thing that you need to know is what storage method is used to store and retrieve data from Cassandra. It is CassandraStorage()
that you will be using in your Pig Latin to transfer data to and from Cassandra. Usage is exactly the same as you would use PigStorage()
.
In Pig, the data structure that is used to store/get data to/from Cassandra is a tuple of row key and a bag of tuples, where each tuple is a column name and column value pair, such as this:
(ROW_KEY, { (COL1, VAL1), (COL2, VAL2), (COL3, VAL3), ...})
Here is an example of word count from the Cassandra table. This example uses the same data ("Alice in Wonderland" book) as we did when we showed the MapReduce example with Cassandra. The book is split into lines and each row contains 500 lines in 500 columns. There are a total of six rows.
# Pull Data from dataCF column family under testks Keyspace grunt> rows = LOAD 'cassandra://testks/dataCF' USING CassandraStorage(); grunt> cols = FOREACH rows GENERATE flatten(columns); grunt> vals = FOREACH cols GENERATE flatten(TOKENIZE((chararray)$1)) as word; grunt> grps = group vals by word; grunt> cnt = foreach grps generate group, COUNT(vals), 'count' as ccnt; grunt> grp_by_word = group cnt by $0; grunt> cagg = foreach grp_by_word generate group, cnt.(ccnt, $1); # Put Data into result1CF column family under testks Keyspace grunt> STORE cagg into 'cassandra://testks/result1CF' USING CassandraStorage(); 2013-07-22 14:12:45,144 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: GROUP_BY [-- snip --] 2013-07-22 14:12:50,464 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases cnt,cols,grps,rows,vals 2013-07-22 14:12:50,464 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: rows[6,7],cols[7,7],vals[8,7],cnt[10,6],grps[9,7] C: cnt[10,6],grps[9,7] R: cnt[10,6] [-- snip --] 2013-07-22 14:13:45,626 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 97% complete 2013-07-22 14:13:49,669 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete [-- snip --] Input(s): Successfully read 6 records (360 bytes) from: "cassandra://testks/dataCF" Output(s): Successfully stored 4440 records in: "cassandra://testks/result1CF" [-- snip --] 2013-07-22 14:13:49,693 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
Let's look at the result that is stored in result1CF
and compare it with the previous result.
cqlsh> use testks; cqlsh:testks> select * from result1CF where key = 'the'; KEY | count -----+------- the | 1666 cqlsh:testks> select * from resultCF where key = 'Alice'; KEY | count -------+------- Alice | 377 cqlsh:testks> select * from resultCF where key = 'Hatter'; KEY | count --------+------- Hatter | 54 cqlsh:testks> select * from resultCF where key = 'Cat'; KEY | count -----+------- Cat | 23
There is a small difference in the counting of the
, but that's likely due to the split that I use and the split function that Pig uses.
Please note that the Pig Latin that we have used here may be very inefficient. The purpose of this example is to show Cassandra and Pig integration. To learn about Pig Latin, look into the Pig documentation. Apache Pig's official tutorial (http://pig.apache.org/docs/r0.11.1/start.html#tutorial) is recommended reading to know more about it.
3.128.205.21