Chapter 12. Cluster Administration

Once a cluster is in operation, it may become necessary to change its size or add extra measures for failover scenarios, all while the cluster is in use. Data should be backed up and/or moved between distinct clusters. In this chapter, we will look how this can be done with minimal to no interruption.

Operational Tasks

This section introduces the various tasks necessary while operating a cluster, including adding and removing nodes.

Node Decommissioning

You can stop an individual region server by running the following script in the HBase directory on the particular server:

$ ./bin/hbase-daemon.sh stop regionserver

The region server will first close all regions and then shut itself down. On shutdown, its ephemeral node in ZooKeeper will expire. The master will notice that the region server is gone and will treat it as a crashed server: it will reassign the regions the server was carrying.

A downside to this method of stopping a region server is that regions could be offline for a good period of time—up to the configured ZooKeeper timeout period. Regions are closed in order: if there are many regions on the server, the first region to close may not be back online until all regions close and after the master notices the region server’s ZooKeeper znode being removed.

HBase 0.90.2 introduced the ability for a node to gradually shed its load and then shut itself down. This is accomplished with the graceful_stop.sh script. When you invoke this script without any parameters, you are presented with an explanation of its usage:

$ ./bin/graceful_stop.sh 
Usage: graceful_stop.sh [--config &conf-dir>] [--restart] [--reload] 
                        [--thrift] [--rest] &hostname>
 thrift      If we should stop/start thrift before/after the hbase stop/start
 rest        If we should stop/start rest before/after the hbase stop/start
 restart     If we should restart after graceful stop
 reload      Move offloaded regions back on to the stopped server
 debug       Move offloaded regions back on to the stopped server
 hostname    Hostname of server we are to stop

When you want to decommission a loaded region server, run the following:

$ ./bin/graceful_stop.sh HOSTNAME

where HOSTNAME is the host carrying the region server you want to decommission.

Note

The HOSTNAME passed to graceful_stop.sh must match the hostname that HBase is using to identify region servers. Check the list of region servers in the master UI for how HBase is referring to each server. It is usually hostname, but it can also be an FQDN, such as hostname.foobar.com. Whatever HBase is using, this is what you should pass the graceful_stop.sh decommission script.

If you pass IP addresses, the script is not (yet) smart enough to make a hostname (or FQDN) out of it and will fail when it checks if the server is currently running: the graceful unloading of regions will not run.

The graceful_stop.sh script will move the regions off the decommissioned region server one at a time to minimize region churn. It will verify the region deployed in the new location before it moves the next region, and so on, until the decommissioned server is carrying no more regions.

At this point, the graceful_stop.sh script tells the region server to stop. The master will notice the region server gone but all regions will have already been redeployed, and because the region server went down cleanly, there will be no WALs to split.

Rolling Restarts

You can also use the graceful_stop.sh script to restart a region server after the shutdown and move its old regions back into place. (You might do the latter to retain data locality.) A primitive rolling restart might be effected by running something like the following:

$ for i in `cat conf/regionservers|sort`; do ./bin/graceful_stop.sh 
  --restart --reload --debug $i; done &> /tmp/log.txt &

Tail the output of /tmp/log.txt to follow the script’s progress. The preceding code pertains to region servers only. Be sure to disable the load balancer before using this code.

You will need to perform the master update separately, and it is recommended that you do the rolling restart of the region servers. Here are some steps you can follow to accomplish a rolling restart:

  1. Unpack your release, make sure of its configuration, and then rsync it across the cluster. If you are using version 0.90.2, patch it with HBASE-3744 and HBASE-3756.

  2. Run hbck to ensure the cluster is consistent:

    $ ./bin/hbase hbck

    Effect repairs if inconsistent.

  3. Restart the master:

    $ ./bin/hbase-daemon.sh stop master; ./bin/hbase-daemon.sh start master
  4. Disable the region balancer:

    $ echo "balance_switch false" | ./bin/hbase shell
  5. Run the graceful_stop.sh script per region server. For example:

    $ for i in `cat conf/regionservers|sort`; do ./bin/graceful_stop.sh  
      --restart --reload --debug $i; done &> /tmp/log.txt &

    If you are running Thrift or REST servers on the region server, pass the --thrift or --rest option, as per the script’s usage instructions, shown earlier (i.e., run it without any commandline options to get the instructions).

  6. Restart the master again. This will clear out the dead servers list and reenable the balancer.

  7. Run hbck to ensure the cluster is consistent.

Adding Servers

One of the major features HBase offers is built-in scalability. As the load on your cluster increases, you need to be able to add new servers to compensate for the new requirements. Adding new servers is a straightforward process and can be done for clusters running in any of the distribution modes, as explained in Distributed Mode.

Pseudodistributed mode

It seems paradoxical to scale an HBase cluster in an all-local mode, even when all daemons are run in separate processes. However, pseudodistributed mode is the closest you can get to a real cluster setup, and during development or prototyping it is advantageous to be able to replicate a fully distributed setup on a single machine.

Since the processes have to share all the local resources, adding more processes obviously will not make your test cluster perform any better. In fact, pseudodistributed mode is really suitable only for a very small amount of data. However, it allows you to test most of the architectural features HBase has to offer.

For example, you can experiment with master failover scenarios, or regions being moved from one server to another. Obviously, this does not replace testing at scale on the real cluster hardware, with the load expected during production. However, it does help you to come to terms with the administrative functionality offered by the HBase Shell, for example.

Or you can use the administrative API as discussed in Chapter 5. Use it to develop tools that maintain schemas, or to handle shifting server loads. There are many applications for this in a production environment, and being able to develop and test a tool locally first is tremendously helpful.

Note

You need to have set up a pseudodistributed installation before you can add any servers in psuedodistributed mode, and it must be running to use the following commands. They add to the existing processes, but do not take care of spinning up the local cluster itself.

Adding a local backup master

Starting a local backup master process is accomplished by using the local-master-backup.sh script in the bin directory, like so:

$ ./bin/local-master-backup.sh start 1

The number at the end of the command signifies an offset that is added to the default ports of 60000 for RPC and 60010 for the web-based UI. In this example, a new master process would be started that reads the same configuration files as usual, but would listen on ports 60001 and 60011, respectively.

In other words, the parameter is required and does not represent a number of servers to start, but where their ports are bound to. Starting more than one is also possible:

$./bin/local-master-backup.sh start 1 3 5

This starts three backup masters on ports 60001, 60003, and 60005 for RPC, plus 60011, 60013, and 60015 for the web UIs.

Warning

Make sure you do not specify an offset that could collide with a port that is already in use by another process. For example, it is a bad idea to use 30 for the offset, since this would result in a master RPC port on 60030—which is usually already assigned to the first region server as its UI port.

The start script also adds the offset to the name of the logfile the process is using, thus differentiating it from the logfiles used by the other local processes. For an offset of 1, it would set the logfile name to be:

logs/hbase-${USER}-1-master-${HOSTNAME}.log

Note the added 1 in the name. Using an offset of, for instance, 10 would add that number into the logfile name.

Stopping the backup master(s) involves the same command, but replacing the start command with the aptly named stop, like so:

$ ./bin/local-master-backup.sh stop 1

