Getting acquainted with MapReduce

Now you have a solid knowledge base in HDFS, it is now time to dive into the processing module of Hadoop known as MapReduce. Once we have the data in the cluster, we need a programming model to perform advanced operations on it. This is done using Hadoop's MapReduce.

The MapReduce programming model concept has been in existence for quite some time now. This model was designed to process large volumes of data in parallel. Google implemented a version of MapReduce in house to process their data stored on GFS. Later, Google released a paper explaining their implementation. Hadoop's MapReduce implementation is based on this paper.

MapReduce in Hadoop is a Java-based distributed programming framework that leverages the features of HDFS to execute high performance batch processing of the data stored in HDFS.

The processing can be divided into major functions, which are:

  • Map
  • Reduce

Since the primary focus of this book is on the administrative aspects of Hadoop, we will focus on the MapReduce architecture and how it works together with HDFS to process large volumes of data.

Understanding the map phase

In a MapReduce application, all the data read in the map function is read in the form of key and value pairs. The processed output of the map function is also in the form of key and value pairs. The processing of data as key and value pairs works well in a distributed computing environment.

Let's understand how MapReduce works with the help of an example. The word counting program is known as the Hello, World program for MapReduce. The program counts the number of words in an input set of text files.

For this example, let's consider a file with the following line in it:

She sells sea shells on the sea shore where she also sells cookies.

So, if the preceding text is provided as an input to the word count program, the expected output would be as follows:

she, 2
sells,2
sea, 2
shells, 1
on, 1
the, 1
shore, 1
where, 1
also, 1
cookies, 1

The three major components of a MapReduce program are:

  • Driver
  • Mapper
  • Reducer

The driver component of a MapReduce program is responsible for setting up the job configurations and submitting it to the Hadoop cluster. This part of the program runs on the client computer.

The driver component of the word count program would take two parameters to submit the job to the Hadoop cluster:

  • The location of the input files
  • The location of the output file

Once the job is submitted to the cluster, the mapper reads every line in the file as <key, value> pairs. So, if we consider a file with the line mentioned earlier, the key will be the offset of the line and the value will be the entire sentence.

The mapper reads the line as follows:

<0000, She sells sea shells on the sea shore where she also sells cookies>

Once read, the mapper logic would emit the <key, value> pairs for each word in the sentence as follows:

<she, 1>
<sells, 1>
<sea, 1>
<shells, 1>
<on, 1>
<the, 1>
<sea, 1>
<shore, 1>
<where, 1>
<she, 1>
<also, 1>
<sells, 1>
<cookies, 1>

The mapping function has emitted each word in the sentence as a key and constant number 1 as the value for each key.

Understanding the reduce phase

The reduce function reads the intermediate <key, value> pairs emitted by the mapper and produces the final result.

These results are then taken as input by the reducer in a sorted order of the keys. The reducer logic would then work on each key group; in this case, it would sum up the values for each key and would produce the final result as follows:

she, 2
sells,2
sea, 2
shells, 1
on, 1
the, 1
shore, 1
where, 1
also, 1
cookies, 1

The following is a functional representation of the map and reduce functions:

Function

Input

Output

map

<k1, v1>

list(k2, v2)

reduce

<k2, list(v2)>

list(<k3, v3>)

The following diagram shows the flow of a MapReduce job starting from an input file right up to the generation of an output file:

Understanding the reduce phase

In the preceding diagram, you see a very simple flow of MapReduce. However, in real production scenarios, there are multiple mappers and reducers.

When there are multiple mappers and reducers involved, there is a phase between the mapper and reducer known as the shuffle and sort phase. In this phase, all the keys are sorted and sent to the reducers. Each reducer works on the set of keys and values provided as input and generates their own output file as shown in the following diagram:

Understanding the reduce phase

Learning all about the MapReduce job flow

There are several operations and services involved in the submission and execution of a MapReduce job in a Hadoop cluster.

The two main services that are responsible for job execution are:

  • Jobtracker
  • Tasktracker

