Chapter 6. Analytics

It is important to understand that the analytics workload has characteristics that are different from normal reads and writes in Cassandra. In most cases, MapReduce jobs will have to read all or most of the data from Cassandra to run the analytics on it. Mixing the analytics workload and normal reads can cause poor performance on the regular read operations in the system. It will cause filesystem caches and other Cassandra caches to be trashed by the analytics workload. Hence, most users separate different workloads into different physical clusters, allowing the normal reads to be optimized by Cassandra.

It is important to note that this scenario is true not only for Cassandra, but for any other real-time data storage solution available in the market. There are a lot of analytics products the community supports; we will just cover a few of them in this chapter.

Hadoop integration

Cassandra out of the box supports Hadoop, Hive, and Pig to run analytics jobs on top of existing data in Cassandra.

Configuring Hadoop with Cassandra

There are multiple ways to configure Hadoop to work with Cassandra. It's best to run the Hadoop cluster on the same server as Cassandra nodes, which goes with the idea of moving the computation to the data than moving the data to the computation. Database vendors such as DataStax provide a Hadoop distribution, which allows us to run Hadoop on the same box as Cassandra, removing the need for HDFS storage for intermediate data and metadata for the jobs.

Alternatively, you want to have a separate server for your NameNode and JobTracker and install a Hadoop TaskTracker and DataNode on each of your Cassandra nodes, allowing the JobTracker to assign tasks to the Cassandra nodes that contain data for those tasks. You also need HDFS installed on different drives or nodes to allow Hadoop to use the distributed filesystem for copying dependency jars, static data, and intermediate results for job execution. This method allows users to run the analytics near the data instead of transferring the data across the network, but it is worth noting that this might be a bad idea in some cases since the resources are shared.

Users might also want to disable dynamic snitch while trying to do local reads on the analytics nodes; this will force Cassandra to read locally instead of fetching the data from the neighbors when the score fluctuates on the basis of latencies. You also need to configure HADOOP_CLASSPATH in <hadoop_home>/conf/hadoop-env.sh to include the Cassandra lib libraries.

Virtual datacenter

