Hadoop optimization

If we have an optimized Hadoop cluster, a lot of problems are easily solved for other Hadoop ecosystem components on the cluster, for example HBase. So, now let's see some of the factors using which we can optimize Hadoop.

General optimization tips

These are some general optimization tips that will help us to optimize Hadoop:

  • Create a dedicated Hadoop/HBase user to run the daemons.
  • Try to use SSD for the NameNode metadata.
  • The NameNode metadata must be backed up periodically; it can be either on an hourly or daily basis. If it contains very valuable data, it must be backed up every 5 to 10 minutes. We can write crons to copy with the metadata directory.
  • We must have multiple metadata directories for NameNode, which can be specified using the parameters dfs.name.dir or dfs.namenode.name.dir; one can be located at local disk and another at network mount location (NFS). This provides redundancy of metadata, and robustness, in case of failure.
  • Set dfs.namenode.name.dir.restore to true to enable NameNode to try and recover previously failed dfs.namenode.name.dir directories during checkpointing.
  • Master node must be RAID enabled with RAID 1 architecture (mirrored pair).
  • Keep a lot of space for NameNode's log directory. These logs help us to debug or troubleshoot.
  • Since Hadoop is I/O bound, we must select the best possible (with respect to speed and throughput) storage.

Optimizing Java GC

Follow these steps for Java GC optimization:

  • Use the latest version of Java
  • Use parallel GC for Java
  • Disable adaptive sizing in JVM and fix the lower and higher values of heap memory
  • Enable JVM reuse so that we can set the values of the mapred.job.reuse.jvm.num.tasks parameter to the number of tasks we want to reuse JVM

Optimizing Linux OS

Use these techniques for Linux OS optimization:

  • The atime and noatime attributes are the Linux file system attributes that enable Linux to record the created and accessed times for a file; disabling these attributes gives significant gains to performance. This property can be set in fstab ( file systems table) fount at /etc/fstab as follows:
    /mnt/dev1/Vol1 / ext4 defaults,noatime,nodiratime1  1
    
  • Enable a file system's read-ahead buffer to enable caching of blocks of files for faster access. We can set it using the following command:
    blockdev --setra32768 /dev/sda
    
  • Disable vm.swappiness using the following command:
    sysctl -w vm.swappiness= 0
    

    Note

    This value should be 0 or less than 10.

  • Increase ulimit to a higher value; the default value, 1,024, is not adequate so make it between 32K to 34K, depending upon the system resources.
  • We should always have time synchronization in clusters. We will discuss how to do it using the NTP service.
  • Keep your OS bug free, apply available patches, and keep it updated (before updating, check for compatibility with Hadoop/HBase).

Optimizing the Hadoop parameter

The following are the steps that will help us to optimize the Hadoop parameter:

  • Enabling trash will help if a file is deleted accidently; we will see how to do this in administration chapter.
  • Keep the number of threads higher for RPC calls; this can be done using the following parameters:
    • The dfs.namenode.handler.count and mapred.jobtracker.handler.count parameters, by default, are 10, and we can change it to between 50 and 100, depending upon the memory available.
    • The dfs.datanode.handler.count parameter for the DataNode handler count, the default for which is 3, makes it higher between 5 and 10 if more HDFS clients have to read/write from a cluster. The higher the number of threads, the higher the consumption of memory, so set this parameter accordingly, don't make it too high.
    • Set dfs.replication to lower or higher according to the cost and size of data. If the size of data is huge and it's not very costly, we can keep it between 2 and 3; if data size is very valuable and less costly, keep it between 3 and 5 or so.
    • The dfs.block.size parameter defines the block size. It's better to keep it between 64 MB and 256 MB, depending upon the file's size on HDFS. For files less than 100 MB, a block of 64 MB will be better, and for files greater than 100 MB, or files whose size is in GBs, a 256 MB block will be better.

      Tip

      These and other parameters must be adjusted according to the setup and amount of resources available.

Optimizing MapReduce

To optimize MapReduce, use the following techniques:

  • mapreduce.tasktracker.http.thread (renamed to mapreduce.tasktracker.http.threads in Hadoop 2.0) defines the number of worker threads for the HTTP server. By default, it is 40; make it between 80 and 100 for large clusters.
  • mapreduce.task.io.sort.factor defines the sort factor and must be kept higher to reduce disk access. This also defines the number of open files at a time.
  • If mapreduce.map.speculative is set to true, jobs will be executed in parallel; it speeds up the execution process in case some tasks fail (jobs that take a lot of time, that is, more than an hour or two, should be set to false).
  • Compression of Map and Reduce outputs must be set to true for larger clusters and false for smaller clusters (-jobconfmapred.output.compress=true).
  • Use the maximum number of mappers and reducers. This should be set according to the number of the CPU core in node. We will see how to set it later.
  • There are a few other parameters that must be fiddled with according to the cluster size and type of jobs running.