You need to specify the offsets of those backup masters you want to stop, and you have the option to stop only one, or any other number, up to all of the ones you started: whatever offset you specify is used to stop the master matching that number.

Adding a local region server

In a similar vein, you are allowed to start additional local region servers. The script provided is called local-regionservers.sh, and it takes the same parameters as the related local-master-backup.sh script: you specify the command, that is, if you want to start or stop the server, and a list of offsets.

The difference is that these offsets are added to 60200 for RPC, and 60300 for the web UIs. For example:

$ ./bin/local-regionservers.sh start 1

This command will start an additional region server using port 60201 for RPC, and 60301 for the web UI. The logfile name has the offset added to it, and would result in:

logs/hbase-${USER}-1-regionserver-${HOSTNAME}.log

The same concerns apply: you need to ensure that you are specifying an offset that results in a port that is not already in use by another process, or you will receive a java.net.BindException: Address already in use exception—as expected.

Starting more than one region server is accomplished by adding more offsets:

$ ./bin/local-regionservers.sh start 1 2 3

Note

You do not have to start with an offset of 1. Since these are added to the base port numbers, you are free to specify any offset you prefer.

Stopping any additional region server involves replacing the start command with the stop command:

$ ./bin/local-regionservers.sh stop 1

This would stop the region server using offset 1, or ports 60201 and 60301. If you specify the offsets of all previously started region servers, they will all be stopped.

Fully distributed cluster

Operating an HBase cluster typically involves adding new servers over time. This is more common for the region servers, as they are doing all the heavy lifting. For the master, you have the option to start backup instances.

Adding a backup master

To prevent an HBase cluster master server from being the single point of failure, you can add backup masters. These are typically located on separate physical machines so that in a worst-case scenario, where the machine currently hosting the active master is failing, the system can fall back to a backup master.

The master process uses ZooKeeper to negotiate which is the currently active master: there is a dedicated ZooKeeper znode that all master processes race to create, and the first one to create it wins. This happens at startup and the winning process moves on to become the current master. All other machines simply loop around the znode check and wait for it to disappear—triggering the race again.

The /hbase/master znode is ephemeral, and is the same kind the region servers use to report their presence. When the master process that created the znode fails, ZooKeeper will notice the end of the session with that server and remove the znode accordingly, triggering the election process.

Starting a server on multiple machines requires that it is configured just like the rest of the HBase cluster (see Configuration for details). The master servers usually share the same configuration with the other servers in the cluster. Once you have confirmed that this is set up appropriately, you can run the following command on a server that is supposed to host the backup master:

$ ./bin/hbase-daemon.sh start master

Assuming you already had a master running, this command will bring up the new master to the point where it waits for the znode to be removed.[125] If you want to start many masters in an automated fashion and dedicate a specific server to host the current one, while all the others are considered backup masters, you can add the --backup switch like so:

$ ./bin/hbase-daemon.sh start master --backup

This forces the newly started master to wait for the dedicated one—which is the one that was started using the normal start-hbase.sh script, or by the previous command but without the --backup parameter—to create the /hbase/master znode in ZooKeeper. Once this has happened, they move on to the master election loop. Since now there is already a master present, they go into idle mode as explained.

Note

If you started more than one master, and you experienced failovers, there is no easy way to tell which master is currently active. This causes a slight problem in that there is no way for you to know where the master’s web-based UI is located. You will need to try the http://hostname:60010 URL on all possible master servers to find the active one.[126]

Since HBase 0.90.x, there is also the option of creating a backup-masters file in the conf directory. This is akin to the regionservers file, listing one hostname per line that is supposed to start a backup master. For the example in Example Configuration, we could assume that we have three backup masters running on the ZooKeeper servers. In that case, the conf/backup-masters, would contain these entries:

zk1.foo.com
zk2.foo.com
zk3.foo.com

Adding these processes to the ZooKeeper machines is useful in a small cluster, as the master is more a coordinator in the overall design, and therefore does not need a lot of resources.

Note

You should start as many backup masters as you feel satisfies your requirements to handle machine failures. There is no harm in starting too many, but having too few might leave you with a weak spot in the setup. This is mitigated by the use of monitoring solutions that report the first master to fail. You can take action by repairing the server and adding it back to the cluster. Overall, having two or three backup masters seems a reasonable number.

Note that the servers listed in backup-masters are what the backup master processes are started on, while using the --backup switch. This happens as the start-hbase.sh script starts the primary master, the region servers, and eventually the backup masters. Alternatively, you can invoke the hbase-backup.sh script to initiate the start of the backup masters.

Adding a region server

Adding a new region server is one of the more common procedures you will perform on a cluster. The first thing you should do is to edit the regionservers file in the conf directory, to enable the launcher scripts to automat the server start and stop procedure.[127] Simply add a new line to the file specifying the hostname to add.

Once you have updated the file, you need to copy it across all machines in the cluster. You also need to ensure that the newly added machine has HBase installed, and that the configuration is current.

Then you have a few choices to start the new region server process. One option is to run the start-hbase.sh script on the master machine. It will skip all machines that have a process already running. Since the new machine fails this check, it will appropriately start the region server daemon.

Another option is to use the launcher script directly on the new server. This is done like so:

$ ./bin/hbase-daemon.sh start regionserver

Note

This must be run on the server on which you want to start the new region server process.

The region server process will start and register itself by creating a znode with its hostname in ZooKeeper. It subsequently joins the collective and is assigned regions.

Data Tasks

When dealing with an HBase cluster, you also will deal with a lot of data, spread over one or more tables. Sometimes you may be required to move the data as a whole—or in parts—to either archive data for backup purposes or to bootstrap another cluster. The following describes the possible ways in which you can accomplish this task.

Import and Export Tools

HBase ships with a handful of useful tools, two of which are the Import and Export MapReduce jobs. They can be used to write subsets, or an entire table, to files in HDFS, and subsequently load them again. They are contained in the HBase JAR file and you need the hadoop jar command to get a list of the tools:

$ hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar
An example program must be given as the first argument.
Valid program names are:
  CellCounter: Count cells in HBase table
  completebulkload: Complete a bulk data load.
  copytable: Export a table from local cluster to peer cluster
  export: Write table data to HDFS.
  import: Import data written by Export.
  importtsv: Import data in TSV format.
  rowcounter: Count rows in HBase table
  verifyrep: Compare the data from tables in two different clusters. 
    WARNING: It doesn't work for incrementColumnValues'd cells since the 
      timestamp is changed after being appended to the log.

Adding the export program name then displays the options for its usage:

$ hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar export
ERROR: Wrong number of arguments: 0
Usage: Export [-D <property=value>]* <tablename> <outputdir> 
  [<versions> [<starttime> [<endtime>]] 
  [^[regex pattern] or [Prefix] to filter]]

  Note: -D properties will be applied to the conf used. 
  For example: 
   -D mapred.output.compress=true
   -D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
   -D mapred.output.compression.type=BLOCK
  Additionally, the following SCAN properties can be specified
  to control/limit what is exported..
   -D hbase.mapreduce.scan.column.family=<familyName>