When a client initiates a job submission to the cluster, a new job ID is created by the jobtracker and returned to the client. After getting the ID, the job resources along with the information on the input splits of the data are then copied to HDFS so that all the services in the cluster can access it. The client then polls the jobtracker every second to check the job's completion status.

The jobtracker then takes over and initializes the job in the cluster by accessing the job resources in HDFS. The jobtracker retrieves the input splits information and then decides the tasks that it needs to assign to tasktrackers. The job tracker creates a map task for each of the input splits and then assigns the map tasks to the tasktrackers. The tasktrackers are also responsible for running the reduce tasks on completion of the map tasks. The jobtracker tries to assign map tasks to tasktrackers on nodes that are in close proximity to the data. This greatly improves performance by limiting the data transferred across the network (data locality).

The tasktracker is the actual service that runs a task. Tasktrackers are running all the time and are waiting for tasks to be assigned to them by the jobtracker. Tasktrackers are configured to run a specific number of map and reduce tasks. These are called slots.

The tasktracker sends a periodic heartbeat to the jobtracker to inform it that it is alive along with the number of map and reduce slots it has available. The jobtracker assigns tasks as a return value for the heartbeat. Once the task is assigned, the tasktracker copies the client program (usually a java compiled set of classes, referred to as a jar) to its local space from HDFS. All the intermediate data generated by the map task is stored locally on the node where the tasktracker runs.

After all the map and reduce tasks are completed, the jobtracker receives a notification of completion. The jobtracker marks the job as successful. The client that polls for the status of the job prints the completion notification on the client console.

Configuring MapReduce

All MapReduce-related configuration is done by adding/updating the properties in the mapred-site.xml file. The following is an example of a mapred-site.xml file:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
  <property>
    <name>mapred.job.tracker</name>
    <value>node1.hcluster:8021</value>
  </property>
  <property>
    <name>mapred.job.tracker.http.address</name>
    <value>0.0.0.0:50030</value>
  </property>
  <property>
    <name>mapreduce.job.counters.max</name>
    <value>120</value>
  </property>
  <property>
    <name>mapred.output.compress</name>
    <value>false</value>
  </property>
  <property>
    <name>mapred.output.compression.type</name>
    <value>BLOCK</value>
  </property>
  <property>
    <name>mapred.output.compression.codec</name>
    <value>org.apache.hadoop.io.compress.DefaultCodec</value>
  </property>
  <property>
    <name>mapred.map.output.compression.codec</name>
    <value>org.apache.hadoop.io.compress.SnappyCodec</value>
  </property>
  <property>
    <name>mapred.compress.map.output</name>
    <value>true</value>
  </property>
  <property>
    <name>io.sort.mb</name>
    <value>50</value>
  </property>
  <property>
    <name>io.sort.factor</name>
    <value>64</value>
  </property>
  <property>
    <name>mapred.reduce.parallel.copies</name>
    <value>10</value>
  </property>
  <property>
    <name>mapred.submit.replication</name>
    <value>2</value>
  </property>
  <property>
    <name>mapred.reduce.tasks</name>
    <value>4</value>
  </property>
  <property>
    <name>mapred.userlog.retain.hours</name>
    <value>24</value>
  </property>
  <property>
    <name>mapred.child.java.opts</name>
    <value> -Xmx112889935</value>
  </property>
  <property>
    <name>mapred.job.reuse.jvm.num.tasks</name>
    <value>1</value>
  </property>
  <property>
    <name>mapred.map.tasks.speculative.execution</name>
    <value>false</value>
  </property>
  <property>
    <name>mapred.reduce.tasks.speculative.execution</name>
    <value>false</value>
  </property>
  <property>
    <name>mapred.reduce.slowstart.completed.maps</name>
    <value>0.8</value>
  </property>
</configuration>

