It is quite useful for advanced users (or those who are just plain adventurous) to fully comprehend how a system of their choice works behind the scenes. This chapter explains the various moving parts of HBase and how they work together.
Before we look into the architecture itself, however, we will first address a more fundamental difference between typical RDBMS storage structures and alternative ones. Specifically, we will look briefly at B-trees, or rather B+ trees,[86] as they are commonly used in relational storage engines, and Log-Structured Merge Trees,[87] which (to some extent) form the basis for Bigtable’s storage architecture, as discussed in Building Blocks.
Note that RDBMSes do not use B-tree-type structures exclusively, nor do all NoSQL solutions use different architectures. You will find a colorful variety of mix-and-match technologies, but with one common objective: use the best strategy for the problem at hand.
B+ trees have some specific features that allow for efficient insertion, lookup, and deletion of records that are identified by keys. They represent dynamic, multilevel indexes with lower and upper bounds as far as the number of keys in each segment (also called page) is concerned. Using these segments, they achieve a much higher fanout compared to binary trees, resulting in a much lower number of I/O operations to find a specific key.
In addition, they also enable you to do range scans very efficiently, since the leaf nodes in the tree are linked and represent an in-order list of all keys, avoiding more costly tree traversals. That is one of the reasons why they are used for indexes in relational database systems.
In a B+ tree index, you get locality on a page level (where “page” is synonymous with “block” in other systems). For example, the leaf pages look something like this:
[link to previous
page] |
[link to next
page] |
key1 →
rowid |
key2 →
rowid |
key3 →
rowid |
In order to insert a new index entry, say
key1.5
, it will update the leaf page
with a new key1.5 → rowid
entry. That
is not a problem until the page, which has a fixed size, exceeds its
capacity. Then it has to split the page into two new ones, and update
the parent in the tree to point to the two new half-full pages. See
Figure 8-1 for an example of a page that is full
and would need to be split when adding another key.
The issue here is that the new pages aren’t
necessarily next to each other on disk. So now if you ask to query a
range from key 1
to key 3
, it’s going to have to read two leaf
pages that could be far apart from each other. That is also the reason
why you will find an OPTIMIZE TABLE
command in most layouts based on B+ trees—it basically rewrites the table in-order so that range
queries become ranges on disk again.
Log-structured merge-trees, also known as LSM-trees, follow a different approach. Incoming data is stored in a logfile first, completely sequentially. Once the log has the modification saved, it then updates an in-memory store that holds the most recent updates for fast lookup.
When the system has accrued enough updates and
starts to fill up the in-memory store, it flushes the sorted list of key → record
pairs to disk, creating a new
store file. At this point, the updates to the log can be thrown away, as
all modifications have been persisted.
The store files are arranged similar to B-trees, but are optimized for sequential disk access where all nodes are completely filled and stored as either single-page or multipage blocks. Updating the store files is done in a rolling merge fashion, that is, the system packs existing on-disk multipage blocks together with the flushed in-memory data until the block reaches its full capacity, at which point a new one is started.
Figure 8-2 shows how a multipage block is merged from the in-memory tree into the next on-disk tree. Merging writes out a new block with the combined result. Eventually, the trees are merged into the larger blocks.
As more flushes are taking place over time, creating many store files, a background process aggregates the files into larger ones so that disk seeks are limited to only a few store files. The on-disk tree can also be split into separate trees to spread updates across multiple store files. All of the stores are always sorted by key, so no reordering is required to fit new keys in between existing ones.
Lookups are done in a merging fashion in which the in-memory store is searched first, and then the on-disk store files are searched next. That way, all the stored data, no matter where it currently resides, forms a consistent view from a client’s perspective.
Deletes are a special case of update wherein a delete marker is stored and is used during the lookup to skip “deleted” keys. When the pages are rewritten asynchronously, the delete markers and the key they mask are eventually dropped.
An additional feature of the background processing for housekeeping is the ability to support predicate deletions. These are triggered by setting a time-to-live (TTL) value that retires entries, for example, after 20 days. The merge processes will check the predicate and, if true, drop the record from the rewritten blocks.
The fundamental difference between B-trees and LSM-trees, though, is how their architecture is making use of modern hardware, especially disk drives.
To compare B+ trees and LSM-trees you need to understand their relative strengths and weaknesses. B+ trees work well until there are too many modifications, because they force you to perform costly optimizations to retain that advantage for a limited amount of time. The more and faster you add data at random locations, the faster the pages become fragmented again. Eventually, you may take in data at a higher rate than the optimization process takes to rewrite the existing files. The updates and deletes are done at disk seek rates, rather than disk transfer rates.
LSM-trees work at disk transfer rates and scale much better to handle large amounts of data. They also guarantee a very consistent insert rate, as they transform random writes into sequential writes using the logfile plus in-memory store. The reads are independent from the writes, so you also get no contention between these two operations.
The stored data is always in an optimized layout. So, you have a predictable and consistent boundary on the number of disk seeks to access a key, and reading any number of records following that key doesn’t incur any extra seeks. In general, what could be emphasized about an LSM-tree-based system is cost transparency: you know that if you have five storage files, access will take a maximum of five disk seeks, whereas you have no way to determine the number of disk seeks an RDBMS query will take, even if it is indexed.
Finally, HBase is an LSM-tree-based system, just like Bigtable. The next sections will explain the storage architecture, while referring back to earlier sections of the book where appropriate.
One of the least-known aspects of HBase is how data is actually stored. While the majority of users may never have to bother with this, you may have to get up to speed when you want to learn the meaning of the various advanced configuration options you have at your disposal. Chapter 11 lists the more common ones and Appendix A has the full reference list.
You may also want to know more about file storage if, for whatever reason, disaster strikes and you have to recover an HBase installation. At that point, it is important to know where all the data is stored and how to access it on the HDFS level. Of course, this shall not happen, but who can guarantee that?
The first step in understanding the various moving parts in the storage layer of HBase is to understand the high-level picture. Figure 8-3 shows an overview of how HBase and Hadoop’s filesystem are combined to store data.
The figure shows that HBase handles basically
two kinds of file types: one is used for the write-ahead log and the
other for the actual data storage. The files are primarily handled by
the HRegionServer
s. In certain cases,
the HMaster
will also have to perform
low-level file operations. You may also notice that the actual files are
divided into blocks when stored within HDFS. This is also one of the
areas where you can configure the system to handle larger or smaller
data records better. More on that in HFile Format.
The general communication flow is that a new client contacts
the ZooKeeper ensemble (a separate cluster of ZooKeeper nodes) first
when trying to access a particular row. It does so by retrieving the
server name (i.e., hostname) that hosts the -ROOT-
region from ZooKeeper. With this
information it can query that region server to get the server name that
hosts the .META.
table region
containing the row key in question. Both of these details are cached and
only looked up once. Lastly, it can query the reported .META.
server and retrieve the server name
that has the region containing the row key the client is looking
for.
Once it has been told in what region the row
resides, it caches this information as well and contacts the HRegionServer
hosting that region directly.
So, over time, the client has a pretty complete picture of where to get
rows without needing to query the .META.
server again. See Region Lookups for more details.
The HMaster
is responsible for assigning the
regions to each HRegionServer
when
you start HBase. This also includes the special -ROOT-
and .META.
tables. See The Region Life Cycle for details.
The HRegionServer
opens the region and creates a
corresponding HRegion
object. When
the HRegion
is opened it sets
up a Store
instance for
each HColumnFamily
for every table as
defined by the user beforehand. Each Store
instance can, in turn, have one or more
StoreFile
instances, which are
lightweight wrappers around the actual storage file called HFile
. A Store
also has a MemStore
, and the HRegionServer
a shared HLog
instance (see Write-Ahead Log).
The client issues an HTable.put(Put)
request to the HRegionServer
, which hands the details to the
matching HRegion
instance. The first
step is to write the data to the write-ahead log (the WAL), represented
by the HLog
class.[89] The WAL is a standard Hadoop SequenceFile
and it stores HLogKey
instances. These keys contain a
sequential number as well as the actual data and are used to replay
not-yet-persisted data after a server crash.
Once the data is written to the WAL, it is placed in the
MemStore
. At the same time, it is checked to see if the MemStore
is full and, if so, a flush to disk
is requested. The request is served by a separate thread in the HRegionServer
, which writes the data to a new
HFile
located in HDFS. It also saves
the last written sequence number so that the system knows what was
persisted so far.
HBase has a configurable root directory in HDFS, with the
default set to "/hbase"
. Coexisting Clusters shows how to use a different root
directory when sharing a central HDFS cluster. You can use the hadoop dfs -lsr command to look at the various
files HBase stores. Before doing this, let us first create and fill a
table with a handful of regions:
hbase(main):001:0>
create 'testtable', 'colfam1',
{ SPLITS => ['row-300', 'row-500', 'row-700' , 'row-900'] }
0 row(s) in 0.1910 secondshbase(main):002:0>
for i in '0'..'9' do for j in '0'..'9' do
for k in '0'..'9' do put 'testtable', "row-#{i}#{j}#{k}",
"colfam1:#{j}#{k}", "#{j}#{k}" end end end
0 row(s) in 1.0710 seconds 0 row(s) in 0.0280 seconds 0 row(s) in 0.0260 seconds ...hbase(main):003:0>
flush 'testtable'
0 row(s) in 0.3310 secondshbase(main):004:0>
for i in '0'..'9' do for j in '0'..'9' do
for k in '0'..'9' do put 'testtable', "row-#{i}#{j}#{k}",
"colfam1:#{j}#{k}", "#{j}#{k}" end end end
0 row(s) in 1.0710 seconds 0 row(s) in 0.0280 seconds 0 row(s) in 0.0260 seconds ...
The flush command writes the in-memory data to the store files; otherwise, we would have had to wait until more than the configured flush size of data was inserted into the stores. The last round of looping over the put command is to fill the write-ahead log again.
Here is the content of the HBase root directory afterward:
$
$HADOOP_HOME/bin/hadoop dfs -lsr /hbase
... 0 /hbase/.logs 0 /hbase/.logs/foo.internal,60020,1309812147645 0 /hbase/.logs/foo.internal,60020,1309812147645/ foo.internal%2C60020%2C1309812147645.1309812151180 0 /hbase/.oldlogs 38 /hbase/hbase.id 3 /hbase/hbase.version 0 /hbase/testtable 487 /hbase/testtable/.tableinfo 0 /hbase/testtable/.tmp 0 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855 0 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855/.oldlogs 124 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855/.oldlogs/ hlog.1309812163957 282 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855/.regioninfo 0 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855/.tmp 0 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855/colfam1 11773 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855/colfam1/ 646297264540129145 0 /hbase/testtable/66b4d2adcc25f1643da5e6260c7f7b26 311 /hbase/testtable/66b4d2adcc25f1643da5e6260c7f7b26/.regioninfo 0 /hbase/testtable/66b4d2adcc25f1643da5e6260c7f7b26/.tmp 0 /hbase/testtable/66b4d2adcc25f1643da5e6260c7f7b26/colfam1 7973 /hbase/testtable/66b4d2adcc25f1643da5e6260c7f7b26/colfam1/ 3673316899703710654 0 /hbase/testtable/99c0716d66e536d927b479af4502bc91 297 /hbase/testtable/99c0716d66e536d927b479af4502bc91/.regioninfo 0 /hbase/testtable/99c0716d66e536d927b479af4502bc91/.tmp 0 /hbase/testtable/99c0716d66e536d927b479af4502bc91/colfam1 4173 /hbase/testtable/99c0716d66e536d927b479af4502bc91/colfam1/ 1337830525545548148 0 /hbase/testtable/d240e0e57dcf4a7e11f4c0b106a33827 311 /hbase/testtable/d240e0e57dcf4a7e11f4c0b106a33827/.regioninfo 0 /hbase/testtable/d240e0e57dcf4a7e11f4c0b106a33827/.tmp 0 /hbase/testtable/d240e0e57dcf4a7e11f4c0b106a33827/colfam1 7973 /hbase/testtable/d240e0e57dcf4a7e11f4c0b106a33827/colfam1/ 316417188262456922 0 /hbase/testtable/d9ffc3a5cd016ae58e23d7a6cb937949 311 /hbase/testtable/d9ffc3a5cd016ae58e23d7a6cb937949/.regioninfo 0 /hbase/testtable/d9ffc3a5cd016ae58e23d7a6cb937949/.tmp 0 /hbase/testtable/d9ffc3a5cd016ae58e23d7a6cb937949/colfam1 7973 /hbase/testtable/d9ffc3a5cd016ae58e23d7a6cb937949/colfam1/ 4238940159225512178
The output was reduced to include just the file size and name to fit the available space. When you run the command on your cluster you will see more details.
The files can be divided into those that reside directly under the HBase root directory, and those that are in the per-table directories.
The first set of files are the write-ahead log files
handled by the HLog
instances,
created in a directory called .logs
underneath the HBase root directory. The .logs directory contains a
subdirectory for each HRegionServer
. In each subdirectory, there
are several HLog
files (because of
log rotation). All regions from that region server share the same
HLog
files.
An interesting observation is that the
logfile is reported to have a size of 0
. This is fairly typical when the file was
created recently, as HDFS is using built-in
append support to write to this file, and only
complete blocks are made available to readers—including the hadoop dfs -lsr command. Although the data
of the put operations is safely persisted, the size of the logfile
that is currently being written to is slightly off.
After, for example, waiting for an hour so that the logfile is rolled (see LogRoller Class for all reasons when logfiles are rolled), you will see the existing logfile reported with its proper size, since it is closed now and HDFS can state the “correct” size. The new logfile next to it again starts at zero size:
249962 /hbase/.logs/foo.internal,60020,1309812147645/ foo.internal%2C60020%2C1309812147645.1309812151180 0 /hbase/.logs/foo.internal,60020,1309812147645/ foo.internal%2C60020%2C1309812147645.1309815751223
When a logfile is are no longer needed because all of the contained edits have been persisted into store files, it is decommissioned into the .oldlogs directory under the root HBase directory. This is triggered when the logfile is rolled based on the configured thresholds.
The old logfiles are deleted by the master after 10 minutes
(by default), set with the hbase.master.logcleaner.ttl
property. The
master checks every minute (by default again) for those files. This is
configured with the hbase.master.cleaner.interval
property.
The behavior for expired logfiles is pluggable. This is used, for instance, by the replication feature (see Replication) to have access to persisted modifications.
The hbase.id and hbase.version files contain the unique ID of the cluster, and the file format version:
$
hadoop dfs -cat /hbase/hbase.id
$e627e130-0ae2-448d-8bb5-117a8af06e97$
hadoop dfs -cat /hbase/hbase.version
7
They are used internally and are otherwise not very interesting. In addition, there are a few more root-level directories that appear over time. The splitlog and .corrupt folders are used by the log split process to store the intermediate split files and the corrupted logs, respectively. For example:
0 /hbase/.corrupt 0 /hbase/splitlog/foo.internal,60020,1309851880898_hdfs%3A%2F%2F localhost%2Fhbase%2F.logs%2Ffoo.internal%2C60020%2C1309850971208%2F foo.internal%252C60020%252C1309850971208.1309851641956/testtable/ d9ffc3a5cd016ae58e23d7a6cb937949/recovered.edits/0000000000000002352
There are no corrupt logfiles in this example, but there is one staged split file. The log splitting process is explained in Replay.
Every table in HBase has its own directory, located under the
HBase root directory in the filesystem. Each table directory contains
a top-level file named .tableinfo,
which stores the serialized HTableDescriptor
(see Tables for details) for the table. This includes
the table and column family schemas, and can be read, for example, by
tools to gain insight on what the table looks like. The .tmp directory contains temporary data, and
is used, for example, when the .tableinfo file is updated.
Inside each table directory, there is a separate directory for
every region comprising the table. The names of these directories are
the MD5 hash portion of a region name. For example, the following is
taken from the master’s web UI, after clicking on the testtable
link in the User
Tables section:
testtable,row-500,1309812163930.d9ffc3a5cd016ae58e23d7a6cb937949.
The MD5 hash is d9ffc3a5cd016ae58e23d7a6cb937949
and is
generated by encoding everything
before the hash in the region name (minus the dividing dot), that is,
testtable,row-500,1309812163930
.
The final dot after the hash is part of the complete region name: it
indicates that this is a new style name. In previous versions of
HBase, the region names did not include the
hash.
The -ROOT-
and .META.
catalog tables are still using the
old style format, that is, their region names include
no hash, and therefore end without the trailing
dot:
.META.,,1.1028785192
The encoding of the region names for the on-disk directories is also different: they use a Jenkins hash to encode the region name.
The hash guarantees that the directory names are always valid, in terms of filesystem rules: they do not contain any special character, such as the slash (“/”), which is used to divide the path. The overall layout for region files is then:
/<hbase-root-dir>/<tablename>/<encoded-regionname>/<column-family>/<filename>
In each column-family directory, you can see the actual data files, explained in HFile Format. Their name is just an arbitrary number, based on the Java built-in random generator. The code is smart enough to check for collisions, that is, where a file with a newly generated number already exists. It loops until it finds an unused one and uses that instead.
The region directory also has a .regioninfo file, which contains the
serialized information of the HRegionInfo
instance for the given region.
Similar to the .tableinfo file,
it can be used by external tools to gain insight into the metadata of
a region. The hbase hbck tool uses
this to generate missing meta table entries, for example.
The optional .tmp directory is created on demand, and is used to hold temporary files—for example, the rewritten files from a compaction. These are usually moved out into the region directory once the process has completed. In rare circumstances, you might find leftover files, which are cleaned out when the region is reopened.
During the replay of the write-ahead log, any edit that has not been committed is written into a separate file per region. These are staged first (see the splitlog directory in Root-level files) and then—assuming the log splitting process has completed successfully—moved into the optional recovered.edits directory atomically. When the region is opened the region server will see the recovery file and replay the entries accordingly.
There is a clear distinction between the splitting of write-ahead logs (Replay) and the splitting of regions (Region splits). Sometimes it is difficult to distinguish the file and directory names in the filesystem, because both might refer to the term splits. Make sure you carefully identify their purpose to avoid confusion—or mistakes.
Once the region needs to split because it has exceeded the maximum configured region size, a matching splits directory is created, which is used to stage the two new daughter regions. If this process is successful—usually this happens in a few seconds or less—they are moved up into the table directory to form the two new regions, each representing one-half of the original region.
In other words, when you see a region directory that has no .tmp directory, no compaction has been performed for it yet. When it has no recovered.edits file, no write-ahead log replay has occurred for it yet.
In HBase versions before 0.90.x there were additional files, which are now obsolete. One is oldlogfile.log, which contained the replayed write-ahead log edits for the given region. The oldlogfile.log.old file (note the extra .old extension) indicated that there was already an existing oldlogfile.log file when the new one was put into place.
Another noteworthy file is the compaction.dir file in older versions of HBase, which is now replaced by the .tmp directory.
This concludes the list of what is commonly contained in the various directories inside the HBase root folder. There are more intermediate files, created by the region split process. They are discussed separately in the next section.
When a store file within a region grows larger than the
configured hbase.hregion.
max.filesize
—or what is configured at the
column family level using HColumnDescriptor
—the region is split in
two. This is done initially very quickly because the system simply
creates two reference files for the new regions (also called
daughters), which each hosting half of the
original region (referred to as the
parent).
The region server accomplishes this by creating the splits directory in the parent region. Next, it closes the region so that it does not take on anymore requests.
The region server then prepares the new
daughter regions (using multiple threads) by setting up the necessary
file structures inside the splits
directory. This includes the new region directories and the reference
files. If this process completes successfully, it moves the two new
region directories into the table directory. The .META.
table is updated for the parent to
state that it is now split, and what the two daughter regions are.
This prevents it from being reopened by accident. Here is an example
of how this looks in the .META.
table:
row: testtable,row-500,1309812163930.d9ffc3a5cd016ae58e23d7a6cb937949. column=info:regioninfo, timestamp=1309872211559, value=REGION => {NAME => 'testtable,row-500,1309812163930.d9ffc3a5cd016ae58e23d7a6cb937949. TableName => 'testtable', STARTKEY => 'row-500', ENDKEY => 'row-700', ENCODED => d9ffc3a5cd016ae58e23d7a6cb937949, OFFLINE => true, SPLIT => true,} column=info:splitA, timestamp=1309872211559, value=REGION => {NAME => 'testtable,row-500,1309872211320.d5a127167c6e2dc5106f066cc84506f8. TableName => 'testtable', STARTKEY => 'row-500', ENDKEY => 'row-550', ENCODED => d5a127167c6e2dc5106f066cc84506f8,} column=info:splitB, timestamp=1309872211559, value=REGION => {NAME => 'testtable,row-550,1309872211320.de27e14ffc1f3fff65ce424fcf14ae42. TableName => [B@62892cc5', STARTKEY => 'row-550', ENDKEY => 'row-700', ENCODED => de27e14ffc1f3fff65ce424fcf14ae42,}
You can see how the original region was
split into two regions, separated at row-550
. The SPLIT
=> true
in the info:regioninfo
column value also indicates
that this region is now split into the regions referred to in info:splitA
and info:splitB
.
The name of the reference file is another random number, but with the hash of the referenced region as a postfix, for instance:
/hbase/testtable/d5a127167c6e2dc5106f066cc84506f8/colfam1/ 6630747383202842155.d9ffc3a5cd016ae58e23d7a6cb937949
This reference file represents one-half of
the original region with the hash d9ffc3a5cd016ae58e23d7a6cb937949
, which is
the region shown in the preceding example. The reference files only
hold a little information: the key the original region was split at,
and whether it is the top or bottom reference. Of note is that these
references are then used by the HalfHFileReader
class (which was omitted
from the earlier overview as it is only used temporarily) to read the
original region data files, and either the top or the bottom half of
the files.
Both daughter regions are now ready and
will be opened in parallel by the same server. This includes updating
the .META.
table to list both
regions as available regions—just like any other. After that, the
regions are online and start serving requests.
The opening of the daughters also schedules a compaction for both—which rewrites the store files in the background from the parent region into the two halves, while replacing the reference files. This takes place in the .tmp directory of the daughter regions. Once the files have been generated, they atomically replace the reference.
The parent is eventually cleaned up when
there are no more references to it, which means it is removed as the
parent from the .META.
table, and
all of its files on disk are deleted. Finally, the master is informed
about the split and can schedule for the new regions to be moved off
to other servers for load balancing reasons.
The store files are monitored by a background thread to keep them under control. The flushes of memstores slowly build up an increasing number of on-disk files. If there are enough of them, the compaction process will combine them to a few, larger files. This goes on until the largest of these files exceeds the configured maximum store file size and triggers a region split (see Region splits).
Compactions come in two varieties: minor and
major. Minor compactions are responsible for
rewriting the last few files into one larger one. The number of files
is set with
the hbase.hstore.compaction.min
property (which was
previously called hbase.
hstore.
compactionThreshold
, and although deprecated
is still supported). It is set to 3
by default, and needs to be at least 2
or more. A number too large would delay
minor compactions, but also would require more resources and take
longer once the compactions start.
The maximum number of files to include in a
minor compaction is set to 10
, and
is configured with hbase.hstore.compaction.max
. The list is
further narrowed down by the hbase.hstore.compaction.min.size
(set to the
configured memstore flush size for the region), and the hbase.hstore.compaction.max.size
(defaults
to Long.MAX_VALUE
) configuration
properties. Any file larger than the maximum compaction size is always
excluded. The minimum compaction size works slightly differently: it
is a threshold rather than a per-file limit. It includes all files
that are under that limit, up to the total number of files per
compaction allowed.
Figure 8-4 shows an example set of store files. All files that fit under the minimum compaction threshold are included in the compaction process.
The algorithm uses hbase.hstore.compaction.ratio
(defaults to
1.2
, or 120%) to ensure that it
does include enough files in the selection process. The ratio will
also select files that are up to that size compared to the sum of the
store file sizes of all newer files. The evaluation always checks the
files from the oldest to the newest. This ensures that older files are
compacted first. The combination of these properties allows you to
fine-tune how many files are included in a minor compaction.
In contrast to minor compactions, major
compactions compact all files into a single file. Which compaction
type is run is automatically determined when the compaction check is
executed. The check is triggered either after a memstore has been
flushed to disk, after the compact
or major_compact shell commands or
corresponding API calls have been invoked, or by a background thread.
This background thread is called the CompactionChecker
and each region server
runs a single instance. It runs a check on a regular basis, controlled
by hbase.server.thread.wakefrequency
(and
multiplied by hbase.server.thread.wakefrequency.multiplier
,
set to 1000
, to run it less often
than the other thread-based tasks).
If you call the major_compact shell command, or the majorCompact()
API call, you force the major
compaction to run. Otherwise, the server checks first if the major
compaction is due, based on hbase.hregion.majorcompaction
(set to 24
hours) from the first time it ran. The hbase.hregion.majorcompaction.jitter
(set to
0.2
, in other words, 20%) causes
this time to be spread out for the stores. Without the jitter, all
stores would run a major compaction at the same time, every 24 hours.
See Managed Splitting for information on why
this is a bad idea and how to manage this better.
If no major compaction is due, a minor compaction is assumed. Based on the aforementioned configuration properties, the server determines if enough files for a minor compaction are available and continues if that is the case.
Minor compactions might be promoted to major compactions when the former would include all store files, and there are less than the configured maximum files per compaction.
The actual storage files are implemented by the HFile
class, which was specifically created to
serve one purpose: store HBase’s data efficiently. They are based on
Hadoop’s TFile
class,[90] and mimic the SSTable format used in
Google’s Bigtable architecture. The previous use of Hadoop’s MapFile
class in HBase proved to be
insufficient in terms of performance. Figure 8-5 shows
the file format details.
The files contain a variable number of blocks, where the only fixed ones are the file info and trailer blocks. As Figure 8-5 shows, the trailer has the pointers to the other blocks. It is written after the data has been persisted to the file, finalizing the now immutable data store. The index blocks record the offsets of the data and meta blocks. Both the data and the meta blocks are actually optional. But considering how HBase uses the data files, you will almost always find at least data blocks in the store files.
The block size is configured by the HColumnDescriptor
, which, in turn, is
specified at table creation time by the user, or defaults to reasonable
standard values. Here is an example as shown in the master web-based
interface:
{NAME => 'testtable', FAMILIES => [{NAME => 'colfam1', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSIONS => '3', COMPRESSION => 'NONE', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}
The default is 64 KB (or 65,535 bytes). Here
is what the HFile
JavaDoc
explains:
Minimum block size. We recommend a setting of minimum block size between 8KB to 1MB for general usage. Larger block size is preferred if files are primarily for sequential access. However, it would lead to inefficient random access (because there are more data to decompress). Smaller blocks are good for random access, but require more memory to hold the block index, and may be slower to create (because we must flush the compressor stream at the conclusion of each data block, which leads to an FS I/O flush). Further, due to the internal caching in Compression codec, the smallest possible block size would be around 20KB-30KB.
Each block contains a
magic header, and a number of serialized KeyValue
instances (see KeyValue Format for their format). If you are
not using a compression algorithm, each block is
about as large as the configured block size. This
is not an exact science, as the writer has to fit whatever you give it:
if you store a KeyValue
that is
larger than the block size, the writer has to accept that. But even with
smaller values, the check for the block size is done after the last
value was written, so in practice, the majority of blocks will be
slightly larger.
When you are using a compression algorithm you will not have much control over block size. Compression codecs work best if they can decide how much data is enough to achieve an efficient compression ratio. For example, setting the block size to 256 KB and using LZO compression ensures that blocks will always be written to be less than or equal to 256 KB to suit the LZO internal buffer size.
Many compression libraries come with a set of configuration properties you can use to specify the buffer size, and other options. Refer to the source code of the JNI library to find out what is available to you.
The writer does not know if you have a compression algorithm selected or not: it follows the block size limit to write out raw data close to the configured amount. If you have compression enabled, less data will be saved less. This means the final store file will contain the same number of blocks, but the total size will be smaller since each block is smaller.
One thing you may notice is that the default
block size for files in HDFS is 64 MB, which is 1,024 times the HFile
default block size. As such, the HBase
storage file blocks do not match the Hadoop blocks.
In fact, there is no correlation between these two block types. HBase
stores its files transparently into a filesystem.
The fact that HDFS uses blocks is a coincidence. And HDFS also does not
know what HBase stores; it only sees binary files. Figure 8-6 demonstrates how the HFile content is
simply spread across HDFS blocks.
Sometimes it is necessary to be able to access
an HFile
directly, bypassing HBase,
for example, to check its health, or to dump its contents. The HFile.main()
method provides the tools to do
that:
$
./bin/hbase org.apache.hadoop.hbase.io.hfile.HFile
usage: HFile [-a] [-b] [-e] [-f <arg>] [-k] [-m] [-p] [-r <arg>] [-v] -a,--checkfamily Enable family check -b,--printblocks Print block index meta data -e,--printkey Print keys -f,--file <arg> File to scan. Pass full-path; e.g. hdfs://a:9000/hbase/.META./12/34 -k,--checkrow Enable row order check; looks for out-of-order keys -m,--printmeta Print meta data of file -p,--printkv Print key/value pairs -r,--region <arg> Region to scan. Pass region name; e.g. '.META.,,1' -v,--verbose Verbose output; emits file and meta data delimiters
Here is an example of what the output will look like (shortened):
$
./bin/hbase org.apache.hadoop.hbase.io.hfile.HFile -f
/hbase/testtable/de27e14ffc1f3fff65ce424fcf14ae42/colfam1/2518469459313898451
-v -m -p
Scanning -> /hbase/testtable/de27e14ffc1f3fff65ce424fcf14ae42/colfam1/ 2518469459313898451 K: row-550/colfam1:50/1309813948188/Put/vlen=2 V: 50 K: row-550/colfam1:50/1309812287166/Put/vlen=2 V: 50 K: row-551/colfam1:51/1309813948222/Put/vlen=2 V: 51 K: row-551/colfam1:51/1309812287200/Put/vlen=2 V: 51 K: row-552/colfam1:52/1309813948256/Put/vlen=2 V: 52 ... K: row-698/colfam1:98/1309813953680/Put/vlen=2 V: 98 K: row-698/colfam1:98/1309812292594/Put/vlen=2 V: 98 K: row-699/colfam1:99/1309813953720/Put/vlen=2 V: 99 K: row-699/colfam1:99/1309812292635/Put/vlen=2 V: 99 Scanned kv count -> 300 Block index size as per heapsize: 208 reader=/hbase/testtable/de27e14ffc1f3fff65ce424fcf14ae42/colfam1/ 2518469459313898451, compression=none, inMemory=false, firstKey=row-550/colfam1:50/1309813948188/Put, lastKey=row-699/colfam1:99/1309812292635/Put, avgKeyLen=28, avgValueLen=2, entries=300, length=11773 fileinfoOffset=11408, dataIndexOffset=11664, dataIndexCount=1, metaIndexOffset=0, metaIndexCount=0, totalBytes=11408, entryCount=300, version=1 Fileinfo: MAJOR_COMPACTION_KEY = xFF MAX_SEQ_ID_KEY = 2020 TIMERANGE = 1309812287166....1309813953720 hfile.AVG_KEY_LEN = 28 hfile.AVG_VALUE_LEN = 2 hfile.COMPARATOR = org.apache.hadoop.hbase.KeyValue$KeyComparator hfile.LASTKEY = x00x07row-699x07colfam199x00x00x010xF6xE5|x1Bx04 Could not get bloom data from meta block
The first part of the output is the actual
data stored as serialized KeyValue
instances. The second part dumps the internal HFile.Reader
properties, as well as the
trailer block details. The last part, starting with Fileinfo
, is the file info block
values.
The provided information is valuable to, for
example, confirm whether a file is compressed or not, and with what
compression type. It also shows you how many cells you have stored, as
well as the average size of their keys and values. In the preceding
example, the key is much larger than the value. This is caused by the
overhead required by the KeyValue
class to store the necessary data, explained next.
In essence, each KeyValue
in the HFile
is a low-level byte
array that allows for zero-copy access to the data.
Figure 8-7 shows the layout of the contained
data.
The structure starts with two fixed-length
numbers indicating the size and value of the key. With that information,
you can offset into the array to, for example, get direct access to the
value, ignoring the key. Otherwise, you can get the required information
from the key. Once the information is parsed into a KeyValue
Java instance, you can use getters to
access the details, as explained in The KeyValue class.
The reason the average key in the preceding
example is larger than the value has to do with the fields that make up
the key part of a KeyValue
. The key
holds the row key, the column family name, the column qualifier, and so
on. For a small payload, this results in quite a considerable overhead.
If you deal with small values, try to keep the key small as well. Choose
a short row and column key (the family name with a single byte, and the
qualifier equally short) to keep the ratio in check.
On the other hand, compression should help
mitigate the overwhelming key size problem, as it looks at finite
windows of data, and all repeating data should compress well. The
sorting of all KeyValue
s in the store
file helps to keep similar keys (and possibly values too, in case you
are using versioning) close together.
The region servers keep data in-memory until enough is collected to warrant a flush to disk, avoiding the creation of too many very small files. While the data resides in memory it is volatile, meaning it could be lost if the server loses power, for example. This is a likely occurrence when operating at large scale, as explained in Seek Versus Transfer.
A common approach to solving this issue is write-ahead logging:[91] Each update (also called an “edit”) is written to a log, and only if the update has succeeded is the client informed that the operation has succeeded. The server then has the liberty to batch or aggregate the data in memory as needed.
The WAL is the lifeline that is needed when disaster strikes. Similar to a binary log in MySQL, the WAL records all changes to the data. This is important in case something happens to the primary storage. If the server crashes, the WAL can effectively replay the log to get everything up to where the server should have been just before the crash. It also means that if writing the record to the WAL fails, the whole operation must be considered a failure.
Overview shows how the WAL fits into the overall architecture of HBase. Since it is shared by all regions hosted by the same region server, it acts as a central logging backbone for every modification. Figure 8-8 shows how the flow of edits is split between the memstores and the WAL.
The process is as follows: first the client
initiates an action that modifies data. This can be, for example, a call
to put()
, delete()
, and increment()
. Each of these modifications is
wrapped into a KeyValue
object
instance and sent over the wire using RPC calls. The calls are (ideally)
batched to the HRegionServer
that
serves the matching regions.
Once the KeyValue
instances arrive, they are routed to
the HRegion
instances that are
responsible for the given rows. The data is written to the WAL, and then
put into the MemStore
of the actual
Store
that holds the record. This is,
in essence, the write path of HBase.
Eventually, when the memstores get to a certain size, or after a specific time, the data is persisted in the background to the filesystem. During that time, data is stored in a volatile state in memory. The WAL guarantees that the data is never lost, even if the server fails. Keep in mind that the actual log resides on HDFS, which is a replicated filesystem. Any other server can open the log and start replaying the edits—nothing on the failed physical server is needed to effect a full recovery.
The class that implements the WAL is called HLog
. When an HRegion
is instantiated, the single HLog
instance that runs inside each region
server is passed on as a parameter to the constructor of HRegion
. When a region receives an update
operation, it can save the data directly to the shared WAL
instance.
The core of the HLog
functionality is the append()
method.
Note that for performance reasons there is an option for Put
, Delete
, and Increment
to be called with an extra parameter
set: setWriteToWAL(false)
. If you
invoke this method while setting up, for example, a Put
instance, the writing to the WAL is
bypassed! That is also why the downward arrow in Figure 8-8 was created with a dashed line to indicate the
optional step. By default, you certainly want the WAL, no doubt about
that. But say you run a large bulk import MapReduce job that you can
rerun at any time. You gain extra performance when you disable the WAL,
but at the cost of having to take extra care that no data was lost
during the import.
You are strongly advised not to lightheartedly turn off writing edits to the WAL. If you do so, you will lose data sooner or later. And no, HBase cannot recover data that is lost and that has not been written to the log first.
Another important feature of HLog
is the ability to keep track of changes.
It does this by using a sequence number. It uses an
AtomicLong
internally to be
thread-safe and starts at either zero, or the last known number
persisted to the filesystem: as the region is opening its storage files,
it reads the highest sequence number, which is stored as a meta field in
each HFile
and sets the HLog
sequence number to that value if it is
higher than what was recorded before. So, after it has opened all the
storage files, the HLog
is
initialized to reflect where persisting ended and where to
continue.
Figure 8-9 shows three
different regions, hosted on the same region server, with each of them
covering a different row key range. Each region shares the same single
instance of HLog
. This means the data
is written to the WAL in the order it arrives. This means some extra
work is needed when a log needs to be replayed (see Replay). But since this happens rather seldomly, the
WAL is optimized to store data sequentially, giving it the best I/O
performance.
Currently, the WAL uses a Hadoop SequenceFile
, which stores records as sets of
key/values. For the WAL, the value is simply the modification(s) sent
from the client. The key is represented by an HLogKey
instance: since the KeyValue
only represents the row key, column
family, column qualifier, timestamp, type, and value, there has to be a
place to store what the KeyValue belongs to, in other words, the region
and table name. That information is stored in the HLogKey
. Also stored is the aforementioned
sequence number. That number is incremented with each edit in order to
keep a sequential order of edits.
This class also records the write time, which is a timestamp that denotes when the edit was written to the log. Finally, it stores the cluster ID, which is needed for replication across clusters.
Every modification sent by a client is wrapped into a WALEdit
instance, which takes care
of atomicity at the log level. Assume you update 10 columns
in one row. Each column, or cell, is represented as a separate KeyValue
instance. If the server writes five
of them to the WAL and then fails, you will end up with a half-persisted
row mutation.
Atomicity is guaranteed by bundling all
updates that comprise multiple cells into a single WALEdit
instance. This group of edits is then
written in a single operation, ensuring that the row mutation is applied
in full or not at all.
The table descriptor allows you to set the so-called
deferred log flush flag, as explained in Table Properties. The default is false
and it means that every time an edit is
sent to the servers, it will call the log writer’s sync()
method. It is the call that forces the
update to the log to be acknowledged by the filesystem so that you have
durability.
Unfortunately, calling this method involves a
pipelined write to N servers (where
N is the replication factor set for the write-ahead
log files). Since this is a rather costly operation, you have the option
to slightly delay the call, and have it executed in a background process
instead. Keep in mind that without the call to sync()
, there is a chance of data
loss in case of a server failure. Use this option
carefully.
Setting the deferred log flush flag to
true
causes the edits to be buffered
on the region server, and the LogSyncer
class, running as a background
thread on the server, is responsible for calling the sync()
method at a very short interval. The
default is one second and is configured by the hbase.regionserver.optionallogflushinterval
property.
Note that this only applies to user tables: all catalog tables are always synced right away.
There are size restrictions when it comes to the logs that are
written. The LogRoller
class runs as
a background thread and takes care of rolling logfiles at certain
intervals. This is controlled by the hbase.regionserver.logroll.period
property,
set by default to one hour.
Every 60 minutes the log is closed and a new
one is started. Over time, the system accumulates an increasing number
of logfiles that need to be managed as well. The HLog.rollWriter()
method, which is called by
the LogRoller
to roll the current
logfile, takes care of that as well by subsequently calling HLog.cleanOldLogs()
.
It checks what the highest sequence number written to a storage file is. This is the edit sequence number of the last edit persisted out to the filesystem. It then checks if there is a log left that has edits that are all less than that number. If that is the case, it moves said logs into the .oldlogs directory, and leaves the remaining ones in place.
You might see the following obscure message in your logs:
2011-06-15 01:45:48,427 INFO org.apache.hadoop.hbase.regionserver.HLog: Too many hlogs: logs=130, maxlogs=96; forcing flush of 8 region(s): testtable,row-500,1309872211320.d5a127167c6e2dc5106f066cc84506f8., ...
This message is printed because the configured maximum number of logfiles to keep exceeds the number of logfiles that are required to be kept because they still contain outstanding edits that have not yet been persisted. This can occur when you stress out the filesystem to such an extent that it cannot persist the data at the rate at which new data is added. Otherwise, memstore flushes should take care of this.
Note, though, that when this message is printed the server goes into a special mode trying to force edits to be flushed out to reduce the number of outstanding WAL files.
The other parameters controlling log rolling
are hbase.regionserver.hlog.blocksize
(set to the
filesystem default block size, or fs.local.block.size
, defaulting to 32 MB)
and hbase.regionserver.logroll.multiplier
(set to
0.95
), which will rotate logs when
they are at 95% of the block size. So logs are switched out when they
are considered full, or when a certain amount of time has
passed—whatever comes first.
The master and region servers need to orchestrate the handling of logfiles carefully, especially when it comes to recovering from server failures. The WAL is responsible for retaining the edits safely; replaying the WAL to restore a consistent state is a much more complex exercise.
Since all edits are written to one HLog
-based logfile per region server, you
might ask: why is that the case? Why not write all edits for a
specific region into its own logfile? Here is the related quote from
the Bigtable paper:
If we kept the commit log for each tablet in a separate logfile, a very large number of files would be written concurrently in GFS. Depending on the underlying file system implementation on each GFS server, these writes could cause a large number of disk seeks to write to the different physical log files.
HBase followed that principle for pretty much the same reasons: writing too many files at the same time, plus the number of rolled logs that need to be kept, does not scale well.
What is the drawback, though? If you have to split a log because of a server crash, you need to divide it into suitable pieces, as described in the next section. The master cannot redeploy any region from a crashed server until the logs for that very server have been split. This can potentially take a considerable amount of time.
There are two situations in which logfiles have to be replayed: when the cluster starts, or when a server fails. When the master starts—and this includes a backup master taking over duty—it checks if there are any logfiles, in the .logs directory under the HBase root on the filesystem, that have no region server assigned to them. The logs’ names contain not just the server name, but also the start code of the server. This number is reset every time a region server restarts, and the master can use this number to verify whether a log has been abandoned—for example, due to a server crash.
The master is responsible for monitoring
the servers using ZooKeeper, and if it detects a server failure, it
immediately starts the process of recovering its logfiles, before
reassigning the regions to new servers. This happens in the ServerShutdownHandler
class.
Before the edits in the log can be replayed, they need to be separated into one logfile per region. This process is called log splitting: the combined log is read and all entries are grouped by the region they belong to. These grouped edits are then stored in a file next to the target region for subsequent recovery.
The actual process of splitting the logs is different in nearly every version of HBase: early versions would read the file in a single thread, directly on the master. This was improved to at least write the grouped edits per region in multiple threads. Version 0.92.0 finally introduces the concept of distributed log splitting, which removes the burden of doing the actual work from the master to all region servers.
Consider a larger cluster with many region servers and many (rather large) logfiles. In the past, the master had to recover each logfile separately, and—so it would not overload in terms of I/O as well as memory usage—it would do this sequentially. This meant that, for any region that had pending edits, it had to be blocked from opening until the log split and recovery had been completed.
The new distributed mode uses ZooKeeper to hand out each abandoned logfile to a region server. They monitor ZooKeeper for available work, and if the master indicates that a log is available for processing, they race to accept the task. The winning region server then proceeds to read and split the logfiles in a single thread (so as not to overload the already busy region server).
You can turn the new distributed log
splitting off by means of the hbase.master.distributed.log.splitting
configuration property. Setting this property to false
disables distributed splitting, and
falls back to doing the work directly on the master only.
In nondistributed mode the writers are
multithreaded, controlled by the hbase.regionserver.hlog.splitlog.writer.threads
property, which is set to 3
by default. You need to be careful when
increasing this number, as you are likely bound by the performance
of the single log reader.
The split process writes the edits first into the splitlog staging directory under the HBase root folder. They are placed in the same path that is needed for the target region. For example:
0 /hbase/.corrupt 0 /hbase/splitlog/foo.internal,60020,1309851880898_hdfs%3A%2F%2F localhost%2Fhbase%2F.logs%2Ffoo.internal%2C60020%2C1309850971208%2F foo.internal%252C60020%252C1309850971208.1309851641956/testtable/ d9ffc3a5cd016ae58e23d7a6cb937949/recovered.edits/0000000000000002352
The path contains the logfile name itself to distinguish it from other, possibly concurrently executed, log split output. The path also contains the table name, region name (hash), and recovered.edits directory. Lastly, the name of the split file is the sequence ID of the first edit for the particular region.
The .corrupt directory
contains any logfile that could not be parsed. This is influenced by
the hbase.hlog.split.skip.errors
property, which
is set to true
by default. It means
that any edit that could not be read from a file causes the entire log
to be moved to the .corrupt
folder. If you set the flag to false
, an IOExecption
is thrown and the
entire log splitting process is stopped.
Once a log has been split successfully, the per-region files are moved into the actual region directories. They are now ready to be recovered by the region itself. This is also why the splitting has to stall opening the affected regions, since it first has to provide the pending edits for replay.
When a region is opened, either because the cluster is started or because it has been moved from one region server to another, it first checks for the presence of the recovered.edits directory. If it exists, it opens the contained files and starts reading the edits they contain. The files are sorted by their name, which contains the sequence ID. This allows the region to recover the edits in order.
Any edit that has a sequence ID that is less than or equal to what has been persisted in the on-disk store files is ignored, because it has already been applied. All other edits are applied to the matching memstore of the region to recover the previous state. At the end, a flush of the memstores is forced to write the current state to disk.
The files in the recovered.edits folder are removed once
they have been read and their edits persisted to disk. If a file
cannot be read, the hbase.skip.errors
property defines what happens next: the default value is false
and causes the entire region recovery
to fail. If this property is set to true
, the file is renamed to the original
filename plus .<currentTimeMillis>
. Either way, you
need to carefully check your logfiles to determine why the recovery
has had issues and fix the problem to continue.
You want to be able to rely on the system to save all your data, no matter what newfangled algorithms are employed behind the scenes. As far as HBase and the log are concerned, you can set the log flush times to be as low as you want, or sync them for every edit—you are still dependent on the underlying filesystem as mentioned earlier; the stream used to store the data is flushed, but is it written to disk yet? We are talking about fsync style issues. Now for HBase we are most likely dealing with Hadoop’s HDFS as being the filesystem that is persisted to.
At this point, it should be abundantly clear
that the log is what keeps data safe. It is being kept open for up to an
hour (or more if configured to do so), and as data arrives a new
key/value pair is written to the SequenceFile
. Eventually, the log is rolled
and a new one is created.
But that is not how Hadoop was designed to work. Hadoop provides an API tailored to MapReduce that allows you to open a file, write data into it (preferably a lot), and close it right away, leaving an immutable file for everyone else to read many times.
Only after a file is closed is it visible and readable to others. If a process dies while writing the data, the file is considered lost. For HBase to be able to work properly, what is required is a feature that allows you to read the log up to the point where the crashed server has written it. This was added to HDFS in later versions and is referred to as append.
HBase currently detects whether the underlying
Hadoop library has support for syncFs()
or hflush()
. If a sync()
is triggered on the log writer, it
calls either method internally—or none if HBase runs in a nondurable
setup. The sync()
is using the
pipelined write process described in LogSyncer Class
to guarantee the durability of the edits in the logfile. In case of a
server crash, the system can safely read the abandoned logfile up to the
last edits.
In summary, without Hadoop 0.21.0 and later, or a specially prepared 0.20.x with append support backported to it, you can very well face data loss. See Hadoop for more information.
HBase uses multiple store files per column family, which contain
the actual cells, or KeyValue
instances. These files are created over time as modifications aggregated
in the memstores are eventually flushed as store files to disk. The
background process of compactions keeps the number of files under control
by rewriting smaller files into larger ones. Major compactions eventually
compact the entire set of files into a single one, after which the flushes
start adding smaller files again.
Since all store files are immutable, there is no way to delete a particular value out of them, nor does it make sense to keep rewriting large store files to remove the deleted cells one by one. Instead, a tombstone marker is written, which masks out the “deleted” information—which can be a single cell, a range of cells, or entire rows.
Consider you are writing a column in a given
row today. You keep adding data in other rows over a few more days, then
you write a different column in the given row. The question is, given that
the original column value has been persisted as a KeyValue
on disk for quite some time, while the
newly written column for the same row is still in the memstore, or has
been flushed to disk, where does the logical row reside?
In other words, when you are using the shell to
perform a get command on that row, how
does the system know what to return? As a client, you want to see both
columns being returned—as if they were stored in a single entity. But in
reality, the data lives as separate KeyValue
instances, spread across any number of
store files.
If you are deleting the initial column value, and you perform the get again, you expect the value to be gone, when in fact it still exists somewhere, but the tombstone marker is indicating that you have deleted it. But that marker is most likely stored far away from the value it “buries.” A more formal explanation of the architecture behind this approach is provided in Seek Versus Transfer.
HBase solves the problem by using a QueryMatcher
in combination with a ColumnTracker
, which comes
in a few variations: one for explicit matching, for when you specify a
list of columns to retrieve, and another that includes all columns. Both
allow you to set the maximum number of versions to match. They keep track
of what needs to be included in the final result.
Before all the store files are read to find a
matching entry, a quick exclusion check is conducted, which uses the
timestamps and optional Bloom filter to skip files that definitely have no KeyValue
belonging to the row in question. The
remaining store files, including the memstore, are then scanned to find a
matching key.
The scan is implemented by the RegionScanner
class, which retrieves a StoreScanner
for every Store
instance—each representing a column
family. If the read operation excludes certain column families, their
stores are omitted as well.
The StoreScanner
class
combines the store files and memstore that the Store
instance contains. It is also where the exclusion happens, based on the
Bloom filter, or the timestamp. If you are asking for versions that are
not more than 30 minutes old, for example, you can skip all storage files
that are older than one hour: they will not contain anything of interest.
See Key Design for details on the exclusion, and how
to make use of it.
The StoreScanner
class also has the QueryMatcher
(here the ScanQueryMatcher
class), which will keep track
of which KeyValue
s to include in the
final result.
The RegionScanner
internally is using a KeyValueHeap
class to arrange all store scanners
ordered by timestamps. The StoreScanner
is using the same to order the stores the same way. This guarantees that
you are reading KeyValue
s in their
correct order (e.g., descending by
timestamp).
When the store scanners are opened, they will
position themselves at the requested row key, or—in the case of a get()
call—on the next nonmatching row key. The
scanner is now ready to read data. Figure 8-10 shows
an example of what this looks like.
For a get()
call, all the server has to do is to call next()
on the RegionScanner
. The call internally reads
everything that should be part of the result. This includes all of the
versions requested. Consider a column that has three versions, and you are
requesting to retrieve all of them. The three KeyValue
instances could be spread across any
store, on disk or in memory. The next()
call keeps reading from all store files until either the next row is
reached, or enough versions have been found.
At the same time, it keeps track of delete
markers too. As it scans through the KeyValue
s of the current row, it will come
across these delete markers and note that anything with a timestamp that
is less than or equal to the marker is considered erased.
Figure 8-10 also shows the
logical row as a list of KeyValue
s, some in the same store file, some on
other files, spanning multiple column families. A store file and a
memstore were skipped because of the timestamp and Bloom filter exclusion
process. The delete marker in the last store file is masking out entries,
but they are still all part of the same row. The scanners—depicted as an
arrow next to the stores—are either on the first matching entry in the
file, or on the one that would follow the requested key, in case the store
has no direct match.
Only scanners that are on the proper row are
considered during the call to next()
.
The internal loop would read the KeyValue
s from the first and last stores one
after the other, in time-descending order, until they also exceed the
requested row key.
For scan operations, this is repeated by
calling next()
on the ResultScanner
until either the stop row has been
found, the end of the table has been reached, or enough rows have been
read for the current batch (as set via scanner caching).
The final result is a list of KeyValue
instances that matched the given get or
scan operation. The list is sent back to the client, which can then use
the API methods to access the contained columns.
For the clients to be able to find the region server hosting a
specific row key range, HBase provides two special catalog tables, called
-ROOT-
and .META.
.[92]
The -ROOT-
table is used to refer to all regions in the .META.
table. The design considers only one root
region, that is, the root region is never split to guarantee a
three-level, B+ tree-like lookup
scheme: the first level is a node stored in ZooKeeper that contains the
location of the root table’s region—in other words, the name of the region
server hosting that specific region. The second level is the lookup of a
matching meta region from the -ROOT-
table, and the third is the retrieval of the user table region from the
.META.
table.
The row keys in the catalog tables are the region names, which are a concatenation of the region’s table name, its start row, and an ID (usually the current time in milliseconds). As of HBase 0.90.0 these keys may have another hashed value attached to them. This is currently only used for user tables. See Region-level files for an example.
Avoiding any concerns about the three-level
location scheme, the Bigtable
paper states that with average limits on the .META.
region size at 128 MB it can address
234 regions, or
261 bytes in 128 MB regions. Since the size
of the regions can be increased without any impact on the location
scheme, this is a conservative number and can be increased as
needed.
Although clients cache region locations, there
is an initial need to figure out where to send requests when looking for a
specific row key—or when the cache is stale and a region has since been
split, merged, or moved. The client library uses a recursive discovery
process moving up in the hierarchy to find the current information. It
asks the corresponding region server hosting the matching .META.
region for the given row key and
retrieves the address. If that information is invalid, it backs out,
asking the root table where the .META.
region is. Eventually, if all else fails, it has to do a read of the
ZooKeeper node to find the root table region.
In a worst-case scenario, it would need six network round-trips to discover the user region, since stale entries in the cache are only discovered when the lookup fails, because it is assumed that assignments, especially of meta regions, do not change too often. When the cache is empty, the client needs three network round-trips to update its cache. One way to mitigate future round-trips is to prefetch location information in a single request, thus updating the client cache ahead of time. Refer to Miscellaneous Features for details on how to influence this using the client-side API.
Figure 8-11 shows the mapping of user table regions, through meta, and finally to the root table information. Once the user table region is known, it can be accessed directly without any further lookups. The lookups are numbered and assume an empty cache. However, if the cache were filled with only stale details, the client would fail on all three lookups, requiring a refresh of all three and resulting in the aforementioned six network round-trips.
The state of a region is tracked by the master, using the
AssignmentManager
class. It follows the
region from its offline state, all the way through
its life cycle. Table 8-1 lists the possible
states of a region.
State | Description |
Offline | The region is offline. |
Pending Open | A request to open the region was sent to the server. |
Opening | The server has started opening the region. |
Open | The region is open and fully operational. |
Pending Close | A request to close the region has been sent to the server. |
Closing | The server is in the process of closing the region. |
Closed | The region is closed. |
Splitting | The server started splitting the region. |
Split | The region has been split by the server. |
The transitions between states are commonly initiated by the master, but may also be initiated by the region server hosting the region. For example, the master assigns a region to a server, which is then opened by the assignee. On the other hand, the region server starts the split process, which in itself triggers multiple region close and open events.
Because of the distributed nature of these events, the servers are using ZooKeeper to track specific states in a dedicated znode.
Since version 0.20.x, HBase has been using ZooKeeper as its distributed coordination service. This includes tracking of region servers, where the root region is hosted, and more. Version 0.90.x introduced a new master implementation which has an even tighter integration with ZooKeeper. It enables HBase to remove critical heartbeat messages that needed to be sent between the master and the region servers. These are now moved into ZooKeeper, which informs either party of changes whenever they occur, as opposed to the fixed intervals that were used before.
HBase creates a list of znodes under its root
node. The default is /hbase
and is
configured with the zookeeper.znode.parent
property. Here is the list of the contained znodes and their
purposes:
The examples use the ZooKeeper command-line interface (CLI) to issue the commands. You can start it with:
$
$ZK_HOME/bin/zkCli.sh -server
<quorum-server>
The output of each command was shortened by the ZooKeeper internal details.
/hbase/hbaseid
Contains the cluster ID, as stored in the hbase.id file on HDFS. For example:
[zk: localhost(CONNECTED) 1]
get /hbase/hbaseid
e627e130-0ae2-448d-8bb5-117a8af06e97
/hbase/master
Holds the server name (see Cluster Status Information for details). For example:
[zk: localhost(CONNECTED) 2]
get /hbase/master
foo.internal,60000,1309859972983
/hbase/replication
Contains replication details. See Internals for details.
/hbase/root-region-server
Contains the server name of the region
server hosting the -ROOT-
regions. This is used during the region lookup (see Region Lookups). For instance:
[zk: localhost(CONNECTED) 3]
get /hbase/root-region-server
rs1.internal,60000,1309859972983
/hbase/rs
Acts as the root node for all region servers to list themselves when they start. It is used to track server failures. Each znode inside is ephemeral and its name is the server name of the region server. For example:
[zk: localhost(CONNECTED) 4]
ls /hbase/rs
[rs1.internal,60000,1309859972983,rs2.internal,60000,1309859345233]
/hbase/shutdown
Is used to track the cluster state. It contains the time when the cluster was started, and is empty when it was shut down. For example:
[zk: localhost(CONNECTED) 5]
get /hbase/shutdown
Tue Jul 05 11:59:33 CEST 2011
/hbase/splitlog
The parent znode for all log-splitting-related coordination (see Log splitting for details). For example:
[zk: localhost(CONNECTED) 6]
ls /hbase/splitlog
[hdfs%3A%2F%2Flocalhost%2Fhbase%2F.logs%2Ffoo.internal%2C60020%2C 1309850971208%2Ffoo.internal%252C60020%252C1309850971208.1309851636647, hdfs%3A%2F%2Flocalhost%2Fhbase%2F.logs%2Ffoo.internal%2C60020%2C 1309850971208%2Ffoo.internal%252C60020%252C1309850971208.1309851641956, ... hdfs%3A%2F%2Flocalhost%2Fhbase%2F.logs%2Ffoo.internal%2C60020%2C 1309850971208%2Ffoo.internal%252C60020%252C1309850971208.1309851784396][zk: localhost(CONNECTED) 7]
get /hbase/splitlog/
hdfs%3A%2F%2Flocalhost%2Fhbase%2F.logs%2Fmemcache1.internal%2C
60020%2C1309850971208%2Fmemcache1.internal%252C60020%252C1309850971208.
1309851784396
unassigned foo.internal,60000,1309851879862[zk: localhost(CONNECTED) 8]
get /hbase/splitlog/
hdfs%3A%2F%2Flocalhost%2Fhbase%2F.logs%2Fmemcache1.internal%2C
60020%2C1309850971208%2Fmemcache1.internal%252C60020%252C1309850971208.
1309851784396
owned foo.internal,60000,1309851879862[zk: localhost(CONNECTED) 9]
ls /hbase/splitlog
[RESCAN0000293834, hdfs%3A%2F%2Flocalhost%2Fhbase%2F.logs%2Fmemcache1. internal%2C60020%2C1309850971208%2Fmemcache1.internal%252C 60020%252C1309850971208.1309851681118, RESCAN0000293827, RESCAN0000293828, RESCAN0000293829, RESCAN0000293838, RESCAN0000293837]
These examples list various things: you
can see how a log to be split was first unassigned, and then owned
by a region server. The RESCAN
nodes are signifying that the workers, that is, the region server,
is supposed to check for more work, in case a split has failed on
another machine.
/hbase/table
The znode to which a disabled table is
added as its parent. The name of the table is the newly created
znode, and its content is the word DISABLED
. For example:
[zk: localhost(CONNECTED) 10]
ls /hbase/table
[testtable][zk: localhost(CONNECTED) 11]
get /hbase/table/testtable
DISABLED
/hbase/unassigned
Is used by the AssignmentManager
to track region states
across the entire cluster. It contains znodes for those regions that
are not open, but are in a transitional state. The name of the znode
is the hash of the region. For example:
[zk: localhost(CONNECTED) 11]
ls /hbase/unassigned
[8438203023b8cbba347eb6fc118312a7]
HBase replication is a way to copy data between HBase deployments. It can serve as a disaster recovery solution and can contribute to provide higher availability at the HBase layer. It can also serve a more practical purpose; for example, as a way to easily copy edits from a web-facing cluster to a MapReduce cluster that will process old and new data and ship back the results automatically.
The basic architecture pattern used for HBase
replication is “(HBase cluster) master-push”; this pattern makes it much
easier to keep track of what is currently being replicated since each
region server has its own write-ahead log (WAL or HLog
), just like other well-known solutions,
such as MySQL master/slave replication, where there is only one binary log
to keep track of. One master cluster can replicate to any number of slave
clusters, and each region server will participate to replicate its own
stream of edits.
The replication is done asynchronously, meaning that the clusters can be geographically distant, the links between them can be offline for some time, and rows inserted on the master cluster will not be available at the same time on the slave clusters (eventual consistency).
Figure 8-12 shows an overview of how replication works.
The replication format used in this design is
conceptually the same as MySQL’s statement-based replication.[93] Instead of SQL statements, whole WALEdit
s (consisting of multiple cell inserts
coming from the clients’ Put
and
Delete
) are replicated in order to
maintain atomicity.
The HLog
s from each region
server are the basis of HBase replication, and must be kept in HDFS as
long as they are needed to replicate data to any slave cluster. Each
region server reads from the oldest log it needs to replicate and keeps
the current position inside ZooKeeper to simplify failure recovery. That
position can be different for every slave cluster, as can the queue of
HLog
s to process.
The clusters participating in replication can be of asymmetric sizes and the master cluster will do its best effort to balance the stream of replication on the slave clusters by relying on randomization.
The following sections describe the life of a single edit going from a client that communicates with a master cluster all the way to a single slave cluster.
The client uses an HBase API that sends a Put
, Delete
, or Increment
to a region server. The key/values
are transformed into a WALEdit
by
the region server and the WALEdit
is inspected by
the replication code that, for each family that is scoped for
replication, adds the scope to the edit. The edit is appended to the
current WAL and is then applied to its MemStore
.
In a separate thread, the edit is read from
the log (as part of a batch) and only the KeyValue
s that are replicable are kept
(i.e., they are part of a family that is scoped as GLOBAL
in the family’s schema and are
noncatalog so it is not .META.
or
-ROOT-
). When the buffer is filled,
or the reader hits the end of the file, the buffer is sent to a random
region server on the slave cluster.
Synchronously, the region server that
receives the edits reads them sequentially and separates each of them
into buffers, one per table. Once all edits are read, each buffer is
flushed using the normal HBase client (HTable
s managed by an HTablePool
). This is done in order to
leverage parallel insertion (MultiPut
).
Back in the master cluster’s region server, the offset for the current WAL that is being replicated is registered in ZooKeeper.
The edit is inserted in the same way. In a separate thread, the region server reads, filters, and buffers the log edits the same way as is done during normal processing. The slave region server that is contacted does not answer to the RPC, so the master region server will sleep and retry up to a configured number of times. If the slave region server still is not available, the master cluster region server will select a new subset of the region server to replicate to and will try to send the buffer of edits again.
In the meantime, the WALs will be rolled and stored in a queue in ZooKeeper. Logs that are archived by their region server (archiving is basically moving a log from the region server’s logs directory to a central logs archive directory) will update their paths in the in-memory queue of the replicating thread.
When the slave cluster is finally available, the buffer will be applied the same way as during normal processing. The master cluster region server will then replicate the backlog of logs.
This section describes in depth how each of the replication’s internal features operates.
When a master cluster region server
initiates a replication source to a slave cluster, it first connects
to the slave’s ZooKeeper ensemble using the provided cluster key (that
key is composed of the value of hbase.zookeeper.quorum
, zookeeper.znode.parent
, and hbase.zookeeper.property.clientPort
). It
then scans the /hbase/rs
directory to discover all the available sinks (region servers that are
accepting incoming streams of edits to replicate) and will randomly
choose a subset of them using a configured ratio (which has a default
value of 10%). For example, if a slave cluster has 150 machines, 15
will be chosen as potential recipients for edits that this master
cluster region server will be sending. Since this is done by all
master cluster region servers, the probability that all slave region
servers are used is very high, and this method works for clusters of
any size. For example, a master cluster of 10 machines replicating to
a slave cluster of five machines with a ratio of 10% means that the
master cluster region servers will choose one machine each at random;
thus the chance of overlapping and full usage of the slave cluster is
higher.
Every master cluster region server has its
own znode in the replication znodes hierarchy. The parent znode
contains one znode per peer cluster (if there are five slave clusters,
five znodes are created), and each of these contains a queue of
HLog
s to process. Each of these
queues will track the HLog
s created
by that region server, but they can differ in size. For example, if
one slave cluster becomes unavailable for some time, the HLog
s should not be deleted, and thus they
need to stay in the queue (while the others are processed). See Region server failover for an example.
When a source is instantiated, it contains
the current HLog
that the region
server is writing to. During log rolling, the new file is added to the
queue of each slave cluster’s znode just before it is made available.
This ensures that all the sources are aware that a new log exists
before HLog
is able to append edits
into it, but this operation is now more expensive. The queue items are
discarded when the replication thread cannot read more entries from a
file (because it reached the end of the last block) and that there are
other files in the queue. This means that if a source is up-to-date
and replicates from the log that the region server writes to, reading
up to the “end” of the current file will not delete the item in the
queue.
When a log is archived (because it is not
used anymore or because there are too many of them per hbase.regionserver.maxlogs
, typically
because the insertion rate is faster than the region flushing rate),
it will notify the source threads that the path for that log changed.
If a particular source was already done with it, it will just ignore
the message. If it is in the queue, the path will be updated in
memory. If the log is currently being replicated, the change will be
done atomically so that the reader does not try to open the file when
it is already moved. Also, moving a file is a NameNode operation; so,
if the reader is currently reading the log, it will not generate any
exceptions.
By default, a source will try to read from
a logfile and ship log entries as quickly as possible to a sink. This
is first limited by the filtering of log entries; only KeyValue
s that are scoped GLOBAL
and that do not belong to catalog
tables will be retained. A second limit is imposed on the total size
of the list of edits to replicate per slave, which by default is 64
MB. This means that a master cluster region server with three slaves
will use, at most, 192 MB to store data to replicate. This does not
take into account the data that was filtered but was not
garbage-collected.
Once the maximum number of edits has been buffered or the reader has hit the end of the logfile, the source thread will stop reading and will randomly choose a sink to replicate to (from the list that was generated by keeping only a subset of slave region servers). It will directly issue an RPC to the chosen machine and will wait for the method to return. If it is successful, the source will determine if the current file is emptied or if it should continue to read from it. If the former, it will delete the znode in the queue. If the latter, it will register the new offset in the log’s znode. If the RPC threw an exception, the source will retry 10 times until trying to find a different sink.
If replication is not enabled, the master’s log cleaning thread will delete old logs using a configured TTL. This does not work well with replication since archived logs that are past their TTL may still be in a queue. Thus, the default behavior is augmented so that if a log is past its TTL, the cleaning thread will look up every queue until it finds the log (while caching the ones it finds). If it is not found, the log will be deleted. The next time it has to look for a log, it will first use its cache.
As long as region servers do not fail, keeping track of the logs in ZooKeeper does not add any value. Unfortunately, they do fail, so since ZooKeeper is highly available, we can count on it and its semantics to help us manage the transfer of the queues.
All the master cluster region servers keep
a watcher on one another to be notified when one dies (just like the
master does). When this happens, they all race to create a znode
called lock
inside the dead region
server’s znode that contains its queues. The one that creates it
successfully will proceed by transferring all the queues to its own
znode (one by one, since ZooKeeper does not support the rename
operation) and will delete all the old ones when it is done. The
recovered queues’ znodes will be named with the ID of the slave
cluster appended with the name of the dead server.
Once that is done, the master cluster region server will create one new source thread per copied queue, and each of them will follow the read/filter/ship pattern. The main difference is that those queues will never have new data since they do not belong to their new region server, which means that when the reader hits the end of the last log, the queue’s znode will be deleted and the master cluster region server will close that replication source.
For example, consider a master cluster with
three region servers that is replicating to a single slave with an ID
of 2
. The following hierarchy
represents what the znodes’ layout could be at some point in time. We
can see that the region servers’ znodes all contain a peers znode that contains a single queue.
The znode names in the queues represent the actual filenames on HDFS
in the form address,port.timestamp
.
/hbase/replication/rs/ 1.1.1.1,60020,123456780/ peers/ 2/ 1.1.1.1,60020.1234 (Contains a position) 1.1.1.1,60020.1265 1.1.1.2,60020,123456790/ peers/ 2/ 1.1.1.2,60020.1214 (Contains a position) 1.1.1.2,60020.1248 1.1.1.2,60020.1312 1.1.1.3,60020, 123456630/ peers/ 2/ 1.1.1.3,60020.1280 (Contains a position)
Now let’s say that 1.1.1.2
loses its ZooKeeper session. The
survivors will race to create a lock, and for some reason 1.1.1.3
wins. It will then start
transferring all the queues to its local peers znode by appending the
name of the dead server. Right before 1.1.1.3
is able to clean up the old znodes,
the layout will look like the following:
/hbase/replication/rs/ 1.1.1.1,60020,123456780/ peers/ 2/ 1.1.1.1,60020.1234 (Contains a position) 1.1.1.1,60020.1265 1.1.1.2,60020,123456790/ lock peers/ 2/ 1.1.1.2,60020.1214 (Contains a position) 1.1.1.2,60020.1248 1.1.1.2,60020.1312 1.1.1.3,60020,123456630/ peers/ 2/ 1.1.1.3,60020.1280 (Contains a position) 2-1.1.1.2,60020,123456790/ 1.1.1.2,60020.1214 (Contains a position) 1.1.1.2,60020.1248 1.1.1.2,60020.1312
Sometime later, but before 1.1.1.3
is able to finish replicating the
last HLog
from 1.1.1.2
, let’s say that it dies too (also,
some new logs were created in the normal queues). The last region
server will then try to lock 1.1.1.3
’s znode and will begin transferring
all the queues. The new layout will be:
/hbase/replication/rs/ 1.1.1.1,60020,123456780/ peers/ 2/ 1.1.1.1,60020.1378 (Contains a position) 2-1.1.1.3,60020,123456630/ 1.1.1.3,60020.1325 (Contains a position) 1.1.1.3,60020.1401 2-1.1.1.2,60020,123456790-1.1.1.3,60020,123456630/ 1.1.1.2,60020.1312 (Contains a position) 1.1.1.3,60020,123456630/ lock peers/ 2/ 1.1.1.3,60020.1325 (Contains a position) 1.1.1.3,60020.1401 2-1.1.1.2,60020,123456790/ 1.1.1.2,60020.1312 (Contains a position)
Replication is still considered to be an experimental feature. Carefully evaluate whether it works for your use case before you consider using it.
[86] See “B+ trees” on Wikipedia.
[87] See “LSM-Tree” (http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.44.2782), O’Neil et al., 1996.
[88] From “Open Source Search” by Doug Cutting, December 5, 2005.
[89] In extreme cases, you may turn off this step by setting a flag
using the Put.setWriteToWAL(boolean)
method. This is
not recommended as this will disable durability.
[90] See the JIRA issue HADOOP-3315 for details.
[91] For information on the term itself, read “Write-ahead logging” on Wikipedia.
[92] Subsequently, they are referred to interchangeably as
root table and meta table,
respectively, since, for example, "-ROOT-"
is how the table is actually named
in HBase and calling it a root table is stating its purpose.
18.225.55.193