You can see how you can supply various options. The only two required parameters are tablename and outputdir. The others are optional and can be added as required. [128] Table 12-1 lists the possible options.

Table 12-1. Parameters for the Export tool
NameDescription
tablenameThe name of the table to export.
outputdirThe location in HDFS to store the exported data.
versionsThe number of versions per column to store. Default is 1.
starttimeThe start time, further limiting the versions saved. See Introduction for details on the setTimeRange() method that is used.
endtimeThe matching end time for the time range of the scan used.
regexp/prefixWhen starting with ^ it is treated as a regular expression pattern, matching row keys; otherwise, it is treated as a row key prefix.

Note

The regexp parameter makes use of the RowFilter and RegexStringComparator, as explained in RowFilter, and the prefix version uses the PrefixFilter, discussed in PrefixFilter.

You do need to specify the parameters from left to right, and you cannot omit any inbetween. In other words, if you want to specify a row key filter, you must specify the versions, as well as the start and end times. If you do not need them, set them to their minimum and maximum values—for example, 0 for the start and 9223372036854775807 (since the time is given as a long value) for the end timestamp. This will ensure that the time range is not taken into consideration.

Although you are supplying the HBase JAR file, there are a few extra dependencies that need to be satisfied before you can run this MapReduce job successfully. MapReduce requires access to the following JAR files: zookeeper-xyz.jar, guava-xyz.jar, and google-collections-xyz.jar. You need to make them available in such a way that the MapReduce task attempt has access to them. One way is to add them to HADOOP_CLASSPATH variable in the $HADOOP_HOME/conf/hadoop-env.sh.

Running the command will start the MapReduce job and print out the progress:

$ hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar export  
  testtable /user/larsgeorge/backup-testtable
11/06/25 15:58:29 INFO mapred.JobClient: Running job: job_201106251558_0001
11/06/25 15:58:30 INFO mapred.JobClient:  map 0% reduce 0%
11/06/25 15:58:52 INFO mapred.JobClient:  map 6% reduce 0%
11/06/25 15:58:55 INFO mapred.JobClient:  map 9% reduce 0%
11/06/25 15:58:58 INFO mapred.JobClient:  map 15% reduce 0%
11/06/25 15:59:01 INFO mapred.JobClient:  map 21% reduce 0%
11/06/25 15:59:04 INFO mapred.JobClient:  map 28% reduce 0%
11/06/25 15:59:07 INFO mapred.JobClient:  map 34% reduce 0%
11/06/25 15:59:10 INFO mapred.JobClient:  map 40% reduce 0%
11/06/25 15:59:13 INFO mapred.JobClient:  map 46% reduce 0%
11/06/25 15:59:16 INFO mapred.JobClient:  map 53% reduce 0%
11/06/25 15:59:19 INFO mapred.JobClient:  map 59% reduce 0%
11/06/25 15:59:22 INFO mapred.JobClient:  map 65% reduce 0%
11/06/25 15:59:25 INFO mapred.JobClient:  map 71% reduce 0%
11/06/25 15:59:28 INFO mapred.JobClient:  map 78% reduce 0%
11/06/25 15:59:31 INFO mapred.JobClient:  map 84% reduce 0%
11/06/25 15:59:34 INFO mapred.JobClient:  map 90% reduce 0%
11/06/25 15:59:37 INFO mapred.JobClient:  map 96% reduce 0%
11/06/25 15:59:40 INFO mapred.JobClient:  map 100% reduce 0%
11/06/25 15:59:42 INFO mapred.JobClient: Job complete: job_201106251558_0001
11/06/25 15:59:42 INFO mapred.JobClient: Counters: 6
11/06/25 15:59:42 INFO mapred.JobClient:   Job Counters 
11/06/25 15:59:42 INFO mapred.JobClient:     Rack-local map tasks=32
11/06/25 15:59:42 INFO mapred.JobClient:     Launched map tasks=32
11/06/25 15:59:42 INFO mapred.JobClient:   FileSystemCounters
11/06/25 15:59:42 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=3648
11/06/25 15:59:42 INFO mapred.JobClient:   Map-Reduce Framework
11/06/25 15:59:42 INFO mapred.JobClient:     Map input records=0
11/06/25 15:59:42 INFO mapred.JobClient:     Spilled Records=0
11/06/25 15:59:42 INFO mapred.JobClient:     Map output records=0

Once the job is complete, you can check the filesystem for the exported data. Use the hadoop dfs command (the lines have been shortened to fit horizontally):

$ hadoop dfs -lsr /user/larsgeorge/backup-testtable 
drwxr-xr-x   - ...      0 2011-06-25 15:58 _logs
-rw-r--r--   1 ...    114 2011-06-25 15:58 part-m-00000
-rw-r--r--   1 ...    114 2011-06-25 15:58 part-m-00001
-rw-r--r--   1 ...    114 2011-06-25 15:58 part-m-00002
-rw-r--r--   1 ...    114 2011-06-25 15:58 part-m-00003
-rw-r--r--   1 ...    114 2011-06-25 15:58 part-m-00004
-rw-r--r--   1 ...    114 2011-06-25 15:58 part-m-00005
-rw-r--r--   1 ...    114 2011-06-25 15:58 part-m-00006
-rw-r--r--   1 ...    114 2011-06-25 15:58 part-m-00007
-rw-r--r--   1 ...    114 2011-06-25 15:58 part-m-00008
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00009
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00010
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00011
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00012
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00013
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00014
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00015
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00016
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00017
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00018
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00019
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00020
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00021
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00022
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00023
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00024
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00025
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00026
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00027
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00028
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00029
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00030
-rw-r--r--   1 ...    114 2011-06-25 15:59 part-m-00031

Each part-m-nnnnn file contains a piece of the exported data, and together they form the full backup of the table. You can now, for example, use the hadoop distcp command to move the directory from one cluster to another, and perform the import there.

Also, using the optional parameters, you can implement an incremental backup process: set the start time to the value of the last backup. The job will still scan the entire table, but only export what has been modified since.

It is usually OK to only export the last version of a column value, but if you want a complete table backup, set the number of versions to 2147483647, which means all of them.

Importing the data is the reverse operation. First we can get the usage details by invoking the command without any parameters, and then we can start the job with the tablename and inputdir (the directory containing the exported files):

$  hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar import
ERROR: Wrong number of arguments: 0
Usage: Import <tablename> <inputdir>

$ hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar import 
  testtable /user/larsgeorge/backup-testtable
11/06/25 17:09:48 INFO mapreduce.TableOutputFormat: Created table instance 
  for testtable
