Cassandra and Hadoop in action

So, with more than enough (rather boring) theory, we are ready to get some excitement. In this section, we will do a word count of a book. It will be more interesting than the grep example.

In this example we load Lewis Carroll's novel Alice in Wonderland (http://en.wikipedia.org/wiki/Alice%27s_Adventures_in_Wonderland) in Cassandra. To prepare this data, we read the text file line by line and store 500 lines in one row. The row names are formatted as row_1, row_2, and so on and the columns in each row have names such as col_1, col_2, and so on. Each row has at most 500 columns and each column has one line from the file.

To avoid noises, we have removed punctuations from the lines during the load. We could certainly do the noise reduction in the MapReduce code, but we wanted to keep it simple. What follows is the code and its explanation. It is suggested to download the code either from the author's GitHub account or from the publisher's website. Keep it handy while reading this chapter. The code is eventually compiled and submitted to Hadoop MapReduce to execute the compiled jar file. We use the Maven command mvn clean install to compile and create a jar file. If you are unaware of Maven or are new to Java, you can compile the files using appropriate dependencies or jar files in the classpath. Refer to the pom.xml file in the project to know what jar files you need to compile the example in Java.

Assuming that we have data ready in Cassandra to run MapReduce on it, we will write Mapper, Reducer, and a main method. Here is the Mapper:

public static class WordMapper 
extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable>{ 
    private static final IntWritable ONE = new IntWritable(1); 
    private Text word = new Text(); 
    
    @Override 
    protected void map(ByteBuffer key, 
                       SortedMap<ByteBuffer, IColumn> cols, 
                       Context context) 
    throws IOException, InterruptedException { 
      
      //Iterate through the column values 
      for(IColumn col: cols.values()){ 
        String val = ByteBufferUtil.string(col.value()); 
        StringTokenizer tokenizer = new StringTokenizer(val); 
            
            while (tokenizer.hasMoreTokens()) { 
                word.set(tokenizer.nextToken()); 
                context.write(word, ONE); 
            } 
      } 
    } 
}

This is what our Mapper looks like. To a person who has some experience in writing MapReduce programs, this does not deviate much from a regular Mapper. A couple of things to note:

  1. Cassandra feeds a sorted map to Mapper. This is sorted by column name and it is basically column-name, column-value pair.
  2. The key is of type ByteBuffer and it is the row key.
  3. Use org.apache.cassandra.utils.ByteBufferUtil to convert ByteBuffer to meaningful types.
  4. If you want to process column by column, loop through the columns sorted map.
  5. Write out the output that you want this Mapper to forward to Reducer. The values that you write to context is sorted and grouped by the framework and forwarded to the Reducer.
  6. Now that we have done the basic task of splitting the text in each column and forwarding them with key as word and value as one, in order to count each word, we need to get all the words that were forwarded by Mapper at one place so that we can just iterate in the grouped key-value pairs of word and 1, and update a counter until all the occurrences of that word exhausts. Here is what our Mapper looks like:
    public static class WordReducer 
    extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>{ 
    	 
    	@Override 
    	protected void reduce(Text key, 
    	                      Iterable<IntWritable> values, 
    	                      Context context) 
    	throws IOException, InterruptedException { 
    		int sum = 0; 
    		for(IntWritable value: values){ 
    			sum = sum + value.get(); 
    		} 
    
    		Column col = new Column(); 
    		col.setName(ByteBufferUtil.bytes("count")); 
    		col.setValue(ByteBufferUtil.bytes(sum)); 
    		col.setTimestamp(System.currentTimeMillis()); 
    		 
    		Mutation mutation = new Mutation(); 
    		mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn()); 
    		mutation.getColumn_or_supercolumn().setColumn(col); 
    		context.write(
                          ByteBufferUtil.bytes(key.toString()),
                          Collections.singletonList(mutation)
                        ); 
    	} 
    }

Reducer is a little more interesting than Mapper. It is because we are doing two things. One, we are counting the number of grouped elements that come to Reducer. From our Mapper, we know that it is grouped by word. So at the end of looping through the values, we will get the number of instances of that word. The second thing that we are doing here is storing this value to Cassandra. Instead of outputting the result to HDFS, we store it in Cassandra with row key as word, and we add a column named count that will hold the value that we just obtained in the previous step. You can see that there is no environment-specific configuration done here; we just instruct what to store in Cassandra and how, and we are done. So, the question arises, where do we set all the environment-specific and Cassandra-specific settings? The answer is in the main method. Here is what the main method for this particular example looks like. But in any Cassandra-based Hadoop project, it will not vary much.

public class CassandraWordCount extends Configured implements Tool { 

  [-- snip --] 
  
