Chapter 11. Performance Tuning

Thus far, you have seen how to set up a cluster and make use of it. Using HBase in production often requires that you turn many knobs to make it hum as expected. This chapter covers various advanced techniques for tuning a cluster and testing it repeatedly to verify its performance.

Garbage Collection Tuning

One of the lower-level settings you need to adjust is the garbage collection parameters for the region server processes. Note that the master is not a problem here as it does not handle any heavy loads, and data does not pass through it. These parameters only need to be added to the region servers.

You might wonder why you have to tune the garbage collection parameters to run HBase efficiently. The problem is that the Java Runtime Environment comes with basic assumptions regarding what your programs are doing, how they create objects, how they allocate the heap to handle data, and so on. These assumptions work well in a lot of cases. In addition, the JRE has heuristic algorithms that adjust these assumptions as your process is running. Even with those in place, the JRE is limited to the implementation of such heuristics and can handle some use cases better than others.

The bottom line is that the JRE does not handle region servers very well. This is caused by certain workloads, especially write-heavy ones, stressing the memory allocation mechanisms to a degree that it cannot safely rely on the JRE assumptions alone: you need to use the provided JRE options to tweak the garbage collection strategies to suit the workload.

For write-heavy use cases, the memstores are creating and discarding objects at various times, and in varying sizes. As the data is collected in the in-memory buffers, it needs to remain there until it has outgrown the configured minimum flush size, set with hbase.hregion.memstore.flush.size or at the table level.

Once the data is greater than that number, it is flushed to disk, creating a new store file. Since the data that is written to disk mostly resides in different locations in the Java heap—assuming it was written by the client at different times—it leaves holes in the heap.

Depending on how long the data was in memory, it resided in different locations in the generational architecture of the Java heap: data that was inserted rapidly and is flushed equally fast is often still in the so-called young generation (also called new generation) of the heap. The space can be reclaimed quickly and no harm is done.

However, if the data stays in memory for a longer period of time—for example, within a column family that is less rapidly inserted into—it is promoted to the old generation (or tenured generation). The difference between the young and old generations is primarily size: the young generation is between 128 MB and 512 MB, while the old generation holds the remaining available heap, which is usually many gigabytes of memory.

Note

You can set the following garbage collection-related options by adding them in the hbase-env.sh configuration file to the HBASE_OPTS or the HBASE_REGIONSERVER_OPTS variable. The latter only affects the region server process (as opposed to the master, for example), and is the recommended way to set these options.

You can specify the young generation size like so:

-XX:MaxNewSize=128m -XX:NewSize=128m

Or you can use the newer and shorter specification which combines the preceding code into one convenient option:

-Xmn128m

Note

Using 128 MB is a good starting point, and further observation of the JVM metrics should be conducted to confirm satisfactory use of the new generation of the heap.

Note that the default value is too low for any serious region server load and must be increased. If you do not do this, you might notice a steep increase in CPU load on your servers, as they spend most of their time collecting objects from the new generation space.

Both generations need to be maintained by the JRE, to reuse the holes created by data that has been written to disk (and obviously any other object that was created and discarded subsequently). If the application ever requests a size of heap that does not fit into one of those holes, the JRE needs to compact the fragmented heap. This includes implicit requests, such as the promotion of longer-living objects from the young to the old generation. If this fails, you will see a promotion failure in your garbage collection logs.

Note

It is highly recommended that you enable the JRE’s log output for garbage collection details. This is done by adding the following JRE options:

-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 
  -Xloggc:$HBASE_HOME/logs/gc-$(hostname)-hbase.log"

Once the log is enabled, you can monitor it for occurrences of "concurrent mode failure" or "promotion failed" messages, which oftentimes precede long pauses.

Note that the logfile is not rolled like the other files are; you need to take care of this manually (e.g., by using a cron-based daily log roll task).

The process to rewrite the heap generation in question is called a garbage collection, and there are parameters for the JRE that you can use to specify different garbage collection implementations. The recommended values are:

-XX:+UseParNewGC and -XX:+UseConcMarkSweepGC

The first option is setting the garbage collection strategy for the young generation to use the Parallel New Collector: it stops the entire Java process to clean up the young generation heap. Since its size is small in comparison, this process does not take a long time, usually less than a few hundred milliseconds.

This is acceptable for the smaller young generation, but not for the old generation: in a worst-case scenario this can result in processes being stopped for seconds, if not minutes. Once you reach the configured ZooKeeper session timeout, this server is considered lost by the master and it is abandoned. Once it comes back from the garbage collection-induced stop, it is notified that it is abandoned and shuts itself down.

This is mitigated by using the Concurrent Mark-Sweep Collector (CMS), enabled with the latter option shown earlier. It works differently in that it tries to do as much work concurrently as possible, without stopping the Java process. This takes extra effort and an increased CPU load, but avoids the required stops to rewrite a fragmented old generation heap—until you hit the promotion error, which forces the garbage collector to stop everything and clean up the mess.

The CMS has an additional switch, which controls when it starts doing its concurrent mark and sweep check. This value can be set with this option:

-XX:CMSInitiatingOccupancyFraction=70

The value is a percentage that specifies when the background process starts, and it needs to be set to a level that avoids another issue: the concurrent mode failure. This occurs when the background process to mark and sweep the heap for collection is still running when the heap runs out of usable space (recall the holes analogy). In this case, the JRE must stop the Java process and free the space by forcefully removing discarded objects, or tenuring those that are old enough.

Setting the initiating occupancy fraction to 70% means that it is slightly larger than the configured 60% of heap usage by the region servers, which is the combination of the default 20% block cache and 40% memstore limits. It will start the concurrent collection process early enough before the heap runs out of space, but also not too early for it to run too often.

Putting the preceding settings together, you can use the following as a starting point for your configuration:

export HBASE_REGIONSERVER_OPTS="-Xmx8g -Xms8g -Xmn128m -XX:+UseParNewGC 
  -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -verbose:gc 
  -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 
  -Xloggc:$HBASE_HOME/logs/gc-$(hostname)-hbase.log"

Note

Note that -XX:+CMSIncrementalMode is not recommended on actual server hardware.

These settings combine the current best practices at the time of this writing. If you use a newer version than Java 6, make sure you carefully evaluate the new garbage collection implementations and choose one that fits your use case.