11/06/25 17:09:48 INFO input.FileInputFormat: Total input paths to process : 32
11/06/25 17:09:49 INFO mapred.JobClient: Running job: job_201106251558_0003
11/06/25 17:09:50 INFO mapred.JobClient:  map 0% reduce 0%
11/06/25 17:10:04 INFO mapred.JobClient:  map 6% reduce 0%
11/06/25 17:10:07 INFO mapred.JobClient:  map 12% reduce 0%
11/06/25 17:10:10 INFO mapred.JobClient:  map 18% reduce 0%
11/06/25 17:10:13 INFO mapred.JobClient:  map 25% reduce 0%
11/06/25 17:10:16 INFO mapred.JobClient:  map 31% reduce 0%
11/06/25 17:10:19 INFO mapred.JobClient:  map 37% reduce 0%
11/06/25 17:10:22 INFO mapred.JobClient:  map 43% reduce 0%
11/06/25 17:10:25 INFO mapred.JobClient:  map 50% reduce 0%
11/06/25 17:10:28 INFO mapred.JobClient:  map 56% reduce 0%
11/06/25 17:10:31 INFO mapred.JobClient:  map 62% reduce 0%
11/06/25 17:10:34 INFO mapred.JobClient:  map 68% reduce 0%
11/06/25 17:10:37 INFO mapred.JobClient:  map 75% reduce 0%
11/06/25 17:10:40 INFO mapred.JobClient:  map 81% reduce 0%
11/06/25 17:10:43 INFO mapred.JobClient:  map 87% reduce 0%
11/06/25 17:10:46 INFO mapred.JobClient:  map 93% reduce 0%
11/06/25 17:10:49 INFO mapred.JobClient:  map 100% reduce 0%
11/06/25 17:10:51 INFO mapred.JobClient: Job complete: job_201106251558_0003
11/06/25 17:10:51 INFO mapred.JobClient: Counters: 6
11/06/25 17:10:51 INFO mapred.JobClient:   Job Counters 
11/06/25 17:10:51 INFO mapred.JobClient:     Launched map tasks=32
11/06/25 17:10:51 INFO mapred.JobClient:     Data-local map tasks=32
11/06/25 17:10:51 INFO mapred.JobClient:   FileSystemCounters
11/06/25 17:10:51 INFO mapred.JobClient:     HDFS_BYTES_READ=3648
11/06/25 17:10:51 INFO mapred.JobClient:   Map-Reduce Framework
11/06/25 17:10:51 INFO mapred.JobClient:     Map input records=0
11/06/25 17:10:51 INFO mapred.JobClient:     Spilled Records=0
11/06/25 17:10:51 INFO mapred.JobClient:     Map output records=0

You can also use the Import job to store the data in a different table. As long as it has the same schema, you are free to specify a different table name on the command line.

The data from the exported files was read by the MapReduce job and stored in the specified table. Finally, this Export/Import combination is per-table only. If you have more than one table, you need to run them separately.

CopyTable Tool

Another supplied tool is CopyTable, which is primarily designed to bootstrap cluster replication. You can use is it to make a copy of an existing table from the master cluster to the slave cluster. Here are its command-line options:

$ hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar copytable
Usage: CopyTable [--rs.class=CLASS] [--rs.impl=IMPL] [--starttime=X] 
       [--endtime=Y] [--new.name=NEW] [--peer.adr=ADR] <tablename>

Options:
 rs.class     hbase.regionserver.class of the peer cluster
              specify if different from current cluster
 rs.impl      hbase.regionserver.impl of the peer cluster
 starttime    beginning of the time range
              without endtime means from starttime to forever
 endtime      end of the time range
 new.name     new table's name
 peer.adr     Address of the peer cluster given in the format
   hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent
 families     comma-seperated list of families to copy

Args:
 tablename    Name of the table to copy

Examples:
 To copy 'TestTable' to a cluster that uses replication for a 1 hour window:
 $ bin/hbase org.apache.hadoop.hbase.mapreduce.CopyTable 
  --rs.class=org.apache.hadoop.hbase.ipc.ReplicationRegionInterface
  --rs.impl=org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer 
  --starttime=1265875194289 --endtime=1265878794289 
  --peer.adr=server1,server2,server3:2181:/hbase TestTable

CopyTable comes with an example command at the end of the usage output, which you can use to set up your own copy process. The parameters are all documented in the output too, and you may notice that you also have the start and end time options, which you can use the same way as explained earlier for the Export/Import tool.

In addition, you can use the families parameter to limit the number of column families that are included in the copy. The copy only considers the latest version of a column value. Here is an example of copying a table within the same cluster:

$ hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar copytable 
  --new.name=testtable3 testtable
11/06/26 15:20:07 INFO mapreduce.TableOutputFormat: Created table instance for 
testtable3
11/06/26 15:20:07 INFO mapred.JobClient: Running job: job_201106261454_0003
11/06/26 15:20:08 INFO mapred.JobClient:  map 0% reduce 0%
11/06/26 15:20:19 INFO mapred.JobClient:  map 6% reduce 0%
11/06/26 15:20:22 INFO mapred.JobClient:  map 12% reduce 0%
11/06/26 15:20:25 INFO mapred.JobClient:  map 18% reduce 0%
11/06/26 15:20:28 INFO mapred.JobClient:  map 25% reduce 0%
11/06/26 15:20:31 INFO mapred.JobClient:  map 31% reduce 0%
11/06/26 15:20:34 INFO mapred.JobClient:  map 37% reduce 0%
11/06/26 15:20:37 INFO mapred.JobClient:  map 43% reduce 0%
11/06/26 15:20:40 INFO mapred.JobClient:  map 50% reduce 0%
11/06/26 15:20:43 INFO mapred.JobClient:  map 56% reduce 0%
11/06/26 15:20:46 INFO mapred.JobClient:  map 62% reduce 0%
11/06/26 15:20:49 INFO mapred.JobClient:  map 68% reduce 0%
11/06/26 15:20:52 INFO mapred.JobClient:  map 75% reduce 0%
11/06/26 15:20:55 INFO mapred.JobClient:  map 81% reduce 0%
11/06/26 15:20:58 INFO mapred.JobClient:  map 87% reduce 0%
11/06/26 15:21:01 INFO mapred.JobClient:  map 93% reduce 0%
11/06/26 15:21:04 INFO mapred.JobClient:  map 100% reduce 0%
11/06/26 15:21:06 INFO mapred.JobClient: Job complete: job_201106261454_0003
11/06/26 15:21:06 INFO mapred.JobClient: Counters: 5
11/06/26 15:21:06 INFO mapred.JobClient:   Job Counters 
11/06/26 15:21:06 INFO mapred.JobClient:     Launched map tasks=32
11/06/26 15:21:06 INFO mapred.JobClient:     Data-local map tasks=32
11/06/26 15:21:06 INFO mapred.JobClient:   Map-Reduce Framework
11/06/26 15:21:06 INFO mapred.JobClient:     Map input records=0
11/06/26 15:21:06 INFO mapred.JobClient:     Spilled Records=0
11/06/26 15:21:06 INFO mapred.JobClient:     Map output records=0

The copy process requires for the target table to exist: use the shell to get the definition of the source table, and create the target table using the same. You can omit the families you do not include in the copy command.

The example also uses the optional new.name parameter, which allows you to specify a table name that is different from the original. The copy of the table is stored on the same cluster, since the peer.adr parameter was not used.

Note

Note that for both the CopyTable and Export/Import tools you can only rely on row-level atomicity. In other words, if you export or copy a table while it is being modified by other clients, you may not be able to tell exactly what has been copied to the new location.

Especially when dealing with more than one table, such as the secondary indexes, you need to ensure from the client side that you have copied a consistent view of all tables. One way to handle this is to use the start and end time parameters. This will allow you to run a second update job that only addresses the recently updated data.

