Hadoop and Cassandra

In the age of big data analytics, there are hardly any data-rich companies that do not want their data to be extracted, evaluated, and inferred to provide more business inside. In the past, analyzing large datasets (structured or unstructured) that span terabytes or petabytes used to be expensive and a technically challenging task to a team; distributed computing was harder to keep track of, and hardware to support this kind of infrastructure was not financially feasible to everyone.

Note

This chapter does not cover Cassandra integration with Hive and Oozie. To learn about Cassandra integration with Oozie, visit http://wiki.apache.org/cassandra/HadoopSupport#Oozie.

There are ongoing efforts to bring Hive integration to Cassandra as its native part. If you are planning to use Cassandra with Hive, visit https://issues.apache.org/jira/browse/CASSANDRA-4131.

DataStax Enterprise editions have built-in Cassandra-enabled Hive MapReduce clients. Check them out at http://www.datastax.com/documentation/datastax_enterprise/4.6/datastax_enterprise/ana/anaHiv.html.

A couple of things changed the demography completely in favor of medium and small companies. Hardware prices dropped down. The memory and processing powers of computing units increased dramatically at the same time. Hardware on-demand came into the picture. You can spend about 20 dollars to rent about a 100 virtual machines with quad-core (virtual) processors, 7.5 GB RAM, and 840 GB of ephemeral storage (you can plug in gigantic network attached storage that is permanent) from AWS for an hour. There are multiple vendors that provide this sort of cloud infrastructure. However, the biggest leap in making big data analysis commonplace is the availability of extremely high, quality free, and open source solutions that abstract the developers from managing distributed systems. This software made it possible to plug in various algorithms and use the system as a black box to take care of getting data, applying routines, and returning results. Hadoop is the most prominent name in this field. Currently, it is the de facto standard of big data processing.

Note

At the time of writing this book, this was the specification of an AWS m3.large machine. The pricing estimate is based on the hourly price of on-demand instances at USD 0.14 per hour.

Hadoop deserves a book of its own. If you wanted to learn about Hadoop, you may want to refer to Yahoo!'s excellent tutorial on this subject (http://developer.yahoo.com/hadoop/tutorial/index.html). This section will give a simplistic introduction to Hadoop, which is by no means complete. If you are already familiar with Hadoop, you may skip this section.

Introduction to Hadoop

