Hadoop and Cassandra

In the age of Big Data analytics, there are hardly any data-rich companies that do not want their data to be extracted, evaluated, and inferred to provide more business inside. In the past, analyzing large data sets (structured or unstructured) that span terabytes or petabytes used to be expensive and a technically challenging task to a team; distributed computing was harder to keep track of, and hardware to support this kind of infrastructure was not financially feasible to everyone.

What changed the demography completely in favor of medium and small companies are a couple of things. Hardware prices dropped down to earth. Memories and processing powers of computing units increased dramatically at the same time. Hardware on demand came into the picture. You can spend about 20 dollars to rent about a 100 virtual machines with quad-core (virtual) processors, 7.5 GB RAM, and 840 GB of ephemeral storage (you can plug in gigantic network-attached storages that are permanent) from Amazon Web Services for one hour. There are multiple vendors that provide this sort of cloud infrastructure. However, the biggest leap in making Big Data analysis commonplace is the availability of extremely high, quality free, and open source solutions that abstract the developers from managing distributed systems. These software made it possible to plug in various algorithms and use the system as a black box to take care of getting the data, applying the routines, and returning the results. Hadoop is the most prominent name in this field. Currently, it is the de-facto standard of Big Data processing.

Note

At the time of writing this book, this is the specification of an AWS M1.large machine. The pricing estimate is based on the hourly price of on-demand instances at USD 0.24 per hour.

Hadoop deserves a book of its own. If you wanted to learn about Hadoop, you may want to refer to Yahoo!'s excellent tutorial on this subject (http://developer.yahoo.com/hadoop/tutorial/index.html). This section will give a simplistic introduction to Hadoop, which is by no means complete. If you are already familiar with Hadoop, you may skip this section.

Introduction to Hadoop