Bulk Import

HBase includes several methods of loading data into tables. The most straightforward method is to either use the TableOutputFormat class from a MapReduce job (see Chapter 7), or use the normal client APIs; however, these are not always the most efficient methods.

Another way to efficiently load large amounts of data is via a bulkimport. The bulk load feature uses a MapReduce job to output table data in HBase’s internal data format, and then directly loads the data files into a running cluster. This feature uses less CPU and network resources than simply using the HBase API.

Note

A problem with loading data into HBase is that often this must be done in short bursts, but with those bursts being potentially very large. This will put additional stress on your cluster, and might overload it subsequently. Bulk imports are a way to alleviate this problem by not causing unnecessary churn on region servers.

Bulk load procedure

The HBase bulk load process consists of two main steps:

Preparation of data

The first step of a bulk load is to generate HBase data files from a MapReduce job using HFileOutputFormat. This output format writes out data in HBase’s internal storage format so that it can be later loaded very efficiently into the cluster.

In order to function efficiently, HFileOutputFormat must be configured such that each output HFile fits within a single region: jobs whose output will be bulk-loaded into HBase use Hadoop’s TotalOrderPartitioner class to partition the map output into disjoint ranges of the key space, corresponding to the key ranges of the regions in the table.

HFileOutputFormat includes a convenience function, configureIncrementalLoad(), which automatically sets up a TotalOrderPartitioner based on the current region boundaries of a table.

Load data

After the data has been prepared using HFileOutputFormat, it is loaded into the cluster using the completebulkload tool. This tool iterates through the prepared data files, and for each one it determines the region the file belongs to. It then contacts the appropriate region server which adopts the HFile, moving it into its storage directory and making the data available to clients.

If the region boundaries have changed during the course of bulk load preparation, or between the preparation and completion steps, the completebulkload tool will automatically split the data files into pieces corresponding to the new boundaries. This process is not optimally efficient, so you should take care to minimize the delay between preparing a bulk load and importing it into the cluster, especially if other clients are simultaneously loading data through other means.

This mechanism makes use of the merge read already in place on the servers to scan memstores and on-disk file stores for KeyValue entries of a row. Adding the newly generated files from the bulk import adds an additional file to handle—similar to new store files generated by a memstore flush.

What is even more important is that all of these files are sorted by the timestamps the matching KeyValue instances have (see Read Path). In other words, you can bulk-import newer and older versions of a column value, while the region servers sort them appropriately. The end result is that you immediately have a consistent and coherent view of the stored rows.

Using the importtsv tool

HBase ships with a command-line tool called importtsv which, when given files containing data in tab-separated value (TSV) format, can prepare this data for bulk import into HBase. This tool uses the HBase put() API by default to insert data into HBase one row at a time.

Alternatively, you can use the importtsv.bulk.output option so that importtsv will instead generate files using HFileOutputFormat. These can subsequently be bulk-loaded into HBase. Running the tool with no arguments prints brief usage information:

$ hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar importtsv
Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>

Imports the given input directory of TSV data into the specified table.

The column names of the TSV data must be specified using the -Dimporttsv.columns
option. This option takes the form of comma-separated column names, where each
column name is either a simple column family, or a columnfamily:qualifier. The 
special column name HBASE_ROW_KEY is used to designate that this column should
be used as the row key for each imported record. You must specify exactly one 
column to be the row key, and you must specify a column name for every column 
that exists in the input data.

By default importtsv will load data directly into HBase. To instead generate
HFiles of data to prepare for a bulk data load, pass the option:
  -Dimporttsv.bulk.output=/path/for/output
  Note: if you do not use this option, then the target table must already 
    exist in HBase

Other options that may be specified with -D include:
  -Dimporttsv.skip.bad.lines=false - fail if encountering an invalid line
  '-Dimporttsv.separator=|' - eg separate on pipes instead of tabs
  -Dimporttsv.timestamp=currentTimeAsLong - use the specified timestamp for the import
  -Dimporttsv.mapper.class=my.Mapper - A user-defined Mapper to use instead  
    of org.apache.hadoop.hbase.mapreduce.TsvImporterMapper

The usage information is self-explanatory, so you simply need to run the tool, while specifying the option it requires. It will start a job that reads the files from HDFS and prepare the bulk import store files.

Using the completebulkload Tool

After a data import has been prepared, either by using the importtsv tool with the importtsv.bulk.output option, or by some other MapReduce job using the HFileOutputFormat, the completebulkload tool is used to import the data into the running cluster.

The completebulkload tool simply takes the output path where importtsv or your MapReduce job put its results, and the table name to import into. For example:

$ hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar completebulkload 
-conf ~/my-hbase-site.xml /user/larsgeorge/myoutput mytable

The optional -conf config-file parameter can be used to specify a file containing the appropriate HBase parameters, if not supplied already on the CLASSPATH. In addition, the CLASSPATH must contain the directory that has the ZooKeeper configuration file, if ZooKeeper is not managed by HBase.

Note

If the target table does not already exist in HBase, this tool will create it for you.

The completebulkload tool completes quickly, after which point the new data will be visible in the cluster.

Advanced usage

Although the importtsv tool is useful in many cases, advanced users may want to generate data programatically, or import data from other formats. To get started doing so, peruse the ImportTsv.java class, and check the JavaDoc for HFileOutputFormat.

The import step of the bulk load can also be done from within your code: see the LoadIncrementalHFiles class for more information.

Replication

The architecture of the HBase replication feature was discussed in Replication. Here we will look at what is required to enable replication of a table between two clusters.

The first step is to edit the hbase-site.xml configuration file in the conf directory to turn the feature on for the entire cluster:

<configuration>
  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>zk1.foo.com,zk2.foo.com,zk3.foo.com</value>
  </property>
  <property>
    <name>hbase.rootdir</name>
    <value>hdfs://master.foo.com:8020/hbase</value>
  </property>
  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>
  <property>
      <name>hbase.replication</name>
      <value>true</value>
  </property>
</configuration>

This example adds the new hbase.replication property, where setting it to true enables replication support. This puts certain low-level features into place that are required. Otherwise, you will not see any changes to your cluster setup and functionality. Do not forget to copy the changed configuration file to all machines in your cluster, and to restart the servers.

Now you can either alter an existing table—you need to disable it before you can do that—or create a new one with the replication scope set to 1 (also see Column Families for its value range):

hbase(main):001:0> create 'testtable1', 'colfam1'
hbase(main):002:0> disable 'testtable1'
hbase(main):003:0> alter 'testtable1', NAME => 'colfam1',  
  REPLICATION_SCOPE => '1'
hbase(main):004:0> enable 'testtable1'

hbase(main):005:0> create 'testtable2', { NAME => 'colfam1',  
  REPLICATION_SCOPE => 1}

Setting the scope further prepares the master cluster for its role as the replication source. Now it is time to add a slave—here also called a peer—cluster and start the replication:

hbase(main):006:0> add_peer '1', 'slave-zk1:2181:/hbase'
hbase(main):007:0> start_replication