Apache Hadoop is an open source implementation of two famous white papers from Google: Google File System (GFS) (http://research.google.com/archive/gfs.html) and Google MapReduce (http://research.google.com/archive/mapreduce.html). Vanilla Hadoop consists of two modules, Hadoop Distributed File System (HDFS) and MapReduce. HDFS and MR are implementations of GFS and Google MapReduce, respectively. One may consider HDFS as a storage module and MapReduce as a processing module.

HDFS

Let's start with an example. Assume you have 1 TB of data to read from a single machine with a single hard disk. Assuming the disk read rate is 100 MBps, it will take about 2 hours and 45 minutes to read the file. If you could split this data over 10 hard disks and read them all in parallel, it would have decreased the read time by 10—more or less. From a layman's perspective, this is what HDFS does; it breaks the data into fixed-sized blocks (default is 64 MB) and distributes them over a number of slave machines.

HDFS is a filesystem that runs on top of a regular filesystem. Production installations generally have ext3 filesystems running beneath HDFS. By distributing data across several nodes, the storage layer can scale to very large virtual storage that scales linearly. To provide reliability to store data, the data is stored with redundancy. Each block is replicated three times by default. HDFS is architected in such a way that each data block gets replicated to different servers and, if possible, on different racks. This saves data from disk, server, or complete rack failure. In the event of a disk or a server failure, data is replicated to a new location to meet the replication factor. If this reminds you of Cassandra, or any other distributed system, you are on the right track. As we will see very soon, unlike Cassandra, HDFS has single point of failure due to its master-slave design.

Despite all these good features, HDFS has a couple of shortcomings too. They are as follows:

  • HDFS is optimized for streaming. This means that there is no random access to a file. It may not utilize the maximum data transfer rate.
  • NameNode (discussed later) is a single point of unavailability for HDFS.
  • HDFS is better suited for large files.
  • The append method is not supported by default. However, one can change the configuration to allow the append method.

Data management

HDFS uses the master-slave mechanism to distribute data across multiple servers. The master node is usually backed by a powerful machine so that it does not fail. The slave machines are data nodes, these are commodity hardware. The reason behind having a powerful master node is we do not want it to go down as it's a single point failure. If the master node (that is, NameNode) goes down, the storage is down—unlike the Cassandra model. To load the data to HDFS, the client connects to the master node and sends an upload request. The master node tells the client to send parts of the data to various data nodes. Note that data does not stream through the master node. It just directs the client to appropriate data nodes and maintains the metadata about the location of various parts of a file. The following diagram shows how the client makes a request to NameNode to write a block. NameNode returns the nodes where the block is to be written. The client picks one DataNode from the node's list in the previous step and forwards it to other nodes:

.

Data management

There are two processes one needs to know about to understand how the data is distributed and managed by HDFS.

NameNode

The NameNode process is the one that runs on a master server. Its job is to keep metadata about the files that are stored in the data nodes. If NameNode is down, the slaves have no idea how to make a sense of the block stored. Therefore, it is crucial to have NameNode on redundant hardware. In general, in a Hadoop cluster, there is just one master NameNode.

DataNodes

DataNodes are the slaves. They are the machines that actually contain the data. The DataNode process manages the data blocks on the local machine. DataNode keeps communication with the master node using some sort of heartbeat mechanism. This enables the master node to replicate the data if one of the slaves dies.

Data never goes via NameNode. DataNodes are the ones responsible for streaming the data out. NameNode and DataNodes work in harmony to provide a scalable and giant virtual filesystem that is oblivious to the underlying hardware or the operating system. The way data read or write takes place is as follows:

  • The client makes a write request for a block of a file to the master, the NameNode server.
  • NameNode returns a list of servers that the block is copied to (in a replicated manner, a block is copied at many places as replication is configured).
  • The client makes a ready request to one of the to-be-written-on DataNodes. This node forwards the request to the next node, which will forward it to the next, until all the nodes to write the data on acknowledge the client with an OK message.
  • On receipt of the OK message, the client starts to stream the data to one of the data nodes that internally streams the data to the next replica node and so on.
  • Once the block gets written successfully, slaves notify the master. The slave connected to the client returns a success.

The preceding figure shows the data flow when a Hadoop client (CLI or Java) makes a request to write a block to HDFS.

Hadoop MapReduce

MapReduce (MR) is a very simple concept once you know it. It is algorithm 101: divide and conquer. The job is broken into small independent tasks and distributed across multiple machines. The result gets sorted and merged together to generate the final result. The ability to distribute a large computational burden over multiple servers into a small computational load provides a Hadoop programmer an effectively limitless computation capability. MR is the processing part of Hadoop; it virtualizes the CPU. The following figure depicts this process.

As an end user, you need to write a Mapper and a Reducer for the tasks you need to get done. The Hadoop framework performs the heavy lifting of getting data from a source and splitting it into maps of keys and values based on what the data source is. It may be a line from a text file, a row from a relational database, or a key-value from Cassandra's column family. These maps of key-value pairs (indicated as Input key-val pairs in the following figure) are forwarded to the Mapper that you have provided to Hadoop. Mapper performs unit tasks of the key-value pair; for example, for a word count task, you may want to remove punctuation, split the words by a white space, iterate in this split array of words, forward key as an individual word, and set the value as one. These make the intermediate key-value pair, as indicated in the following figure:

Hadoop MapReduce

Hadoop MapReduce framework in action (simplified)

These results are sorted by the key and forwarded to the Reducer interface that you provided. Incoming tuples have the same key, and reducers can use this property in their logic. Understanding this sentence is important to a beginner. What it means is that you can just iterate in the incoming iterator and do things such as group or count—basically reduce or fold the map by a key.

The reduced values are then stored in a place of your choice, which can be HDFS, RDBMS, Cassandra, or one of the many other storage options.

There are two main processes that you should know about in context of Hadoop MapReduce, which we will talk about in a bit.

JobTracker

Similar to NameNode, JobTracker is a master process that governs the execution of worker threads such as TaskTracker. Like any master-slave architecture, JobTracker is a single point of failure. Therefore, it is advisable to have robust hardware and redundancy built into the machine that has JobTracker running.

JobTracker's responsibility includes estimating the number of Mapper tasks from the input split, for example, file splits from HDFS via InputFormat. It uses already configured values as numbers of Reducer tasks. A client application can use JobClient to submit jobs to JobTracker and inquire the status of a job.

TaskTracker

Like the DataNode in case of HDFS, the TaskTracker is the actual execution unit of Hadoop. It creates a child JVM for Mapper and Reducer tasks. The maximum number of tasks (Mapper and Reducer tasks) can be set independently. TaskTracker may reuse the child JVMs to improve efficiency.

Reliability of data and processes in Hadoop

Hadoop is a very robust and reliable architecture. It is meant to be run on commodity hardware and hence takes care of failure automatically. It detects the failure of a task and retries the failed tasks. It is fault tolerant. A down DataNode is replicated (redundant) and a system heals by itself, if a DataNode is unavailable.

Hadoop allows servers to join the cluster or leave it without any repercussion. Rack-aware storage of data saves the cluster against disk failures, rack/machine power failure, and even a complete rack going down.

The following figure shows the famous schema of the reliable Hadoop infrastructure using commodity hardware for slaves and heavy-duty servers (top of the rack) for the masters. Please note that these are physical servers, as they are in the data centers. Later, when we will discuss using Cassandra as a data store for Hadoop, we will use a ring representation. Even in that case, the physical configuration may be the same as the one represented in the following figure, but the logical configuration, as we have seen throughout this book, will be a ring-like structure to emphasize the token distribution.

Setting up local Hadoop

This section will discuss how to set up Hadoop 2.6.0 (should be valid for all 2.x versions) on your local machine. At the time of writing this, Hadoop has transitioned from version 1 to version 2. Version 2.x is mature now and has disruptive changes. It is presumably better than version 1. HDFS is federated in the new version. The Apache documentation says that this version scales NameNode service horizontally using multiple independent NameNodes. This should ideally avoid the single point of failure that NameNodes faced in the previous version. The other major improvement is in the new MapReduce frameworks, for example, MRNextGen, MRv2, and Yet Another Resource Negotiator (YARN). More about the new version can be learned on the Apache website (http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html). Here are the steps to get Hadoop Version 2.x to work on a Linux machine. To keep things generic, I have used the dowloaded zipped file to install the Hadoop. One can use a binary package for a specific platform without much of a change in the instructions. The following figure shows Hadoop infrastructure. Heavy-duty master nodes are at the top of the rack servers. Each rack has a rack switch. Slaves run DataNode and TaskTracker services. Racks are connected through 10GE switches. Note that not all racks will have a master server:

Setting up local Hadoop

Make sure you can add SSH to your local host using a key-based password-less login. If you can't do this, generate and set a key pair as described in the following commands:

# Generate key pair
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
Generating public/private dsa key pair.
Your identification has been saved in /home/ec2-user/.ssh/id_dsa.
Your public key has been saved in /home/ec2-user/.ssh/id_dsa.pub.
The key fingerprint is:
6a:dc:53:04:8a:52:50:9f:dd:2a:2e:99:04:86:41:f4 ec2-user@ip-10-147-171-159
The key's randomart image is:
+--[ DSA 1024]----+
|+o.oo   .        |
| o.. o + o       |
|. +E. + .o      |
| . o     o       |
|    . . S .      |
|   . = + .       |
|    + = o        |
|     o   .       |
|                 |
+-----------------+
# Add public key to authorized_keys
$ cat ~/.ssh/id_dsa.pub>> ~/.ssh/authorized_keys

At this point, you should be able to add SSH to your machine by issuing ssh localhost.

Note

You may need to install the SSH server, if you do not have it installed already. In Ubuntu, execute the following command:

sudo apt-get install openssh-server

In RHEL variants, execute the following command:

yum install openssh-server

The first step is to download and extract Hadoop 2.x Version to a desired location:

# Download Hadoop
$ wget http://www.us.apache.org/dist/hadoop/common/hadoop-2.6.0/hadoop-2.6.0.tar.gz

# Extract
$ tar xvzf hadoop-2.6.0.tar.gz

# Create a soft link for easy access and updates without disrupting PATH
$ ln -s hadoop-2.6.0 hadoop
$ cd hadoop

Let's assume the directory where the Hadoop tarball is extracted is $HADOOP_HOME. You need to configure Hadoop to get it working. We will perform the minimalistic configuration that gets Hadoop working in a pseudo-cluster mode, where your single machine works as master node with JobTracker and NameNode and slave node with TaskTracker and DataNode. Remember, this is not a production-ready configuration.

Edit $HADOOP_HOME/conf/core-site.xml and add the following lines:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

Edit $HADOOP_HOME/conf/hdfs-site.xml and add the replication parameter for the data blocks:

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

Now, it is almost done. However, you need to tell Hadoop where Java lives. Edit $HADOOP_HOME/etc/hadoop/hadoop-env.sh and add the export statement for JAVA_HOME like this:

# The only required environment variable is JAVA_HOME.  All others are
# optional.  When running a distributed configuration it is best to
# set JAVA_HOME in this file, so that it is correctly defined on
# remote nodes.

# The java implementation to use.
export JAVA_HOME="/usr/lib/jvm/java-7-oracle"
We are done. Next is testing the installation.

Testing the installation

Before we start testing the newly installed Hadoop, we need to format the NameNode to prepare HDFS. We haven't provided any directory to HDFS, so it will default to /tmp, which may get deleted after the reboot. Here's the list of commands to run to test the installation:

$ bin/hdfs namenode -format
15/01/20 23:03:13 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = blackwidow/127.0.1.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 2.6.0
STARTUP_MSG:   classpath = /home/naishe/apps/hadoop-2.6.0/etc/hadoop:/home/naishe/apps/hadoop-2.6.0/share/hadoop/common/lib/hadoop-auth-2.6.0.jar:/home/naishe/apps/hadoop-2.6.0/share/hadoop/common/lib/apacheds-kerberos-codec-2.0.0-M15.jar:
[ -- SNIP -- ]
:/contrib/capacity-scheduler/*.jar
STARTUP_MSG:   build = https://git-wip-us.apache.org/repos/asf/hadoop.git -r e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1; compiled by 'jenkins' on 2014-11-13T21:10Z
STARTUP_MSG:   java = 1.7.0_72
************************************************************/
15/01/20 23:03:13 INFO namenode.NameNode: registered UNIX signal handlers for [TERM, HUP, INT]
15/01/20 23:03:13 INFO namenode.NameNode: createNameNode [-format]
Formatting using clusterid: CID-3d2c36d0-de9f-4032-914e-3abe1b40b0c3
[ -- snip -- ]
15/01/20 23:03:16 INFO namenode.FSImage: Allocated new BlockPoolId: BP-324301922-127.0.1.1-1421775196180
15/01/20 23:03:16 INFO common.Storage: Storage directory /tmp/hadoop-naishe/dfs/name has been successfully formatted.
15/01/20 23:03:16 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0
15/01/20 23:03:16 INFO util.ExitUtil: Exiting with status 0
15/01/20 23:03:16 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at blackwidow/127.0.1.1
************************************************************/

You may observe that the storage directory is set to /tmp/hadoop-<username>/dfs/name by default. You may change it to a sensible location by editing configuration XML. Alternatively, it will serve its purpose to demonstrate capability.

Let's start everything up and see whether we are OK:

# Start Hadoop services locally
$ sbin/start-dfs.sh
Starting namenodes on [localhost]
localhost: Warning: Permanently added 'localhost' (ECDSA) to the list of known hosts.
localhost: starting namenode, logging to /home/naishe/apps/hadoop-2.6.0/logs/hadoop-naishe-namenode-blackwidow.out
localhost: Warning: Permanently added 'localhost' (ECDSA) to the list of known hosts.
localhost: starting datanode, logging to /home/naishe/apps/hadoop-2.6.0/logs/hadoop-naishe-datanode-blackwidow.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: Warning: Permanently added '0.0.0.0' (ECDSA) to the list of known hosts.
0.0.0.0: starting secondarynamenode, logging to /home/naishe/apps/hadoop-2.6.0/logs/hadoop-naishe-secondarynamenode-blackwidow.out

# Test if all services are running
$ jps
22194 NameNode
22565 SecondaryNameNode
22314 DataNode
22754 Jps

The jps is a built-in tool provided by the Oracle JDK. It lists all the Java processes running on the machine. The previous snippet shows that all the Hadoop processes are up. Let's execute an example and see whether things are actually working:

# Upload everything under conf directory to "in" directory in HDFS
$ bin/hdfs dfs -mkdir /user
$ bin/hdfs dfs -mkdir /user/naishe
$ bin/hdfs dfs -put etc/hadoop input
$ bin/hdfs dfs -ls input
Found 29 items
-rw-r--r--   1 naishe supergroup       4436 2015-01-20 23:20 input/capacity-scheduler.xml
-rw-r--r--   1 naishe supergroup       1335 2015-01-20 23:20 input/configuration.xsl
-rw-r--r--   1 naishe supergroup        318 2015-01-20 23:20 input/container-executor.cfg
[ -- snip -- ]
-rw-r--r--   1 naishe supergroup        690 2015-01-20 23:20 input/yarn-site
.xml

All set, time to execute an example on it. We will run an example that grabs all the words that match the dfs[a-z.]+ regular expression across all the files under the in folder and returns the counts in a folder called out.

# Executegrep example
$ bin/hadoop jar hadoop-examples-*.jar grep in out 'dfs[a-z.]+'
15/01/20 23:27:10 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/01/20 23:27:10 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/01/20 23:27:10 WARN mapreduce.JobSubmitter: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
15/01/20 23:27:11 INFO input.FileInputFormat: Total input paths to process : 29
15/01/20 23:27:11 INFO mapreduce.JobSubmitter: number of splits:29
15/01/20 23:27:11 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1885642397_0001
15/01/20 23:27:11 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
15/01/20 23:27:11 INFO mapreduce.Job: Running job: job_local1885642397_0001
[-- snip --]
15/01/20 23:27:15 INFO mapred.LocalJobRunner: reduce task executor complete.
15/01/20 23:27:15 INFO mapreduce.Job:  map 100% reduce 100%
15/01/20 23:27:15 INFO mapreduce.Job: Job job_local1885642397_0001 completed successfully
15/01/20 23:27:15 INFO mapreduce.Job: Counters: 38
  File System Counters
    FILE: Number of bytes read=1243026
    FILE: Number of bytes written=7750172
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=1764343
    HDFS: Number of bytes written=437
    HDFS: Number of read operations=1051
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=32
  Map-Reduce Framework
[-- snip stats --]
  File Input Format Counters
    Bytes Read=437
  File Output Format Counters
    Bytes Written=197

# Result of the MapReduce execution
$ bin/hdfs dfs -cat output/*
6  dfs.audit.logger
4  dfs.class
3  dfs.server.namenode.
2  dfs.period
2  dfs.audit.log.maxfilesize
2  dfs.audit.log.maxbackupindex
1  dfsmetrics.log
1  dfsadmin
1  dfs.servers
1  dfs.replication
1  dfs.file

Congratulations, you have just executed a job using MapReduce. It was a bit of a boring task. You could have executed the grep command on your Linux machine that runs much faster than this. However, it gives you a couple of important insights. One, that the configuration works, and the other is, it is not always the best thing to do everything using Hadoop; for some tasks, it is better to use the tools that are best suited to them. We will see more about this when we will discuss the cons of Hadoop later in this chapter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.139.224