Cassandra with Hadoop MapReduce

Cassandra provides built-in support for Hadoop. If you have ever written a MapReduce program, you will find out that writing a MapReduce task with Cassandra is quite similar to how one would write a MapReduce task for the data stored in HDFS. Cassandra supports input to Hadoop with ColumnFamilyInputFormat and output with the ColumnFamilyOutputFormat classes, respectively. Apart from these, you will need to put Cassandra-specific settings for Hadoop via ConfigHelper. These three classes are enough to get you started. Another class that might be worth looking at is BulkOutputFormat. All these classes are under the org.apache.cassandra.hadoop.* package.

To be able to compile the MapReduce code that uses Cassandra as data source or data sink, you must have cassandra-all.jar in your classpath. You will also need to make Hadoop to be able to see JARs in the Cassandra library. We will discuss this later in this chapter.

Let's understand the classes that we will be using to get Cassandra working for our MapReduce problem.

Preparing Cassandra for Hadoop

The nodes that have Hadoop on them need to have virtual nodes disabled. So, if you have a Cassandra cluster, you may want to split it into two data centers. You will need to configure a proper endpoint snitch (such as PropertyFileSnitch) to separate the two data centers. The data centers need not actually be in different data centers. To disable the virtual node, change the following in cassandra.yaml:

  1. Uncomment initial_token, set to an appropriate value within the data center or leave it to a default value, say, 1.
  2. Set num_tokens to 1.

If you are trying to divide an existing cluster into two parts, one for data storage (the normal vnode-enabled data center for data storage) and another for analytics (the ones that have vnode disabled and stays along with Hadoop), make sure, in analytics data center, you decommission those nodes then clean data from data directories and restart them after making the previously mentioned changes. This is the same as adding new nodes to the cluster. Disabling vnode in an existing running cluster is not advised.

ColumnFamilyInputFormat

The ColumnFamilyInputFormat class is an implementation of org.apache.hadoop.mapred.InputFormat (or mapreduce in newer the API). So, its implementation is dictated by the InputFormat class specifications. Hadoop uses this class to get data for the MapReduce tasks. It describes how to read data from column families into the Mapper instances.

The other job of ColumnFamilyInputFormat (or any implementation of InputFormat) is to fragment input data into small chunks that get fed to map tasks. Cassandra has ColumnInputSplit for this purpose. One can configure the number of rows per InputSplit via ConfigHelper.setInputSplitSize. However, there is a caveat. It uses multiple get_slice_range queries for each InputSplit data, so, as Cassandra documentation says, a smaller value will build up call overhead; on the other hand, too large a value may cause out-of-memory issues. Larger values are better for performance, so if you are planning to play with this parameter do some calculation based on median column size to avoid memory overflow. Trial and error can be handy. The default split size is 64 x 1024 rows.

ColumnFamilyOutputFormat

The OutputFormat class is the mechanism of writing the result from MapReduce to a permanent (usually) storage. Cassandra implements Hadoop's OutputFormat, that is, ColumnFamilyOutputFormat. It enables Hadoop to write the result from the reduce task as column family rows. It is implemented such that the results are written, to the column family, in batches. This is a performance improvement, and this mechanism is called lazy write-back caching.

CqlOutputFormat and CqlInputFormat

CqlOutputFormat and CqlInputFormats are Hadoop-specific output and input formats for reducer and mapper tasks, respectively. Functioning similar to ColumnFamilyOutputFormat and ColumnFamilyInputFormat, they provide the ability to access CQL rows and variable binding.

CqlInputFormat requires the keyspace and table name to be specified. You can use ConfigHelper class to set up this and other configurations. A couple of things that you should set are input split size via ConfigHelper.setInputSplitSize, which defaults to 64,000 rows. The number of CQL rows per page via CqlConfigHelper.setInputCqlPageRowSize defaults to 1,000 rows per page. It is a good idea to have CQL rows per page as big as your machine can support without causing memory issues. This will help reducing network overhead. Initial input address and partitioner may be required to mention. To do so, use the ConfigHelper.setInputInitialAddress and ConfigHelper.setInputPartitioner methods.

The CqlOutputFormat allows the reducer task to write keys and values to the specified CQL table. You need to set the output table, output initial address, and output partitioner via ConfigHelper and the CQL that updates the output table via CqlConfigHelper.setOutputCql.

ConfigHelper

The ConfigHelper class is a gateway to configure Cassandra-specific settings for Hadoop. It is a pretty plain utility class that validates the settings passed and sets into Hadoop's org.apache.hadoop.conf.Configuration instance for the job. This configuration is made available to the Mapper and the Reducer.

The ConfigHelper class saves developers from inputing the wrong property name because all the properties are set using a method; any typo can appear at compile time. It may be worth looking at JavaDoc for ConfigHelper. Here are some of the commonly used methods:

  • setInputInitialAddress: This can be a hostname or private IP of one of the Cassandra nodes.
  • SetInputRpcPort: This will set the RPC port address if it has been altered from default. If not set, it uses the default thrift port 9160.
  • setInputPartitioner: This will set the appropriate partitioner according to the underlying Cassandra storage setting.
  • SetInputColumnFamily: This will set the column family details to be able to pull data from.
  • SetInputSlicePredicate: This will set the columns that are pulled from column family to provide Mapper to work on.
  • SetOutputInitialAddress: This will set the address of Cassandra cluster (one of the nodes) where the result is being published; it is usually similar to InputInitialAddress.
  • SetOutputRpcPort: This will set the RPC port to cluster where the result is stored.
  • SetOutputPartitioner: This is the partitioner used in the output cluster.
  • SetOutputColumnFamily: This will set the column family details to store results in.

Since version 1.1, Cassandra added support to wide row column families, bulk loading, and secondary indexes.

Wide row support

Earlier, having multimillion columns was a problem in Cassandra Hadoop integration. It was pulling a row per call limited by SlicePredicate. Version 1.1 onwards, you can pass the wide row Boolean parameter as TRUE, as shown in the following snippet:

ConfigHelper.setInputColumnFamily(
conf,
keyspace,
inCF,
true// SET WIDEROW = TRUE
);

When wide row is set to true, the rows are fed one column at a time to the Mapper.

Bulk loading

The BulkOutputFormat class is another utility that Cassandra provides to improve the write performance of jobs that result in large data. It streams the data in a binary format, which is much quicker than inserting data one by one. It uses SSTableLoader to do this. Refer to SSTableLoader in Chapter 6, Managing a Cluster – Scaling, Node Repair, and Backup. Here's how to set it up:

Job job = new Job(conf);
job.setOutputFormatClass(BulkOutputFormat.class);

Secondary index support

One can use a secondary index when pulling data from Cassandra to pass it on to the job. This is another improvement. It makes Cassandra shift the data and pass only the relevant data to Hadoop instead of Hadoop burning the CPU cycles to weed out the data that is not going to be used in the computation. It lowers the overhead of passing extra data to Hadoop. Here is an example:

IndexExpressionelectronicItems =
    newIndexExpression(
    ByteBufferUtil.bytes("item_category"),
    IndexOperator.EQ,
    ByteBufferUtil.bytes("electronics")
      );

IndexExpressionsoldAfter2012 =
    newIndexExpression(
    ByteBufferUtil.bytes("sell_year"),
            IndexOperator.GT,
    ByteBufferUtil.bytes(2012)
  );

ConfigHelper.setInputRange(conf, Arrays.asList(electronicItems, soldAfter2012));

The previous code snippet returns the rows that fall in the electronics category and were sold after the year 2012.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.176.99