The first command adds the ZooKeeper quorum details for the peer cluster so that modifications can be shipped to it subsequently. The second command starts the actual shipping of modification records to the peer cluster. For this to work as expected, you need to make sure that you have already created an identical copy of the table on the peer cluster: it can be empty, but it needs to have the same schema definition and table name.

For development and prototyping, you can use the approach of running two local clusters, described in Coexisting Clusters, and configure the peer address to point to the second local cluster:

hbase(main):006:0> add_peer '1', 'localhost:2181:/hbase-2'

There is one more change you need to apply to the hbase-site.xml file in the conf.2 directory on the secondary cluster:

<property>
    <name>hbase.replication</name>
    <value>true</value>
</property>

Adding this flag will allow for it to act as a peer for the master replication cluster.

Since replication is now enabled, you can add data into the master cluster, and within a few moments see the data appear in the peer cluster table with the same name.

No further changes need to be applied to the peer cluster. The replication feature uses the normal client API on the peer cluster to apply the changes locally. Removing a peer and stopping the translation is equally done, using the reverse commands:

hbase(main):008:0> stop_replication
hbase(main):009:0> remove_peer '1'

Note that stopping the replication will still complete the shipping of all queued modifications to the peer, but all further processing is ended.

Finally, verifying the replicated data on two clusters is easy to do in the shell when looking only at a few rows, but doing a systematic comparison requires more computing power. This is why the Verify Replication tool is provided; it is available as verifyrep using the hadoop jar command once more:

$ hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar verifyrep
Usage: verifyrep [--starttime=X] [--stoptime=Y] [--families=A] <peerid> 
       <tablename>

Options:
 starttime    beginning of the time range
              without endtime means from starttime to forever
 stoptime     end of the time range
 families     comma-separated list of families to copy

Args:
 peerid       Id of the peer used for verification, must match the one given 
              for replication
 tablename    Name of the table to verify

Examples:
 To verify the data replicated from TestTable for a 1 hour window with peer #5 
 $ bin/hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication 
  --starttime=1265875194289 --stoptime=1265878794289 5 TestTable

T has to be run on the master cluster and needs to be provided with a peer ID (the one provided when establishing a replication stream) and a table name. Other options let you specify a time range and specific families.

Additional Tasks

On top of the operational and data tasks, there are additional tasks you may need to perform when setting up or running a test or production HBase cluster. We will discuss these tasks in the following subsections.

Coexisting Clusters

For testing purposes, it is useful to be able to run HBase in two separate instances, but on the same physical machine. This can be helpful, for example, when you want to prototype replication on your development machine.

Warning

Running multiple instances of HBase, including any of its daemons, on a distributed cluster is not recommended, and is not tested at all. None of HBase’s processes is designed to share the same server in production, nor is doing so part of its design. Be warned!

Presuming you have set up a local installation of HBase, as described in Chapter 2, and configured it to run in standalone mode, you can first make a copy of the configuration directory like so:

$ cd $HBASE_HOME
$ cp -pR conf conf.2

The next step is to edit the hbase-env.sh file in the new conf.2 directory:

# Where log files are stored.  $HBASE_HOME/logs by default.
export HBASE_LOG_DIR=${HBASE_HOME}/logs.2

# A string representing this instance of hbase. $USER by default.
export HBASE_IDENT_STRING=${USER}.2

This is required to have no overlap in local filenames. Lastly, you need to adjust the hbase-site.xml file:

<configuration>
  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>localhost</value>
  </property>
  <property>
    <name>hbase.rootdir</name>
    <value>hdfs://localhost:8020/hbase-2</value>
  </property>
  <property>
    <name>hbase.tmp.dir</name>
    <value>/tmp/hbase-2-${user.name}</value>
  </property>
  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>
  <property>
    <name>zookeeper.znode.parent</name>
    <value>/hbase-2</value>
  </property>
  <property>
    <name>hbase.master.port</name>
    <value>60100</value>
  </property>
  <property>
    <name>hbase.master.info.port</name>
    <value>60110</value>
  </property>
  <property>
    <name>hbase.regionserver.port</name>
    <value>60120</value>
  </property>
  <property>
    <name>hbase.regionserver.info.port</name>
    <value>60130</value>
  </property>
</configuration>

The highlighted properties contain the required changes. You need to assign all ports differently so that you have a clear distinction between the two cluster instances. Operating the secondary cluster requires specification of the new configuration directory:

$ HBASE_CONF_DIR=conf.2 bin/start-hbase.sh 
$ HBASE_CONF_DIR=conf.2 ./bin/hbase shell
$ HBASE_CONF_DIR=conf.2 ./bin/stop-hbase.sh

The first command starts the secondary local cluster, the middle one starts a shell connecting to it, and the last command stops the cluster.

Required Ports

The HBase processes, when started, bind to two separate ports: one for the RPCs, and another for the web-based UI. This applies to both the master and each region server. Since you are running each process type on one machine only, you need to consider two ports per server type—unless you run in a nondistributed setup. Table 12-2 lists the default ports.

Table 12-2. Default ports used by the HBase daemons
Node typePortDescription
Master60000The RPC port the master listens on for client requests. Can be configured with the hbase.master.port configuration property.
Master60010The web-based UI port the master process listens on. Can be configured with the hbase.master.info.port configuration property.
Region server60020The RPC port the region server listens on for client requests. Can be configured with the hbase.regionserver.port configuration property.
Region server60030The web-based UI port the region server listens on. Can be configured with the hbase.regionserver.info.port configuration property.

In addition, if you want to configure a firewall, for example, you also have to ensure that the ports for the Hadoop subsystems, that is, MapReduce and HDFS, are configured so that the HBase daemons have access to them.[129]

Changing Logging Levels

By default, HBase ships with a configuration which sets the log level of its processes to DEBUG, which is useful if you are in the installation and prototyping phase. It allows you to search through the files in case something goes wrong, as discussed in Analyzing the Logs.

For a production environment, you can switch to a less verbose level, such as INFO, or even WARN. This is accomplished by editing the log4j.properties file in the conf directory. Here is an example with the modified level for the HBase classes:

...
# Custom Logging levels

log4j.logger.org.apache.zookeeper=INFO
#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
log4j.logger.org.apache.hadoop.hbase=INFO
# Make these two classes INFO-level. Make them DEBUG to see more zk debug.
log4j.logger.org.apache.hadoop.hbase.zookeeper.ZKUtil=INFO
log4j.logger.org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher=INFO
#log4j.logger.org.apache.hadoop.dfs=DEBUG
# Set this class to log INFO only otherwise its OTT
...

This file needs to be copied to all servers, which need to be restarted subsequently for the changes to take effect.

Another option to either temporarily change the level, or when you have made changes to the properties file and want to delay the restart, use the web-based UIs and their log-level page. This is discussed and shown in Shared Pages. Since the UI log-level change is only affecting the server it is loaded from, you will need to adjust the level separately for every server in your cluster.

Troubleshooting

This section deals with the things you can do to heal a cluster that does not work as expected.

HBase Fsck

HBase comes with a tool called hbck which is implemented by the HBaseFsck class. It provides various command-line switches that influence its behavior. You can get a full list of its usage information by running it with -h:

$ ./bin/hbase hbck -h
Unknown command line option : -h
Usage: fsck [opts] 
 where [opts] are:
   -details Display full report of all regions.
   -timelag {timeInSeconds}  Process only regions that  have not experienced 
            any metadata updates in the last  {{timeInSeconds} seconds.
   -fix Try to fix some of the errors.
   -sleepBeforeRerun {timeInSeconds} Sleep this many seconds before checking 
            if the fix worked if run with -fix
   -summary Print only summary of the tables and status.

The details switch prints out the most information when running hbck, while summary prints out the least. No option at all invokes the normal output detail, for example:

$ ./bin/hbase hbck
Number of Tables: 40
Number of live region servers: 19
Number of dead region servers: 0
Number of empty REGIONINFO_QUALIFIER rows in .META.: 0
Summary:
  -ROOT- is okay.
    Number of regions: 1
    Deployed on:  host1.foo.com:60020
  .META. is okay.
    Number of regions: 1
    Deployed on:  host4.foo.com:60020
  testtable is okay.
    Number of regions: 15
    Deployed on:  host7.foo.com:60020 host14.foo.com:60020
...
  testtable2 is okay.
    Number of regions: 1
    Deployed on:  host11.foo.com:60020
0 inconsistencies detected.
Status: OK

The extra parameters, such as timelag and sleepBeforeRerun, are explained in the usage details in the preceding code. They allow you to check subsets of data, as well as delay the eventual re-check run, to report any remaining issues.

Once started, the hbck tool will scan the .META. table to gather all the pertinent information it holds. It also scans the HDFS root directory HBase is configured to use. It then proceeds to compare the collected details to report on inconsistencies and integrity issues.

Consistency check

This check applies to a region on its own. It is checked whether the region is listed in .META. and exists in HDFS, as well as if it is assigned to exactly one region server.

Integrity check

This concerns a table as a whole. It compares the regions with the table details to find missing regions, or those that have holes or overlaps in their row key ranges.

The fix option allows you to repair a list of these issues. Over time, this feature is going to be enhanced so that more problems can be fixed. As of this writing, the fix option can handle the following problems:

  • Assign .META. to a single new server if it is unassigned.

  • Reassign .META. to a single new server if it is assigned to multiple servers.

  • Assign a user table region to a new server if it is unassigned.

  • Reassign a user table region to a single new server if it is assigned to multiple servers.

  • Reassign a user table region to a new server if the current server does not match what the .META. table refers to.

Note

Be aware that sometimes hbck reports inconsistencies which are temporal, or transitional only. For example, when regions are unavailable for short periods of time during the internal housekeeping process, hbck will report those as inconsistencies too. Add the details switch to get more information on what is going on and rerun the tool a few times to confirm a permanent problem.

Analyzing the Logs

In rare cases it is necessary to directly access the logfiles created by the various HBase processes. They contain a mix of messages, some of which are printed for informational purposes and others representing internal warnings or error messages. While some of these messages are temporary, and do not mean that there is a permanent issue with the cluster, others state a system failure and are printed just before the process is forcefully ended.

Table 12-3 lists the various default HBase, ZooKeeper, and Hadoop logfiles. user is replaced with the user ID the process is started by, and hostname is the name of the machine the process is running on.

Table 12-3. The various server types and the logfiles they create
Server typeLogfile
HBase Master$HBASE_HOME/logs/hbase-<user>-master-<hostname>.log
HBase RegionServer$HBASE_HOME/logs/hbase-<user>-regionserver-<hostname>.log
ZooKeeperConsole log output only
NameNode$HADOOP_HOME/logs/hadoop-<user>-namenode-<hostname>.log
DataNode$HADOOP_HOME/logs/hadoop-<user>-datanode-<hostname>.log
JobTracker$HADOOP_HOME/logs/hadoop-<user>-jobtracker-<hostname>.log
TaskTracker$HADOOP_HOME/logs/hadoop-<user>-jobtracker-<hostname>.log

Obviously, this can be modified by editing the configuration files for either of these systems.

When you start analyzing the logfiles, it is useful to begin with the master logfile first, as it acts as the coordinator service of the entire cluster. It contains informational messages, such as the balancer printing out its background processing:

2011-06-03 09:12:55,448 INFO org.apache.hadoop.hbase.master.HMaster: balance 
hri=testtable,mykey1,1308610119005.dbccd6310dd7326f28ac09b60170a84c.,  
src=host1.foo.com,60020,1308239280769, dest=host3.foo.com,60020,1308239274789

or when a region is split on a region server, duly reporting back the event:

2011-06-03 09:12:55,344 INFO org.apache.hadoop.hbase.master.ServerManager: 
Received REGION_SPLIT: 
testtable,myrowkey5,1308647333895.0b8eeffeba8e2168dc7c06148d93dfcf.: 
Daughters; testtable,myrowkey5,1308647572030.bc7cc0055a3a4fd7a5f56df6f27a696b.,
testtable,myrowkey9,1308647572030.87882799b2d58020990041f588b6b31c. 
from host5.foo.com,60020,1308239280769

Many of these messages at the INFO level show you how your cluster evolved over time. You can use them to go back in time and see what happened earlier on. Typically the master is simply printing these messages on a regular basis, so when you look at specific time ranges you will see the common patterns.

If something fails, though, these patterns will change: the log messages are interrupted by others at the WARN (short for warning) or even ERROR level. You should find those patterns and reset just before the common pattern was disturbed.

Note

An interesting metric you can use as a gauge for where to start is discussed in JVM Metrics, under System Event Metrics: the error log event metric. It gives you a graph showing you where the server(s) started logging an increasing number of error messages in the logfiles. Find the time before this graph started rising and use it as the entry point into your logs.

Once you have found where the processes began logging ERROR level messages, you should be able to identify the root cause. A lot of subsequent messages are often collateral damage: they are a side effect of the original problem.

Not all of the logged messages that indicate a pattern change are using an elevated log level. Here is an example of a region that has been in the transition table for too long:

2011-06-21 09:19:20,218 INFO org.apache.hadoop.hbase.master.AssignmentManager: 
Regions in transition timed out:  
testtable,myrowkey123,1308610119005.dbccd6310dd7326f28ac09b60170a84c. 
state=CLOSING, ts=1308647575449

2011-06-21 09:19:20,218 INFO org.apache.hadoop.hbase.master.AssignmentManager: 
Region has been CLOSING for too long, this should eventually complete or the 
server will expire, doing nothing

The message is logged on the info level because the system will eventually recover from it. But it could indicate the beginning of larger problems—for example, when the servers start to get overloaded. Make sure you reset your log analysis to where the normal patterns are disrupted.

Once you have investigated the master logs, move on to the region server logs. Use the monitoring metrics to see if any of them shows an increase in log messages, and scrutinize that server first.

If you find an error message, use the online resources to search[130]for the message in the public mailing lists (see http://hbase.apache.org/mail-lists.html). There is a good chance that this has been reported or discussed before, especially with recurring issues, such as the mentioned server overload scenarios: even errors follow a pattern at times.

Here is an example error message, caused by session loss between the region server and the ZooKeeper quorum:

2011-06-09 15:28:34,836 ERROR 
org.apache.hadoop.hbase.regionserver.HRegionServer: 
ZooKeeper session expired
2011-06-09 15:28:34,837 ERROR
org.apache.hadoop.hbase.regionserver.HRegionServer:
java.io.IOException: Server not running, aborting
...

You can search in the logfiles for occurrences of "ERROR" and "aborting" to find clues about the reasons the server in question stopped working.

Common Issues

The following gives you a list to run through when you encounter problems with your cluster setup.

Basic setup checklist

This section provides a checklist of things you should confirm for your cluster, before going into a deeper analysis in case of problems or performance issues.

File handles

The ulimit -n for the DataNode processes and the HBase processes should be set high. To verify the current ulimit setting you can also run the following:

$ cat /proc/<PID of JVM>/limits

You should see that the limit on the number of files is set reasonably high—it is safest to just bump this up to 32000, or even more. File handles and process limits has the full details on how to configure this value.

DataNode connections

The DataNodes should be configured with a large number of transceivers—at least 4,096, but potentially more. There’s no particular harm in setting it up to as high as 16,000 or so. See Datanode handlers for more information.

Compression

Compression should almost always be on, unless you are storing precompressed data. Compression discusses the details. Make sure that you have verified the installation so that all region servers can load the required compression libraries. If not, you will see errors like this:

hbase(main):007:0> create 'testtable', { NAME => 'colfam1', COMPRESSION => 'LZO' }
ERROR: org.apache.hadoop.hbase.client.NoServerForRegionException: 
  No server address listed in .META. for region 
  testtable2,,1309713043529.8ec02f811f75d2178ad098dc40b4efcf.

In the logfiles of the servers, you will see the root cause for this problem (abbreviated and line-wrapped to fit the available width):

2011-07-03 19:10:43,725 INFO org.apache.hadoop.hbase.regionserver.HRegion:  
  Setting up tabledescriptor config now ...
2011-07-03 19:10:43,725 DEBUG org.apache.hadoop.hbase.regionserver.HRegion: 
  Instantiated testtable,,1309713043529.8ec02f811f75d2178ad098dc40b4efcf.
2011-07-03 19:10:43,839 ERROR org.apache.hadoop.hbase.regionserver.handler. 
OpenRegionHandler: Failed open of region=testtable,,1309713043529. 
8ec02f811f75d2178ad098dc40b4efcf.
java.io.IOException: java.lang.RuntimeException: 
  java.lang.ClassNotFoundException: com.hadoop.compression.lzo.LzoCodec
    at org.apache.hadoop.hbase.util.CompressionTest.testCompression
    at org.apache.hadoop.hbase.regionserver.HRegion.checkCompressionCodecs
    ...

The missing compression library triggers an error when the region server tries to open the region with the column family configured to use LZO compression.

Garbage collection/memory tuning

We discussed the common Java garbage collector settings in Garbage Collection Tuning. If enough memory is available, you should increase the region server heap up to at least 4 GB, preferably more like 8 GB. The recommended garbage collection settings ought to work for any heap size.

Also, if you are colocating the region server and MapReduce task tracker, be mindful of resource contention on the shared system. Edit the mapred-site.xml file to reduce the number of slots for nodes running with ZooKeeper, so you can allocate a good share of memory to the region server. Do the math on memory allocation, accounting for memory allocated to the task tracker and region server, as well as memory allocated for each child task (from mapred-site.xml and hadoop-env.sh) to make sure you are leaving enough memory for the region server but you’re not oversubscribing the system. Refer to the discussion in Requirements. You might want to consider separating MapReduce and HBase functionality if you are otherwise strapped for resources.

Lastly, HBase is also CPU-intensive. So even if you have enough memory, check your CPU utilization to determine if slots need to be reduced, using a simple Unix command such as top, or the monitoring described in Chapter 10.

Stability issues

In rare cases, a region server may shut itself down, or its process may be terminated unexpectedly. You can check the following:

  • Double-check that the JVM version is not 1.6.0u18 (which is known to have detrimental effects on running HBase processes).

  • Check the last lines of the region server logs—they probably have a message containing the word "aborting" (or "abort"), hopefully with a reason.

The latter is often an issue when the server is losing its ZooKeeper session. If that is the case, you can look into the following:

ZooKeeper problems

It is vital to ensure that ZooKeeper can perform its tasks as the coordination service for HBase. It is also important for the HBase processes to be able to communicate with ZooKeeper on a regular basis. Here is a checklist you can use to ensure that your do not run into commonly known problems with ZooKeeper:

Check that the region server and ZooKeeper machines do not swap

If machines start swapping, certain resources start to time out and the region servers will lose their ZooKeeper session, causing them to abort themselves. You can use Ganglia, for example, to graph the machines’ swap usage, or execute

$ vmstat 20

on the server(s) while running load against the cluster (e.g., a MapReduce job): make sure the "si" and "so" columns stay at 0. These columns show the amount of data swapped in or out. Also execute

$ free -m

to make sure that no swap space is used (the swap column should state 0). Also consider tuning the kernel’s swappiness value (/proc/sys/vm/swappiness) down to 5 or 10. This should help if the total memory allocation adds up to less than the box’s available memory, yet swap is happening anyway.

Check network issues

If the network is flaky, region servers will lose their connections to ZooKeeper and abort.

Check ZooKeeper machine deployment

ZooKeeper should never be codeployed with task trackers or data nodes. It is permissible to deploy ZooKeeper with the name node, secondary name node, and job tracker on small clusters (e.g., fewer than 40 nodes).

It is preferable to deploy just one ZooKeeper peer shared with the name node/job tracker than to deploy three that are collocated with other processes: the other processes will stress the machine and ZooKeeper will start timing out.

Check pauses related to garbage collection

Check the region server’s logfiles for a message containing "slept"; for example, you might see something like "We slept 65000ms instead of 10000ms". If you see this, it is probably due to either garbage collection pauses or heavy swapping. If they are garbage collection pauses, refer to the tuning options mentioned in Basic setup checklist.

Monitor slow disks

HBase does not degrade well when reading or writing a block on a data node with a slow disk. This problem can affect the entire cluster if the block holds data from the META region, causing compactions to slow and back up. Again, use monitoring to carefully keep these vital metrics under control.

“Could not obtain block” errors

Often, this is the xceiver problem, discussed in . Double-check the configured xceivers value. Also check the data node for log messages containing "exceeds the limit", which would indicate the xceiver issue. Check both the data node and region server log for "Too many open files" errors.



[125] As of this writing, the newly started master also has no web-based UI available. In other words, accessing the master info port on that server will not yield any results.

[126] There is an entry in the issue tracking system to rectify this inconvenience, which means it will improve over time. For now, you could use a script that reads the current master’s hostname from ZooKeeper and updates a DNS entry pointing a generic hostname to it.

[127] Note that some distributions for HBase do not require this, since they do not make use of the supplied start-hbase.sh script.

[128] There is an entry open in the issue tracking system to replace the parameter parsing with a more modern command-line parser. This will change the how the job is parameterized in the future.

[129] Hadoop uses a similar layout for the port assignments, but since it has more process types it also has additional ports. See this blog post for more information.

[130] A dedicated service you can use is Search Hadoop.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.223.10