So, with more than enough (rather boring) theory, we are ready to get some excitement. In this section, we will do a word count of a book. It will be more interesting than the grep example.
In this example we load Lewis Carroll's novel Alice in Wonderland (http://en.wikipedia.org/wiki/Alice%27s_Adventures_in_Wonderland) in Cassandra. To prepare this data, we read the text file line by line and store 500 lines in one row. The row names are formatted as row_1, row_2, and so on and the columns in each row have names such as col_1, col_2, and so on. Each row has at most 500 columns and each column has one line from the file.
To avoid noises, we have removed punctuations from the lines during the load. We could certainly do the noise reduction in the MapReduce code, but we wanted to keep it simple. What follows is the code and its explanation. It is suggested to download the code either from the author's GitHub account or from the publisher's website. Keep it handy while reading this chapter. The code is eventually compiled and submitted to Hadoop MapReduce to execute the compiled jar
file. We use the Maven command mvn clean install
to compile and create a jar
file. If you are unaware of Maven or are new to Java, you can compile the files using appropriate dependencies or jar
files in the classpath. Refer to the pom.xml
file in the project to know what jar
files you need to compile the example in Java.
Assuming that we have data ready in Cassandra to run MapReduce on it, we will write Mapper, Reducer, and a main
method. Here is the Mapper:
public static class WordMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable>{ private static final IntWritable ONE = new IntWritable(1); private Text word = new Text(); @Override protected void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> cols, Context context) throws IOException, InterruptedException { //Iterate through the column values for(IColumn col: cols.values()){ String val = ByteBufferUtil.string(col.value()); StringTokenizer tokenizer = new StringTokenizer(val); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, ONE); } } } }
This is what our Mapper looks like. To a person who has some experience in writing MapReduce programs, this does not deviate much from a regular Mapper. A couple of things to note:
ByteBuffer
and it is the row key.org.apache.cassandra.utils.ByteBufferUtil
to convert ByteBuffer
to meaningful types.public static class WordReducer extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable value: values){ sum = sum + value.get(); } Column col = new Column(); col.setName(ByteBufferUtil.bytes("count")); col.setValue(ByteBufferUtil.bytes(sum)); col.setTimestamp(System.currentTimeMillis()); Mutation mutation = new Mutation(); mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn()); mutation.getColumn_or_supercolumn().setColumn(col); context.write( ByteBufferUtil.bytes(key.toString()), Collections.singletonList(mutation) ); } }
Reducer is a little more interesting than Mapper. It is because we are doing two things. One, we are counting the number of grouped elements that come to Reducer. From our Mapper, we know that it is grouped by word. So at the end of looping through the values, we will get the number of instances of that word. The second thing that we are doing here is storing this value to Cassandra. Instead of outputting the result to HDFS, we store it in Cassandra with row key as word, and we add a column named count
that will hold the value that we just obtained in the previous step. You can see that there is no environment-specific configuration done here; we just instruct what to store in Cassandra and how, and we are done. So, the question arises, where do we set all the environment-specific and Cassandra-specific settings? The answer is in the main
method. Here is what the main
method for this particular example looks like. But in any Cassandra-based Hadoop project, it will not vary much.
public class CassandraWordCount extends Configured implements Tool { [-- snip --] public int run(String[] args) throws Exception { Job job = new Job(getConf(), "cassandrawordcount"); job.setJarByClass(getClass()); // Anything you set in conf will be available to Mapper and Reducer Configuration conf = job.getConfiguration(); // set mapper and reducer job.setMapperClass(WordMapper.class); job.setReducerClass(WordReducer.class); // Cassandra Specific settings for ingesting CF ConfigHelper.setInputInitialAddress(conf, Setup.CASSANDRA_HOST_ADDR); ConfigHelper.setInputRpcPort(conf, String.valueOf(Setup.CASSANDRA_RPC_PORT)); ConfigHelper.setInputPartitioner(conf, RandomPartitioner.class.getName()); ConfigHelper.setInputColumnFamily(conf, Setup.KEYSPACE, Setup.INPUT_CF); SliceRange sliceRange = new SliceRange( ByteBufferUtil.bytes(""), ByteBufferUtil.bytes(""), false, Integer.MAX_VALUE); SlicePredicate predicate = new SlicePredicate() .setSlice_range(sliceRange); ConfigHelper.setInputSlicePredicate(conf, predicate); job.setInputFormatClass(ColumnFamilyInputFormat.class); // Cassandra specific output setting ConfigHelper.setOutputInitialAddress(conf, Setup.CASSANDRA_HOST_ADDR); ConfigHelper.setOutputRpcPort(conf, String.valueOf(Setup.CASSANDRA_RPC_PORT)); ConfigHelper.setOutputPartitioner(conf, RandomPartitioner.class.getName()); ConfigHelper.setOutputColumnFamily(conf, Setup.KEYSPACE, Setup.OUTPUT_CF); // set output class types job.setOutputKeyClass(ByteBuffer.class); job.setOutputValueClass(List.class); job.setOutputFormatClass(ColumnFamilyOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // verbose job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception{ ToolRunner.run(new Configuration(), new CassandraWordCount(), args); System.exit(0); } }
All right, lots of things, but nothing that we do not know about. Starting from the main
method, we provide an instance of our main
class and any parameter that is passed from the command-line interface and we kick off ToolRunner
. ToolRunner
executes the run
method, where all the settings for environment and Cassandra are. We also tell where our Mapper and producer for this job are.
We tell Hadoop how to pull data from Cassandra by providing SlicePredicate
where we pull a complete row by not setting the start column name, the end column name, and setting the count to two billion. One may want to modify and just set wide row to true and achieve the same without worrying about SlicePredicate
.
To execute this example, compile the code and create a jar
file. Add any required external libraries or dependency to the classpath or edit conf/hadoop-env.sh
and add the location of the jars. One of the hard requirements to get all this Cassandra-related stuff running is to have the Cassandra library directory in Hadoop's classpath. To do that, edit $HADOOP_HOME/conf/hadoop-env.sh
and update the classpath like this:
$ conf/hadoop-env.sh
# Extra Java CLASSPATH elements.Optional.
# export HADOOP_CLASSPATH=
export HADOOP_CLASSPATH=/home/nishant/apps/cassandra11/lib/*:$HADOOP_CLASSPATH
Make sure Hadoop and Cassandra are completely up and running. To execute your MapReduce program, submit the jar
file to Hadoop with the appropriate classpath, if needed, and the fully qualified class name of the main class.
$ bin/hadoop jar materingcassadra-0.0.1-SNAPSHOT.jar in.naishe.mc.CassandraWordCount
Hadoop provides a pretty simple web-based GUI to monitor and view debug logs, system out stream messages, and system error stream messages. One can monitor the status of a running, failed, or previously run job details. By default, this portal is available at the following URL: http://JOBTRACKER_ADDRESS:50030
. So, if you are running everything locally, the address will be http://localhost:50030
. The preceding figure shows a screenshot of the page. You can click on a job link and view details. On the job detail page, you can see logs for Mapper or Reducer processes.
As per our Reducer configuration, the results can be accessed from Cassandra from the appropriate column family. You may observe the result there. So, as expected, you will find the
in highest use, and there are a decent number of references to Alice
, Hatter
, and Cat
.
cqlsh:testks> select * from resultCF where key = 'the'; KEY | count -------+------- the | 1664 cqlsh:testks> select * from resultCF where key = 'Alice'; KEY | count -------+------- Alice | 377 cqlsh:testks> select * from resultCF where key = 'Hatter'; KEY | count --------+------- Hatter | 54 cqlsh:testks> select * from resultCF where key = 'Cat'; KEY | count -----+------- Cat | 23
18.225.254.192