It is important to size the young generation space so that the tenuring of longer-living objects is not causing the older generation heap to fragment too quickly. On the other hand, it should not be too large either, as this might cause too many short pauses. Although this will not cause your region servers to be abandoned, it does affect the latency of your servers, as they frequently stop for a few hundred milliseconds.

Also, when tuning the block cache and memstore size, make sure you set the initiating occupancy fraction value to something slightly larger. In addition, you must not specify these two values to go over a reasonable value, but definitely make sure they are less than 100%. You need to account for general Java class management overhead, so the default total of 60% is reasonable. More on this in Configuration.

Memstore-Local Allocation Buffer

Version 0.90 of HBase introduced an advanced mechanism to mitigate the issue of heap fragmentation due to too much churn on the memstore instances of a region server: the memstore-local allocation buffers, or MSLAB for short.

The preceding section explained how tenured KeyValue instances, once they are flushed to disk, cause holes in the old generation heap. Once there is no longer enough space for a new allocation caused by the fragmentation, the JRE falls back to the stop-the-world garbage collector, which rewrites the entire heap space and compacts it to the remaining active objects.

The key to reducing these compacting collections is to reduce fragmentation, and the MSLABs were built to help with that. The idea behind them is that only objects of exactly the same size should be allocated from the heap. Once these objects tenure and eventually get collected, they leave holes in the heap of a specific size. Subsequent allocations of new objects of the exact same size will always reuse these holes: there is no promotion error, and therefore no stop-the-world compacting collection is required.

The MSLABs are buffers of fixed sizes containing KeyValue instances of varying sizes. Whenever a buffer cannot completely fit a newly added KeyValue, it is considered full and a new buffer is created, once again of the given fixed size.

The feature is enabled by default in version 0.92, and disabled in version 0.90 of HBase. You can use the hbase.hregion.memstore.mslab.enabled configuration property to override it either way. It is recommended that you thoroughly test your setup with this new feature, as it might delay the inevitable only longer—which is a good thing—and therefore you still have to deal with long garbage collection pauses. If you are still experiencing these pauses, you could plan to restart the servers every few days, or weeks, before the pause happens.

Note

As of this writing, this feature is not yet widely tested in long-running production environments. Due diligence is advised.

The size of each allocated, fixed-sized buffer is controlled by the hbase.hregion.memstore.mslab.chunksize property. The default is 2 MB and is a sensible starting point. Based on your KeyValue instances, you may have to adjust this value: if you store larger cells, for example, 100 KB in size, you need to increase the MSLAB size to fit more than just a few cells.

There is also an upper boundary of what is stored in the buffers. It is set by the hbase.hregion.memstore.mslab.max.allocation property and defaults to 256 KB. Any cell that is larger will be directly allocated in the Java heap. If you are storing a lot of KeyValue instances that are larger than this upper limit, you will run into fragmentation-related pauses earlier.

The MSLABs do not come without a cost: they are more wasteful in regard to heap usage, as you will most likely not fill every buffer to the last byte. The remaining unused capacity of the buffer is wasted. Once again, it’s about striking a balance: you need to decide if you should use MSLABs and benefit from better garbage collection but incur the extra space that is required, or not use MSLABs and benefit from better memory efficiency but deal with the problem caused by garbage collection pauses.

Finally, because the buffers require an additional byte array copy operation, they are also slightly slower, compared to directly using the KeyValue instances. Measure the impact on your workload and see if it has no adverse effect.

Compression

HBase comes with support for a number of compression algorithms that can be enabled at the column family level. It is recommended that you enable compression unless you have a reason not to do so—for example, when using already compressed content, such as JPEG images. For every other use case, compression usually will yield overall better performance, because the overhead of the CPU performing the compression and decompression is less than what is required to read more data from disk.

Available Codecs

You can choose from a fixed list of supported compression algorithms. They have different qualities when it comes to compression ratio, as well as CPU and installation requirements.

Note

Currently there is no support for pluggable compression algorithms. The provided ones either are part of Java itself or are added on the operating-system level. They require support libraries which are either built or shipped with HBase.

Before looking into each available compression algorithm, refer to Table 11-1 to see the compression algorithm comparison Google published in 2005.[118] While the numbers are old, they still can be used to compare the qualities of the algorithms.

Table 11-1. Comparison of compression algorithms
Algorithm% remainingEncodingDecoding
GZIP13.4%21 MB/s118 MB/s
LZO20.5%135 MB/s410 MB/s
Zippy/Snappy22.2%172 MB/s409 MB/s

Note that some of the algorithms have a better compression ratio while others are faster during encoding, and a lot faster during decoding. Depending on your use case, you can choose one that suits you best.

Note

Before Snappy was made available in 2011, the recommended algorithm was LZO, even if it did not have the best compression ratio. GZIP is very CPU-intensive and its slight advantage in storage savings is usually not worth the slower performance and CPU usage it exposes.

Snappy has similar qualities as LZO, it comes with a compatible license, and first tests have shown that it slightly outperforms LZO when used with Hadoop and HBase. Thus, as of this writing, you should consider Snappy over LZO.

Snappy

With Snappy, released by Google under the BSD License, you have access to the same compression used by Bigtable (where it is called Zippy). It is optimized to provide high speeds and reasonable compression, as opposed to being compatible with other compression libraries.

The code is written in C++, and HBase—as of version 0.92—ships with the required JNI[119] libraries to be able to use it. It requires that you first install the native executable binaries, by either using a packet manager, such as apt, rpm, or yum, or building them from the source code and installing them so that the JNI library can find them.

When setting up support for Snappy, you must install the native binary library on all region servers. Only then are they usable by the libraries.

LZO

Lempel-Ziv-Oberhumer (LZO) is a lossless data compression algorithm that is focused on decompression speed, and written in ANSI C. Similar to Snappy, it requires a JNI library for HBase to be able to use it.

Unfortunately, HBase cannot ship with LZO because of licensing issues: HBase uses the Apache License, while LZO is using the incompatible GNU General Public License (GPL). This means that the LZO installation needs to be performed separately, after HBase has been installed.[120]

GZIP

The GZIP compression algorithm will generally compress better than Snappy or LZO, but is slower in comparison. While this seems like a disadvantage, it comes with an additional savings in storage space.