You can configure the virtual datacenter (just like a multi-DC setup, but it's not physically separated) in Cassandra to separate the analytics workload from the real-time web serving (or similar) traffic; before changing any of the settings, make sure that all the keyspaces use NetworkToplogyStrategy (NTS) and the required replication factor is already selected. (Please see the previous chapters for choosing the replication factor and configuring the keyspaces.)

Adding nodes to a different datacenter (virtual) is as simple as changing the property files if PropertyFileSnitch or GossipingPropertyFileSnitch are used. Configure the new nodes with the appropriate tokens or enable vnodes to start these nodes.

There are two ways to bootstrap the node with the data that the node is supposed to have. However, the recommended and faster way to bootstrap the new cluster is by enabling the replication factor of the new DC for the keyspaces that need to be replicated, and bootstrapping the nodes in the new cluster one-by-one. The other way to bootstrap the datacenter is to bootstrap the node without changing the replication factor and later repairing the nodes one-by-one to synchronize and replicate the old data to the new DC.

PropertyFileSnitch

PropertyFileSnitch helps Cassandra know about the location of the nodes by rack and datacenter. Snitch uses a defined description of the network information from the property file located in <Cassandra_Home>/conf/cassandra-topology.properties; or if you use the Debian package, look for the /etc/cassandra folder (same location as Cassandra.yaml).

As a user, it is your responsibility to configure the right IP address and server location, which are used by Cassandra to understand the network layout.

You can define your datacenter names to be any arbitrary names, but care should be taken to match the datacenter name provided here to the name provided while configuring the keyspace's strategy options (NetworkToplogyStategy).

Every node in the cluster should be described in the cassandra-topology.properties file and should be consistent with all the servers in the cluster.

The following is a sample configuration used to configure the analytics node:

10.0.0.10=DC1:RAC1
10.0.1.10=DC1:RAC2
10.0.2.10=DC1:RAC3
10.20.0.10=Analytics:RAC1
10.20.1.10= Analytics:RAC2
10.20.2.10= Analytics:RAC3
# default for unknown nodes
default=DC1:RAC1
# Native IPv6 is supported; however you must escape the colon in the IPv6
fe80:0:0:0:202:b3ff:fe1e:8329=DC1:RAC1

GossipingPropertyFileSnitch

Since PropertyFileSnitch, Cassandra has implemented a new version that uses the gossiper (GossipingPropertyFileSnitch) to understand the location of the servers. It allows the user to define a local node's datacenter and rack, hence allowing gossip to propagate information to other nodes. It starts by updating <cassandra_home>/conf/cassandra-rackdc.properties.

For example, in DC1 DC, RAC1:

dc=DC1
rack=RAC1

For example, in Analytics DC, RAC1:

dc=Analytics
rack=RAC1

As you can see, the previous snitch is similar to Ec2Snitch, which uses the AWS APIs to understand the node's location. Ec2Snitch also supports dc_suffix for appending to the AWS provided name, so we can have the analytics workload coexist in the same Amazon DC. For example:

dc_suffix=-Analytics

DSE Hadoop

The advantage of using DSE Hadoop is that it is a fully integrated solution. There is no need for the NameNode and TaskTracker to be set up as separate installations. The installation is seamless and it is as simple as starting Cassandra using the following command:

# dse cassandra –t

The good part about this is that DSE implements HDFS APIs in Cassandra. Hence, there is no need to have a separate filesystem for the intermediates. This also simplifies the metadata store; while using Hive, it is an integrated solution. More detailed information can be found at http://www.datastax.com/.

DSE Hadoop

(Courtesy: www.datastax.com)

Acunu Analytics

Acunu Analytics allows users to maintain roll-up cubes, just like the OLAP databases on the market, as and when the data is ingested internally using Cassandra counters to hold the roll-ups in memory. In some ways, this is similar to a facet search in Solr. The data queried on these cubes reflects fresh data immediately in queries that are close to instantaneous because they typically only retrieve precalculated results. But this also means additional work during the updates. It is also important to note that Acunu performance characteristics will be completely different from what this book talks about and out of context to most of the optimizations mentioned in this book.

Reading data directly from Cassandra

This is based on the implementation of ColumnFamilyInputFormat and ColumnFamilyRecordReader, so that Hadoop MapReduce jobs can retrieve data from Cassandra. Cassandra rows, or row fragments (that is, pairs of keys and columns of SortedMap), are input to map tasks to process your jobs as specified by a SlicePredicate that describes which columns to fetch from each row. You can also install the TaskTracker and the DataNode on the same node as Cassandra and simulate the same performance as DSE can provide.

Analytics on backups

Sometimes, it is economical to not have multiple servers running all the time for analytics that run once in a while, and there may be alternative and cheaper storage mechanisms available. In this case, you can use the incremental backup that Cassandra supports to get data out of the server and read it to extract data out of the SSTables; if done right, this storage can also serve as your backup system.

There are multiple ways to achieve this; one well-known way is to convert the backups into JSON by using sstable2json, which converts the on-disk SSTable representation of a column family into a JSON formatted document. Its counterpart, json2sstable, does exactly the opposite. But we need to make sure we resolve the conflicts on the JSON documents that have been pulled out (which we are not going to cover in this chapter). Another elegant way is to write a generic program that can read the SSTable format.

File streaming

Writing output for the MapReduce task is a normal operation that will be done as part of the analytics. An example of it would be calculating an aggregate of the data for the past hour and writing it back to Cassandra to serve the data. Cassandra comes with a bulk uploader utility that can be used by Hadoop MapReduce to upload the output of the reduce job. The advantage of using the bulk uploader is that it doesn't use Thrift to write back to Cassandra, and hence it is faster.

It is important to understand what the sstableloader tool does; given a set of SSTable data files, it streams them to a live cluster. It does not simply copy the set of SSTables to every node, but transfers only the relevant part of the data to each node, thus conforming to the replication strategy of the cluster. But this requires the user to create the SSTables that need to be streamed. One good thing about MapReduce is that you can sort the keys before writing the SSTables, and hence you may be able to create the SSTable format easily.

The Cassandra record writer uses Cassandra's bulk-loader-like format to upload the output data back to Cassandra. Please make sure to configure the right settings as mentioned in the following sections to configure the right column family's in the MapReduce configuration to read and upload the data back to the right column families.

You can use the following configuration to change the MapReduce jobs. Most of the settings are self-explanatory and comments are also included for assistance.

A certain consistency level should be used for reads and writes of Thrift calls. You can choose to use Quorum if you want to make sure the data read and writes have strong consistency (repaired in most cases) before they are used for analytics. The default consistency level is ConsistencyLevel.ONE. The following settings allow you to configure the consistency levels:

  • cassandra.consistencylevel.read
  • cassandra.consistencylevel.write

Keyspace and column family settings

In addition to the previous settings, it might be useful to configure the following parameters for the MapReduce jobs to use (they are self-explanatory):

  • cassandra.input.partitioner.class (defaults to Murmur3Partitioner)
  • cassandra.output.partitioner.class (defaults to Murmur3Partitioner)
  • cassandra.input.keyspace
  • cassandra.output.keyspace
  • cassandra.input.keyspace.username
  • cassandra.input.keyspace.passwd
  • cassandra.output.keyspace.username
  • cassandra.output.keyspace.passwd
  • cassandra.input.columnfamily

The following are MapReduce optimization parameters to read the data without causing memory pressure on the nodes:

  • cassandra.input.predicate
  • cassandra.input.keyRange
  • cassandra.input.split.size (defaults to 64 * 1024)

The following is a Boolean update if it is a wide row or not:

  • cassandra.input.widerows

You can also change the Hadoop configuration to set a larger batch size. Using the following configuration reduces the network calls across Cassandra:

  • cassandra.range.batch.size (defaults to 4096)

Change the key size to be fetched for every read, which defaults to 8192:

  • cassandra.hadoop.max_key_size

Communication configuration using the Thrift interface with Cassandra

To configure the communication configurations for the MapReduce jobs to talk to Cassandra, you can change the following:

  • cassandra.input.thrift.port
  • cassandra.output.thrift.port
  • cassandra.input.thrift.address
  • cassandra.output.thrift.address
  • cassandra.input.transport.factory.class
  • cassandra.output.transport.factory.class
  • cassandra.thrift.framed.size_mb
  • cassandra.thrift.message.max_size_mb

Set the compression setting for the SSTable configuration so it can be streamed back to Cassandra:

  • cassandra.output.compression.class
  • cassandra.output.compression.length

HDFS location of the temporary files

The location of the files can be configured using the following:

  • mapreduce.output.bulkoutputformat.localdir
  • mapreduce.output.bulkoutputformat.localdir
  • mapreduce.output.bulkoutputformat.buffersize

The throttle is on the amount of data that will be streamed to Cassandra. Streaming more means more overhead on the receiving end to compact the data and also to saturate the network:

  • mapreduce.output.bulkoutputformat.streamthrottlembits

The maximum number of hosts that are allowed can be down during the BulkRecordWriter upload of SSTables:

  • mapreduce.output.bulkoutputformat.maxfailedhosts

If you are noticing a lot of timeouts in Cassandra due to batch updates or you are reading the data out of Cassandra, it may be a good time to increase rpc_timeout_in_ms in Cassandra.yaml, or you can also change the batch sizes to reduce the amount of data that needs to be read or scanned. Since Cassandra doesn't support streaming, it is important to paginate the wide rows and also to be careful in reading the data out of the data store. For example, if you read 2 GB of data from a row and it is not paginated, it will naturally run out of memory since the data store has to first pull the data into memory and send it in one shot.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.96.105