  public int run(String[] args) throws Exception { 
    Job job = new Job(getConf(), "cassandrawordcount"); 
    job.setJarByClass(getClass()); 
    
//  Anything you set in conf will be available to Mapper and Reducer 
    Configuration conf = job.getConfiguration(); 
    
//  set mapper and reducer 
    job.setMapperClass(WordMapper.class); 
    job.setReducerClass(WordReducer.class); 
    
//  Cassandra Specific settings for ingesting CF 
    ConfigHelper.setInputInitialAddress(conf, Setup.CASSANDRA_HOST_ADDR); 
    ConfigHelper.setInputRpcPort(conf, String.valueOf(Setup.CASSANDRA_RPC_PORT)); 
    ConfigHelper.setInputPartitioner(conf, RandomPartitioner.class.getName()); 
    ConfigHelper.setInputColumnFamily(conf, Setup.KEYSPACE, Setup.INPUT_CF); 
    
    SliceRange sliceRange = new SliceRange( 
                    ByteBufferUtil.bytes(""), 
                    ByteBufferUtil.bytes(""), 
                    false, 
                    Integer.MAX_VALUE); 
    SlicePredicate predicate = new SlicePredicate() 
                  .setSlice_range(sliceRange); 
    ConfigHelper.setInputSlicePredicate(conf, predicate); 
    
    job.setInputFormatClass(ColumnFamilyInputFormat.class); 
    
    
//  Cassandra specific output setting 
    ConfigHelper.setOutputInitialAddress(conf, Setup.CASSANDRA_HOST_ADDR); 
    ConfigHelper.setOutputRpcPort(conf, String.valueOf(Setup.CASSANDRA_RPC_PORT)); 
    ConfigHelper.setOutputPartitioner(conf, RandomPartitioner.class.getName()); 
    ConfigHelper.setOutputColumnFamily(conf, Setup.KEYSPACE, Setup.OUTPUT_CF); 
    
//  set output class types 
    job.setOutputKeyClass(ByteBuffer.class); 
    job.setOutputValueClass(List.class); 
    job.setOutputFormatClass(ColumnFamilyOutputFormat.class); 
    
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(IntWritable.class); 
    
//  verbose 
    job.waitForCompletion(true); 
    return 0; 
  } 
  
  public static void main(String[] args) throws Exception{ 
    ToolRunner.run(new Configuration(), new CassandraWordCount(), args); 
    System.exit(0); 
  } 

}

All right, lots of things, but nothing that we do not know about. Starting from the main method, we provide an instance of our main class and any parameter that is passed from the command-line interface and we kick off ToolRunner. ToolRunner executes the run method, where all the settings for environment and Cassandra are. We also tell where our Mapper and producer for this job are.

We tell Hadoop how to pull data from Cassandra by providing SlicePredicate where we pull a complete row by not setting the start column name, the end column name, and setting the count to two billion. One may want to modify and just set wide row to true and achieve the same without worrying about SlicePredicate.

Executing, debugging, monitoring, and looking at results

To execute this example, compile the code and create a jar file. Add any required external libraries or dependency to the classpath or edit conf/hadoop-env.sh and add the location of the jars. One of the hard requirements to get all this Cassandra-related stuff running is to have the Cassandra library directory in Hadoop's classpath. To do that, edit $HADOOP_HOME/conf/hadoop-env.sh and update the classpath like this:

$ conf/hadoop-env.sh
# Extra Java CLASSPATH elements.Optional.
# export HADOOP_CLASSPATH=
export HADOOP_CLASSPATH=/home/nishant/apps/cassandra11/lib/*:$HADOOP_CLASSPATH

Make sure Hadoop and Cassandra are completely up and running. To execute your MapReduce program, submit the jar file to Hadoop with the appropriate classpath, if needed, and the fully qualified class name of the main class.

$ bin/hadoop jar materingcassadra-0.0.1-SNAPSHOT.jar in.naishe.mc.CassandraWordCount
Executing, debugging, monitoring, and looking at results

Figure 8.4: The Hadoop JobTracker GUI. It enables users to watch running jobs and completion; most importantly, you can see logs, system outs, and system error streams by drilling into job IDs

Hadoop provides a pretty simple web-based GUI to monitor and view debug logs, system out stream messages, and system error stream messages. One can monitor the status of a running, failed, or previously run job details. By default, this portal is available at the following URL: http://JOBTRACKER_ADDRESS:50030. So, if you are running everything locally, the address will be http://localhost:50030. The preceding figure shows a screenshot of the page. You can click on a job link and view details. On the job detail page, you can see logs for Mapper or Reducer processes.

As per our Reducer configuration, the results can be accessed from Cassandra from the appropriate column family. You may observe the result there. So, as expected, you will find the in highest use, and there are a decent number of references to Alice, Hatter, and Cat.

cqlsh:testks> select * from resultCF where key = 'the';
 KEY   | count
-------+-------
 the   | 1664

cqlsh:testks> select * from resultCF where key = 'Alice';
 KEY   | count
-------+-------
 Alice |   377

cqlsh:testks> select * from resultCF where key = 'Hatter';
 KEY    | count
--------+-------
 Hatter |    54

cqlsh:testks> select * from resultCF where key = 'Cat';
 KEY | count
-----+-------
 Cat |    23
Executing, debugging, monitoring, and looking at results

Figure 8.5: Typical setup for analytics with Cassandra and Hadoop

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.225.254.192