Now you have a solid knowledge base in HDFS, it is now time to dive into the processing module of Hadoop known as MapReduce. Once we have the data in the cluster, we need a programming model to perform advanced operations on it. This is done using Hadoop's MapReduce.
The MapReduce programming model concept has been in existence for quite some time now. This model was designed to process large volumes of data in parallel. Google implemented a version of MapReduce in house to process their data stored on GFS. Later, Google released a paper explaining their implementation. Hadoop's MapReduce implementation is based on this paper.
MapReduce in Hadoop is a Java-based distributed programming framework that leverages the features of HDFS to execute high performance batch processing of the data stored in HDFS.
The processing can be divided into major functions, which are:
Since the primary focus of this book is on the administrative aspects of Hadoop, we will focus on the MapReduce architecture and how it works together with HDFS to process large volumes of data.
In a MapReduce application, all the data read in the map
function is read in the form of key and value pairs. The processed output of the map
function is also in the form of key and value pairs. The processing of data as key and value pairs works well in a distributed computing environment.
Let's understand how MapReduce works with the help of an example. The word counting program is known as the Hello, World program for MapReduce. The program counts the number of words in an input set of text files.
For this example, let's consider a file with the following line in it:
She sells sea shells on the sea shore where she also sells cookies.
So, if the preceding text is provided as an input to the word count program, the expected output would be as follows:
she, 2 sells,2 sea, 2 shells, 1 on, 1 the, 1 shore, 1 where, 1 also, 1 cookies, 1
The three major components of a MapReduce program are:
The driver component of a MapReduce program is responsible for setting up the job configurations and submitting it to the Hadoop cluster. This part of the program runs on the client computer.
The driver component of the word count program would take two parameters to submit the job to the Hadoop cluster:
Once the job is submitted to the cluster, the mapper reads every line in the file as <key, value>
pairs. So, if we consider a file with the line mentioned earlier, the key will be the offset of the line and the value will be the entire sentence.
The mapper reads the line as follows:
<0000, She sells sea shells on the sea shore where she also sells cookies>
Once read, the mapper logic would emit the <key, value>
pairs for each word in the sentence as follows:
<she, 1> <sells, 1> <sea, 1> <shells, 1> <on, 1> <the, 1> <sea, 1> <shore, 1> <where, 1> <she, 1> <also, 1> <sells, 1> <cookies, 1>
The mapping function has emitted each word in the sentence as a key and constant number 1
as the value for each key.
The reduce
function reads the intermediate <key, value>
pairs emitted by the mapper and produces the final result.
These results are then taken as input by the reducer in a sorted order of the keys. The reducer logic would then work on each key group; in this case, it would sum up the values for each key and would produce the final result as follows:
she, 2 sells,2 sea, 2 shells, 1 on, 1 the, 1 shore, 1 where, 1 also, 1 cookies, 1
The following is a functional representation of the map and reduce functions:
Function |
Input |
Output |
---|---|---|
|
<k1, v1> |
list(k2, v2) |
|
<k2, list(v2)> |
list(<k3, v3>) |
The following diagram shows the flow of a MapReduce job starting from an input file right up to the generation of an output file:
In the preceding diagram, you see a very simple flow of MapReduce. However, in real production scenarios, there are multiple mappers and reducers.
When there are multiple mappers and reducers involved, there is a phase between the mapper and reducer known as the shuffle and sort phase. In this phase, all the keys are sorted and sent to the reducers. Each reducer works on the set of keys and values provided as input and generates their own output file as shown in the following diagram:
There are several operations and services involved in the submission and execution of a MapReduce job in a Hadoop cluster.
The two main services that are responsible for job execution are:
When a client initiates a job submission to the cluster, a new job ID is created by the jobtracker and returned to the client. After getting the ID, the job resources along with the information on the input splits of the data are then copied to HDFS so that all the services in the cluster can access it. The client then polls the jobtracker every second to check the job's completion status.
The jobtracker then takes over and initializes the job in the cluster by accessing the job resources in HDFS. The jobtracker retrieves the input splits information and then decides the tasks that it needs to assign to tasktrackers. The job tracker creates a map task for each of the input splits and then assigns the map tasks to the tasktrackers. The tasktrackers are also responsible for running the reduce tasks on completion of the map tasks. The jobtracker tries to assign map tasks to tasktrackers on nodes that are in close proximity to the data. This greatly improves performance by limiting the data transferred across the network (data locality).
The tasktracker is the actual service that runs a task. Tasktrackers are running all the time and are waiting for tasks to be assigned to them by the jobtracker. Tasktrackers are configured to run a specific number of map and reduce tasks. These are called slots.
The tasktracker sends a periodic heartbeat to the jobtracker to inform it that it is alive along with the number of map and reduce slots it has available. The jobtracker assigns tasks as a return value for the heartbeat. Once the task is assigned, the tasktracker copies the client program (usually a java compiled set of classes, referred to as a jar) to its local space from HDFS. All the intermediate data generated by the map task is stored locally on the node where the tasktracker runs.
After all the map and reduce tasks are completed, the jobtracker receives a notification of completion. The jobtracker marks the job as successful. The client that polls for the status of the job prints the completion notification on the client console.
All MapReduce-related configuration is done by adding/updating the properties in the mapred-site.xml
file. The following is an example of a mapred-site.xml
file:
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>mapred.job.tracker</name> <value>node1.hcluster:8021</value> </property> <property> <name>mapred.job.tracker.http.address</name> <value>0.0.0.0:50030</value> </property> <property> <name>mapreduce.job.counters.max</name> <value>120</value> </property> <property> <name>mapred.output.compress</name> <value>false</value> </property> <property> <name>mapred.output.compression.type</name> <value>BLOCK</value> </property> <property> <name>mapred.output.compression.codec</name> <value>org.apache.hadoop.io.compress.DefaultCodec</value> </property> <property> <name>mapred.map.output.compression.codec</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> </property> <property> <name>mapred.compress.map.output</name> <value>true</value> </property> <property> <name>io.sort.mb</name> <value>50</value> </property> <property> <name>io.sort.factor</name> <value>64</value> </property> <property> <name>mapred.reduce.parallel.copies</name> <value>10</value> </property> <property> <name>mapred.submit.replication</name> <value>2</value> </property> <property> <name>mapred.reduce.tasks</name> <value>4</value> </property> <property> <name>mapred.userlog.retain.hours</name> <value>24</value> </property> <property> <name>mapred.child.java.opts</name> <value> -Xmx112889935</value> </property> <property> <name>mapred.job.reuse.jvm.num.tasks</name> <value>1</value> </property> <property> <name>mapred.map.tasks.speculative.execution</name> <value>false</value> </property> <property> <name>mapred.reduce.tasks.speculative.execution</name> <value>false</value> </property> <property> <name>mapred.reduce.slowstart.completed.maps</name> <value>0.8</value> </property> </configuration>
Let's discuss each property in detail:
mapred.job.tracker
: This property defines the host and port on which the jobtracker runs. All communication with the jobtracker is done over the host and port.mapred.job.tracker.http.address
: This property defines the web address of the jobtracker. This web location helps in the monitoring of jobs submitted to the cluster.mapreduce.job.counters.max
: Internally, Hadoop maintains several counters, for example, JobCounter
and TaskCounter
to count the job and task-related information during their process. However, it is also possible for developers to define their own counters. This liberty could cause issues if the number of counters is not controlled, as the jobtracker maintains these counters globally. This property helps in limiting the number of counters that can be generated.mapred.output.compress
: This is a Boolean property, and if set to true
, it will compress the job's output file.mapred.output.compression.type
: This property defines the type of compression that can be set. The options are NONE
, RECORD
, or BLOCK
.mapred.output.compression.codec
: This property defines the codec to be used for compression of the job's output file.mapred.map.output.compression.codec
: This property defines the codec that should be used to compress the map output files.mapred.compress.map.output
: This property, if set to true
, can compress the map output files before it is sent across the network.io.sort.mb
: This property defines the memory set to perform the in-memory sorting and is useful when tuning to reduce the number of spilled records.io.sort.factor
: When output data generated from a map task is small enough to fit into a tasktracker's memory, it is retained there and all operations are done in-memory. However, if the data is larger than the tasktracker's memory, it is spilled (written) to the disk. This property defines the number of open file handles that will be used when sorting files.mapred.reduce.parallel.copies
: This property defines the number of parallel transfers done by reduce during the shuffle phase.mapred.submit.replication
: This property defines the replication factor for the job-related resources that are copied to HDFS at the initiation of a job run.mapred.reduce.tasks
: This property defines the number of reduce tasks for a single job.mapred.userlog.retain.hours
: This property defines the retention period of the user logs after the job completion.mapred.child.java.opts
: This property defines the parameters that are passed to the tasktracker's child processes.mapred.job.reuse.jvm.num.tasks
: The tasktracker creates a JVM for each task. This property helps to alter this behavior to run more than one task per JVM.mapred.map.tasks.speculative.execution
: Speculative execution is the ability of the jobtracker to identify slow running tasks and start another instance of the same task in parallel. The results of the task that finishes first will be considered and the incomplete task is discarded. This helps in situations when nodes that are running tasks face some kind of performance problems. If this property is set to true
, two instances of the same map task could run in parallel.mapred.reduce.tasks.speculative.execution
: If this property is set to true
, multiple instances of the same reduce task can run in parallel.mapred.reduce.slowstart.completed.maps
: This property defines the percentage value of how much a map task should complete before the reducers are scheduled for their tasks.The jobtracker user interface provides useful information related to the jobs executed on the cluster. The jobtracker status can be monitored via the following jobtracker URL:
http://<serveraddress>:50030/
The UI provides complete information of the status of the jobtracker along with the progress information of the task. The jobtracker page is divided into the following sections:
The following screenshot shows the General Information section of the jobtracker web interface:
If a node is not performing correctly, the jobtracker can blacklist the node so that no new tasks are assigned to it. The Blacklisted Nodes column in the Cluster Summary table shows the number of blacklisted nodes in the cluster. The excluded nodes show the number of decommissioned nodes in the cluster.
The following screenshot shows the Cluster Summary section of the jobtracker web interface:
The following screenshot shows the Scheduling Information section of the jobtracker web interface:
The following screenshot shows the Running Jobs section of the jobtracker web interface:
The following screenshot shows the Completed Jobs section of the jobtracker web interface:
The following screenshot shows the Failed Jobs section of the jobtracker web interface:
mapred.jobtracker.retirejob.interval
) is set in the mapred-site.xml
file.The following screenshot shows the Retired Jobs section of the jobtracker web interface:
The following screenshot shows the Local Logs section of the jobtracker web interface:
One of the hyperlinks of this section is:
The following sections are displayed when the Jobid link available in the Running Jobs section is clicked:
The following screenshot shows the general information section:
The following screenshot shows the map and reduce progress information section:
The following screenshot shows the counter information section:
The following screenshot shows the map and reduce completion graphs:
All the preceding information is really helpful to monitor the status of a job.
3.145.84.112