Apache Hadoop is an open source implementation of two famous white papers from Google: Google File System (GFS) (http://research.google.com/archive/gfs.html) and Google MapReduce (http://research.google.com/archive/mapreduce.html). Vanilla Hadoop consists of two modules: Hadoop Distributed File System (HDFS) and MapReduce (MR). HDFS and MR are implementations of GFS and Google MapReduce, respectively. One may consider HDFS as a storage module and MapReduce as a processing module.

HDFS – Hadoop Distributed File System

Let's start with an example. Assume you have 1 TB of data to read from a single machine with a single hard disk. Assuming the disk read rate is 100 MBps, it will take about 2 hours and 45 minutes to read the file. If you could split this data over 10 hard disks and read them all in parallel, it would have decreased the read time by 10—more or less. From a layman's perspective, this is what HDFS does; it breaks the data into fixed sized blocks (default is 64 MB) and distributes them over a number of slave machines.

HDFS is a filesystem that runs on top of a regular filesystem. Production installations generally have ext3 filesystems running beneath HDFS. By distributing data across several nodes, the storage layer can scale to a very large virtual storage that scales linearly. To provide reliability to store data, the data is stored with redundancy. Each block is replicated three times by default. HDFS is architected in such a way that each data block gets replicated to different servers and if possible on different racks. This saves data from disk, server, or complete rack failure. In the event of a disk or a server failure, data is replicated to a new location to meet the replication factor. If this reminds you of Cassandra, or any other distributed system, you are on the right track. But as we will see very soon, unlike Cassandra, HDFS has a single point of failure due to its master-slave design.

Despite all these good features, HDFS has a few shortcomings too.

  1. HDFS is optimized for streaming. This means that there is no random access to a file. It may not utilize the maximum data transfer rate.
  2. NameNode (discussed later) is a single point of unavailability for HDFS.
  3. HDFS is better suited for large files.
  4. The append method is not supported by default. However, one can change the configuration to allow the append method.

Data management

HDFS uses the master-slave mechanism to distribute data across multiple servers. The master node is usually backed by a powerful machine so that it does not fail. The slave machines are data nodes; these are commodity hardware. The reason of master nodes being beefy machines is that it is a single point failure. If the master node (that is, the NameNode) goes down, the storage is down—unlike the Cassandra model. To load the data to HDFS, the client connects to the master node and sends an upload request. The master node tells the client to send parts of data to various data nodes. Note that data does not stream through the master node. It just directs the client to appropriate data nodes and maintains the metadata about the location of various parts of a file.

Data management

Figure 8.1: Client makes a request to NameNode to write a block. NameNode returns the nodes where the block is to be written. Client picks one DataNode from the nodes list in the previous step and forwards to other nodes

There are two processes one needs to know about to understand how the data is distributed and managed by HDFS.

NameNode

NameNode process is the one that runs on a master server. Its job is to keep metadata about the files that are stored in the data nodes. If the NameNode is down, the slaves have no idea how to make sense of the block stored. So, it is crucial to have NameNode on redundant hardware. In general, in a Hadoop cluster, there is just one master NameNode.

DataNodes

DataNodes are the slaves. They are the machines that actually contain the data. The DataNode process manages the data blocks on the local machine. DataNodes keep on checking with the master node as in a sort of heartbeat. This enables the master to replicate the data if one of the slaves dies.

Data never goes via NameNode. DataNodes are the ones responsible for streaming the data out.

NameNode and DataNodes work in harmony to provide a scalable and giant virtual filesystem that is oblivious to the underlying hardware or the operating system. The way data read or write takes place is as follows:

  1. Client makes a write request for a block of a file to the master, the NameNode server.
  2. NameNode returns a list of servers that the block is copied to (in a replicated manner, a block is copied at many places as replication is configured).
  3. Client makes an is-ready request to one of the to-be-written-on DataNodes. This node forwards the request to the next node, which will forward it to the next, until all the nodes to write the data on acknowledge OK.
  4. On receipt of the OK message, a client starts to stream the data to one of the data nodes that internally streams the data to the next replica node and so on.
  5. Once the block gets written successfully, slaves notify the master. The slave connected to the client returns a success.
  6. Figure 8.1 shows the data flow when a Hadoop client (CLI or Java) makes a request to write a block to HDFS.

Hadoop MapReduce

MapReduce or MR is a very simple concept once you know it. It is algorithm 101: divide and conquer. The job is broken into small independent tasks and distributed across multiple machines; the result gets sorted and merged together to generate the final result. The ability to distribute a large computational burden over multiple servers into a small computational load over multiple servers enables a Hadoop programmer to have effectively limitless CPU capability for data analysis. MR is the processing part of Hadoop; it virtualizes the CPU. Figure 8.2 depicts this process.

As an end user, you need to write a Mapper and a Reducer for the tasks you need to get done. The Hadoop framework performs the heavy lifting of getting data from a source and splitting it into maps of keys and values based on what the data source is. It may be a line from a text file, a row from a relational database, or a key-value from the Cassandra column family. These maps of key-value pairs (indicated as Input key-val pairs in the next figure) are forwarded to the Mapper that you have provided to Hadoop. Mapper performs unit tasks of the key-value pair; for example, for a word count task, you may want to remove punctuations, split the words by whitespace, iterate in this split array of words, forward key as an individual word, and set value as one. These make the intermediate key-value pair, as indicated in the next figure.

These results are sorted by the key and forwarded to the Reducer interface that you provided. Reducers can use this property, that coming tuples have the same key. Understanding the last sentence is important to a beginner. What it means is that you can just iterate in the incoming iterator and do things such as group or count—basically reduce or fold the map by key.

Hadoop MapReduce

Figure 8.2: Hadoop MapReduce framework in action (simplified)

The reduced values are then stored in a storage of your choice that can be HDFS, RDBMS, Cassandra, or one of the many other storage options.

There are two main processes that you should know about in the context of Hadoop MapReduce.

JobTracker

Similar to NameNode, JobTracker is a master process that governs the execution of worker threads such as TaskTracker. Like any master-slave architecture, JobTracker is a single point of failure. So, it is advisable to have a robust hardware and redundancy built into the machine that has a JobTracker running.

JobTracker's responsibility includes estimating the number of Mapper tasks from the input split, for example, file splits from HDFS via InputFormat. It uses already configured values as numbers of Reducer tasks. Client application can use JobClient to submit jobs to JobTracker and inquire status.

TaskTracker

Like the DataNode in the case of HDFS, the TaskTracker is the actual execution unit of Hadoop. It creates a child JVM for Mapper and Reducer tasks. The maximum number of tasks (Mapper and Reducer tasks) can be set independently. TaskTracker may re-use the child JVMs to improve efficiency.

Reliability of data and process in Hadoop

Hadoop is a very robust and reliable architecture. It is meant to be run on commodity hardware and hence takes care of failure automatically. It detects failure of a task and retries the failed tasks. It is fault tolerant. A down DataNode is replicated (redundant) and a system heals by itself, in case of unavailability of a DataNode.

Hadoop allows servers to join the cluster or leave it without any repercussion. Rack-aware storage of data saves the cluster against disk failures, rack/machine power failure, and even a complete rack going down.

Figure 8.3 shows the famous schema of reliable Hadoop infrastructure using commodity hardware for slaves and heavy-duty servers (top of the rack) for the masters. Please note that these are physical servers as they are in the data centers. Later, when discussing using Cassandra as a data store, we will use a ring representation. Even in that case, the physical configuration may be the same as the one represented in Figure 8.3, but the logical configuration, as we have been seeing throughout this book, will be a ring-like structure to emphasize the token distribution.

Setting up local Hadoop

This section will discuss how to set up Hadoop 1.x on your local machine. At the time of this writing, Hadoop is transitioning from Version 1 to Version 2. Version 2 has disruptive changes and is presumably better than Version 1. HDFS is federated in the new version. The Apache documentation says this version scales the NameNode service horizontally using multiple independent NameNodes. This should ideally avoid single point of failure that NameNodes faced in the previous version. The other major improvement is in the new MapReduce frameworks, for example, MRNextGen, MRv2, and Yet Another Resource Negotiator (YARN). More about the new version can be learned on the Apache website (What is YARN? http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html). Here are the steps to get Hadoop Version 1 working on a Linux machine. To keep things generic, I have used a zipped download to install Hadoop. One may use a binary package for a specific platform without much change in instructions.

Setting up local Hadoop

Figure 8.3: Hadoop Infrastructure: Heavy-duty master nodes at the top of the rack servers. Each rack has a rack switch. Slaves run DataNode and TaskTracker services. Racks are connected through 10GE switches. Note that not all racks will have a master server

Make sure you can add secure shell (SSH) to your local host using a key-based password-less login. If you can't, generate and set a key pair as described in the following command line:

# Generate key pair
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa 
Generating public/private dsa key pair. 
Your identification has been saved in /home/ec2-user/.ssh/id_dsa. 
Your public key has been saved in /home/ec2-user/.ssh/id_dsa.pub. 
The key fingerprint is: 
6a:dc:53:04:8a:52:50:9f:dd:2a:2e:99:04:86:41:f4 ec2-user@ip-10-147-171-159 
The key's randomart image is: 
+--[ DSA 1024]----+ 
|+o.oo   .        | 
| o.. o + o       | 
|. +E. + . o      | 
| . o     o       | 
|    . . S .      | 
|   . = + .       | 
|    + = o        | 
|     o   .       | 
|                 | 
+-----------------+ 

# Add public key to authorized_keys
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

At this point, you should be able to add SSH to your machine by issuing ssh localhost.

Note

You may need to install the SSH server if you do not have it installed already. In Ubuntu, execute this:

sudo apt-get install openssh-server

In RHEL variants, do this:

yum install openssh-server

The first step is to download and extract Hadoop 1.x Version to a desired location:

# Download Hadoop
$ wget http://mirrors.gigenet.com/apache/hadoop/common/hadoop-1.1.2/hadoop-1.1.2-bin.tar.gz

# Extract
$ tar xvzf hadoop-1.1.2-bin.tar.gz 

# Create a soft link for easy access and updates without disrupting PATH
$ ln -s hadoop-1.1.2 hadoop
$ cd hadoop

Let's assume the directory where the Hadoop tarball is extracted is $HADOOP_HOME. You need to configure Hadoop to get it working. We will perform the minimalistic configuration that gets Hadoop working in a pseudo-cluster mode where your single machine works as a master node with JobTracker and NameNode, and slave node with TaskTracker and DataNode. Remember, this is not a production-ready configuration.

Edit $HADOOP_HOME/conf/core-site.xml, and add the following:

<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

Edit $HADOOP_HOME/conf/hdfs-site.xml, and add the replication parameter for the data blocks:

<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

Edit $HADOOP_HOME/conf/mapred-site.xml, and set:

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
</configuration>

Now, it is almost done. Except, you need to tell Hadoop where Java lives. Edit $HADOOP_HOME/conf/hadoop-env.sh and add an export statement for JAVA_HOME like this:

# The java implementation to use.  Required.
# export JAVA_HOME=/usr/lib/j2sdk1.5-sun
export JAVA_HOME=/opt/jdk

We are done. Next is testing the installation.

Testing the installation

Before we start testing the newly installed Hadoop, we need to format the NameNode to prepare the HDFS. We haven't provided any directory to HDFS, so it will default to /tmp, which may not survive a machine reboot.

$ bin/hadoop namenode -format
13/07/20 22:36:39 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = marla/127.0.1.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 1.1.2
STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.1 -r 1440782; compiled by 'hortonfo' on Thu Jan 31 02:03:24 UTC 2013
************************************************************/
13/07/20 22:36:39 INFO util.GSet: VM type       = 64-bit
13/07/20 22:36:39 INFO util.GSet: 2% max memory = 17.77875 MB
13/07/20 22:36:39 INFO util.GSet: capacity      = 2^21 = 2097152 entries
13/07/20 22:36:39 INFO util.GSet: recommended=2097152, actual=2097152
13/07/20 22:36:40 INFO namenode.FSNamesystem: fsOwner=nishant
[-- snip --]
13/07/20 22:36:41 INFO common.Storage: Image file of size 113 saved in 0 seconds.
13/07/20 22:36:41 INFO namenode.FSEditLog: closing edit log: position=4, editlog=/tmp/hadoop-nishant/dfs/name/current/edits
13/07/20 22:36:41 INFO namenode.FSEditLog: close success: truncate to 4, editlog=/tmp/hadoop-nishant/dfs/name/current/edits
13/07/20 22:36:41 INFO common.Storage: Storage directory /tmp/hadoop-nishant/dfs/name has been successfully formatted.
13/07/20 22:36:41 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at marla/127.0.1.1
************************************************************/

You may observe that the storage directory is set to /tmp/hadoop-nishant/dfs/name by default. You may change it to a sensible location by editing configuration XML. But it will serve a purpose to demonstrate capability.

Let's start everything up and see if we are OK.

# Start all Hadoop services locally
$ bin/start-all.sh
startingnamenode, logging to /home/nishant/apps/hadoop-1.1.2/libexec/../logs/hadoop-nishant-namenode-marla.out
localhost: starting datanode, logging to /home/nishant/apps/hadoop-1.1.2/libexec/../logs/hadoop-nishant-datanode-marla.out
localhost: starting secondarynamenode, logging to /home/nishant/apps/hadoop-1.1.2/libexec/../logs/hadoop-nishant-secondarynamenode-marla.out
startingjobtracker, logging to /home/nishant/apps/hadoop-1.1.2/libexec/../logs/hadoop-nishant-jobtracker-marla.out
localhost: starting tasktracker, logging to /home/nishant/apps/hadoop-1.1.2/libexec/../logs/hadoop-nishant-tasktracker-marla.out

# Test if all services are running
hadoop1$ jps
11816 DataNode
12158 JobTracker
11506 NameNode
12486 Jps
12408 TaskTracker
12064 SecondaryNameNode

jps is a built-in tool provided by Oracle JDK. It lists all the Java processes running on the machine. The previous snippet shows that all the Hadoop processes are up. Let's execute an example and see if things are actually working.

# Upload everything under conf directory to "in" directory in HDFS
$ bin/hadoop fs -put conf in

#View the contents (some columns and rows are omitted for brevity)
$ bin/hadoop fs -ls in
Found 16 items
-rw-r--r-- nishant supergroup  7457  /user/nishant/in/capacity-scheduler.xml
-rw-r--r-- nishant supergroup  535  /user/nishant/in/configuration.xsl
[-- snip --]
-rw-r--r-- nishant supergroup  243  /user/nishant/in/ssl-client.xml.example
-rw-r--r-- nishant supergroup  195  /user/nishant/in/ssl-server.xml.example
-rw-r--r-- nishant supergroup  382  /user/nishant/in/taskcontroller.cfg

All set, so it's time to execute an example to it. We will run an example that greps all the words that match the dfs[a-z.]+ regular expression across all the files under the in folder and returns the counts in a folder called out.

# Execute grep example
$ bin/hadoop jar hadoop-examples-*.jar grep in out 'dfs[a-z.]+'
13/07/20 23:36:01 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/07/20 23:36:01 WARN snappy.LoadSnappy: Snappy native library not loaded
13/07/20 23:36:01 INFO mapred.FileInputFormat: Total input paths to process : 16
13/07/20 23:36:02 INFO mapred.JobClient: Running job: job_201307202304_0001
13/07/20 23:36:03 INFO mapred.JobClient:  map 0% reduce 0%
13/07/20 23:36:12 INFO mapred.JobClient:  map 12% reduce 0%
[-- snip --]
13/07/20 23:36:41 INFO mapred.JobClient:  map 87% reduce 20%
13/07/20 23:36:45 INFO mapred.JobClient:  map 100% reduce 29%
13/07/20 23:36:52 INFO mapred.JobClient:  map 100% reduce 100%
13/07/20 23:36:53 INFO mapred.JobClient: Job complete: job_201307202304_0001
13/07/20 23:36:53 INFO mapred.JobClient: Counters: 30
[-- snip stats --]
13/07/20 23:37:12 INFO mapred.JobClient:     Reduce output records=3
13/07/20 23:37:12 INFO mapred.JobClient:     Map output records=3

# Result of the MapReduce execution
$ bin/hadoop fs -cat out/*
1  dfs.replication
1  dfs.server.namenode.
1  dfsadmin

Congratulations, you have just executed a job using MapReduce. It was a bit of a boring task. You could have executed the grep command on your Linux machine that runs much faster than this. But it gives a couple of important insights. One, that the configuration works, and the other is, it is not always the best thing to do everything using Hadoop; for some tasks it is better to use the tools that are best suited to them. We will see more on this when we discuss the cons of Hadoop later in this chapter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.224.68.28