Chapter 16

Performance Tuning

WHAT’S IN THIS CHAPTER?

  • Understanding the factors that affect parallel scalable applications
  • Optimizing scalable processing, especially when it leverages the MapReduce model for processing
  • Presenting a set of best practices for parallel processing
  • Illustrating a few Hadoop performance tuning tips

Today, much of the big data analysis in the world of NoSQL rests on the shoulders of the MapReduce model of processing. Hadoop is built on it and each NoSQL product supporting huge data sizes leverages it. This chapter is a first look into optimizing scalable applications and tuning the way MapReduce-style processing works on large data sets. By no means does the chapter provide a prescriptive solution. Instead, it provides a few important concepts and good practices to bear in mind when optimizing a scalable parallel application. Each optimization problem is unique to its requirements and context and so providing one universally applicable general solution is probably not feasible.

GOALS OF PARALLEL ALGORITHMS

MapReduce makes scalable parallel processing easier than it had been in the past. By adhering to a model where data is not shared between parallel threads or processes, MapReduce creates a bottleneck-free way of scaling out as workloads increase. The underlying goal at all times is to reduce latency and increase throughput.

The Implications of Reducing Latency

Reducing latency simply means reducing the execution time of a program. The faster a program completes — that is, the less time a program takes to produce desired results for a given set of inputs — the better it fares on the latency metrics. Under a given set of inputs and outputs, latency reduction usually involves choosing the most optimal algorithms for producing the output and parallelizing the execution of sub-tasks. If a task can be broken down into a number of parallel and independent sub-tasks, running them in parallel can reduce the overall time taken to complete the task. Therefore, in parallel programs, latency effectively is a measure of the time taken to execute the smallest ‘atomic’ sub-task. The word ‘atomic’ here denotes the smallest unit of work that cannot be further broken down into parallel sub-tasks. In a case, where parallelization is not feasible, latency is a measure of the time taken to execute the entire program.

As far as optimizing algorithms go, you need to keep in mind that the given algorithm needs to fit within the model of map and reduce functions. Of course, you can have multiple passes through these functions, if required.

How to Increase Throughput

Throughput refers to the amount of input that can be manipulated to generate output within a given process. In many large data sets, throughput takes center stage, sometimes even at the expense of increased latency. This is because analyzing large amounts of data is not trivial. For example, Kevin Weil of Twitter, in a presentation at Web2.0 Expo in 2010 (www.slideshare.net/kevinweil/analyzing-big-data-at-twitter-web-20-expo-nyc-sep-2010) revealed that Tweets added up to 12 Terabytes per day. This amount of data needs around 48 hours to be written to a disk at a speed of about 80 Mbps. The same story appears in all venues, for example Facebook and Google, where heavy user traffic generates large amounts of user data every day.

Hadoop provides the capability to analyze large sets of data, even data that is spread beyond a single machine. In traditional single large systems, the throughput was often constrained by the available resources. For example, the amount of RAM in a system or the number and power of CPU(s) determined the amount of processing a machine could do. As data grew, even the most powerful machines seemed to reach their limits. In a horizontally scaled Hadoop environment that leverages the Hadoop distributed filesystem (HDFS) such limits have become lesser problems. Adding nodes to a cluster enables Hadoop to take on more processing as data grows. Also as a side effect the parallelization allows for commodity hardware with limited capabilities, as compared to powerful machines, to contribute effectively and help in increasing throughput.

Linear Scalability

In a typical horizontally scaled MapReduce-based model, the processing is parallel but the scalability is often linear. This means if one node of a cluster can process x megabytes of data every second, then n nodes can process x multiplied by n amounts of data. Flipping the argument the other way as data grows by every multiple of x, you need another node to keep the same rate of processing. Also, if all n nodes are equally balanced in terms of load, the time could be kept constant as long as a newer node is available to take on the additional load. Alternatively, time could be proportionately reduced for processing a given amount of data by adding more nodes to the cluster.

Simple math to demonstrate this could be as follows:

  • Time taken to process y amounts of data on a single node = t seconds
  • Time taken to process y amounts of data on n nodes = t/n seconds

These simple mathematical formulas assume that tasks can be parallelized into equally balanced units and that each unit takes about the same time to process the given data set.

INFLUENCING EQUATIONS

Milind Bhandarkar, a key contributor to Hadoop, in a presentation on Scaling Hadoop Applications (www.slideshare.net/hadoop/scaling-hadoopapplications) nicely summarizes the influencing theory using three important and well-known equations:

  • Amdahl’s Law
  • Little’s Law
  • Message Cost Model