Rack awareness in Hadoop

In a production environment, we should enable rack awareness for a more robust cluster as it might happen that the whole rack will go down due to a problem with the central switch for this specific rack, or there might be a fire that will affect all the machines on the rack. If we enable and set up a cluster with rack awareness, we can avoid the whole cluster going down and always access the data that lies on nodes on other racks; meanwhile, the affected rack can be brought up. Now, let's see how to enable and configure this feature.

To make the cluster rack aware, we can write a script that enables the master node to assign data blocks according to the topology for high availability in case of a rack failure. For this, we need to include a parameter in the hadoop-site.xml file, which is topology.script.file.name (net.topology.script.file.name in Hadoop 2); for this, we give the path of the executable script that returns a list of IPs or hostnames and the rack where they reside:

<property>
  <name>topology.script.file.name</name>
  <value>/home/shashwat/hadoop/conf/hadoopRackAwareness.sh</value>
</property>

The following is the hadoopRackAwareness.sh script:

#!/bin/sh
#SourceFrom : http://wiki.apache.org/hadoop/topology_rack_awareness_scripts
#Credit : Apache Wiki

HADOOP_CONF=/home/shashwat/hadoop
while [ $# -gt 0 ] ; do
nodeArg=$1
exec< ${HADOOP_CONF}/IPRackFile.list
  result="" 
while read line ; do
ar=( $line ) 
if [ "${ar[0]}" = "$nodeArg" ] ; then
result="${ar[1]}"
fi
done
shift
if [ -z "$result" ] ; then
echo -n "/default/rack "
else
echo -n "$result "
fi
done

The following is the content of IPRackFile.list:

192.168.1.10    /rack1
192.168.1.11    /rack1
192.168.1.12    /rack1
10.3.0.3        /rack2
10.3.0.4        /rack2
10.3.0.5        /rack2
192.3.0.4       /rack3
192.3.0.5       /rack3
192.3.0.6       /rack3

After making these changes, restart the cluster and issue this command that will show a new line with the rack information:

bin/hadoop dfsadmin –report

Number of Map and Reduce limits in configuration files

For the number of Map and Reduce limits, we need to consider the following parameters:

  • The maximum number of Map task slots to run simultaneously on a single TaskTracker:
    mapred.tasktracker.map.tasks.maximum
    
  • The maximum number of Reduce task slots to run simultaneously on a single TaskTracker:
    mapred.tasktracker.reduce.tasks.maximum
    

To set these two values, we need to consider available CPU cores, disk, and memory. Suppose we have an eight core processor, and if the job is CPU intensive, we can set four Maps that will finish. Then, the reducer will start; for less CPU-intensive jobs, we can keep map task at 40 and reduce task at 20. After fixing these, we need to see if there is a long wait, and then we can reduce the number so as to make the process faster. We must understand that if we set too big a number, there will be a lot of context switching and swapping of data between memory and disk, which might reduce the overall slow processing, so we need to make it balanced according to the system resources we have.

Considering and deciding the maximum number of Map and Reduce tasks

Let's see how we can set the maximum number of Map and Reduce tasks. It is not always correct to run a lot of tasks (MapReduce) at a time because if they can't fit in the memory, tasks tend to fail or take a lot of time. So, we need to consider the maximum number of MapReduce tasks that can run at a time on TaskTracker, and according to the number of CPU cores available in the system, we fit only a certain number of task data in memory and don't swap in or swap out data as it increases I/O and hence degrades performance.

The following table lists the number of Map and Reduce tasks according to the existing number of CPU cores and memory:

CPU core

Memory

Number of Map task(s)

Number of Reduce task(s)

1

1 GB

1

1

1

5 GB

1

1

4

5 GB

4

2

16

32 GB

16

8

16

64 GB

16

8

24

64 GB

24

12

24

128 GB

24

12

Programmatically, we can set the number of Map and Reduce tasks too, which can override the value we set in configuration files. Generally, we can calculate it as follows:

mapred.tasktracker.map.tasks.maximum = 2 + cpu_numer * 2/3
mapred.tasktracker.reduce.tasks.maximum = 2 + cpu_numer * 1/3
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.85.221