Let's discuss each property in detail:

  • mapred.job.tracker: This property defines the host and port on which the jobtracker runs. All communication with the jobtracker is done over the host and port.
  • mapred.job.tracker.http.address: This property defines the web address of the jobtracker. This web location helps in the monitoring of jobs submitted to the cluster.
  • mapreduce.job.counters.max: Internally, Hadoop maintains several counters, for example, JobCounter and TaskCounter to count the job and task-related information during their process. However, it is also possible for developers to define their own counters. This liberty could cause issues if the number of counters is not controlled, as the jobtracker maintains these counters globally. This property helps in limiting the number of counters that can be generated.
  • mapred.output.compress: This is a Boolean property, and if set to true, it will compress the job's output file.
  • mapred.output.compression.type: This property defines the type of compression that can be set. The options are NONE, RECORD, or BLOCK.
  • mapred.output.compression.codec: This property defines the codec to be used for compression of the job's output file.
  • mapred.map.output.compression.codec: This property defines the codec that should be used to compress the map output files.
  • mapred.compress.map.output: This property, if set to true, can compress the map output files before it is sent across the network.
  • io.sort.mb: This property defines the memory set to perform the in-memory sorting and is useful when tuning to reduce the number of spilled records.
  • io.sort.factor: When output data generated from a map task is small enough to fit into a tasktracker's memory, it is retained there and all operations are done in-memory. However, if the data is larger than the tasktracker's memory, it is spilled (written) to the disk. This property defines the number of open file handles that will be used when sorting files.
  • mapred.reduce.parallel.copies: This property defines the number of parallel transfers done by reduce during the shuffle phase.
  • mapred.submit.replication: This property defines the replication factor for the job-related resources that are copied to HDFS at the initiation of a job run.
  • mapred.reduce.tasks: This property defines the number of reduce tasks for a single job.
  • mapred.userlog.retain.hours: This property defines the retention period of the user logs after the job completion.
  • mapred.child.java.opts: This property defines the parameters that are passed to the tasktracker's child processes.
  • mapred.job.reuse.jvm.num.tasks: The tasktracker creates a JVM for each task. This property helps to alter this behavior to run more than one task per JVM.
  • mapred.map.tasks.speculative.execution: Speculative execution is the ability of the jobtracker to identify slow running tasks and start another instance of the same task in parallel. The results of the task that finishes first will be considered and the incomplete task is discarded. This helps in situations when nodes that are running tasks face some kind of performance problems. If this property is set to true, two instances of the same map task could run in parallel.
  • mapred.reduce.tasks.speculative.execution: If this property is set to true, multiple instances of the same reduce task can run in parallel.
  • mapred.reduce.slowstart.completed.maps: This property defines the percentage value of how much a map task should complete before the reducers are scheduled for their tasks.

Understanding the jobtracker UI

The jobtracker user interface provides useful information related to the jobs executed on the cluster. The jobtracker status can be monitored via the following jobtracker URL:

http://<serveraddress>:50030/

