Cassandra with Hadoop MapReduce

Cassandra provides built-in support for Hadoop. If you have ever written a MapReduce program, you will find out that writing a MapReduce task with Cassandra is quite similar to how one would write a MapReduce task for the data stored in HDFS. Cassandra supports input to Hadoop with ColumnFamilyInputFormat and output with ColumnFamilyOutputFormat classes, respectively. Apart from these, you will need to put Cassandra-specific settings for Hadoop via ConfigHelper. These three classes are enough to get you started. One other class that might be worth looking at is BulkOutputFormat. All these classes are under the org.apache.cassandra.hadoop.* package.

Note

To be able to compile the MapReduce code that uses Cassandra as a data source or data sink, you must have cassandra-all.jar in your classpath. You will also need to make Hadoop to be able to see jars in the Cassandra library. We will discuss this later in this chapter.

Let's understand the classes that we will be using to get Cassandra working for our MapReduce problem.

ColumnFamilyInputFormat

ColumnFamilyInputFormat is an implementation of org.apache.hadoop.mapred.InputFormat (or mapreduce in newer API). So, its implementation is dictated by the InputFormat class specifications. Hadoop uses this class to get data for the MapReduce tasks. It describes how to read data from column families into the Mapper instances.

The other job of ColumnFamilyInputFormat (or any implementation of InputFormat) is to fragment input data into small chunks that get fed to map tasks. Cassandra has ColumnInputSplit for this purpose. One can configure the number of rows per InputSplit via ConfigHelper.setInputSplitSize. But there is a caveat. It uses multiple get_slice_range queries for each InputSplit, so, as Cassandra documentation says, a smaller value will build up call overhead; on the other hand, too large a value may cause out-of-memory issues. Larger values are better for performance, so if you are planning to play with this parameter do some calculation based on median column size to avoid memory overflow. Trial and error can be handy. The default split size is 64 x 1024 rows.

ColumnFamilyOutputFormat

OutputFormat is the mechanism of writing the result from MapReduce to a permanent (usually) storage. Cassandra implements Hadoop's OutputFormat, that is, ColumnFamilyOutputFormat. It enables Hadoop to write the result from the reduce task as column family rows. It is implemented such that the results are written, to the column family, in batches. This is a performance improvement and this mechanism is called lazy write-back caching.

ConfigHelper

ConfigHelper is a gateway to configure Cassandra-specific settings for Hadoop. It is a pretty plain utility class that validates the settings passed and sets into Hadoop's org.apache.hadoop.conf.Configuration instance for the job. This configuration is made available to the Mapper and the Reducer.

ConfigHelper saves developers from inputting a wrong property name because all the properties are set using a method; any typo will appear at compile time. It may be worth looking at JavaDoc for ConfigHelper. Here are some of the commonly used methods:

  • setInputInitialAddress: It can be a hostname or private IP of one of the Cassandra nodes.
  • SetInputRpcPort: It will set the RPC port address if it has been altered from default. If not set, it uses default thrift port 9160.
  • setInputPartitioner: It will set the appropriate partitioner according to the underlying Cassandra storage setting.
  • SetInputColumnFamily: It will set the column family details to be able to pull data from.
  • SetInputSlicePredicate: It will set the columns that are pulled from the column family to provide Mapper to work on.
  • SetOutputInitialAddress: It will set the address of the Cassandra cluster (one of the nodes) where the result is being published; it is usually similar to InputInitialAddress.
  • SetOutputRpcPort: It will set the RPC port to cluster where the result is stored.
  • SetOutputPartitioner: It is the partitioner used in the output cluster.
  • SetOutputColumnFamily: It will set the column family details to store results in.

Since Version 1.1, Cassandra added support to wide-row column families, bulk loading, and secondary indexes.

Wide-row support

Earlier, having multimillions of columns was a problem in Cassandra Hadoop integration. It was pulling a row per call limited by SlicePredicate. Version 1.1 onward, you can pass the wide-row Boolean parameter as true, as shown in the following snippet:

ConfigHelper.setInputColumnFamily(
conf,
keyspace,
inCF,
true// SET WIDEROW = TRUE
);

When wide row is true, the rows are fed to the Mapper one column at a time that is, you will iterate column by column.

Bulk loading

BulkOutputFormat is another utility that Cassandra provides to improve the write performance of jobs that result in large data. It streams the data in binary format, which is much quicker than inserting one by one. It uses SSTableLoader to do this. Refer to SSTableLoader in the Using Cassandra bulk loader to restore the data section in Chapter 6, Managing a Cluster – Scaling, Node Repair, and Backup.

Job job = new Job(conf);
job.setOutputFormatClass(BulkOutputFormat.class);

Secondary index support

One can use secondary index when pulling data from Cassandra to pass it on to the job. This is another improvement. It makes Cassandra shift the data and pass only the relevant data to Hadoop, instead of Hadoop burning the CPU cycles to weed out the data that is not going to be used in the computation. It lowers the overhead of passing extra data to Hadoop. Here is an example.

IndexExpression electronicItems = 
		  new IndexExpression( 
		      ByteBufferUtil.bytes("item_category"), 
		      IndexOperator.EQ, 
		      ByteBufferUtil.bytes("electronics") 
		  ); 

IndexExpression soldAfter2012 = 
		  new IndexExpression( 
		       ByteBufferUtil.bytes("sell_year"), 
		       IndexOperator.GT, 
		       ByteBufferUtil.bytes(2012) 
		  ); 
				  
ConfigHelper.setInputRange(conf, Arrays.asList(electronicItems, soldAfter2012));

The previous code returns the rows that fall in the electronics category and were sold after the year 2012.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.28.179