The performance issue can be mitigated to some degree by using the native GZIP libraries that are available on your operating system. The libraries used by HBase (which are provided by Hadoop) automatically check if the native libraries are available[121] and will make use of them. If not, you will see this message in your logfiles: "Got brand-new compressor". This indicates a failure to load the native version while falling back to the Java code implementation instead. The compression will still work, but is slightly slower.

An additional disadvantage is that GZIP needs a considerable amount of CPU resources. This can put unwanted load on your servers and needs to be carefully monitored.

Verifying Installation

Once you have installed a supported compression algorithm, it is highly recommended that you check if the installation was successful. There are a few mechanisms in HBase to do that.

Compression test tool

HBase includes a tool to test if compression is set up properly. To run it, type ./bin/hbase org.apache.hadoop.hbase.util.CompressionTest. This will return information on how to run the tool:

$ ./bin/hbase org.apache.hadoop.hbase.util.CompressionTest
Usage: CompressionTest <path> none|gz|lzo|snappy

For example:
  hbase class org.apache.hadoop.hbase.util.CompressionTest file:///tmp/testfile gz

You need to specify a file that the tool will create and test in combination with the selected compression algorithm. For example, using a test file in HDFS and checking if GZIP is installed, you can run:

$ ./bin/hbase org.apache.hadoop.hbase.util.CompressionTest 
  /user/larsgeorge/test.gz gz
11/07/01 20:27:43 WARN util.NativeCodeLoader: Unable to load native-hadoop  
  library for your platform... using builtin-java classes where applicable
11/07/01 20:27:43 INFO compress.CodecPool: Got brand-new compressor
11/07/01 20:27:43 INFO compress.CodecPool: Got brand-new compressor
SUCCESS

The tool reports SUCCESS, and therefore confirms that you can use this compression type for a column family definition. Note how it also prints the "Got brand-new compressor" message explained earlier: the server did not find the native GZIP libraries, but it can fall back to the Java code-based library.

Trying the same tool with a compression type that is not properly installed will raise an exception:

$ ./bin/hbase org.apache.hadoop.hbase.util.CompressionTest 
  file:///tmp/test.lzo lzo
Exception in thread "main" java.lang.RuntimeException:  
 java.lang.ClassNotFoundException: com.hadoop.compression.lzo.LzoCodec
    at org.apache.hadoop.hbase.io.hfile.Compression$Algorithm$1.getCodec)
    at org.apache.hadoop.hbase.io.hfile.Compression$Algorithm.getCompressor

If this happens, you need to go back and check the installation again. You also may have to restart the servers after you installed the JNI and/or native compression libraries.

Startup check

Even if the compression test tool reports success and confirms the proper installation of a compression library, you can still run into problems later on: since JNI requires that you first install the native libraries, it can happen that while you provision a new machine you miss this step. Subsequently, the server fails to open regions that contain column families using the native libraries (see Basic setup checklist).

This can be mitigated by specifying the (by default unset) hbase.regionserver.codecs property to list all of the required JNI libraries. Should one of them fail to find its native counterpart, it will prevent the entire region server from starting up. This way you get a fast failing setup where you notice the missing libraries, instead of running into issues later.

For example, this will check that the Snappy and LZO compression libraries are properly installed when the region server starts:

<property>
  <name>hbase.regionserver.codecs</name>
  <value>snappy,lzo</value>
</property>

If, for any reason, the JNI libraries fail to load the matching native ones, the server will abort at startup with an IOException stating "Compression codec <codec-name> not supported, aborting RS construction". Repair the setup and try to start the region server daemon again.

You can conduct this test for every compression algorithm supported by HBase. Do not forget to copy the changed configuration file to all region servers and to restart them afterward.

Enabling Compression

Enabling compression requires installation of the JNI and native compression libraries (unless you only want to use the Java code-based GZIP compression), as described earlier, and specifying the chosen algorithm in the column family schema.

One way to accomplish this is during table creation. The possible values are listed in Column Families.

hbase(main):001:0> create 'testtable', { NAME => 'colfam1', COMPRESSION => 'GZ' }  
0 row(s) in 1.1920 seconds