The UI provides complete information of the status of the jobtracker along with the progress information of the task. The jobtracker page is divided into the following sections:

  • General Information: The general information section displays some basic information of the jobtracker such as the current status, the timestamp of when the service was started, the compilation information, and a unique identifier for the service.

    The following screenshot shows the General Information section of the jobtracker web interface:

    Understanding the jobtracker UI
  • Cluster Summary: This section displays information about the tasks and nodes available in the cluster. Information such as the current running map and reduce tasks and number of nodes in the cluster are self-explanatory. Every tasktracker in the cluster has a configured number of map and reduce slots. The occupied map slots shows the number of map slots currently in use out of the total number of slots available in the cluster. Similarly, the occupied reduce slots show the number of reduce slots currently in use out the total available. The map task capacity and reduce task capacity is the value that represents the maximum number of map tasks and reduce tasks that can be run on the cluster. The average tasks per node shows the average number of map or reduces tasks that run on the cluster.

    If a node is not performing correctly, the jobtracker can blacklist the node so that no new tasks are assigned to it. The Blacklisted Nodes column in the Cluster Summary table shows the number of blacklisted nodes in the cluster. The excluded nodes show the number of decommissioned nodes in the cluster.

    The following screenshot shows the Cluster Summary section of the jobtracker web interface:

    Understanding the jobtracker UI
  • Scheduling Information: This section shows the job scheduler information for the cluster. Here, the default job scheduler is a hyperlink that can be clicked to see the information of the jobs that are currently in queue. The default scheduler maintains a queue of all the jobs submitted and completes jobs sequentially. There are other types of the schedulers too that can be configured to run jobs in Hadoop, such as fair scheduler and capacity scheduler.

    The following screenshot shows the Scheduling Information section of the jobtracker web interface:

    Understanding the jobtracker UI
  • Running Jobs: This section shows the details of the current running jobs in the cluster. This is very useful to monitor the status of the jobs. The Jobid column lists all the job IDs and each of them are hyperlinks. Clicking on one will bring up more details of the job. Some other basic job-related information such as the job priority, the user who submitted the job, and the name of the job are also displayed. The progress of the cumulative map and reduce tasks are also shown as percentages. Total map tasks, completed map tasks, total reduce tasks, and completed reduce task are also displayed.

    The following screenshot shows the Running Jobs section of the jobtracker web interface:

    Understanding the jobtracker UI
  • Completed Jobs: This section is very similar to that of the running jobs section except that this lists only the completed jobs. This does not show any in-progress or failed jobs.

    The following screenshot shows the Completed Jobs section of the jobtracker web interface:

    Understanding the jobtracker UI
  • Failed Jobs: The failed jobs section, as the name suggests, lists all the jobs that failed in the cluster.

    The following screenshot shows the Failed Jobs section of the jobtracker web interface:

    Understanding the jobtracker UI
  • Retired Jobs: The jobs submitted to the Hadoop cluster stay in memory on successful completion. They are automatically written to the disk after a certain configured period of time. This configuration (mapred.jobtracker.retirejob.interval) is set in the mapred-site.xml file.

    The following screenshot shows the Retired Jobs section of the jobtracker web interface:

    Understanding the jobtracker UI
  • Local Logs: This section provides hyperlinks to the log directory and the jobtracker history for all the jobs on the cluster. The log directory consists of logs related to the jobtracker as well as tasktracker.

    The following screenshot shows the Local Logs section of the jobtracker web interface:

    Understanding the jobtracker UI

    One of the hyperlinks of this section is:

    • Job Tracker History: This hyperlink from the Local Logs section brings up the list of all the jobs that ran on the cluster with details of when they were submitted, the ID, the job name, and the user who submitted the job. Here, the job ID is a hyperlink that brings up further details of the job. The following is a screenshot of the Hadoop Map/Reduce History Viewer section:
    Understanding the jobtracker UI

Getting MapReduce job information

The following sections are displayed when the Jobid link available in the Running Jobs section is clicked:

  • General information: This section provides basic details of the job such as the user, the name of the job, and the configuration file. Along with this, the section also shows the current status of the job, the running duration, and the job start timestamp.

    The following screenshot shows the general information section:

    Getting MapReduce job information
  • Map and reduce progress information: This section displays the complete details of the number of map and reduce tasks for the current job. Information such as completion status, total number of tasks, number of running tasks, number of completed tasks, and the number of killed tasks are also displayed.

    The following screenshot shows the map and reduce progress information section:

    Getting MapReduce job information
  • Counter information: Hadoop has several built-in counters that provide information about the job being executed. These counters help us understand the behavior of the job and also assist us identify problems with a job a job. Developers can also build custom counters to track certain aspects of their application. The counter information section provides filesystem counter information, job counter information, and statistics related to the MapReduce operations of the job.

    The following screenshot shows the counter information section:

    Getting MapReduce job information
  • Map and reduce completion graphs: The map and reduce completion graphs provide a graphical representation of the map and reduce tasks for the job. The map completion graph shows progress information of all the map tasks submitted to the cluster for the job. The reduce completion graph shows progress information of each phase: the copy, sort, and reduce operations.

    The following screenshot shows the map and reduce completion graphs:

    Getting MapReduce job information

All the preceding information is really helpful to monitor the status of a job.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.84.112