Amdahl’s Law

Amdahl’s Law provides a formula for finding the maximum improvement in performance of an overall system when only a part of the system is improved. Amdahl’s Law is named after Gene Amdahl, www.computer.org/portal/web/awards/amdahl, a well-known computer architect who contributed to the making of the IBM mainframes.

Amdahl’s Law can succinctly be explained using a simple example. Say you have a process that runs for 5 hours and this process can be divided into sub-tasks that can be parallelized. Assume that you can parallelize all but a small part of the program that takes 25 minutes to run. Then this part of the program, the one that takes 25 minutes to complete, ends up defining the best speeds that the overall program can achieve. Essentially, the linear part of the program limits the performance.

In mathematical terms this example could be seen as follows:

  • Total time taken for the program to run: 5 hours (300 minutes)
  • Time taken for the serial part of the program: 25 minutes
  • Percentage of the overall program that can be parallelized: ~91.6
  • Percentage that cannot be parallelized (or is serial in nature): 8.4
  • Therefore, maximum increase in speed of the parallelized version compared to the non-parallelized version is 1 / (1 − 0.916) = ~11.9

In other words, the completely parallelized version could be more than 11 times faster than the non-parallel version of the same program.

Amdahl’s Law generalizes this calculation of speed improvement in an equation, which is as follows:

1 / ((1 – P) + P/S)

where P represents the proportion that is parallelized and S the times the parallelized part performs as compared to the non-parallelized one.

This generalized equation takes into account different levels of speed increase for different parts of a program. So, for example, a program can be parallelized into four parts, P1, P2, P3, and P4, where P1, P2, P3, and P4 are 10%, 30%, 40%, and 20%, respectively. If P1 can speed up by 2x, P2 by 3x, and P3 by 4x, but P4 cannot be speeded up, then the overall running time is as follows:

0.10/2 + 0.30/3 + 0.40/4 + 0.20/1 = 0.45

Therefore, the maximum speed increase is 1/0.45 or 2.22, more than double that of the non-parallel program.

You can read more about Amdahl’s Law at www-inst.eecs.berkeley.edu/~n252/paper/Amdahl.pdf.

Amdahl’s Law applies as much to MapReduce parallelization as it does to multi-core programming.

image

Gustafson’s Law, http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.85.6348, reevaluates Amdahl’s Law. It states that given more computing power, more complex problems can be solved in the same time as a simpler problem takes, when lesser computing power is used. Therefore, Gustafson’s Law contradicts the scalability limits imposed by the linear part of the program, especially when large complex repetitive tasks are carried out using more computing resources.

Little’s Law

Little’s Law applies to parallel computing but has its origins in the world of economics and queuing theory. The law appears deceptively simple but provides a probability distribution independent way of analyzing the load on stable systems. The law states that the average number of customers in a stable system is the product of the average arrival rate and the time each customer spends in the system. In terms of a formula, it appears as follows:

  • L = kW
  • L is the average number of customers in a stable system
  • k is the average arrival rate
  • W is the time a customer spends in the system

To understand this a bit further, consider a simple system, say a small gas station with cash-only payments over a single counter. If four customers arrive every hour at this gas station and each customer takes about 15 minutes (0.25 hours) at the gas station, there should be an average of only one customer at any point in time at this station. If more than four customers arrive at the same station, it becomes clear that it would lead to bottlenecks in the system. If gas station customers get frustrated by waiting longer than normal and leave without filling up, you are likely to have higher exit rates than arrival rates and in such a situation the system would become unstable.

Viewing a system in terms of Little’s Law, it becomes evident that if a customer or an active process, when translated to parallel programs, takes a certain amount of time, say W, to complete and the maximum capacity for the system allows handling of only L processes at any time, then the arrival rate cannot be more than L/W per unit of time. If the arrival rate exceeds this value, the system would be backed up and the computation time and volume would be impacted.

Message Cost Model

The third equation is the Message Cost Model. The Message Cost Model breaks down the cost of sending a message from one end to the other in terms of its fixed and variable costs. Simply put, the Message Cost Model equation is as follows:

C = a + bN

  • C is the cost of sending the message from one end, say A, to the other, say B
  • a is the upfront cost for sending the message
  • b is the cost per byte of the message
  • N is the number of bytes of the message