hbase(main):012:0> describe 'testtable'                                            
DESCRIPTION                                                 ENABLED
{NAME => 'testtable', FAMILIES => [{NAME => 'colfam1',      true 
BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSIONS 
=> '3', COMPRESSION => 'GZ', TTL => '2147483647', BLOCKSIZE
=> '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}
1 row(s) in 0.0400 seconds

The describe shell command is used to read back the schema of the newly created table. You can see the compression is set to GZIP (using the shorter GZ value as required). Another option to enable—or change, or disable—the compression algorithm is to use the alter command for existing tables:

hbase(main):013:0> create 'testtable2', 'colfam1'
0 row(s) in 1.1920 seconds

hbase(main):014:0> disable 'testtable2'
0 row(s) in 2.0650 seconds

hbase(main):016:0> alter 'testtable2', { NAME => 'colfam1', COMPRESSION => 'GZ' }
0 row(s) in 0.2190 seconds

hbase(main):017:0> enable 'testtable2'
0 row(s) in 2.0410 seconds

Note how the table was first disabled. This is necessary to perform the alteration of the column family definition. The final enable command brings the table back online.

Changing the compression format to NONE will disable the compression for the given column family.

Optimizing Splits and Compactions

The built-in mechanisms of HBase to handle splits and compactions have sensible defaults and perform their duty as expected. Sometimes, though, it is useful to change their behavior to gain additional performance.

Managed Splitting

Usually HBase handles the splitting of regions automatically: once the regions reach the configured maximum size, they are split into two halves, which then can start taking on more data and grow from there. This is the default behavior and is sufficient for the majority of use cases.

There is one known problematic scenario, though, that can cause what is called split/compaction storms: when you grow your regions roughly at the same rate, eventually they all need to be split at about the same time, causing a large spike in disk I/O because of the required compactions to rewrite the split regions.

Rather than relying on HBase to handle the splitting, you can turn it off and manually invoke the split and major_compact commands. This is accomplished by setting the hbase.hregion.max.filesize for the entire cluster, or when defining your table schema at the column family level, to a very high number. Setting it to Long.MAX_VALUE is not recommended in case the manual splits fail to run. It is better to set this value to a reasonable upper boundary, such as 100 GB (which would result in a one-hour major compaction if triggered).

The advantage of running the commands to split and compact your regions manually is that you can time-control them. Running them staggered across all regions spreads the I/O load as much as possible, avoiding any split/compaction storm. You will need to implement a client that uses the administrative API to call the split() and majorCompact() methods. Alternatively, you can use the shell to invoke the commands interactively, or script their call using cron, for instance. Also see the RegionSplitter (added in version 0.90.2), discussed shortly, for another way to split existing regions: it has a rolling split feature you can use to carefully split the existing regions while waiting long enough for the involved compactions to complete (see the -r and -o command-line options).

An additional advantage to managing the splits manually is that you have better control over which regions are available at any time. This is good in the rare case that you have to do very low-level debugging, to, for example, see why a certain region had problems. With automated splits it might happen that by the time you want to check into a specific region, it has already been replaced with two daughter regions. These regions have new names and tracing the evolution of the original region over longer periods of time makes it much more difficult to find the information you require.

Region Hotspotting

Using the metrics discussed in Region Server Metrics,[122] you can determine if you are dealing with a write pattern that is causing a specific region to run hot.

If this is the case, refer to the approaches discussed in Chapter 9, especially those discussed in Key Design: you may need to salt the keys, or use random keys to distribute the load across all servers evenly.

The only way to alleviate the situation is to manually split a hot region into one or more new regions, at exact boundaries. This will divide the region’s load over multiple region servers. As you split a region you can specify a split key, that is, the row key where you can split the given region into two. You can specify any row key within that region so that you are also able to generate halves that are completely different in size.

This might help only when you are not dealing with completely sequential key ranges, because those are always going to hit one region for a considerable amount of time.

Presplitting Regions

Managing the splits is useful to tightly control when load is going to increase on your cluster. You still face the problem that when initially loading a table, you need to split the regions rather often, since you usually start out with a single region per table. Growing this single region to a very large size is not recommended; therefore, it is better to start with a larger number of regions right from the start. This is done by presplitting the regions of an existing table, or by creating a table with the required number of regions.

The createTable() method of the administrative API, as well as the shell’s create command, both take a list of split keys, which can be used to presplit a table when it is created. HBase also ships with a utility called RegionSplitter, which you can use to create a presplit table. Starting it without a parameter will show usage information:

$ ./bin/hbase org.apache.hadoop.hbase.util.RegionSplitter
usage: RegionSplitter <TABLE>
 -c <region count>        Create a new table with a pre-split number of
                                regions
 -D <property=value>      Override HBase Configuration Settings
 -f <family:family:...>   Column Families to create with new table.
                                Required with -c
 -h                             Print this usage help
 -o <count>               Max outstanding splits that have unfinished
                                major compactions
 -r                             Perform a rolling split of an existing region
    --risky                     Skip verification steps to complete
                                quickly.STRONGLY DISCOURAGED for production
                                systems.

By default, it used the MD5StringSplit class to partition the row keys into ranges. You can define your own algorithm by implementing the SplitAlgorithm interface provided, and handing it into the utility using the -D split.algorithm=<your-algorithm-class> parameter. An example of using the supplied split algorithm class and creating a presplit table is:

$ ./bin/hbase org.apache.hadoop.hbase.util.RegionSplitter 
  -c 10  testtable -f colfam1

In the web UI of the master, you can click on the link with the newly created table name to see the generated regions:

testtable,,1309766006467.c0937d09f1da31f2a6c2950537a61093.
testtable,0ccccccc,1309766006467.83a0a6a949a6150c5680f39695450d8a.
testtable,19999998,1309766006467.1eba79c27eb9d5c2f89c3571f0d87a92.
testtable,26666664,1309766006467.7882cd50eb22652849491c08a6180258.
testtable,33333330,1309766006467.cef2853e36bd250c1b9324bac03e4bc9.
testtable,3ffffffc,1309766006467.00365940761359fee14d41db6a73ffc5.
testtable,4cccccc8,1309766006467.f0c5045c304c2ff5338be27e81ae698e.
testtable,59999994,1309766006467.2d854f337aa6c09232409f0ba1d4964b.
testtable,66666660,1309766006467.b1ec9df9fd90d91f54cb18da5edc2581.
testtable,7333332c,1309766006468.42e179b78663b64401079a8601d9bd06.

Or you can use the shell’s create command:

hbase(main):001:0> create 'testtable', 'colfam1', 
  { SPLITS => ['row-100', 'row-200', 'row-300', 'row-400'] }
0 row(s) in 1.1670 seconds

This generates the following regions:

testtable,,1309768272330.37377c4ab0a944a326ba8b6596a29396.
testtable,row-100,1309768272331.e6092cc777f58a08c61bf081aba14916.
testtable,row-200,1309768272331.63c9630a79b37ebce7b58cde0235dfe5.
testtable,row-300,1309768272331.eead6ad2ff3303ffe6a3126e0df3ff7a.
testtable,row-400,1309768272331.2bee7417fa67e4ac8c7210ce7325708e.

As for the number of presplit regions to use, you can start low with 10 presplit regions per server and watch as data grows over time. It is better to err on the side of too few regions and using a rolling split later, as having too many regions is usually not ideal in regard to overall cluster performance.

Alternatively, you can determine how many presplit regions to use based on the largest store file in your region: with a growing data size, this will get larger over time, and you want the largest region to be just big enough so that is not selected for major compaction—or you might face the mentioned compaction storms.

If you presplit your regions too thin, you can increase the major compaction interval by increasing the value for the hbase.hregion.majorcompaction configuration property. If your data size grows too large, use the RegionSplitter utility to perform a network I/O safe rolling split of all regions.

Use of manual splits and presplit regions is an advanced concept that requires a lot of planning and careful monitoring. On the other hand, it can help you to avoid the compaction storms that can happen for uniform data growth, or to shed load of hot regions by splitting them manually.

Load Balancing

The master has a built-in feature, called the balancer. By default, the balancer runs every five minutes, and it is configured by the hbase.balancer.period property. Once the balancer is started, it will attempt to equal out the number of assigned regions per region server so that they are within one region of the average number per server. The call first determines a new assignment plan, which describes which regions should be moved where. Then it starts the process of moving the regions by calling the unassign() method of the administrative API iteratively.

The balancer has an upper limit on how long it is allowed to run, which is configured using the hbase.balancer.max.balancing property and defaults to half of the balancer period value, or two and a half minutes.

You can control the balancer by means of the balancer switch: either use the shell’s balance_switch command to toggle the balancer status between enabled and disabled, or use the balanceSwitch() API method to do the same. When you disable the balancer, it no longer runs as expected.

The balancer can be explicitly started using the shell’s balancer command, or using the balancer() API method. The time-controlled invocation mentioned previously calls this method implicitly. It will determine if there is any work to be done and return true if that is the case. The return value of false means that it was not able to run the balancer, because either it was switched off, there was no work to be done (all is balanced), or something else was prohibiting the process. One example for this is the region in transition list (see Main page): if there is a region currently in transition, the balancer will be skipped.

Instead of relying on the balancer to do its work properly, you can use the move command and API method to assign regions to other servers. This is useful when you want to control where the regions of a particular table are assigned. See Region Hotspotting for an example.

Merging Regions

While it is much more common for regions to split automatically over time as you are adding data to the corresponding table, sometimes you may need to merge regions—for example, after you have removed a large amount of data and you want to reduce the number of regions hosted by each server.

HBase ships with a tool that allows you to merge two adjacent regions as long as the cluster is not online. You can use the command-line tool to get the usage details:

$ ./bin/hbase org.apache.hadoop.hbase.util.Merge
Usage: bin/hbase merge <table-name> <region-1> <region-2>

Here is an example of a table that has more than one region, all of which are subsequently merged:

$ ./bin/hbase shell

hbase(main):001:0> create 'testtable', 'colfam1', 
 {SPLITS => ['row-10','row-20','row-30','row-40','row-50']}
0 row(s) in 0.2640 seconds

hbase(main):002:0> for i in '0'..'9' do for j in '0'..'9' do 
 put 'testtable', "row-#{i}#{j}", "colfam1:#{j}", "#{j}" end end
0 row(s) in 1.0450 seconds

hbase(main):003:0> flush 'testtable'
0 row(s) in 0.2000 seconds

hbase(main):004:0> scan '.META.', { COLUMNS => ['info:regioninfo']}
ROW                                  COLUMN+CELL
 testtable,,1309614509037.612d1e0112 column=info:regioninfo, timestamp=130...
 406e6c2bb482eeaec57322.             STARTKEY => '', ENDKEY => 'row-10'
 testtable,row-10,1309614509040.2fba column=info:regioninfo, timestamp=130...
 fcc9bc6afac94c465ce5dcabc5d1.       STARTKEY => 'row-10', ENDKEY => 'row-20'
 testtable,row-20,1309614509041.e7c1 column=info:regioninfo, timestamp=130...
 6267eb30e147e5d988c63d40f982.       STARTKEY => 'row-20', ENDKEY => 'row-30'
 testtable,row-30,1309614509041.a9cd column=info:regioninfo, timestamp=130...
 e1cbc7d1a21b1aca2ac7fda30ad8.       STARTKEY => 'row-30', ENDKEY => 'row-40'
 testtable,row-40,1309614509041.d458 column=info:regioninfo, timestamp=130...
 236feae097efcf33477e7acc51d4.       STARTKEY => 'row-40', ENDKEY => 'row-50'
 testtable,row-50,1309614509041.74a5 column=info:regioninfo, timestamp=130...
 7dc7e3e9602d9229b15d4c0357d1.       STARTKEY => 'row-50', ENDKEY => ''
6 row(s) in 0.0440 seconds

hbase(main):005:0> exit

$ ./bin/stop-hbase.sh

$ ./bin/hbase org.apache.hadoop.hbase.util.Merge testtable 
 testtable,row-20,1309614509041.e7c16267eb30e147e5d988c63d40f982. 
 testtable,row-30,1309614509041.a9cde1cbc7d1a21b1aca2ac7fda30ad8.

The example creates a table with five split points, resulting in six regions. It then inserts some rows and flushes the data to ensure that there are store files for the subsequent merge. The scan is used to get the names of the regions, but you can also use the web UI of the master: click on the table name in the User Tables section to get the same list of regions.

Note

Note how the shell wraps the values in each column. The region name is split over two lines, which you need to copy and paste separately. The web UI is easier to use in that respect, as it has the names in one column and in a single line.

The content of the column values is abbreviated to the start and end keys. You can see how the create command using the split keys has created the regions. The example goes on to exit the shell, and stop the HBase cluster. Note that HDFS still needs to run for the merge to work, as it needs to read the store files of each region and merge them into a new, combined one.

Client API: Best Practices

When reading or writing data from a client using the API, there are a handful of optimizations you should consider to gain the best performance. Here is a list of the best practice options:

Disable auto-flush

When performing a lot of put operations, make sure the auto-flush feature of HTable is set to false, using the setAutoFlush(false) method. Otherwise, the Put instances will be sent one at a time to the region server. Puts added via HTable.add(Put) and HTable.add( <List> Put) wind up in the same write buffer. If auto-flushing is disabled, these operations are not sent until the write buffer is filled. To explicitly flush the messages, call flushCommits(). Calling close on the HTable instance will implicitly invoke flushCommits().

Use scanner-caching

If HBase is used as an input source for a MapReduce job, for example, make sure the input Scan instance to the MapReduce job has setCaching() set to something greater than the default of 1. Using the default value means that the map task will make callbacks to the region server for every record processed. Setting this value to 500, for example, will transfer 500 rows at a time to the client to be processed. There is a cost to having the cache value be large because it costs more in memory for both the client and region servers, so bigger is not always better.

Limit scan scope

Whenever a Scan is used to process large numbers of rows (and especially when used as a MapReduce source), be aware of which attributes are selected. If Scan.addFamily() is called, all of the columns in the specified column family will be returned to the client. If only a small number of the available columns are to be processed, only those should be specified in the input scan because column overselection incurs a nontrivial performance penalty over large data sets.

Close ResultScanners

This isn’t so much about improving performance, but rather avoiding performance problems. If you forget to close ResultScanner instances, as returned by HTable,getScanner(), you can cause problems on the region servers.

Always have ResultScanner processing enclosed in try/catch blocks, for example:

Scan scan = new Scan();
// configure scan instance
ResultScanner scanner = table.getScanner(scan);
try {
  for (Result result : scanner) {
  // process result...
} finally {
  scanner.close();  // always close the scanner!
}
table.close();
Block cache usage

Scan instances can be set to use the block cache in the region server via the setCacheBlocks() method. For scans used with MapReduce jobs, this should be false. For frequently accessed rows, it is advisable to use the block cache.

Optimal loading of row keys

When performing a table scan where only the row keys are needed (no families, qualifiers, values, or timestamps), add a FilterList with a MUST_PASS_ALL operator to the scanner using setFilter(). The filter list should include both a FirstKeyOnlyFilter and a KeyOnlyFilter instance, as explained in Dedicated Filters. Using this filter combination will cause the region server to only load the row key of the first KeyValue (i.e., from the first column) found and return it to the client, resulting in minimized network traffic.

Turn off WAL on Puts

A frequently discussed option for increasing throughput on Puts is to call writeToWAL(false). Turning this off means that the region server will not write the Put to the write-ahead log, but rather only into the memstore. However, the consequence is that if there is a region server failure there will be data loss. If you use writeToWAL(false), do so with extreme caution. You may find that it actually makes little difference if your load is well distributed across the cluster.

In general, it is best to use the WAL for Puts, and where loading throughput is a concern to use the bulk loading techniques instead, as explained in Bulk Import.

Configuration

Many configuration properties are available for you to use to fine-tune your cluster setup. Configuration listed the ones you need to change or set to get your cluster up and running. There are advanced options you can consider adjusting based on your use case. Here is a list of the more commonly changed ones, and how to adjust them.

Note

The majority of the settings are properties in the hbase-site.xml configuration file. Edit the file, copy it to all servers in the cluster, and restart the servers to effect the changes.

Decrease ZooKeeper timeout

The default timeout between a region server and the ZooKeeper quorum is three minutes (specified in milliseconds), and is configured with the zookeeper.session.timeout property. This means that if a server crashes, it will be three minutes before the master notices this fact and starts recovery. You can tune the timeout down to a minute, or even less, so the master notices failures sooner.

Before changing this value, be sure you have your JVM garbage collection configuration under control, because otherwise, a long garbage collection that lasts beyond the ZooKeeper session timeout will take out your region server. You might be fine with this: you probably want recovery to start if a region server has been in a garbage collection-induced pause for a long period of time.

The reason for the default value being rather high is that it avoids problems during very large imports: such imports put a lot of stress on the servers, thereby increasing the likelihood that they will run into the garbage collection pause problem. Also see Stability issues for information on how to detect such pauses.

Increase handlers

The hbase.regionserver.handler.count configuration property defines the number of threads that are kept open to answer incoming requests to user tables. The default of 10 is rather low in order to prevent users from overloading their region servers when using large write buffers with a high number of concurrent clients. The rule of thumb is to keep this number low when the payload per request approaches megabytes (e.g., big puts, scans using a large cache) and high when the payload is small (e.g., gets, small puts, increments, deletes).

It is safe to set that number to the maximum number of incoming clients if their payloads are small, the typical example being a cluster that serves a website, since puts are typically not buffered, and most of the operations are gets.

The reason why it is dangerous to keep this setting high is that the aggregate size of all the puts that are currently happening in a region server may impose too much pressure on the server’s memory, or even trigger an OutOfMemoryError exception. A region server running on low memory will trigger its JVM’s garbage collector to run more frequently up to a point where pauses become noticeable (the reason being that all the memory used to keep all the requests’ payloads cannot be collected, no matter how hard the garbage collector tries). After some time, the overall cluster throughput is affected since every request that hits that region server will take longer, which exacerbates the problem.

Increase heap settings

HBase ships with a reasonable, conservative configuration that will work on nearly all machine types that people might want to test with. If you have larger machines—for example, where you can assign 8 GB or more to HBase—you should adjust the HBASE_HEAPSIZE setting in your hbase-env.sh file.

Consider using HBASE_REGIONSERVER_OPTS instead of changing the global HBASE_HEAPSIZE: this way the master will run with the default 1 GB heap, while you can increase the region server heap as needed independently.

This option is set in hbase-env.sh, as opposed to the hbase-site.xml file used for most of the other options.

Enable data compression

You should enable compression for the storage files—in particular, Snappy or LZO. It’s near-frictionless and, in most cases, boosts performance. See Compression for information on all the compression algorithms.

Increase region size

Consider going to larger regions to cut down on the total number of regions on your cluster. Generally, fewer regions to manage makes for a smoother-running cluster. You can always manually split the big regions later should one prove hot and you want to spread the request load over the cluster. Optimizing Splits and Compactions has the details.

By default, regions are 256 MB in size. You could run with 1 GB, or even larger regions. Keep in mind that this needs to be carefully assessed, since a large region also can mean longer pauses under high pressure, due to compactions.

Adjust hbase.hregion.max.filesize in your hbase-site.xml configuration file.

Adjust block cache size

The amount of heap used for the block cache is specified as a percentage, expressed as a float value, and defaults to 20% (set as 0.2). The property to change this percentage is perf.hfile.block.cache.size. Carefully monitor your block cache usage (see Region Server Metrics) to see if you are encountering many block evictions. In this case, you could increase the cache to fit more blocks.

Another reason to increase the block cache size is if you have mainly reading workloads. Then the block cache is what is needed most, and increasing it will help to cache more data.

Warning

The total value of the block cache percentage and the upper limit of the memstore should not be 100%. You need to leave room for other purposes, or you will cause the server to run out of memory. The default total percentage is 60%, which is a reasonable value. Only go above that percentage when you are absolutely sure it will help you—and that it will have no adverse effect later on.

Adjust memstore limits

Memstore heap usage is set with the hbase.regionserver.global.memstore.upperLimit property, and it defaults to 40% (set to 0.4). In addition, the hbase.regionserver.global.memstore.lowerLimit property (set to 35%, or 0.35) is used to control the amount of flushing that will take place once the server is required to free heap space. Keep the upper and lower limits close to each other to avoid excessive flushing.

When you are dealing with mainly read-oriented workloads, you can consider reducing both limits to make more room for the block cache. On the other hand, when you are handling many writes, you should check the logfiles (or use the region server metrics as explained in Region Server Metrics) if the flushes are mostly done at a very small size—for example, 5 MB—and increase the memstore limits to reduce the excessive amount of I/O this causes.

Increase blocking store files

This value, set with the hbase.hstore.blockingStoreFiles property, defines when the region servers block further updates from clients to give compactions time to reduce the number of files. When you have a workload that sometimes spikes in regard to inserts, you should increase this value slightly—the default is seven files—to account for these spikes.

Use monitoring to graph the number of store files maintained by the region servers. If this number is consistently high, you might not want to increase this value, as you are only delaying the inevitable problems of overloading your servers.

Increase block multiplier

The property hbase.hregion.memstore.block.multiplier, set by default to 2, is a safety latch that blocks any further updates from clients when the memstores exceed the multiplier * flush size limit.

When you have enough memory at your disposal, you can increase this value to handle spikes more gracefully: instead of blocking updates to wait for the flush to complete, you can temporarily accept more data.

Decrease maximum logfiles

Setting the hbase.regionserver.maxlogs property allows you to control how often flushes occur based on the number of WAL files on disk. The default is 32, which can be high in a write-heavy use case. Lower it to force the servers to flush data more often to disk so that these logs can be subsequently discarded.

Load Tests

After installing your cluster, it is advisable to run performance tests to verify its functionality. These tests give you a baseline which you can refer to after making changes to the configuration of the cluster, or the schemas of your tables. Doing a burn-in of your cluster will show you how much you can gain from it, but this does not replace a test with the load as expected from your use case.

Performance Evaluation

HBase ships with its own tool to execute a performance evaluation. It is aptly named Performance Evaluation (PE) and its usage details can be gained from using it with no command-line parameters:

$ ./bin/hbase org.apache.hadoop.hbase.PerformanceEvaluation
Usage: java org.apache.hadoop.hbase.PerformanceEvaluation 
  [--miniCluster] [--nomapred] [--rows=ROWS] <command> <nclients>

Options:
 miniCluster     Run the test on an HBaseMiniCluster
 nomapred        Run multiple clients using threads (rather than use mapreduce)
 rows            Rows each client runs. Default: One million
 flushCommits    Used to determine if the test should flush the table.  
                   Default: false
 writeToWAL      Set writeToWAL on puts. Default: True

Command:
 filterScan      Run scan test using a filter to find a specific row based 
                   on it's value (make sure to use --rows=20)
 randomRead      Run random read test
 randomSeekScan  Run random seek and scan 100 test
 randomWrite     Run random write test
 scan            Run scan test (read every row)
 scanRange10     Run random seek scan with both start and stop row (max 10 rows)
 scanRange100    Run random seek scan with both start and stop row (max 100 rows)
 scanRange1000   Run random seek scan with both start and stop row (max 1000 rows)
 scanRange10000  Run random seek scan with both start and stop row (max 10000 rows)
 sequentialRead  Run sequential read test
 sequentialWrite Run sequential write test

Args:
 nclients        Integer. Required. Total number of clients (and HRegionServers)
                 running: 1 <= value <= 500
Examples:
 To run a single evaluation client:
 $ bin/hbase org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 1

By default, the PE is executed as a MapReduce job—unless you specify for it to use 1 client, or because you used the --nomapred parameter. You can see the default values from the usage information in the preceding code sample, which are reasonable starting points, and the command to run a test is given as well:

$ ./bin/hbase org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 1
11/07/03 13:18:34 INFO hbase.PerformanceEvaluation: Start class 
  org.apache.hadoop.hbase.PerformanceEvaluation$SequentialWriteTest at 
  offset 0 for 1048576 rows
...
11/07/03 13:18:41 INFO hbase.PerformanceEvaluation: 0/104857/1048576
...
11/07/03 13:18:45 INFO hbase.PerformanceEvaluation: 0/209714/1048576
...
11/07/03 13:20:03 INFO hbase.PerformanceEvaluation: 0/1048570/1048576
11/07/03 13:20:03 INFO hbase.PerformanceEvaluation: Finished class 
  org.apache.hadoop.hbase.PerformanceEvaluation$SequentialWriteTest 
  in 89062ms at offset 0 for 1048576 rows

The command starts a single client and performs a sequential write test. The output of the command shows the progress, until the final results are printed. You need to increase the number of clients (i.e., threads or MapReduce tasks) to a reasonable number, while making sure you are not overloading the client machine.

There is no need to specify a table name, nor a column family, as the PE code is generating its own schema: a table named TestTable with a family called info.

Note

The read tests require that you have previously executed the write tests. This will generate the table and insert the data to read subsequently.

Using the random or sequential read and write tests allows you to emulate these specific workloads. You cannot mix them, though, which means you must execute each test separately.

YCSB

The Yahoo! Cloud Serving Benchmark[124] (YCSB) is a suite of tools that can be used to run comparable workloads against different storage systems. While primarily built to compare various systems, it is also a reasonable tool for performing an HBase cluster burn-in—or performance test.

Before you can use YCSB you need to create the required test table, named usertable. While the name of the table is hardcoded, you are free to create a column family with a name of your choice. For example:

$ ./bin/hbase shell

hbase(main):001:0> create 'usertable', 'family'
0 row(s) in 0.3420 seconds

Starting YCSB without any options gives you its usage information:

$ java -cp build/ycsb.jar:db/hbase/lib/* com.yahoo.ycsb.Client
Usage: java com.yahoo.ycsb.Client [options]
Options:
  -threads n: execute using n threads (default: 1) - can also be specified as the 
              "threadcount" property using -p
  -target n: attempt to do n operations per second (default: unlimited) - can also
             be specified as the "target" property using -p
  -load:  run the loading phase of the workload
  -t:  run the transactions phase of the workload (default)
  -db dbname: specify the name of the DB to use (default: com.yahoo.ycsb.BasicDB) - 
              can also be specified as the "db" property using -p
  -P propertyfile: load properties from the given file. Multiple files can
                   be specified, and will be processed in the order specified
  -p name=value:  specify a property to be passed to the DB and workloads;
                  multiple properties can be specified, and override any
                  values in the propertyfile
  -s:  show status during run (default: no status)
  -l label:  use label for status (e.g. to label one experiment out of a whole 
             batch)

Required properties:
  workload: the name of the workload class to use 
            (e.g. com.yahoo.ycsb.workloads.CoreWorkload)

To run the transaction phase from multiple servers, start a separate client
on each. To run the load phase from multiple servers, start a separate client 
on each; additionally, use the "insertcount" and "insertstart" properties to 
divide up the records to be inserted

The first step to test a running HBase cluster is to load it with a number of rows, which are subsequently used to modify the same rows, or to add new rows to the existing table:

$ java -cp $HBASE_HOME/conf:build/ycsb.jar:db/hbase/lib/* 
com.yahoo.ycsb.Client -load -db com.yahoo.ycsb.db.HBaseClient 
-P workloads/workloada -p columnfamily=family -p recordcount=100000000 
-s > ycsb-load.log

This will run for a while and create the rows. The layout of the row is controlled by the given workload file, here workloada, containing these settings:

$ cat workloads/workloada
# Yahoo! Cloud System Benchmark
# Workload A: Update heavy workload
#   Application example: Session store recording recent actions
#                        
#   Read/update ratio: 50/50
#   Default data size: 1 KB records (10 fields, 100 bytes each, plus key)
#   Request distribution: zipfian

recordcount=1000
operationcount=1000
workload=com.yahoo.ycsb.workloads.CoreWorkload

readallfields=true

readproportion=0.5
updateproportion=0.5
scanproportion=0
insertproportion=0

requestdistribution=zipfian

Refer to the online documentation of the YCSB project for details on how to modify, or set up your own, workloads. The description specifies the data size and number of columns that are created during the load phase. The output of the tool is redirected into a logfile, which will contain lines like these:

YCSB Client 0.1
Command line: -load -db com.yahoo.ycsb.db.HBaseClient -P workloads/workloada 
-p columnfamily=family -p recordcount=100000000 -s
[OVERALL], RunTime(ms), 915.0
[OVERALL], Throughput(ops/sec), 1092.896174863388
[INSERT], Operations, 1000
[INSERT], AverageLatency(ms), 0.457
[INSERT], MinLatency(ms), 0
[INSERT], MaxLatency(ms), 314
[INSERT], 95thPercentileLatency(ms), 1
[INSERT], 99thPercentileLatency(ms), 1
[INSERT], Return=0, 1000
[INSERT], 0, 856
[INSERT], 1, 143
[INSERT], 2, 0
[INSERT], 3, 0
[INSERT], 4, 0
...

This is useful to keep, as it states the observed write performance for the initial set of rows. The default record count of 1000 was increased to reflect a more real-world number. You can override any of the workload configuration options on the command line. If you are running the same workloads more often, create your own and refer to it on the command line using the -P parameter.

The second step for a YCSB performance test is to execute the workload on the prepared table. For example:

$ java -cp $HBASE_HOME:build/ycsb.jar:db/hbase/lib/* 
com.yahoo.ycsb.Client -t -db com.yahoo.ycsb.db.HBaseClient 
-P workloads/workloada -p columnfamily=family -p operationcount=1000000 -s 
-threads 10 > ycsb-test.log

As with the loading step shown earlier, you need to override a few values to make this test useful: increase (or use your own modified workload file) the number of operations to test, and set the number of concurrent threads that should perform them to something reasonable. If you use too many threads you may overload the test machine (the one you run YCSB on). In this case, it is more useful to run the same test at the same time from different physical machines.

The output is also redirected into a logfile so that you can evaluate the test run afterward. The output will contain lines like these:

]$ cat transactions.dat 
YCSB Client 0.1
Command line: -t -db com.yahoo.ycsb.db.HBaseClient -P workloads/workloada -p 
columnfamily=family -p operationcount=1000 -s -threads 10
[OVERALL], RunTime(ms), 575.0
[OVERALL], Throughput(ops/sec), 1739.1304347826087
[UPDATE], Operations, 507
[UPDATE], AverageLatency(ms), 2.546351084812623
[UPDATE], MinLatency(ms), 0
[UPDATE], MaxLatency(ms), 414
[UPDATE], 95thPercentileLatency(ms), 1
[UPDATE], 99thPercentileLatency(ms), 1
[UPDATE], Return=0, 507
[UPDATE], 0, 455
[UPDATE], 1, 49
[UPDATE], 2, 0
[UPDATE], 3, 0
...
[UPDATE], 997, 0
[UPDATE], 998, 0
[UPDATE], 999, 0
[UPDATE], >1000, 0
[READ], Operations, 493
[READ], AverageLatency(ms), 7.711967545638945
[READ], MinLatency(ms), 0
[READ], MaxLatency(ms), 417
[READ], 95thPercentileLatency(ms), 3
[READ], 99thPercentileLatency(ms), 416
[READ], Return=0, 493
[READ], 0, 1
[READ], 1, 165
[READ], 2, 257
[READ], 3, 48
[READ], 4, 11
[READ], 5, 4
[READ], 6, 0
...
[READ], 998, 0
[READ], 999, 0
[READ], >1000, 0

Note that YCSB can hardly emulate the workload you will see in your use case, but it can still be useful to test a varying set of loads on your cluster. Use the supplied workloads, or create your own, to emulate cases that are bound to read, write, or both kinds of operations.

Also consider running YCSB while you are running batch jobs, such as a MapReduce process that scans subsets, or entire tables. This will allow you to measure the impact of either on the other.

Note

As of this writing, using YCSB is preferred over the HBase-supplied Performance Evaluation. It offers more options, and can combine read and write workloads.



[118] The video of the presentation is available online.

[119] Java uses the Java Native Interface (JNI) to integrate native libraries and applications.

[120] See the wiki page “Using LZO Compression” (http://wiki.apache.org/hadoop/UsingLzoCompression) for information on how to make LZO work with HBase.

[121] The Hadoop project has a page describing the required steps to build and/or install the native libraries, which includes the GZIP support.

[122] As an alternative, you can also look at the number of requests values reported on the master UI page; see Main page.

[123] Work has been done to improve this situation in HBase 0.92.0.

[124] See the project’s GitHub repository for details.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.229.44