Once a cluster is in operation, it may become necessary to change its size or add extra measures for failover scenarios, all while the cluster is in use. Data should be backed up and/or moved between distinct clusters. In this chapter, we will look how this can be done with minimal to no interruption.
This section introduces the various tasks necessary while operating a cluster, including adding and removing nodes.
You can stop an individual region server by running the following script in the HBase directory on the particular server:
$
./bin/hbase-daemon.sh stop regionserver
The region server will first close all regions and then shut itself down. On shutdown, its ephemeral node in ZooKeeper will expire. The master will notice that the region server is gone and will treat it as a crashed server: it will reassign the regions the server was carrying.
A downside to this method of stopping a region server is that regions could be offline for a good period of time—up to the configured ZooKeeper timeout period. Regions are closed in order: if there are many regions on the server, the first region to close may not be back online until all regions close and after the master notices the region server’s ZooKeeper znode being removed.
HBase 0.90.2 introduced the ability for a node to gradually shed its load and then shut itself down. This is accomplished with the graceful_stop.sh script. When you invoke this script without any parameters, you are presented with an explanation of its usage:
$
./bin/graceful_stop.sh
Usage: graceful_stop.sh [--config &conf-dir>] [--restart] [--reload] [--thrift] [--rest] &hostname> thrift If we should stop/start thrift before/after the hbase stop/start rest If we should stop/start rest before/after the hbase stop/start restart If we should restart after graceful stop reload Move offloaded regions back on to the stopped server debug Move offloaded regions back on to the stopped server hostname Hostname of server we are to stop
When you want to decommission a loaded region server, run the following:
$
./bin/graceful_stop.sh HOSTNAME
where HOSTNAME
is the host carrying the region
server you want to decommission.
The HOSTNAME
passed to graceful_stop.sh must match the hostname
that HBase is using to identify region servers. Check the list of
region servers in the master UI for how HBase is referring to each
server. It is usually hostname
, but
it can also be an FQDN, such as hostname.foobar.com
. Whatever HBase is
using, this is what you should pass the graceful_stop.sh decommission
script.
If you pass IP addresses, the script is not
(yet) smart enough to make a hostname
(or FQDN) out of it and will fail
when it checks if the server is currently running: the graceful
unloading of regions will not run.
The graceful_stop.sh script will move the regions off the decommissioned region server one at a time to minimize region churn. It will verify the region deployed in the new location before it moves the next region, and so on, until the decommissioned server is carrying no more regions.
At this point, the graceful_stop.sh script tells the region server to stop. The master will notice the region server gone but all regions will have already been redeployed, and because the region server went down cleanly, there will be no WALs to split.
You can also use the graceful_stop.sh script to restart a region server after the shutdown and move its old regions back into place. (You might do the latter to retain data locality.) A primitive rolling restart might be effected by running something like the following:
$
for i in `cat conf/regionservers|sort`; do ./bin/graceful_stop.sh
--restart --reload --debug $i; done &> /tmp/log.txt &
Tail the output of /tmp/log.txt to follow the script’s progress. The preceding code pertains to region servers only. Be sure to disable the load balancer before using this code.
You will need to perform the master update separately, and it is recommended that you do the rolling restart of the region servers. Here are some steps you can follow to accomplish a rolling restart:
Unpack your release, make sure of its configuration, and then rsync it across the cluster. If you are using version 0.90.2, patch it with HBASE-3744 and HBASE-3756.
Run hbck to ensure the cluster is consistent:
$
./bin/hbase hbck
$
./bin/hbase-daemon.sh stop master; ./bin/hbase-daemon.sh start master
$
echo "balance_switch false" | ./bin/hbase shell
Run the graceful_stop.sh script per region server. For example:
$
for i in `cat conf/regionservers|sort`; do ./bin/graceful_stop.sh
--restart --reload --debug $i; done &> /tmp/log.txt &
If you are running Thrift or REST servers
on the region server, pass the --thrift
or --rest
option, as per the script’s usage
instructions, shown earlier (i.e., run it without any commandline
options to get the instructions).
Restart the master again. This will clear out the dead servers list and reenable the balancer.
One of the major features HBase offers is built-in scalability. As the load on your cluster increases, you need to be able to add new servers to compensate for the new requirements. Adding new servers is a straightforward process and can be done for clusters running in any of the distribution modes, as explained in Distributed Mode.
It seems paradoxical to scale an HBase cluster in an all-local mode, even when all daemons are run in separate processes. However, pseudodistributed mode is the closest you can get to a real cluster setup, and during development or prototyping it is advantageous to be able to replicate a fully distributed setup on a single machine.
Since the processes have to share all the local resources, adding more processes obviously will not make your test cluster perform any better. In fact, pseudodistributed mode is really suitable only for a very small amount of data. However, it allows you to test most of the architectural features HBase has to offer.
For example, you can experiment with master failover scenarios, or regions being moved from one server to another. Obviously, this does not replace testing at scale on the real cluster hardware, with the load expected during production. However, it does help you to come to terms with the administrative functionality offered by the HBase Shell, for example.
Or you can use the administrative API as discussed in Chapter 5. Use it to develop tools that maintain schemas, or to handle shifting server loads. There are many applications for this in a production environment, and being able to develop and test a tool locally first is tremendously helpful.
You need to have set up a pseudodistributed installation before you can add any servers in psuedodistributed mode, and it must be running to use the following commands. They add to the existing processes, but do not take care of spinning up the local cluster itself.
Starting a local backup master process is accomplished by using the local-master-backup.sh script in the bin directory, like so:
$
./bin/local-master-backup.sh start 1
The number at the end of the command
signifies an offset that is added to the default ports of 60000
for
RPC and 60010
for the web-based
UI. In this example, a new master process would be started that
reads the same configuration files as usual, but would listen on
ports 60001
and 60011
, respectively.
In other words, the parameter is required and does not represent a number of servers to start, but where their ports are bound to. Starting more than one is also possible:
$
./bin/local-master-backup.sh start 1 3 5
This starts three backup masters on ports
60001
, 60003
, and 60005
for RPC, plus 60011
, 60013
, and 60015
for the web UIs.
Make sure you do not specify an offset
that could collide with a port that is already in use by another
process. For example, it is a bad idea to use 30
for the offset, since this would
result in a master RPC port on 60030
—which is usually already assigned
to the first region server as its UI port.
The start script also adds the offset to
the name of the logfile the process is using, thus differentiating
it from the logfiles used by the other local processes. For an
offset of 1
, it would set the
logfile name to be:
logs/hbase-${USER}-1-master-${HOSTNAME}.log
Note the added 1
in the name. Using an offset of, for
instance, 10
would add that
number into the logfile name.
Stopping the backup master(s) involves
the same command, but replacing the start
command with the aptly named
stop
, like so:
$
./bin/local-master-backup.sh stop 1
You need to specify the offsets of those backup masters you want to stop, and you have the option to stop only one, or any other number, up to all of the ones you started: whatever offset you specify is used to stop the master matching that number.
In a similar vein, you are allowed to start additional
local region servers. The script provided is called local-regionservers.sh, and it takes the
same parameters as the related local-master-backup.sh script: you
specify the command, that is, if you want to start
or stop
the server, and a list of
offsets.
The difference is that these offsets are
added to 60200
for RPC, and
60300
for the web UIs. For
example:
$
./bin/local-regionservers.sh start 1
This command will start an additional
region server using port 60201
for RPC, and 60301
for the web
UI. The logfile name has the offset added to it, and would result
in:
logs/hbase-${USER}-1-regionserver-${HOSTNAME}.log
The same concerns apply: you need to
ensure that you are specifying an offset that results in a port that
is not already in use by another process, or you will receive a
java.net.BindException: Address already in
use
exception—as expected.
Starting more than one region server is accomplished by adding more offsets:
$
./bin/local-regionservers.sh start 1 2 3
You do not have to start with an offset
of 1
. Since these are added to
the base port numbers, you are free to specify any offset you
prefer.
Stopping any additional region server
involves replacing the start
command with the stop
command:
$
./bin/local-regionservers.sh stop 1
This would stop the region server using
offset 1
, or ports 60201
and 60301
. If you specify the offsets of all
previously started region servers, they will all be stopped.
Operating an HBase cluster typically involves adding new servers over time. This is more common for the region servers, as they are doing all the heavy lifting. For the master, you have the option to start backup instances.
To prevent an HBase cluster master server from being the single point of failure, you can add backup masters. These are typically located on separate physical machines so that in a worst-case scenario, where the machine currently hosting the active master is failing, the system can fall back to a backup master.
The master process uses ZooKeeper to negotiate which is the currently active master: there is a dedicated ZooKeeper znode that all master processes race to create, and the first one to create it wins. This happens at startup and the winning process moves on to become the current master. All other machines simply loop around the znode check and wait for it to disappear—triggering the race again.
The /hbase/master znode is ephemeral, and is the same kind the region servers use to report their presence. When the master process that created the znode fails, ZooKeeper will notice the end of the session with that server and remove the znode accordingly, triggering the election process.
Starting a server on multiple machines requires that it is configured just like the rest of the HBase cluster (see Configuration for details). The master servers usually share the same configuration with the other servers in the cluster. Once you have confirmed that this is set up appropriately, you can run the following command on a server that is supposed to host the backup master:
$
./bin/hbase-daemon.sh start master
Assuming you already had a master
running, this command will bring up the new master to the point
where it waits for the znode to be removed.[125] If you want to start many masters in an automated
fashion and dedicate a specific server to host the current one,
while all the others are considered backup masters, you can add the
--backup
switch like so:
$
./bin/hbase-daemon.sh start master --backup
This forces the newly started master to
wait for the dedicated one—which is the one that was started using
the normal start-hbase.sh
script, or by the previous command but without
the --backup
parameter—to create
the /hbase/master znode in
ZooKeeper. Once this has happened, they move on to the master
election loop. Since now there is already a master present, they go
into idle mode as explained.
If you started more than one master,
and you experienced failovers, there is no easy way to tell which
master is currently active. This causes a slight problem in that
there is no way for you to know where the master’s web-based UI is
located. You will need to try the http://hostname:60010
URL on all
possible master servers to find the active one.[126]
Since HBase 0.90.x, there is also the option of creating a backup-masters file in the conf directory. This is akin to the regionservers file, listing one hostname per line that is supposed to start a backup master. For the example in Example Configuration, we could assume that we have three backup masters running on the ZooKeeper servers. In that case, the conf/backup-masters, would contain these entries:
zk1.foo.com zk2.foo.com zk3.foo.com
Adding these processes to the ZooKeeper machines is useful in a small cluster, as the master is more a coordinator in the overall design, and therefore does not need a lot of resources.
You should start as many backup masters as you feel satisfies your requirements to handle machine failures. There is no harm in starting too many, but having too few might leave you with a weak spot in the setup. This is mitigated by the use of monitoring solutions that report the first master to fail. You can take action by repairing the server and adding it back to the cluster. Overall, having two or three backup masters seems a reasonable number.
Note that the servers listed in backup-masters are what the backup master
processes are started on, while using the --backup
switch. This happens as the
start-hbase.sh script starts
the primary master, the region servers, and eventually the backup
masters. Alternatively, you can invoke the hbase-backup.sh script to initiate the
start of the backup masters.
Adding a new region server is one of the more common procedures you will perform on a cluster. The first thing you should do is to edit the regionservers file in the conf directory, to enable the launcher scripts to automat the server start and stop procedure.[127] Simply add a new line to the file specifying the hostname to add.
Once you have updated the file, you need to copy it across all machines in the cluster. You also need to ensure that the newly added machine has HBase installed, and that the configuration is current.
Then you have a few choices to start the new region server process. One option is to run the start-hbase.sh script on the master machine. It will skip all machines that have a process already running. Since the new machine fails this check, it will appropriately start the region server daemon.
Another option is to use the launcher script directly on the new server. This is done like so:
$
./bin/hbase-daemon.sh start regionserver
The region server process will start and register itself by creating a znode with its hostname in ZooKeeper. It subsequently joins the collective and is assigned regions.
When dealing with an HBase cluster, you also will deal with a lot of data, spread over one or more tables. Sometimes you may be required to move the data as a whole—or in parts—to either archive data for backup purposes or to bootstrap another cluster. The following describes the possible ways in which you can accomplish this task.
HBase ships with a handful of useful tools, two of which are the Import and Export MapReduce jobs. They can be used to write subsets, or an entire table, to files in HDFS, and subsequently load them again. They are contained in the HBase JAR file and you need the hadoop jar command to get a list of the tools:
$
hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar
An example program must be given as the first argument. Valid program names are: CellCounter: Count cells in HBase table completebulkload: Complete a bulk data load. copytable: Export a table from local cluster to peer cluster export: Write table data to HDFS. import: Import data written by Export. importtsv: Import data in TSV format. rowcounter: Count rows in HBase table verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.
Adding the export
program name then displays the options
for its usage:
$
hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar export
ERROR: Wrong number of arguments: 0 Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> [<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]] Note: -D properties will be applied to the conf used. For example: -D mapred.output.compress=true -D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec -D mapred.output.compression.type=BLOCK Additionally, the following SCAN properties can be specified to control/limit what is exported.. -D hbase.mapreduce.scan.column.family=<familyName>
You can see how you can supply various
options. The only two required parameters are tablename
and outputdir
. The others are optional and can be
added as required. [128] Table 12-1 lists the possible
options.
Name | Description |
tablename | The name of the table to export. |
outputdir | The location in HDFS to store the exported data. |
versions | The number of versions per column to store. Default is
1 . |
starttime | The start time, further limiting the versions saved. See
Introduction for details on the
setTimeRange() method that is
used. |
endtime | The matching end time for the time range of the scan used. |
regexp/prefix | When starting with ^
it is treated as a regular expression pattern, matching row
keys; otherwise, it is treated as a row key prefix. |
The regexp
parameter makes use of the RowFilter
and RegexStringComparator
, as explained in RowFilter, and the prefix
version uses the PrefixFilter
, discussed in PrefixFilter.
You do need to specify the parameters from
left to right, and you cannot omit any inbetween. In other words, if you want to
specify a row key filter, you must specify the versions, as well as the start and end
times. If you do not need them, set them to their minimum and maximum
values—for example, 0
for the start
and 9223372036854775807
(since the
time is given as a long
value) for
the end timestamp. This will ensure that the time range is not taken
into consideration.
Although you are supplying the HBase JAR
file, there are a few extra dependencies that need to be satisfied
before you can run this MapReduce job successfully. MapReduce
requires access to the following JAR files: zookeeper-xyz.jar
,
guava-xyz.jar
,
and google-collections-xyz.jar
.
You need to make them available in such a way that the MapReduce task
attempt has access to them. One way is to add them to HADOOP_CLASSPATH
variable in the $HADOOP_HOME/conf/hadoop-env.sh.
Running the command will start the MapReduce job and print out the progress:
$
hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar export
testtable /user/larsgeorge/backup-testtable
11/06/25 15:58:29 INFO mapred.JobClient: Running job: job_201106251558_0001 11/06/25 15:58:30 INFO mapred.JobClient: map 0% reduce 0% 11/06/25 15:58:52 INFO mapred.JobClient: map 6% reduce 0% 11/06/25 15:58:55 INFO mapred.JobClient: map 9% reduce 0% 11/06/25 15:58:58 INFO mapred.JobClient: map 15% reduce 0% 11/06/25 15:59:01 INFO mapred.JobClient: map 21% reduce 0% 11/06/25 15:59:04 INFO mapred.JobClient: map 28% reduce 0% 11/06/25 15:59:07 INFO mapred.JobClient: map 34% reduce 0% 11/06/25 15:59:10 INFO mapred.JobClient: map 40% reduce 0% 11/06/25 15:59:13 INFO mapred.JobClient: map 46% reduce 0% 11/06/25 15:59:16 INFO mapred.JobClient: map 53% reduce 0% 11/06/25 15:59:19 INFO mapred.JobClient: map 59% reduce 0% 11/06/25 15:59:22 INFO mapred.JobClient: map 65% reduce 0% 11/06/25 15:59:25 INFO mapred.JobClient: map 71% reduce 0% 11/06/25 15:59:28 INFO mapred.JobClient: map 78% reduce 0% 11/06/25 15:59:31 INFO mapred.JobClient: map 84% reduce 0% 11/06/25 15:59:34 INFO mapred.JobClient: map 90% reduce 0% 11/06/25 15:59:37 INFO mapred.JobClient: map 96% reduce 0% 11/06/25 15:59:40 INFO mapred.JobClient: map 100% reduce 0% 11/06/25 15:59:42 INFO mapred.JobClient: Job complete: job_201106251558_0001 11/06/25 15:59:42 INFO mapred.JobClient: Counters: 6 11/06/25 15:59:42 INFO mapred.JobClient: Job Counters 11/06/25 15:59:42 INFO mapred.JobClient: Rack-local map tasks=32 11/06/25 15:59:42 INFO mapred.JobClient: Launched map tasks=32 11/06/25 15:59:42 INFO mapred.JobClient: FileSystemCounters 11/06/25 15:59:42 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=3648 11/06/25 15:59:42 INFO mapred.JobClient: Map-Reduce Framework 11/06/25 15:59:42 INFO mapred.JobClient: Map input records=0 11/06/25 15:59:42 INFO mapred.JobClient: Spilled Records=0 11/06/25 15:59:42 INFO mapred.JobClient: Map output records=0
Once the job is complete, you can check the filesystem for the exported data. Use the hadoop dfs command (the lines have been shortened to fit horizontally):
$
hadoop dfs -lsr /user/larsgeorge/backup-testtable
drwxr-xr-x - ... 0 2011-06-25 15:58 _logs -rw-r--r-- 1 ... 114 2011-06-25 15:58 part-m-00000 -rw-r--r-- 1 ... 114 2011-06-25 15:58 part-m-00001 -rw-r--r-- 1 ... 114 2011-06-25 15:58 part-m-00002 -rw-r--r-- 1 ... 114 2011-06-25 15:58 part-m-00003 -rw-r--r-- 1 ... 114 2011-06-25 15:58 part-m-00004 -rw-r--r-- 1 ... 114 2011-06-25 15:58 part-m-00005 -rw-r--r-- 1 ... 114 2011-06-25 15:58 part-m-00006 -rw-r--r-- 1 ... 114 2011-06-25 15:58 part-m-00007 -rw-r--r-- 1 ... 114 2011-06-25 15:58 part-m-00008 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00009 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00010 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00011 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00012 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00013 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00014 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00015 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00016 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00017 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00018 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00019 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00020 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00021 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00022 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00023 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00024 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00025 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00026 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00027 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00028 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00029 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00030 -rw-r--r-- 1 ... 114 2011-06-25 15:59 part-m-00031
Each part-m-nnnnn file contains a piece of the exported data, and together they form the full backup of the table. You can now, for example, use the hadoop distcp command to move the directory from one cluster to another, and perform the import there.
Also, using the optional parameters, you can implement an incremental backup process: set the start time to the value of the last backup. The job will still scan the entire table, but only export what has been modified since.
It is usually OK to only export the last
version of a column value, but if you want a complete table backup, set
the number of versions
to 2147483647
, which means all of them.
Importing the data is the reverse operation. First we can get the
usage details by invoking the command without any parameters, and then
we can start the job with the tablename
and inputdir
(the directory containing the
exported files):
$
hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar import
ERROR: Wrong number of arguments: 0 Usage: Import <tablename> <inputdir>$
hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar import
testtable /user/larsgeorge/backup-testtable
11/06/25 17:09:48 INFO mapreduce.TableOutputFormat: Created table instance for testtable 11/06/25 17:09:48 INFO input.FileInputFormat: Total input paths to process : 32 11/06/25 17:09:49 INFO mapred.JobClient: Running job: job_201106251558_0003 11/06/25 17:09:50 INFO mapred.JobClient: map 0% reduce 0% 11/06/25 17:10:04 INFO mapred.JobClient: map 6% reduce 0% 11/06/25 17:10:07 INFO mapred.JobClient: map 12% reduce 0% 11/06/25 17:10:10 INFO mapred.JobClient: map 18% reduce 0% 11/06/25 17:10:13 INFO mapred.JobClient: map 25% reduce 0% 11/06/25 17:10:16 INFO mapred.JobClient: map 31% reduce 0% 11/06/25 17:10:19 INFO mapred.JobClient: map 37% reduce 0% 11/06/25 17:10:22 INFO mapred.JobClient: map 43% reduce 0% 11/06/25 17:10:25 INFO mapred.JobClient: map 50% reduce 0% 11/06/25 17:10:28 INFO mapred.JobClient: map 56% reduce 0% 11/06/25 17:10:31 INFO mapred.JobClient: map 62% reduce 0% 11/06/25 17:10:34 INFO mapred.JobClient: map 68% reduce 0% 11/06/25 17:10:37 INFO mapred.JobClient: map 75% reduce 0% 11/06/25 17:10:40 INFO mapred.JobClient: map 81% reduce 0% 11/06/25 17:10:43 INFO mapred.JobClient: map 87% reduce 0% 11/06/25 17:10:46 INFO mapred.JobClient: map 93% reduce 0% 11/06/25 17:10:49 INFO mapred.JobClient: map 100% reduce 0% 11/06/25 17:10:51 INFO mapred.JobClient: Job complete: job_201106251558_0003 11/06/25 17:10:51 INFO mapred.JobClient: Counters: 6 11/06/25 17:10:51 INFO mapred.JobClient: Job Counters 11/06/25 17:10:51 INFO mapred.JobClient: Launched map tasks=32 11/06/25 17:10:51 INFO mapred.JobClient: Data-local map tasks=32 11/06/25 17:10:51 INFO mapred.JobClient: FileSystemCounters 11/06/25 17:10:51 INFO mapred.JobClient: HDFS_BYTES_READ=3648 11/06/25 17:10:51 INFO mapred.JobClient: Map-Reduce Framework 11/06/25 17:10:51 INFO mapred.JobClient: Map input records=0 11/06/25 17:10:51 INFO mapred.JobClient: Spilled Records=0 11/06/25 17:10:51 INFO mapred.JobClient: Map output records=0
You can also use the Import job to store the data in a different table. As long as it has the same schema, you are free to specify a different table name on the command line.
The data from the exported files was read by the MapReduce job and stored in the specified table. Finally, this Export/Import combination is per-table only. If you have more than one table, you need to run them separately.
Another supplied tool is CopyTable, which is primarily designed to bootstrap cluster replication. You can use is it to make a copy of an existing table from the master cluster to the slave cluster. Here are its command-line options:
$
hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar copytable
Usage: CopyTable [--rs.class=CLASS] [--rs.impl=IMPL] [--starttime=X] [--endtime=Y] [--new.name=NEW] [--peer.adr=ADR] <tablename> Options: rs.class hbase.regionserver.class of the peer cluster specify if different from current cluster rs.impl hbase.regionserver.impl of the peer cluster starttime beginning of the time range without endtime means from starttime to forever endtime end of the time range new.name new table's name peer.adr Address of the peer cluster given in the format hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent families comma-seperated list of families to copy Args: tablename Name of the table to copy Examples: To copy 'TestTable' to a cluster that uses replication for a 1 hour window: $ bin/hbase org.apache.hadoop.hbase.mapreduce.CopyTable --rs.class=org.apache.hadoop.hbase.ipc.ReplicationRegionInterface --rs.impl=org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer --starttime=1265875194289 --endtime=1265878794289 --peer.adr=server1,server2,server3:2181:/hbase TestTable
CopyTable comes with an example command at the end of the usage output, which you can use to set up your own copy process. The parameters are all documented in the output too, and you may notice that you also have the start and end time options, which you can use the same way as explained earlier for the Export/Import tool.
In addition, you can use the
families
parameter to limit the number of column
families that are included in the copy. The copy only considers the
latest version of a column value. Here is an example of copying a table
within the same cluster:
$
hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar copytable
--new.name=testtable3 testtable
11/06/26 15:20:07 INFO mapreduce.TableOutputFormat: Created table instance for testtable3 11/06/26 15:20:07 INFO mapred.JobClient: Running job: job_201106261454_0003 11/06/26 15:20:08 INFO mapred.JobClient: map 0% reduce 0% 11/06/26 15:20:19 INFO mapred.JobClient: map 6% reduce 0% 11/06/26 15:20:22 INFO mapred.JobClient: map 12% reduce 0% 11/06/26 15:20:25 INFO mapred.JobClient: map 18% reduce 0% 11/06/26 15:20:28 INFO mapred.JobClient: map 25% reduce 0% 11/06/26 15:20:31 INFO mapred.JobClient: map 31% reduce 0% 11/06/26 15:20:34 INFO mapred.JobClient: map 37% reduce 0% 11/06/26 15:20:37 INFO mapred.JobClient: map 43% reduce 0% 11/06/26 15:20:40 INFO mapred.JobClient: map 50% reduce 0% 11/06/26 15:20:43 INFO mapred.JobClient: map 56% reduce 0% 11/06/26 15:20:46 INFO mapred.JobClient: map 62% reduce 0% 11/06/26 15:20:49 INFO mapred.JobClient: map 68% reduce 0% 11/06/26 15:20:52 INFO mapred.JobClient: map 75% reduce 0% 11/06/26 15:20:55 INFO mapred.JobClient: map 81% reduce 0% 11/06/26 15:20:58 INFO mapred.JobClient: map 87% reduce 0% 11/06/26 15:21:01 INFO mapred.JobClient: map 93% reduce 0% 11/06/26 15:21:04 INFO mapred.JobClient: map 100% reduce 0% 11/06/26 15:21:06 INFO mapred.JobClient: Job complete: job_201106261454_0003 11/06/26 15:21:06 INFO mapred.JobClient: Counters: 5 11/06/26 15:21:06 INFO mapred.JobClient: Job Counters 11/06/26 15:21:06 INFO mapred.JobClient: Launched map tasks=32 11/06/26 15:21:06 INFO mapred.JobClient: Data-local map tasks=32 11/06/26 15:21:06 INFO mapred.JobClient: Map-Reduce Framework 11/06/26 15:21:06 INFO mapred.JobClient: Map input records=0 11/06/26 15:21:06 INFO mapred.JobClient: Spilled Records=0 11/06/26 15:21:06 INFO mapred.JobClient: Map output records=0
The copy process requires for the target table to exist: use the shell to get the definition of the source table, and create the target table using the same. You can omit the families you do not include in the copy command.
The example also uses the optional new.name
parameter, which allows you to
specify a table name that is different from the original. The copy of
the table is stored on the same cluster, since the peer.adr
parameter was not used.
Note that for both the CopyTable and Export/Import tools you can only rely on row-level atomicity. In other words, if you export or copy a table while it is being modified by other clients, you may not be able to tell exactly what has been copied to the new location.
Especially when dealing with more than one table, such as the secondary indexes, you need to ensure from the client side that you have copied a consistent view of all tables. One way to handle this is to use the start and end time parameters. This will allow you to run a second update job that only addresses the recently updated data.
HBase includes several methods of loading data into tables. The
most straightforward method is to either use the TableOutputFormat
class from a MapReduce job
(see Chapter 7), or use the normal client APIs;
however, these are not always the most efficient methods.
Another way to efficiently load large amounts of data is via a bulkimport. The bulk load feature uses a MapReduce job to output table data in HBase’s internal data format, and then directly loads the data files into a running cluster. This feature uses less CPU and network resources than simply using the HBase API.
A problem with loading data into HBase is that often this must be done in short bursts, but with those bursts being potentially very large. This will put additional stress on your cluster, and might overload it subsequently. Bulk imports are a way to alleviate this problem by not causing unnecessary churn on region servers.
The HBase bulk load process consists of two main steps:
The first step of a bulk load is to generate HBase
data files from a MapReduce job using HFileOutputFormat
. This output format
writes out data in HBase’s internal storage format so that it
can be later loaded very efficiently into the cluster.
In order to function efficiently,
HFileOutputFormat
must be
configured such that each output HFile fits within a single
region: jobs whose output will be bulk-loaded into HBase use
Hadoop’s TotalOrderPartitioner
class to
partition the map output into disjoint ranges of the key space,
corresponding to the key ranges of the regions in the
table.
HFileOutputFormat
includes a convenience function, configureIncrementalLoad()
, which
automatically sets up a TotalOrderPartitioner
based on the
current region boundaries of a table.
After the data has been prepared using HFileOutputFormat
, it is loaded into
the cluster using the completebulkload
tool. This tool
iterates through the prepared data files, and for each one it
determines the region the file belongs to. It then contacts the
appropriate region server which adopts the HFile, moving it into
its storage directory and making the data available to
clients.
If the region boundaries have changed
during the course of bulk load preparation, or between the
preparation and completion steps, the completebulkload
tool will
automatically split the data files into pieces corresponding to
the new boundaries. This process is not optimally efficient, so
you should take care to minimize the delay between preparing a
bulk load and importing it into the cluster, especially if other
clients are simultaneously loading data through other
means.
This mechanism makes use of the
merge read already in place on the servers to
scan memstores and on-disk file stores for KeyValue
entries of a row. Adding the newly
generated files from the bulk import adds an additional file to
handle—similar to new store files generated by a memstore
flush.
What is even more important is that all of
these files are sorted by the timestamps the matching KeyValue
instances have (see Read Path). In other words, you can bulk-import
newer and older versions of
a column value, while the region servers sort them appropriately. The
end result is that you immediately have a consistent and coherent view
of the stored rows.
HBase ships with a command-line tool called importtsv
which, when given files containing
data in tab-separated value (TSV) format, can
prepare this data for bulk import into HBase. This tool uses the HBase
put()
API by default to insert data into HBase one row at
a time.
Alternatively, you can use the importtsv.bulk.output
option so that
importtsv
will instead generate
files using HFileOutputFormat
.
These can subsequently be bulk-loaded into HBase. Running the tool
with no arguments prints brief usage information:
$
hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar importtsv
Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir> Imports the given input directory of TSV data into the specified table. The column names of the TSV data must be specified using the -Dimporttsv.columns option. This option takes the form of comma-separated column names, where each column name is either a simple column family, or a columnfamily:qualifier. The special column name HBASE_ROW_KEY is used to designate that this column should be used as the row key for each imported record. You must specify exactly one column to be the row key, and you must specify a column name for every column that exists in the input data. By default importtsv will load data directly into HBase. To instead generate HFiles of data to prepare for a bulk data load, pass the option: -Dimporttsv.bulk.output=/path/for/output Note: if you do not use this option, then the target table must already exist in HBase Other options that may be specified with -D include: -Dimporttsv.skip.bad.lines=false - fail if encountering an invalid line '-Dimporttsv.separator=|' - eg separate on pipes instead of tabs -Dimporttsv.timestamp=currentTimeAsLong - use the specified timestamp for the import -Dimporttsv.mapper.class=my.Mapper - A user-defined Mapper to use instead of org.apache.hadoop.hbase.mapreduce.TsvImporterMapper
The usage information is self-explanatory, so you simply need to run the tool, while specifying the option it requires. It will start a job that reads the files from HDFS and prepare the bulk import store files.
After a data import has been prepared, either by using the
importtsv
tool with the importtsv.bulk.output
option, or by some
other MapReduce job using the HFileOutputFormat
, the completebulkload
tool is used to import the
data into the running cluster.
The completebulkload
tool simply takes the
output path where importtsv
or your
MapReduce job put its results, and the table name to import into. For
example:
$
hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar completebulkload
-conf ~/my-hbase-site.xml /user/larsgeorge/myoutput mytable
The optional -conf
config-file
parameter can be used to specify a file
containing the appropriate HBase parameters, if not supplied already
on the CLASSPATH
. In addition, the
CLASSPATH
must contain the
directory that has the ZooKeeper configuration file, if ZooKeeper is
not managed by HBase.
If the target table does not already exist in HBase, this tool will create it for you.
The completebulkload
tool completes quickly,
after which point the new data will be visible in the cluster.
Although the importtsv
tool is
useful in many cases, advanced users may want to generate data
programatically, or import data from other formats. To get started doing so, peruse the
ImportTsv.java
class, and check the JavaDoc for HFileOutputFormat
.
The import step of the bulk load can also be done from
within your code: see the LoadIncrementalHFiles
class for
more information.
The architecture of the HBase replication feature was discussed in Replication. Here we will look at what is required to enable replication of a table between two clusters.
The first step is to edit the hbase-site.xml configuration file in the conf directory to turn the feature on for the entire cluster:
<configuration> <property> <name>hbase.zookeeper.quorum</name> <value>zk1.foo.com,zk2.foo.com,zk3.foo.com</value> </property> <property> <name>hbase.rootdir</name> <value>hdfs://master.foo.com:8020/hbase</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.replication</name> <value>true</value> </property> </configuration>
This example adds the new hbase.replication
property, where setting it
to true
enables replication support.
This puts certain low-level features into place that are required.
Otherwise, you will not see any changes to your cluster setup and
functionality. Do not forget to copy the changed configuration file to
all machines in your cluster, and to restart the servers.
Now you can either alter an existing
table—you need to disable it before you can do that—or create a new one
with the replication scope set to 1
(also see Column Families
for its value range):
hbase(main):001:0>
create 'testtable1', 'colfam1'
hbase(main):002:0>
disable 'testtable1'
hbase(main):003:0>
alter 'testtable1', NAME => 'colfam1',
REPLICATION_SCOPE => '1'
hbase(main):004:0>
enable 'testtable1'
hbase(main):005:0>
create 'testtable2', { NAME => 'colfam1',
REPLICATION_SCOPE => 1}
Setting the scope further prepares the master cluster for its role as the replication source. Now it is time to add a slave—here also called a peer—cluster and start the replication:
hbase(main):006:0>
add_peer '1', 'slave-zk1:2181:/hbase'
hbase(main):007:0>
start_replication
The first command adds the ZooKeeper quorum details for the peer cluster so that modifications can be shipped to it subsequently. The second command starts the actual shipping of modification records to the peer cluster. For this to work as expected, you need to make sure that you have already created an identical copy of the table on the peer cluster: it can be empty, but it needs to have the same schema definition and table name.
For development and prototyping, you can use the approach of running two local clusters, described in Coexisting Clusters, and configure the peer address to point to the second local cluster:
hbase(main):006:0>
add_peer '1', 'localhost:2181:/hbase-2'
There is one more change you need to apply to the hbase-site.xml file in the conf.2 directory on the secondary cluster:
<property> <name>hbase.replication</name> <value>true</value> </property>
Adding this flag will allow for it to act as a peer for the master replication cluster.
Since replication is now enabled, you can add data into the master cluster, and within a few moments see the data appear in the peer cluster table with the same name.
No further changes need to be applied to the peer cluster. The replication feature uses the normal client API on the peer cluster to apply the changes locally. Removing a peer and stopping the translation is equally done, using the reverse commands:
hbase(main):008:0>
stop_replication
hbase(main):009:0>
remove_peer '1'
Note that stopping the replication will still complete the shipping of all queued modifications to the peer, but all further processing is ended.
Finally, verifying the replicated data on two clusters is easy to
do in the shell when looking only at a few rows, but doing a systematic
comparison requires more computing power. This is why the
Verify Replication tool is provided; it is
available as verifyrep
using the
hadoop jar command once more:
$
hadoop jar $HBASE_HOME/hbase-0.91.0-SNAPSHOT.jar verifyrep
Usage: verifyrep [--starttime=X] [--stoptime=Y] [--families=A] <peerid> <tablename> Options: starttime beginning of the time range without endtime means from starttime to forever stoptime end of the time range families comma-separated list of families to copy Args: peerid Id of the peer used for verification, must match the one given for replication tablename Name of the table to verify Examples: To verify the data replicated from TestTable for a 1 hour window with peer #5 $ bin/hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication --starttime=1265875194289 --stoptime=1265878794289 5 TestTable
T has to be run on the master cluster and needs to be provided with a peer ID (the one provided when establishing a replication stream) and a table name. Other options let you specify a time range and specific families.
On top of the operational and data tasks, there are additional tasks you may need to perform when setting up or running a test or production HBase cluster. We will discuss these tasks in the following subsections.
For testing purposes, it is useful to be able to run HBase in two separate instances, but on the same physical machine. This can be helpful, for example, when you want to prototype replication on your development machine.
Running multiple instances of HBase, including any of its daemons, on a distributed cluster is not recommended, and is not tested at all. None of HBase’s processes is designed to share the same server in production, nor is doing so part of its design. Be warned!
Presuming you have set up a local installation of HBase, as described in Chapter 2, and configured it to run in standalone mode, you can first make a copy of the configuration directory like so:
$
cd $HBASE_HOME
$
cp -pR conf conf.2
The next step is to edit the hbase-env.sh file in the new conf.2 directory:
# Where log files are stored. $HBASE_HOME/logs by default. export HBASE_LOG_DIR=${HBASE_HOME}/logs.2 # A string representing this instance of hbase. $USER by default. export HBASE_IDENT_STRING=${USER}.2
This is required to have no overlap in local filenames. Lastly, you need to adjust the hbase-site.xml file:
<configuration> <property> <name>hbase.zookeeper.quorum</name> <value>localhost</value> </property> <property> <name>hbase.rootdir</name> <value>hdfs://localhost:8020/hbase-2</value> </property> <property> <name>hbase.tmp.dir</name> <value>/tmp/hbase-2-${user.name}</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>zookeeper.znode.parent</name> <value>/hbase-2</value> </property> <property> <name>hbase.master.port</name> <value>60100</value> </property> <property> <name>hbase.master.info.port</name> <value>60110</value> </property> <property> <name>hbase.regionserver.port</name> <value>60120</value> </property> <property> <name>hbase.regionserver.info.port</name> <value>60130</value> </property> </configuration>
The highlighted properties contain the required changes. You need to assign all ports differently so that you have a clear distinction between the two cluster instances. Operating the secondary cluster requires specification of the new configuration directory:
$
HBASE_CONF_DIR=conf.2 bin/start-hbase.sh
$
HBASE_CONF_DIR=conf.2 ./bin/hbase shell
$
HBASE_CONF_DIR=conf.2 ./bin/stop-hbase.sh
The first command starts the secondary local cluster, the middle one starts a shell connecting to it, and the last command stops the cluster.
The HBase processes, when started, bind to two separate ports: one for the RPCs, and another for the web-based UI. This applies to both the master and each region server. Since you are running each process type on one machine only, you need to consider two ports per server type—unless you run in a nondistributed setup. Table 12-2 lists the default ports.
In addition, if you want to configure a firewall, for example, you also have to ensure that the ports for the Hadoop subsystems, that is, MapReduce and HDFS, are configured so that the HBase daemons have access to them.[129]
By default, HBase ships with a configuration which sets the
log level of its processes to DEBUG
,
which is useful if you are in the installation and prototyping phase. It
allows you to search through the files in case something goes wrong, as
discussed in Analyzing the Logs.
For a production environment, you can switch to a less verbose
level, such as INFO
, or even WARN
. This is accomplished by editing
the log4j.properties file
in the conf
directory. Here is an example with the modified level for the HBase
classes:
...
# Custom Logging levels
log4j.logger.org.apache.zookeeper=INFO
#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
log4j.logger.org.apache.hadoop.hbase=INFO
# Make these two classes INFO-level. Make them DEBUG to see more zk debug.
log4j.logger.org.apache.hadoop.hbase.zookeeper.ZKUtil=INFO
log4j.logger.org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher=INFO
#log4j.logger.org.apache.hadoop.dfs=DEBUG
# Set this class to log INFO only otherwise its OTT
...
This file needs to be copied to all servers, which need to be restarted subsequently for the changes to take effect.
Another option to either temporarily change the level, or when you have made changes to the properties file and want to delay the restart, use the web-based UIs and their log-level page. This is discussed and shown in Shared Pages. Since the UI log-level change is only affecting the server it is loaded from, you will need to adjust the level separately for every server in your cluster.
This section deals with the things you can do to heal a cluster that does not work as expected.
HBase comes with a tool called hbck which
is implemented by the HBaseFsck
class. It provides various command-line switches that influence its
behavior. You can get a full list of its usage information by running it
with -h
:
$
./bin/hbase hbck -h
Unknown command line option : -h Usage: fsck [opts] where [opts] are: -details Display full report of all regions. -timelag {timeInSeconds} Process only regions that have not experienced any metadata updates in the last {{timeInSeconds} seconds. -fix Try to fix some of the errors. -sleepBeforeRerun {timeInSeconds} Sleep this many seconds before checking if the fix worked if run with -fix -summary Print only summary of the tables and status.
The details
switch prints out the most information
when running hbck, while summary
prints out the least. No option at all
invokes the normal output detail, for example:
$
./bin/hbase hbck
Number of Tables: 40 Number of live region servers: 19 Number of dead region servers: 0 Number of empty REGIONINFO_QUALIFIER rows in .META.: 0 Summary: -ROOT- is okay. Number of regions: 1 Deployed on: host1.foo.com:60020 .META. is okay. Number of regions: 1 Deployed on: host4.foo.com:60020 testtable is okay. Number of regions: 15 Deployed on: host7.foo.com:60020 host14.foo.com:60020 ... testtable2 is okay. Number of regions: 1 Deployed on: host11.foo.com:60020 0 inconsistencies detected. Status: OK
The extra parameters, such as timelag
and sleepBeforeRerun
, are explained in the usage
details in the preceding code. They allow you to check subsets of data,
as well as delay the eventual re-check run, to report any remaining
issues.
Once started, the hbck tool
will scan the .META.
table to gather
all the pertinent information it holds. It also scans the HDFS root
directory HBase is configured to use. It then proceeds to compare the
collected details to report on inconsistencies and integrity
issues.
This check applies to a region on its
own. It is checked whether the region is listed in .META.
and exists in HDFS, as well as if
it is assigned to exactly one region server.
This concerns a table as a whole. It compares the regions with the table details to find missing regions, or those that have holes or overlaps in their row key ranges.
The fix
option allows you to repair a list of these issues. Over time, this
feature is going to be enhanced so that more problems can be fixed. As
of this writing, the fix
option can
handle the following problems:
Assign .META.
to a single
new server if it is unassigned.
Reassign .META.
to a single
new server if it is assigned to multiple
servers.
Assign a user table region to a new server if it is unassigned.
Reassign a user table region to a single new server if it is assigned to multiple servers.
Reassign a user table region to a new server if the current
server does not match what the .META.
table refers to.
Be aware that sometimes hbck reports inconsistencies which are
temporal, or transitional only. For example, when regions are
unavailable for short periods of time during the internal housekeeping
process, hbck will report those as
inconsistencies too. Add the details
switch to get more information on
what is going on and rerun the tool a few times to confirm a
permanent problem.
In rare cases it is necessary to directly access the logfiles created by the various HBase processes. They contain a mix of messages, some of which are printed for informational purposes and others representing internal warnings or error messages. While some of these messages are temporary, and do not mean that there is a permanent issue with the cluster, others state a system failure and are printed just before the process is forcefully ended.
Table 12-3
lists the various default HBase, ZooKeeper, and Hadoop
logfiles. user
is replaced with the
user ID the process is started by, and hostname
is the name of the machine the
process is running on.
Server type | Logfile |
HBase Master | $HBASE_HOME/logs/hbase-<user>-master-<hostname>.log |
HBase RegionServer | $HBASE_HOME/logs/hbase-<user>-regionserver-<hostname>.log |
ZooKeeper | Console log output only |
NameNode | $HADOOP_HOME/logs/hadoop-<user>-namenode-<hostname>.log |
DataNode | $HADOOP_HOME/logs/hadoop-<user>-datanode-<hostname>.log |
JobTracker | $HADOOP_HOME/logs/hadoop-<user>-jobtracker-<hostname>.log |
TaskTracker | $HADOOP_HOME/logs/hadoop-<user>-jobtracker-<hostname>.log |
Obviously, this can be modified by editing the configuration files for either of these systems.
When you start analyzing the logfiles, it is useful to begin with the master logfile first, as it acts as the coordinator service of the entire cluster. It contains informational messages, such as the balancer printing out its background processing:
2011-06-03 09:12:55,448 INFO org.apache.hadoop.hbase.master.HMaster: balance hri=testtable,mykey1,1308610119005.dbccd6310dd7326f28ac09b60170a84c., src=host1.foo.com,60020,1308239280769, dest=host3.foo.com,60020,1308239274789
or when a region is split on a region server, duly reporting back the event:
2011-06-03 09:12:55,344 INFO org.apache.hadoop.hbase.master.ServerManager: Received REGION_SPLIT: testtable,myrowkey5,1308647333895.0b8eeffeba8e2168dc7c06148d93dfcf.: Daughters; testtable,myrowkey5,1308647572030.bc7cc0055a3a4fd7a5f56df6f27a696b., testtable,myrowkey9,1308647572030.87882799b2d58020990041f588b6b31c. from host5.foo.com,60020,1308239280769
Many of these messages at the INFO
level show you how your cluster evolved
over time. You can use them to go back in time and see what happened
earlier on. Typically the master is simply printing these messages on a
regular basis, so when you look at specific time ranges you will see the
common patterns.
If something fails, though, these patterns
will change: the log messages are interrupted by others at the WARN
(short for warning) or even ERROR
level. You should find those patterns
and reset just before the common pattern was disturbed.
An interesting metric you can use as a gauge for where to start is discussed in JVM Metrics, under System Event Metrics: the error log event metric. It gives you a graph showing you where the server(s) started logging an increasing number of error messages in the logfiles. Find the time before this graph started rising and use it as the entry point into your logs.
Once you have found where the processes began
logging ERROR
level messages, you
should be able to identify the root cause. A lot of subsequent messages
are often collateral damage: they are a side effect of the original
problem.
Not all of the logged messages that indicate a pattern change are using an elevated log level. Here is an example of a region that has been in the transition table for too long:
2011-06-21 09:19:20,218 INFO org.apache.hadoop.hbase.master.AssignmentManager: Regions in transition timed out: testtable,myrowkey123,1308610119005.dbccd6310dd7326f28ac09b60170a84c. state=CLOSING, ts=1308647575449 2011-06-21 09:19:20,218 INFO org.apache.hadoop.hbase.master.AssignmentManager: Region has been CLOSING for too long, this should eventually complete or the server will expire, doing nothing
The message is logged on the info level because the system will eventually recover from it. But it could indicate the beginning of larger problems—for example, when the servers start to get overloaded. Make sure you reset your log analysis to where the normal patterns are disrupted.
Once you have investigated the master logs, move on to the region server logs. Use the monitoring metrics to see if any of them shows an increase in log messages, and scrutinize that server first.
If you find an error message, use the online resources to search[130]for the message in the public mailing lists (see http://hbase.apache.org/mail-lists.html). There is a good chance that this has been reported or discussed before, especially with recurring issues, such as the mentioned server overload scenarios: even errors follow a pattern at times.
Here is an example error message, caused by session loss between the region server and the ZooKeeper quorum:
2011-06-09 15:28:34,836 ERROR org.apache.hadoop.hbase.regionserver.HRegionServer: ZooKeeper session expired 2011-06-09 15:28:34,837 ERROR org.apache.hadoop.hbase.regionserver.HRegionServer: java.io.IOException: Server not running, aborting ...
You can search in the logfiles for
occurrences of "ERROR"
and "aborting"
to find clues about the reasons the
server in question stopped working.
The following gives you a list to run through when you encounter problems with your cluster setup.
This section provides a checklist of things you should confirm for your cluster, before going into a deeper analysis in case of problems or performance issues.
The ulimit -n for the DataNode processes and the HBase processes should be set high. To verify the current ulimit setting you can also run the following:
$
cat /proc/<PID of JVM>/limits
You should see that the limit on the number of files is set reasonably high—it is safest to just bump this up to 32000, or even more. File handles and process limits has the full details on how to configure this value.
The DataNodes should be configured with a large number of transceivers—at least 4,096, but potentially more. There’s no particular harm in setting it up to as high as 16,000 or so. See Datanode handlers for more information.
Compression should almost always be on, unless you are storing precompressed data. Compression discusses the details. Make sure that you have verified the installation so that all region servers can load the required compression libraries. If not, you will see errors like this:
hbase(main):007:0>
create 'testtable', { NAME => 'colfam1', COMPRESSION => 'LZO' }
ERROR: org.apache.hadoop.hbase.client.NoServerForRegionException: No server address listed in .META. for region testtable2,,1309713043529.8ec02f811f75d2178ad098dc40b4efcf.
In the logfiles of the servers, you will see the root cause for this problem (abbreviated and line-wrapped to fit the available width):
2011-07-03 19:10:43,725 INFO org.apache.hadoop.hbase.regionserver.HRegion: Setting up tabledescriptor config now ... 2011-07-03 19:10:43,725 DEBUG org.apache.hadoop.hbase.regionserver.HRegion: Instantiated testtable,,1309713043529.8ec02f811f75d2178ad098dc40b4efcf. 2011-07-03 19:10:43,839 ERROR org.apache.hadoop.hbase.regionserver.handler. OpenRegionHandler: Failed open of region=testtable,,1309713043529. 8ec02f811f75d2178ad098dc40b4efcf. java.io.IOException: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.hadoop.compression.lzo.LzoCodec at org.apache.hadoop.hbase.util.CompressionTest.testCompression at org.apache.hadoop.hbase.regionserver.HRegion.checkCompressionCodecs ...
The missing compression library triggers an error when the region server tries to open the region with the column family configured to use LZO compression.
We discussed the common Java garbage collector settings in Garbage Collection Tuning. If enough memory is available, you should increase the region server heap up to at least 4 GB, preferably more like 8 GB. The recommended garbage collection settings ought to work for any heap size.
Also, if you are colocating the region server and MapReduce task tracker, be mindful of resource contention on the shared system. Edit the mapred-site.xml file to reduce the number of slots for nodes running with ZooKeeper, so you can allocate a good share of memory to the region server. Do the math on memory allocation, accounting for memory allocated to the task tracker and region server, as well as memory allocated for each child task (from mapred-site.xml and hadoop-env.sh) to make sure you are leaving enough memory for the region server but you’re not oversubscribing the system. Refer to the discussion in Requirements. You might want to consider separating MapReduce and HBase functionality if you are otherwise strapped for resources.
Lastly, HBase is also CPU-intensive. So even if you have enough memory, check your CPU utilization to determine if slots need to be reduced, using a simple Unix command such as top, or the monitoring described in Chapter 10.
In rare cases, a region server may shut itself down, or its process may be terminated unexpectedly. You can check the following:
The latter is often an issue when the server is losing its ZooKeeper session. If that is the case, you can look into the following:
It is vital to ensure that ZooKeeper can perform its tasks as the coordination service for HBase. It is also important for the HBase processes to be able to communicate with ZooKeeper on a regular basis. Here is a checklist you can use to ensure that your do not run into commonly known problems with ZooKeeper:
If machines start swapping, certain resources start to time out and the region servers will lose their ZooKeeper session, causing them to abort themselves. You can use Ganglia, for example, to graph the machines’ swap usage, or execute
$
vmstat 20
on the server(s) while running load
against the cluster (e.g., a MapReduce job): make sure the
"si"
and "so"
columns stay at 0
. These columns show the amount of
data swapped in or out. Also execute
$
free -m
to make sure that no swap space is
used (the swap column should state 0
). Also consider tuning the
kernel’s swappiness value (/proc/sys/vm/swappiness
) down to
5
or 10
. This should help if the total
memory allocation adds up to less than the box’s available
memory, yet swap is happening anyway.
If the network is flaky, region servers will lose their connections to ZooKeeper and abort.
ZooKeeper should never be codeployed with task trackers or data nodes. It is permissible to deploy ZooKeeper with the name node, secondary name node, and job tracker on small clusters (e.g., fewer than 40 nodes).
It is preferable to deploy just one ZooKeeper peer shared with the name node/job tracker than to deploy three that are collocated with other processes: the other processes will stress the machine and ZooKeeper will start timing out.
Check the region server’s logfiles
for a message containing "slept"
; for example, you might see
something like "We slept 65000ms
instead of 10000ms"
. If you see this, it is probably
due to either garbage collection pauses or heavy swapping. If
they are garbage collection pauses, refer to the tuning
options mentioned in Basic setup checklist.
HBase does not degrade well when reading or writing a block on a data node with a slow disk. This problem can affect the entire cluster if the block holds data from the META region, causing compactions to slow and back up. Again, use monitoring to carefully keep these vital metrics under control.
Often, this is the xceiver problem,
discussed in . Double-check the configured xceivers
value. Also check the data node for log messages containing "exceeds the limit"
, which would indicate
the xceiver issue. Check both the data node and region server log
for "Too many open files"
errors.
[125] As of this writing, the newly started master also has no web-based UI available. In other words, accessing the master info port on that server will not yield any results.
[126] There is an entry in the issue tracking system to rectify this inconvenience, which means it will improve over time. For now, you could use a script that reads the current master’s hostname from ZooKeeper and updates a DNS entry pointing a generic hostname to it.
[127] Note that some distributions for HBase do not require this, since they do not make use of the supplied start-hbase.sh script.
[128] There is an entry open in the issue tracking system to replace the parameter parsing with a more modern command-line parser. This will change the how the job is parameterized in the future.
[129] Hadoop uses a similar layout for the port assignments, but since it has more process types it also has additional ports. See this blog post for more information.
[130] A dedicated service you can use is Search Hadoop.
3.137.223.10