This equation is simple to understand and there are two key takeaways from this model:

  • Transfer of a message irrespective of its size involves an upfront fixed cost. In terms of messages, the overhead around connection establishment, handshake, and setup are quite common.
  • The cost of a message transfer is directly and linearly co-related to the message size.

The Message Cost Model provides some interesting insights into costs associated with transmission of messages across a network. On a gigabit Ethernet, a is about 300 micro-seconds, which is 0.3 milliseconds, and b is 1 second per 125 MB. 1 Gigabit is 1000 Mb or 125 MB. A gigabit Ethernet implies a transmission rate of 125 MBps. A cost of 1 second per 125 MB is the same as 1 ms per 125 KB because 1000 ms make up a second and 1000 KB make up an MB. This means 100 messages of 10 KB each take 100 multiplied by (0.3 + 10/125) ms, which is 38 ms, whereas 10 messages of 100 KB take only 10 multiplied by (0.3 + 100/125) ms, which is 11 ms. Therefore, a way to optimize message cost is to send as big a packet as possible each time, thereby amortizing the upfront cost over a much larger size.

image

In a theoretical calculation a, the fixed cost, in the Message Cost Model is considered fixed for all message sizes but usually that’s not the case. The value of a varies depending on the message size.

PARTITIONING

Partitioning is a very important aspect of parallelization. In the MapReduce method of processing, each reducer forms a partition. During the map phase key/value pairs are emitted. The reducers consume these key/value pairs. The MapReduce method prefers a share-nothing model of processing so it’s necessary that all key/value pairs that have the same key go to the same partition and get manipulated by the same reducer.

In the Hadoop MapReduce framework, a default partitioner is defined. The default partitioner is HashPartitioner. HashPartitioner uses the hashCode function value of the keys. Therefore, ‘hashCode value’ modulo ‘number of partitions’ = n (where n is the number used to distribute the key/value pairs across the partitions).

Hadoop uses an interface called Partitioner to determine which partition a key/value pair emitted by a map task goes to. The number of partitions, and therefore the number of reduce tasks, are known when a job starts. The Partitioner interface is as follows:

public interface Partitioner<K, V> extends JobConfigurable {
  int getPartition(K key, V value, int numPartitions);

The getPartition method takes the key, value, and the number of partitions as arguments and returns the partition number, which identifies the partition the key/value is sent to. For any two keys, k1 and k2, if k1 and k2 are equal then the partition number returned by getPartition is the same.

If partitioning is not balanced using emitted key/value pairs there could be a load imbalance or over partitioning, both of which are not efficient. When a few reducers take on most of the load and others remain idle, load imbalance occurs. Imbalance leads to increased latency. Machines and disks under full load also tend to become slower and hit boundary conditions where efficiency is reduced. Load imbalance causes some reducers to reach these full states.

You know from the earlier illustration of Amdahl’s Law that any parallel process optimization is limited by the longest serial task. In partitioned MapReduce processing, a serial longer running execution can form a bottleneck. It can also lead to sequential waits because reduce and grouping tasks complete the entire process only when all constituent key/value pairs are processed.

SCHEDULING IN HETEROGENEOUS ENVIRONMENTS

Hadoop’s default simple scheduling algorithm compares each task’s progress to the average progress to schedule jobs. The default scheduler assumes the following:

  • Nodes perform work at about the same rate
  • Tasks progress at a constant rate throughout

In heterogeneous environments, this default simple speculative algorithm does not perform optimally. Therefore, improvements have been made specifically to address the problems in heterogeneous environments.

The Longest Approximate Time to End (LATE) scheduler is an improvement on the default Hadoop scheduler. The LATE scheduler launches speculative tasks only on fast nodes. It also puts a speculative cap by limiting the number of tasks that are speculated. Also, a slow task threshold determines whether a task is slow enough to get speculated.

Although the LATE scheduler is an improvement on the default Hadoop scheduler, both these schedulers compute the progress of tasks in a static manner. SAMR, a self-adaptive MapReduce scheduling algorithm, outperforms both the default and LATE schedulers in heterogeneous environments. You can read more about SAMR in a paper titled “SAMR: A Self-adaptive MapReduce Scheduling Algorithm in Heterogeneous Environment” authored by Quan Chen, Daqiang Zhang, Minyi Guo, Qianni Deng, and Song Guo. The paper is catalogued online at http://portal.acm.org/citation.cfm?id=1901325.

ADDITIONAL MAPREDUCE TUNING

A number of configuration parameters that affect MapReduce can be configured appropriately to achieve better performance.

Communication Overheads

When the data sets are too large the algorithmic complexity of MapReduce is the least of the concerns. The focus is often on processing the large data set in the first place. However, you must bear in mind that some of the communication overhead and the associated algorithmic complexity can be minimized by simply getting rid of the reduce task if possible. In such cases, map does everything. In cases where eliminating the reduce task is not an option, launching the reduce tasks before all map tasks have completed can improve performance.

Compression

Compressing data as it gets transmitted between nodes and between map and reduce jobs improves performance dramatically. Essentially, the communication overhead is reduced and avoidable bandwidth and network usage is removed. For large clusters and large jobs, compression can lead to substantial benefits.

image

Some data sets aren’t easily compressible or do not compress enough to provide substantial benefits.

Turning compression on is as simple as setting a single configuration parameter to true. This single parameter is:

mapred.compress.map.output

The compression codec can also be configured. Use mapred.map.output.compression.codec to configure the codec.

image

LZO is a compression algorithm that is suitable for real-time compression. It favors speed over compression ratio. Read more about LZO at www.oberhumer.com/opensource/lzo/.

A further improvement could be to use splittable LZO. Most MapReduce tasks are I/O bound. If files on HDFS are compressed into a format that can be split and consumed by the MapReduce tasks directly, it improves I/O and overall performance. Under normal gzip-based compression algorithms, parallelizing spilt gzip segments poses a problem and so these spilt portions need to be processed by a single mapper. If a single mapper is used the parallelization effort is affected. With bzip2, this can be avoided and split portions can be sent to different mappers but the decompression is very CPU intensive and therefore the gains in I/O are lost in CPU time. LZO comes as a good optimal middle ground where the sizes and decompression speeds are optimal. Learn more about splittable LZO online at https://github.com/kevinweil/hadoop-lzo.

File Block Size

HDFS, the underlying distributed filesystem in Hadoop, allows the storage of very large files. A default block size in HDFS is about 64 MB in size. If your cluster is small and the data size is large, a large number of map tasks would be spawned for the default block size. For example, 120 GBs of input would lead to 1,920 map tasks. This can be derived by a simple calculation as follows:

(120 * 1024)/64

Thus, increasing block size seems logical in small clusters. However, it should not be increased to a point that all nodes in a cluster are not used.

Parallel Copying

Maps outputs are copied over to reducers. In cases where the output of the map task is large, the copying over of values can be done in parallel by multiple threads. Increasing the threads increases the CPU usage but reduces latency. The default number of such threads is set to 5. You can increase the number by setting the following property:

mapred.reduce.parallel.copies

HBASE COPROCESSORS

HBase coprocessors are inspired by the idea of coprocessors in Google Bigtable. A few simple processes like counting, aggregating, and such can be pushed up to the server to enhance performance. The idea of coprocessors achieves this.

Three interfaces in HBase — Coprocessor, RegionObserver, and Endpoint — implement the coprocessor framework in a flexible manner. The idea behind Coprocessor and RegionObserver is that you can insert user code by overriding upcall methods from these two related interfaces. The coprocessor framework handles the details of invoking the upcalls. More than one Coprocessor or RegionObserver can be loaded to extend function. They are chained to execute sequentially. These sequential coprocessors are ordered on the basis of assigned priorities.

Through an endpoint on the server side and dynamic RPC provided by the client library, you can define your own extensions to HBase RPC transactions exchanged between clients and the region servers.

LEVERAGING BLOOM FILTERS

Bloom Filters were introduced in Chapter 13. Please review the definition if you aren’t sure what they are.

A get row call in HBase currently does a parallel N-way get of that row from all StoreFiles in a region. This implies N reads requests from disk. Bloom Filters provide a lightweight in-memory structure to reduce those N disk reads to only the files likely to contain that row.

Reads are in parallel and so the performance gains on an individual get is minimal. Also, read performance is dominated by disk read latency. If you replace parallel get with serial get you would see an impact of Bloom Filters on read latency.

Bloom Filters can be more heavyweight than your data. This is one big reason why they aren’t enabled by default.

SUMMARY

This chapter presented a few perspectives on tuning the performance of parallel MapReduce-based processes. The MapReduce algorithm enables the processing of large amounts of data using commodity hardware. Scaling MapReduce algorithms requires some clever configuration. Optimal configuration of MapReduce tasks can tune performance.

The chapter presented a few generic performance-tuning tips but used Hadoop and the associated set of tools for illustration.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.111.92