Chapter 8. Architecture

It is quite useful for advanced users (or those who are just plain adventurous) to fully comprehend how a system of their choice works behind the scenes. This chapter explains the various moving parts of HBase and how they work together.

Seek Versus Transfer

Before we look into the architecture itself, however, we will first address a more fundamental difference between typical RDBMS storage structures and alternative ones. Specifically, we will look briefly at B-trees, or rather B+ trees,[86] as they are commonly used in relational storage engines, and Log-Structured Merge Trees,[87] which (to some extent) form the basis for Bigtable’s storage architecture, as discussed in Building Blocks.

Note

Note that RDBMSes do not use B-tree-type structures exclusively, nor do all NoSQL solutions use different architectures. You will find a colorful variety of mix-and-match technologies, but with one common objective: use the best strategy for the problem at hand.

B+ Trees

B+ trees have some specific features that allow for efficient insertion, lookup, and deletion of records that are identified by keys. They represent dynamic, multilevel indexes with lower and upper bounds as far as the number of keys in each segment (also called page) is concerned. Using these segments, they achieve a much higher fanout compared to binary trees, resulting in a much lower number of I/O operations to find a specific key.

In addition, they also enable you to do range scans very efficiently, since the leaf nodes in the tree are linked and represent an in-order list of all keys, avoiding more costly tree traversals. That is one of the reasons why they are used for indexes in relational database systems.

In a B+ tree index, you get locality on a page level (where “page” is synonymous with “block” in other systems). For example, the leaf pages look something like this:

[link to previous page]
[link to next page]
key1 → rowid
key2 → rowid
key3 → rowid

In order to insert a new index entry, say key1.5, it will update the leaf page with a new key1.5 → rowid entry. That is not a problem until the page, which has a fixed size, exceeds its capacity. Then it has to split the page into two new ones, and update the parent in the tree to point to the two new half-full pages. See Figure 8-1 for an example of a page that is full and would need to be split when adding another key.

An example B+ tree with one full page
Figure 8-1. An example B+ tree with one full page

The issue here is that the new pages aren’t necessarily next to each other on disk. So now if you ask to query a range from key 1 to key 3, it’s going to have to read two leaf pages that could be far apart from each other. That is also the reason why you will find an OPTIMIZE TABLE command in most layouts based on B+ trees—it basically rewrites the table in-order so that range queries become ranges on disk again.

Log-Structured Merge-Trees

Log-structured merge-trees, also known as LSM-trees, follow a different approach. Incoming data is stored in a logfile first, completely sequentially. Once the log has the modification saved, it then updates an in-memory store that holds the most recent updates for fast lookup.

When the system has accrued enough updates and starts to fill up the in-memory store, it flushes the sorted list of key → record pairs to disk, creating a new store file. At this point, the updates to the log can be thrown away, as all modifications have been persisted.

The store files are arranged similar to B-trees, but are optimized for sequential disk access where all nodes are completely filled and stored as either single-page or multipage blocks. Updating the store files is done in a rolling merge fashion, that is, the system packs existing on-disk multipage blocks together with the flushed in-memory data until the block reaches its full capacity, at which point a new one is started.

Figure 8-2 shows how a multipage block is merged from the in-memory tree into the next on-disk tree. Merging writes out a new block with the combined result. Eventually, the trees are merged into the larger blocks.

Multipage blocks iteratively merged across LSM-trees
Figure 8-2. Multipage blocks iteratively merged across LSM-trees

As more flushes are taking place over time, creating many store files, a background process aggregates the files into larger ones so that disk seeks are limited to only a few store files. The on-disk tree can also be split into separate trees to spread updates across multiple store files. All of the stores are always sorted by key, so no reordering is required to fit new keys in between existing ones.

Lookups are done in a merging fashion in which the in-memory store is searched first, and then the on-disk store files are searched next. That way, all the stored data, no matter where it currently resides, forms a consistent view from a client’s perspective.

Deletes are a special case of update wherein a delete marker is stored and is used during the lookup to skip “deleted” keys. When the pages are rewritten asynchronously, the delete markers and the key they mask are eventually dropped.

An additional feature of the background processing for housekeeping is the ability to support predicate deletions. These are triggered by setting a time-to-live (TTL) value that retires entries, for example, after 20 days. The merge processes will check the predicate and, if true, drop the record from the rewritten blocks.

The fundamental difference between B-trees and LSM-trees, though, is how their architecture is making use of modern hardware, especially disk drives.

To compare B+ trees and LSM-trees you need to understand their relative strengths and weaknesses. B+ trees work well until there are too many modifications, because they force you to perform costly optimizations to retain that advantage for a limited amount of time. The more and faster you add data at random locations, the faster the pages become fragmented again. Eventually, you may take in data at a higher rate than the optimization process takes to rewrite the existing files. The updates and deletes are done at disk seek rates, rather than disk transfer rates.

LSM-trees work at disk transfer rates and scale much better to handle large amounts of data. They also guarantee a very consistent insert rate, as they transform random writes into sequential writes using the logfile plus in-memory store. The reads are independent from the writes, so you also get no contention between these two operations.

The stored data is always in an optimized layout. So, you have a predictable and consistent boundary on the number of disk seeks to access a key, and reading any number of records following that key doesn’t incur any extra seeks. In general, what could be emphasized about an LSM-tree-based system is cost transparency: you know that if you have five storage files, access will take a maximum of five disk seeks, whereas you have no way to determine the number of disk seeks an RDBMS query will take, even if it is indexed.

Finally, HBase is an LSM-tree-based system, just like Bigtable. The next sections will explain the storage architecture, while referring back to earlier sections of the book where appropriate.

Storage

One of the least-known aspects of HBase is how data is actually stored. While the majority of users may never have to bother with this, you may have to get up to speed when you want to learn the meaning of the various advanced configuration options you have at your disposal. Chapter 11 lists the more common ones and Appendix A has the full reference list.

You may also want to know more about file storage if, for whatever reason, disaster strikes and you have to recover an HBase installation. At that point, it is important to know where all the data is stored and how to access it on the HDFS level. Of course, this shall not happen, but who can guarantee that?

Overview

The first step in understanding the various moving parts in the storage layer of HBase is to understand the high-level picture. Figure 8-3 shows an overview of how HBase and Hadoop’s filesystem are combined to store data.

Overview of how HBase handles files in the filesystem, which stores them transparently in HDFS
Figure 8-3. Overview of how HBase handles files in the filesystem, which stores them transparently in HDFS

The figure shows that HBase handles basically two kinds of file types: one is used for the write-ahead log and the other for the actual data storage. The files are primarily handled by the HRegionServers. In certain cases, the HMaster will also have to perform low-level file operations. You may also notice that the actual files are divided into blocks when stored within HDFS. This is also one of the areas where you can configure the system to handle larger or smaller data records better. More on that in HFile Format.

The general communication flow is that a new client contacts the ZooKeeper ensemble (a separate cluster of ZooKeeper nodes) first when trying to access a particular row. It does so by retrieving the server name (i.e., hostname) that hosts the -ROOT- region from ZooKeeper. With this information it can query that region server to get the server name that hosts the .META. table region containing the row key in question. Both of these details are cached and only looked up once. Lastly, it can query the reported .META. server and retrieve the server name that has the region containing the row key the client is looking for.

Once it has been told in what region the row resides, it caches this information as well and contacts the HRegionServer hosting that region directly. So, over time, the client has a pretty complete picture of where to get rows without needing to query the .META. server again. See Region Lookups for more details.

Note

The HMaster is responsible for assigning the regions to each HRegionServer when you start HBase. This also includes the special -ROOT- and .META. tables. See The Region Life Cycle for details.

The HRegionServer opens the region and creates a corresponding HRegion object. When the HRegion is opened it sets up a Store instance for each HColumnFamily for every table as defined by the user beforehand. Each Store instance can, in turn, have one or more StoreFile instances, which are lightweight wrappers around the actual storage file called HFile. A Store also has a MemStore, and the HRegionServer a shared HLog instance (see Write-Ahead Log).

Write Path

The client issues an HTable.put(Put) request to the HRegionServer, which hands the details to the matching HRegion instance. The first step is to write the data to the write-ahead log (the WAL), represented by the HLog class.[89] The WAL is a standard Hadoop SequenceFile and it stores HLogKey instances. These keys contain a sequential number as well as the actual data and are used to replay not-yet-persisted data after a server crash.

Once the data is written to the WAL, it is placed in the MemStore. At the same time, it is checked to see if the MemStore is full and, if so, a flush to disk is requested. The request is served by a separate thread in the HRegionServer, which writes the data to a new HFile located in HDFS. It also saves the last written sequence number so that the system knows what was persisted so far.

Files

HBase has a configurable root directory in HDFS, with the default set to "/hbase". Coexisting Clusters shows how to use a different root directory when sharing a central HDFS cluster. You can use the hadoop dfs -lsr command to look at the various files HBase stores. Before doing this, let us first create and fill a table with a handful of regions:

hbase(main):001:0> create 'testtable', 'colfam1', 
  { SPLITS => ['row-300', 'row-500', 'row-700' , 'row-900'] }
0 row(s) in 0.1910 seconds

hbase(main):002:0> for i in '0'..'9' do for j in '0'..'9' do 
  for k in '0'..'9' do put 'testtable', "row-#{i}#{j}#{k}", 
  "colfam1:#{j}#{k}", "#{j}#{k}" end end end
0 row(s) in 1.0710 seconds
0 row(s) in 0.0280 seconds
0 row(s) in 0.0260 seconds
...

hbase(main):003:0> flush 'testtable'
0 row(s) in 0.3310 seconds

hbase(main):004:0> for i in '0'..'9' do for j in '0'..'9' do 
  for k in '0'..'9' do put 'testtable', "row-#{i}#{j}#{k}", 
  "colfam1:#{j}#{k}", "#{j}#{k}" end end end
0 row(s) in 1.0710 seconds
0 row(s) in 0.0280 seconds
0 row(s) in 0.0260 seconds
...

The flush command writes the in-memory data to the store files; otherwise, we would have had to wait until more than the configured flush size of data was inserted into the stores. The last round of looping over the put command is to fill the write-ahead log again.

Here is the content of the HBase root directory afterward:

$ $HADOOP_HOME/bin/hadoop dfs -lsr /hbase
       ...
       0 /hbase/.logs
       0 /hbase/.logs/foo.internal,60020,1309812147645
       0 /hbase/.logs/foo.internal,60020,1309812147645/ 
foo.internal%2C60020%2C1309812147645.1309812151180
       0 /hbase/.oldlogs
      38 /hbase/hbase.id
       3 /hbase/hbase.version
       0 /hbase/testtable
     487 /hbase/testtable/.tableinfo
       0 /hbase/testtable/.tmp
       0 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855
       0 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855/.oldlogs
     124 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855/.oldlogs/ 
hlog.1309812163957
     282 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855/.regioninfo
       0 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855/.tmp
       0 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855/colfam1
   11773 /hbase/testtable/1d562c9c4d3b8810b3dbeb21f5746855/colfam1/ 
646297264540129145
       0 /hbase/testtable/66b4d2adcc25f1643da5e6260c7f7b26
     311 /hbase/testtable/66b4d2adcc25f1643da5e6260c7f7b26/.regioninfo
       0 /hbase/testtable/66b4d2adcc25f1643da5e6260c7f7b26/.tmp
       0 /hbase/testtable/66b4d2adcc25f1643da5e6260c7f7b26/colfam1
    7973 /hbase/testtable/66b4d2adcc25f1643da5e6260c7f7b26/colfam1/ 
3673316899703710654
       0 /hbase/testtable/99c0716d66e536d927b479af4502bc91
     297 /hbase/testtable/99c0716d66e536d927b479af4502bc91/.regioninfo
       0 /hbase/testtable/99c0716d66e536d927b479af4502bc91/.tmp
       0 /hbase/testtable/99c0716d66e536d927b479af4502bc91/colfam1
    4173 /hbase/testtable/99c0716d66e536d927b479af4502bc91/colfam1/ 
1337830525545548148
       0 /hbase/testtable/d240e0e57dcf4a7e11f4c0b106a33827
     311 /hbase/testtable/d240e0e57dcf4a7e11f4c0b106a33827/.regioninfo
       0 /hbase/testtable/d240e0e57dcf4a7e11f4c0b106a33827/.tmp
       0 /hbase/testtable/d240e0e57dcf4a7e11f4c0b106a33827/colfam1
    7973 /hbase/testtable/d240e0e57dcf4a7e11f4c0b106a33827/colfam1/ 
316417188262456922
       0 /hbase/testtable/d9ffc3a5cd016ae58e23d7a6cb937949
     311 /hbase/testtable/d9ffc3a5cd016ae58e23d7a6cb937949/.regioninfo
       0 /hbase/testtable/d9ffc3a5cd016ae58e23d7a6cb937949/.tmp
       0 /hbase/testtable/d9ffc3a5cd016ae58e23d7a6cb937949/colfam1
    7973 /hbase/testtable/d9ffc3a5cd016ae58e23d7a6cb937949/colfam1/ 
4238940159225512178

Note

The output was reduced to include just the file size and name to fit the available space. When you run the command on your cluster you will see more details.

The files can be divided into those that reside directly under the HBase root directory, and those that are in the per-table directories.

Root-level files

The first set of files are the write-ahead log files handled by the HLog instances, created in a directory called .logs underneath the HBase root directory. The .logs directory contains a subdirectory for each HRegionServer. In each subdirectory, there are several HLog files (because of log rotation). All regions from that region server share the same HLog files.

An interesting observation is that the logfile is reported to have a size of 0. This is fairly typical when the file was created recently, as HDFS is using built-in append support to write to this file, and only complete blocks are made available to readers—including the hadoop dfs -lsr command. Although the data of the put operations is safely persisted, the size of the logfile that is currently being written to is slightly off.

After, for example, waiting for an hour so that the logfile is rolled (see LogRoller Class for all reasons when logfiles are rolled), you will see the existing logfile reported with its proper size, since it is closed now and HDFS can state the “correct” size. The new logfile next to it again starts at zero size:

  249962 /hbase/.logs/foo.internal,60020,1309812147645/ 
foo.internal%2C60020%2C1309812147645.1309812151180
       0 /hbase/.logs/foo.internal,60020,1309812147645/ 
foo.internal%2C60020%2C1309812147645.1309815751223

When a logfile is are no longer needed because all of the contained edits have been persisted into store files, it is decommissioned into the .oldlogs directory under the root HBase directory. This is triggered when the logfile is rolled based on the configured thresholds.

The old logfiles are deleted by the master after 10 minutes (by default), set with the hbase.master.logcleaner.ttl property. The master checks every minute (by default again) for those files. This is configured with the hbase.master.cleaner.interval property.

Note

The behavior for expired logfiles is pluggable. This is used, for instance, by the replication feature (see Replication) to have access to persisted modifications.

The hbase.id and hbase.version files contain the unique ID of the cluster, and the file format version:

$ hadoop dfs -cat /hbase/hbase.id
$e627e130-0ae2-448d-8bb5-117a8af06e97
$ hadoop dfs -cat /hbase/hbase.version
7

They are used internally and are otherwise not very interesting. In addition, there are a few more root-level directories that appear over time. The splitlog and .corrupt folders are used by the log split process to store the intermediate split files and the corrupted logs, respectively. For example:

      0 /hbase/.corrupt
      0 /hbase/splitlog/foo.internal,60020,1309851880898_hdfs%3A%2F%2F 
localhost%2Fhbase%2F.logs%2Ffoo.internal%2C60020%2C1309850971208%2F 
foo.internal%252C60020%252C1309850971208.1309851641956/testtable/ 
d9ffc3a5cd016ae58e23d7a6cb937949/recovered.edits/0000000000000002352

There are no corrupt logfiles in this example, but there is one staged split file. The log splitting process is explained in Replay.

Table-level files

Every table in HBase has its own directory, located under the HBase root directory in the filesystem. Each table directory contains a top-level file named .tableinfo, which stores the serialized HTableDescriptor (see Tables for details) for the table. This includes the table and column family schemas, and can be read, for example, by tools to gain insight on what the table looks like. The .tmp directory contains temporary data, and is used, for example, when the .tableinfo file is updated.

Region-level files

Inside each table directory, there is a separate directory for every region comprising the table. The names of these directories are the MD5 hash portion of a region name. For example, the following is taken from the master’s web UI, after clicking on the testtable link in the User Tables section:

testtable,row-500,1309812163930.d9ffc3a5cd016ae58e23d7a6cb937949.

The MD5 hash is d9ffc3a5cd016ae58e23d7a6cb937949 and is generated by encoding everything before the hash in the region name (minus the dividing dot), that is, testtable,row-500,1309812163930. The final dot after the hash is part of the complete region name: it indicates that this is a new style name. In previous versions of HBase, the region names did not include the hash.

Note

The -ROOT- and .META. catalog tables are still using the old style format, that is, their region names include no hash, and therefore end without the trailing dot:

.META.,,1.1028785192

The encoding of the region names for the on-disk directories is also different: they use a Jenkins hash to encode the region name.

The hash guarantees that the directory names are always valid, in terms of filesystem rules: they do not contain any special character, such as the slash (“/”), which is used to divide the path. The overall layout for region files is then:

/<hbase-root-dir>/<tablename>/<encoded-regionname>/<column-family>/<filename>

In each column-family directory, you can see the actual data files, explained in HFile Format. Their name is just an arbitrary number, based on the Java built-in random generator. The code is smart enough to check for collisions, that is, where a file with a newly generated number already exists. It loops until it finds an unused one and uses that instead.

The region directory also has a .regioninfo file, which contains the serialized information of the HRegionInfo instance for the given region. Similar to the .tableinfo file, it can be used by external tools to gain insight into the metadata of a region. The hbase hbck tool uses this to generate missing meta table entries, for example.

The optional .tmp directory is created on demand, and is used to hold temporary files—for example, the rewritten files from a compaction. These are usually moved out into the region directory once the process has completed. In rare circumstances, you might find leftover files, which are cleaned out when the region is reopened.

During the replay of the write-ahead log, any edit that has not been committed is written into a separate file per region. These are staged first (see the splitlog directory in Root-level files) and then—assuming the log splitting process has completed successfully—moved into the optional recovered.edits directory atomically. When the region is opened the region server will see the recovery file and replay the entries accordingly.

Note

There is a clear distinction between the splitting of write-ahead logs (Replay) and the splitting of regions (Region splits). Sometimes it is difficult to distinguish the file and directory names in the filesystem, because both might refer to the term splits. Make sure you carefully identify their purpose to avoid confusion—or mistakes.

Once the region needs to split because it has exceeded the maximum configured region size, a matching splits directory is created, which is used to stage the two new daughter regions. If this process is successful—usually this happens in a few seconds or less—they are moved up into the table directory to form the two new regions, each representing one-half of the original region.

In other words, when you see a region directory that has no .tmp directory, no compaction has been performed for it yet. When it has no recovered.edits file, no write-ahead log replay has occurred for it yet.

Note

In HBase versions before 0.90.x there were additional files, which are now obsolete. One is oldlogfile.log, which contained the replayed write-ahead log edits for the given region. The oldlogfile.log.old file (note the extra .old extension) indicated that there was already an existing oldlogfile.log file when the new one was put into place.

Another noteworthy file is the compaction.dir file in older versions of HBase, which is now replaced by the .tmp directory.

This concludes the list of what is commonly contained in the various directories inside the HBase root folder. There are more intermediate files, created by the region split process. They are discussed separately in the next section.

Region splits

When a store file within a region grows larger than the configured hbase.hregion.max.filesize—or what is configured at the column family level using HColumnDescriptor—the region is split in two. This is done initially very quickly because the system simply creates two reference files for the new regions (also called daughters), which each hosting half of the original region (referred to as the parent).

The region server accomplishes this by creating the splits directory in the parent region. Next, it closes the region so that it does not take on anymore requests.

The region server then prepares the new daughter regions (using multiple threads) by setting up the necessary file structures inside the splits directory. This includes the new region directories and the reference files. If this process completes successfully, it moves the two new region directories into the table directory. The .META. table is updated for the parent to state that it is now split, and what the two daughter regions are. This prevents it from being reopened by accident. Here is an example of how this looks in the .META. table:

row: testtable,row-500,1309812163930.d9ffc3a5cd016ae58e23d7a6cb937949. 

  column=info:regioninfo, timestamp=1309872211559, value=REGION => {NAME => 
    'testtable,row-500,1309812163930.d9ffc3a5cd016ae58e23d7a6cb937949. 
     TableName => 'testtable', STARTKEY => 'row-500', ENDKEY => 'row-700', 
     ENCODED => d9ffc3a5cd016ae58e23d7a6cb937949, OFFLINE => true, 
     SPLIT => true,}
                                                                                   
  column=info:splitA, timestamp=1309872211559, value=REGION => {NAME => 
    'testtable,row-500,1309872211320.d5a127167c6e2dc5106f066cc84506f8. 
    TableName => 'testtable', STARTKEY => 'row-500', ENDKEY => 'row-550', 
    ENCODED => d5a127167c6e2dc5106f066cc84506f8,}                                                                                                                       
  column=info:splitB, timestamp=1309872211559, value=REGION => {NAME => 
    'testtable,row-550,1309872211320.de27e14ffc1f3fff65ce424fcf14ae42. 
    TableName => [B@62892cc5', STARTKEY => 'row-550', ENDKEY => 'row-700',  
    ENCODED => de27e14ffc1f3fff65ce424fcf14ae42,}

You can see how the original region was split into two regions, separated at row-550. The SPLIT => true in the info:regioninfo column value also indicates that this region is now split into the regions referred to in info:splitA and info:splitB.

The name of the reference file is another random number, but with the hash of the referenced region as a postfix, for instance:

/hbase/testtable/d5a127167c6e2dc5106f066cc84506f8/colfam1/ 
6630747383202842155.d9ffc3a5cd016ae58e23d7a6cb937949

This reference file represents one-half of the original region with the hash d9ffc3a5cd016ae58e23d7a6cb937949, which is the region shown in the preceding example. The reference files only hold a little information: the key the original region was split at, and whether it is the top or bottom reference. Of note is that these references are then used by the HalfHFileReader class (which was omitted from the earlier overview as it is only used temporarily) to read the original region data files, and either the top or the bottom half of the files.

Both daughter regions are now ready and will be opened in parallel by the same server. This includes updating the .META. table to list both regions as available regions—just like any other. After that, the regions are online and start serving requests.

The opening of the daughters also schedules a compaction for both—which rewrites the store files in the background from the parent region into the two halves, while replacing the reference files. This takes place in the .tmp directory of the daughter regions. Once the files have been generated, they atomically replace the reference.

The parent is eventually cleaned up when there are no more references to it, which means it is removed as the parent from the .META. table, and all of its files on disk are deleted. Finally, the master is informed about the split and can schedule for the new regions to be moved off to other servers for load balancing reasons.

Note

All of the steps involved in the split are tracked in ZooKeeper. This allows for other processes to reason about the state of a region in case of a server failure.

Compactions

The store files are monitored by a background thread to keep them under control. The flushes of memstores slowly build up an increasing number of on-disk files. If there are enough of them, the compaction process will combine them to a few, larger files. This goes on until the largest of these files exceeds the configured maximum store file size and triggers a region split (see Region splits).

Compactions come in two varieties: minor and major. Minor compactions are responsible for rewriting the last few files into one larger one. The number of files is set with the hbase.hstore.compaction.min property (which was previously called hbase.hstore.compactionThreshold, and although deprecated is still supported). It is set to 3 by default, and needs to be at least 2 or more. A number too large would delay minor compactions, but also would require more resources and take longer once the compactions start.

The maximum number of files to include in a minor compaction is set to 10, and is configured with hbase.hstore.compaction.max. The list is further narrowed down by the hbase.hstore.compaction.min.size (set to the configured memstore flush size for the region), and the hbase.hstore.compaction.max.size (defaults to Long.MAX_VALUE) configuration properties. Any file larger than the maximum compaction size is always excluded. The minimum compaction size works slightly differently: it is a threshold rather than a per-file limit. It includes all files that are under that limit, up to the total number of files per compaction allowed.

Figure 8-4 shows an example set of store files. All files that fit under the minimum compaction threshold are included in the compaction process.

A set of store files showing the minimum compaction threshold
Figure 8-4. A set of store files showing the minimum compaction threshold

The algorithm uses hbase.hstore.compaction.ratio (defaults to 1.2, or 120%) to ensure that it does include enough files in the selection process. The ratio will also select files that are up to that size compared to the sum of the store file sizes of all newer files. The evaluation always checks the files from the oldest to the newest. This ensures that older files are compacted first. The combination of these properties allows you to fine-tune how many files are included in a minor compaction.

In contrast to minor compactions, major compactions compact all files into a single file. Which compaction type is run is automatically determined when the compaction check is executed. The check is triggered either after a memstore has been flushed to disk, after the compact or major_compact shell commands or corresponding API calls have been invoked, or by a background thread. This background thread is called the CompactionChecker and each region server runs a single instance. It runs a check on a regular basis, controlled by hbase.server.thread.wakefrequency (and multiplied by hbase.server.thread.wakefrequency.multiplier, set to 1000, to run it less often than the other thread-based tasks).

If you call the major_compact shell command, or the majorCompact() API call, you force the major compaction to run. Otherwise, the server checks first if the major compaction is due, based on hbase.hregion.majorcompaction (set to 24 hours) from the first time it ran. The hbase.hregion.majorcompaction.jitter (set to 0.2, in other words, 20%) causes this time to be spread out for the stores. Without the jitter, all stores would run a major compaction at the same time, every 24 hours. See Managed Splitting for information on why this is a bad idea and how to manage this better.

If no major compaction is due, a minor compaction is assumed. Based on the aforementioned configuration properties, the server determines if enough files for a minor compaction are available and continues if that is the case.

Minor compactions might be promoted to major compactions when the former would include all store files, and there are less than the configured maximum files per compaction.

HFile Format

The actual storage files are implemented by the HFile class, which was specifically created to serve one purpose: store HBase’s data efficiently. They are based on Hadoop’s TFile class,[90] and mimic the SSTable format used in Google’s Bigtable architecture. The previous use of Hadoop’s MapFile class in HBase proved to be insufficient in terms of performance. Figure 8-5 shows the file format details.

The HFile structure
Figure 8-5. The HFile structure

The files contain a variable number of blocks, where the only fixed ones are the file info and trailer blocks. As Figure 8-5 shows, the trailer has the pointers to the other blocks. It is written after the data has been persisted to the file, finalizing the now immutable data store. The index blocks record the offsets of the data and meta blocks. Both the data and the meta blocks are actually optional. But considering how HBase uses the data files, you will almost always find at least data blocks in the store files.

The block size is configured by the HColumnDescriptor, which, in turn, is specified at table creation time by the user, or defaults to reasonable standard values. Here is an example as shown in the master web-based interface:

{NAME => 'testtable', FAMILIES => [{NAME => 'colfam1', 
BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSIONS => '3', 
COMPRESSION => 'NONE', TTL => '2147483647', BLOCKSIZE => '65536', 
IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}

The default is 64 KB (or 65,535 bytes). Here is what the HFile JavaDoc explains:

Minimum block size. We recommend a setting of minimum block size between 8KB to 1MB for general usage. Larger block size is preferred if files are primarily for sequential access. However, it would lead to inefficient random access (because there are more data to decompress). Smaller blocks are good for random access, but require more memory to hold the block index, and may be slower to create (because we must flush the compressor stream at the conclusion of each data block, which leads to an FS I/O flush). Further, due to the internal caching in Compression codec, the smallest possible block size would be around 20KB-30KB.

Each block contains a magic header, and a number of serialized KeyValue instances (see KeyValue Format for their format). If you are not using a compression algorithm, each block is about as large as the configured block size. This is not an exact science, as the writer has to fit whatever you give it: if you store a KeyValue that is larger than the block size, the writer has to accept that. But even with smaller values, the check for the block size is done after the last value was written, so in practice, the majority of blocks will be slightly larger.

When you are using a compression algorithm you will not have much control over block size. Compression codecs work best if they can decide how much data is enough to achieve an efficient compression ratio. For example, setting the block size to 256 KB and using LZO compression ensures that blocks will always be written to be less than or equal to 256 KB to suit the LZO internal buffer size.

Note

Many compression libraries come with a set of configuration properties you can use to specify the buffer size, and other options. Refer to the source code of the JNI library to find out what is available to you.

The writer does not know if you have a compression algorithm selected or not: it follows the block size limit to write out raw data close to the configured amount. If you have compression enabled, less data will be saved less. This means the final store file will contain the same number of blocks, but the total size will be smaller since each block is smaller.

One thing you may notice is that the default block size for files in HDFS is 64 MB, which is 1,024 times the HFile default block size. As such, the HBase storage file blocks do not match the Hadoop blocks. In fact, there is no correlation between these two block types. HBase stores its files transparently into a filesystem. The fact that HDFS uses blocks is a coincidence. And HDFS also does not know what HBase stores; it only sees binary files. Figure 8-6 demonstrates how the HFile content is simply spread across HDFS blocks.

HFile content spread across HDFS blocks when many smaller HFile blocks are transparently stored in two HDFS blocks that are much larger
Figure 8-6. HFile content spread across HDFS blocks when many smaller HFile blocks are transparently stored in two HDFS blocks that are much larger

Sometimes it is necessary to be able to access an HFile directly, bypassing HBase, for example, to check its health, or to dump its contents. The HFile.main() method provides the tools to do that:

$ ./bin/hbase org.apache.hadoop.hbase.io.hfile.HFile 
usage: HFile  [-a] [-b] [-e] [-f <arg>] [-k] [-m] [-p] [-r <arg>] [-v]
 -a,--checkfamily    Enable family check
 -b,--printblocks    Print block index meta data
 -e,--printkey       Print keys
 -f,--file <arg>     File to scan. Pass full-path; e.g.
                     hdfs://a:9000/hbase/.META./12/34
 -k,--checkrow       Enable row order check; looks for out-of-order keys
 -m,--printmeta      Print meta data of file
 -p,--printkv        Print key/value pairs
 -r,--region <arg>   Region to scan. Pass region name; e.g. '.META.,,1'
 -v,--verbose        Verbose output; emits file and meta data delimiters

Here is an example of what the output will look like (shortened):

$ ./bin/hbase org.apache.hadoop.hbase.io.hfile.HFile -f 
/hbase/testtable/de27e14ffc1f3fff65ce424fcf14ae42/colfam1/2518469459313898451 
-v -m -p
Scanning -> /hbase/testtable/de27e14ffc1f3fff65ce424fcf14ae42/colfam1/ 
2518469459313898451
K: row-550/colfam1:50/1309813948188/Put/vlen=2 V: 50
K: row-550/colfam1:50/1309812287166/Put/vlen=2 V: 50
K: row-551/colfam1:51/1309813948222/Put/vlen=2 V: 51
K: row-551/colfam1:51/1309812287200/Put/vlen=2 V: 51
K: row-552/colfam1:52/1309813948256/Put/vlen=2 V: 52
...
K: row-698/colfam1:98/1309813953680/Put/vlen=2 V: 98
K: row-698/colfam1:98/1309812292594/Put/vlen=2 V: 98
K: row-699/colfam1:99/1309813953720/Put/vlen=2 V: 99
K: row-699/colfam1:99/1309812292635/Put/vlen=2 V: 99
Scanned kv count -> 300
Block index size as per heapsize: 208
reader=/hbase/testtable/de27e14ffc1f3fff65ce424fcf14ae42/colfam1/ 
2518469459313898451, compression=none, inMemory=false, 
firstKey=row-550/colfam1:50/1309813948188/Put, 
lastKey=row-699/colfam1:99/1309812292635/Put, avgKeyLen=28, avgValueLen=2, 
entries=300, length=11773
fileinfoOffset=11408, dataIndexOffset=11664, dataIndexCount=1, 
metaIndexOffset=0, metaIndexCount=0, totalBytes=11408, entryCount=300, 
version=1
Fileinfo:
MAJOR_COMPACTION_KEY = xFF
MAX_SEQ_ID_KEY = 2020
TIMERANGE = 1309812287166....1309813953720
hfile.AVG_KEY_LEN = 28
hfile.AVG_VALUE_LEN = 2
hfile.COMPARATOR = org.apache.hadoop.hbase.KeyValue$KeyComparator
hfile.LASTKEY = x00x07row-699x07colfam199x00x00x010xF6xE5|x1Bx04
Could not get bloom data from meta block

The first part of the output is the actual data stored as serialized KeyValue instances. The second part dumps the internal HFile.Reader properties, as well as the trailer block details. The last part, starting with Fileinfo, is the file info block values.

The provided information is valuable to, for example, confirm whether a file is compressed or not, and with what compression type. It also shows you how many cells you have stored, as well as the average size of their keys and values. In the preceding example, the key is much larger than the value. This is caused by the overhead required by the KeyValue class to store the necessary data, explained next.

KeyValue Format

In essence, each KeyValue in the HFile is a low-level byte array that allows for zero-copy access to the data. Figure 8-7 shows the layout of the contained data.

The KeyValue format
Figure 8-7. The KeyValue format

The structure starts with two fixed-length numbers indicating the size and value of the key. With that information, you can offset into the array to, for example, get direct access to the value, ignoring the key. Otherwise, you can get the required information from the key. Once the information is parsed into a KeyValue Java instance, you can use getters to access the details, as explained in The KeyValue class.

The reason the average key in the preceding example is larger than the value has to do with the fields that make up the key part of a KeyValue. The key holds the row key, the column family name, the column qualifier, and so on. For a small payload, this results in quite a considerable overhead. If you deal with small values, try to keep the key small as well. Choose a short row and column key (the family name with a single byte, and the qualifier equally short) to keep the ratio in check.

On the other hand, compression should help mitigate the overwhelming key size problem, as it looks at finite windows of data, and all repeating data should compress well. The sorting of all KeyValues in the store file helps to keep similar keys (and possibly values too, in case you are using versioning) close together.

Write-Ahead Log

The region servers keep data in-memory until enough is collected to warrant a flush to disk, avoiding the creation of too many very small files. While the data resides in memory it is volatile, meaning it could be lost if the server loses power, for example. This is a likely occurrence when operating at large scale, as explained in Seek Versus Transfer.

A common approach to solving this issue is write-ahead logging:[91] Each update (also called an “edit”) is written to a log, and only if the update has succeeded is the client informed that the operation has succeeded. The server then has the liberty to batch or aggregate the data in memory as needed.

Overview

The WAL is the lifeline that is needed when disaster strikes. Similar to a binary log in MySQL, the WAL records all changes to the data. This is important in case something happens to the primary storage. If the server crashes, the WAL can effectively replay the log to get everything up to where the server should have been just before the crash. It also means that if writing the record to the WAL fails, the whole operation must be considered a failure.

Overview shows how the WAL fits into the overall architecture of HBase. Since it is shared by all regions hosted by the same region server, it acts as a central logging backbone for every modification. Figure 8-8 shows how the flow of edits is split between the memstores and the WAL.

All modifications saved to the WAL, and then passed on to the memstores
Figure 8-8. All modifications saved to the WAL, and then passed on to the memstores

The process is as follows: first the client initiates an action that modifies data. This can be, for example, a call to put(), delete(), and increment(). Each of these modifications is wrapped into a KeyValue object instance and sent over the wire using RPC calls. The calls are (ideally) batched to the HRegionServer that serves the matching regions.

Once the KeyValue instances arrive, they are routed to the HRegion instances that are responsible for the given rows. The data is written to the WAL, and then put into the MemStore of the actual Store that holds the record. This is, in essence, the write path of HBase.

Eventually, when the memstores get to a certain size, or after a specific time, the data is persisted in the background to the filesystem. During that time, data is stored in a volatile state in memory. The WAL guarantees that the data is never lost, even if the server fails. Keep in mind that the actual log resides on HDFS, which is a replicated filesystem. Any other server can open the log and start replaying the edits—nothing on the failed physical server is needed to effect a full recovery.

HLog Class

The class that implements the WAL is called HLog. When an HRegion is instantiated, the single HLog instance that runs inside each region server is passed on as a parameter to the constructor of HRegion. When a region receives an update operation, it can save the data directly to the shared WAL instance.

The core of the HLog functionality is the append() method. Note that for performance reasons there is an option for Put, Delete, and Increment to be called with an extra parameter set: setWriteToWAL(false). If you invoke this method while setting up, for example, a Put instance, the writing to the WAL is bypassed! That is also why the downward arrow in Figure 8-8 was created with a dashed line to indicate the optional step. By default, you certainly want the WAL, no doubt about that. But say you run a large bulk import MapReduce job that you can rerun at any time. You gain extra performance when you disable the WAL, but at the cost of having to take extra care that no data was lost during the import.

Warning

You are strongly advised not to lightheartedly turn off writing edits to the WAL. If you do so, you will lose data sooner or later. And no, HBase cannot recover data that is lost and that has not been written to the log first.

Another important feature of HLog is the ability to keep track of changes. It does this by using a sequence number. It uses an AtomicLong internally to be thread-safe and starts at either zero, or the last known number persisted to the filesystem: as the region is opening its storage files, it reads the highest sequence number, which is stored as a meta field in each HFile and sets the HLog sequence number to that value if it is higher than what was recorded before. So, after it has opened all the storage files, the HLog is initialized to reflect where persisting ended and where to continue.

Figure 8-9 shows three different regions, hosted on the same region server, with each of them covering a different row key range. Each region shares the same single instance of HLog. This means the data is written to the WAL in the order it arrives. This means some extra work is needed when a log needs to be replayed (see Replay). But since this happens rather seldomly, the WAL is optimized to store data sequentially, giving it the best I/O performance.

The WAL saving edits in the order they arrive, spanning all regions of the same server
Figure 8-9. The WAL saving edits in the order they arrive, spanning all regions of the same server

HLogKey Class

Currently, the WAL uses a Hadoop SequenceFile, which stores records as sets of key/values. For the WAL, the value is simply the modification(s) sent from the client. The key is represented by an HLogKey instance: since the KeyValue only represents the row key, column family, column qualifier, timestamp, type, and value, there has to be a place to store what the KeyValue belongs to, in other words, the region and table name. That information is stored in the HLogKey. Also stored is the aforementioned sequence number. That number is incremented with each edit in order to keep a sequential order of edits.

This class also records the write time, which is a timestamp that denotes when the edit was written to the log. Finally, it stores the cluster ID, which is needed for replication across clusters.

WALEdit Class

Every modification sent by a client is wrapped into a WALEdit instance, which takes care of atomicity at the log level. Assume you update 10 columns in one row. Each column, or cell, is represented as a separate KeyValue instance. If the server writes five of them to the WAL and then fails, you will end up with a half-persisted row mutation.

Atomicity is guaranteed by bundling all updates that comprise multiple cells into a single WALEdit instance. This group of edits is then written in a single operation, ensuring that the row mutation is applied in full or not at all.

Note

Before version 0.90.x, HBase did save the KeyValue instances separately.

LogSyncer Class

The table descriptor allows you to set the so-called deferred log flush flag, as explained in Table Properties. The default is false and it means that every time an edit is sent to the servers, it will call the log writer’s sync() method. It is the call that forces the update to the log to be acknowledged by the filesystem so that you have durability.

Unfortunately, calling this method involves a pipelined write to N servers (where N is the replication factor set for the write-ahead log files). Since this is a rather costly operation, you have the option to slightly delay the call, and have it executed in a background process instead. Keep in mind that without the call to sync(), there is a chance of data loss in case of a server failure. Use this option carefully.

Setting the deferred log flush flag to true causes the edits to be buffered on the region server, and the LogSyncer class, running as a background thread on the server, is responsible for calling the sync() method at a very short interval. The default is one second and is configured by the hbase.regionserver.optionallogflushinterval property.

Note that this only applies to user tables: all catalog tables are always synced right away.

LogRoller Class

There are size restrictions when it comes to the logs that are written. The LogRoller class runs as a background thread and takes care of rolling logfiles at certain intervals. This is controlled by the hbase.regionserver.logroll.period property, set by default to one hour.

Every 60 minutes the log is closed and a new one is started. Over time, the system accumulates an increasing number of logfiles that need to be managed as well. The HLog.rollWriter() method, which is called by the LogRoller to roll the current logfile, takes care of that as well by subsequently calling HLog.cleanOldLogs().

It checks what the highest sequence number written to a storage file is. This is the edit sequence number of the last edit persisted out to the filesystem. It then checks if there is a log left that has edits that are all less than that number. If that is the case, it moves said logs into the .oldlogs directory, and leaves the remaining ones in place.

Note

You might see the following obscure message in your logs:

2011-06-15 01:45:48,427 INFO org.apache.hadoop.hbase.regionserver.HLog: 
  Too many hlogs: logs=130, maxlogs=96; forcing flush of 8 region(s):
  testtable,row-500,1309872211320.d5a127167c6e2dc5106f066cc84506f8., ...

This message is printed because the configured maximum number of logfiles to keep exceeds the number of logfiles that are required to be kept because they still contain outstanding edits that have not yet been persisted. This can occur when you stress out the filesystem to such an extent that it cannot persist the data at the rate at which new data is added. Otherwise, memstore flushes should take care of this.

Note, though, that when this message is printed the server goes into a special mode trying to force edits to be flushed out to reduce the number of outstanding WAL files.

The other parameters controlling log rolling are hbase.regionserver.hlog.blocksize (set to the filesystem default block size, or fs.local.block.size, defaulting to 32 MB) and hbase.regionserver.logroll.multiplier (set to 0.95), which will rotate logs when they are at 95% of the block size. So logs are switched out when they are considered full, or when a certain amount of time has passed—whatever comes first.

Replay

The master and region servers need to orchestrate the handling of logfiles carefully, especially when it comes to recovering from server failures. The WAL is responsible for retaining the edits safely; replaying the WAL to restore a consistent state is a much more complex exercise.

Single log

Since all edits are written to one HLog-based logfile per region server, you might ask: why is that the case? Why not write all edits for a specific region into its own logfile? Here is the related quote from the Bigtable paper:

If we kept the commit log for each tablet in a separate logfile, a very large number of files would be written concurrently in GFS. Depending on the underlying file system implementation on each GFS server, these writes could cause a large number of disk seeks to write to the different physical log files.

HBase followed that principle for pretty much the same reasons: writing too many files at the same time, plus the number of rolled logs that need to be kept, does not scale well.

What is the drawback, though? If you have to split a log because of a server crash, you need to divide it into suitable pieces, as described in the next section. The master cannot redeploy any region from a crashed server until the logs for that very server have been split. This can potentially take a considerable amount of time.

Log splitting

There are two situations in which logfiles have to be replayed: when the cluster starts, or when a server fails. When the master starts—and this includes a backup master taking over duty—it checks if there are any logfiles, in the .logs directory under the HBase root on the filesystem, that have no region server assigned to them. The logs’ names contain not just the server name, but also the start code of the server. This number is reset every time a region server restarts, and the master can use this number to verify whether a log has been abandoned—for example, due to a server crash.

The master is responsible for monitoring the servers using ZooKeeper, and if it detects a server failure, it immediately starts the process of recovering its logfiles, before reassigning the regions to new servers. This happens in the ServerShutdownHandler class.

Before the edits in the log can be replayed, they need to be separated into one logfile per region. This process is called log splitting: the combined log is read and all entries are grouped by the region they belong to. These grouped edits are then stored in a file next to the target region for subsequent recovery.

The actual process of splitting the logs is different in nearly every version of HBase: early versions would read the file in a single thread, directly on the master. This was improved to at least write the grouped edits per region in multiple threads. Version 0.92.0 finally introduces the concept of distributed log splitting, which removes the burden of doing the actual work from the master to all region servers.

Consider a larger cluster with many region servers and many (rather large) logfiles. In the past, the master had to recover each logfile separately, and—so it would not overload in terms of I/O as well as memory usage—it would do this sequentially. This meant that, for any region that had pending edits, it had to be blocked from opening until the log split and recovery had been completed.

The new distributed mode uses ZooKeeper to hand out each abandoned logfile to a region server. They monitor ZooKeeper for available work, and if the master indicates that a log is available for processing, they race to accept the task. The winning region server then proceeds to read and split the logfiles in a single thread (so as not to overload the already busy region server).

Note

You can turn the new distributed log splitting off by means of the hbase.master.distributed.log.splitting configuration property. Setting this property to false disables distributed splitting, and falls back to doing the work directly on the master only.

In nondistributed mode the writers are multithreaded, controlled by the hbase.regionserver.hlog.splitlog.writer.threads property, which is set to 3 by default. You need to be careful when increasing this number, as you are likely bound by the performance of the single log reader.

The split process writes the edits first into the splitlog staging directory under the HBase root folder. They are placed in the same path that is needed for the target region. For example:

      0 /hbase/.corrupt
      0 /hbase/splitlog/foo.internal,60020,1309851880898_hdfs%3A%2F%2F 
localhost%2Fhbase%2F.logs%2Ffoo.internal%2C60020%2C1309850971208%2F 
foo.internal%252C60020%252C1309850971208.1309851641956/testtable/ 
d9ffc3a5cd016ae58e23d7a6cb937949/recovered.edits/0000000000000002352

The path contains the logfile name itself to distinguish it from other, possibly concurrently executed, log split output. The path also contains the table name, region name (hash), and recovered.edits directory. Lastly, the name of the split file is the sequence ID of the first edit for the particular region.

The .corrupt directory contains any logfile that could not be parsed. This is influenced by the hbase.hlog.split.skip.errors property, which is set to true by default. It means that any edit that could not be read from a file causes the entire log to be moved to the .corrupt folder. If you set the flag to false, an IOExecption is thrown and the entire log splitting process is stopped.

Once a log has been split successfully, the per-region files are moved into the actual region directories. They are now ready to be recovered by the region itself. This is also why the splitting has to stall opening the affected regions, since it first has to provide the pending edits for replay.

Edits recovery

When a region is opened, either because the cluster is started or because it has been moved from one region server to another, it first checks for the presence of the recovered.edits directory. If it exists, it opens the contained files and starts reading the edits they contain. The files are sorted by their name, which contains the sequence ID. This allows the region to recover the edits in order.

Any edit that has a sequence ID that is less than or equal to what has been persisted in the on-disk store files is ignored, because it has already been applied. All other edits are applied to the matching memstore of the region to recover the previous state. At the end, a flush of the memstores is forced to write the current state to disk.

The files in the recovered.edits folder are removed once they have been read and their edits persisted to disk. If a file cannot be read, the hbase.skip.errors property defines what happens next: the default value is false and causes the entire region recovery to fail. If this property is set to true, the file is renamed to the original filename plus .<currentTimeMillis>. Either way, you need to carefully check your logfiles to determine why the recovery has had issues and fix the problem to continue.

Durability

You want to be able to rely on the system to save all your data, no matter what newfangled algorithms are employed behind the scenes. As far as HBase and the log are concerned, you can set the log flush times to be as low as you want, or sync them for every edit—you are still dependent on the underlying filesystem as mentioned earlier; the stream used to store the data is flushed, but is it written to disk yet? We are talking about fsync style issues. Now for HBase we are most likely dealing with Hadoop’s HDFS as being the filesystem that is persisted to.

At this point, it should be abundantly clear that the log is what keeps data safe. It is being kept open for up to an hour (or more if configured to do so), and as data arrives a new key/value pair is written to the SequenceFile. Eventually, the log is rolled and a new one is created.

But that is not how Hadoop was designed to work. Hadoop provides an API tailored to MapReduce that allows you to open a file, write data into it (preferably a lot), and close it right away, leaving an immutable file for everyone else to read many times.

Only after a file is closed is it visible and readable to others. If a process dies while writing the data, the file is considered lost. For HBase to be able to work properly, what is required is a feature that allows you to read the log up to the point where the crashed server has written it. This was added to HDFS in later versions and is referred to as append.

HBase currently detects whether the underlying Hadoop library has support for syncFs() or hflush(). If a sync() is triggered on the log writer, it calls either method internally—or none if HBase runs in a nondurable setup. The sync() is using the pipelined write process described in LogSyncer Class to guarantee the durability of the edits in the logfile. In case of a server crash, the system can safely read the abandoned logfile up to the last edits.

In summary, without Hadoop 0.21.0 and later, or a specially prepared 0.20.x with append support backported to it, you can very well face data loss. See Hadoop for more information.

Read Path

HBase uses multiple store files per column family, which contain the actual cells, or KeyValue instances. These files are created over time as modifications aggregated in the memstores are eventually flushed as store files to disk. The background process of compactions keeps the number of files under control by rewriting smaller files into larger ones. Major compactions eventually compact the entire set of files into a single one, after which the flushes start adding smaller files again.

Since all store files are immutable, there is no way to delete a particular value out of them, nor does it make sense to keep rewriting large store files to remove the deleted cells one by one. Instead, a tombstone marker is written, which masks out the “deleted” information—which can be a single cell, a range of cells, or entire rows.

Consider you are writing a column in a given row today. You keep adding data in other rows over a few more days, then you write a different column in the given row. The question is, given that the original column value has been persisted as a KeyValue on disk for quite some time, while the newly written column for the same row is still in the memstore, or has been flushed to disk, where does the logical row reside?

In other words, when you are using the shell to perform a get command on that row, how does the system know what to return? As a client, you want to see both columns being returned—as if they were stored in a single entity. But in reality, the data lives as separate KeyValue instances, spread across any number of store files.

If you are deleting the initial column value, and you perform the get again, you expect the value to be gone, when in fact it still exists somewhere, but the tombstone marker is indicating that you have deleted it. But that marker is most likely stored far away from the value it “buries.” A more formal explanation of the architecture behind this approach is provided in Seek Versus Transfer.

HBase solves the problem by using a QueryMatcher in combination with a ColumnTracker, which comes in a few variations: one for explicit matching, for when you specify a list of columns to retrieve, and another that includes all columns. Both allow you to set the maximum number of versions to match. They keep track of what needs to be included in the final result.

Before all the store files are read to find a matching entry, a quick exclusion check is conducted, which uses the timestamps and optional Bloom filter to skip files that definitely have no KeyValue belonging to the row in question. The remaining store files, including the memstore, are then scanned to find a matching key.

The scan is implemented by the RegionScanner class, which retrieves a StoreScanner for every Store instance—each representing a column family. If the read operation excludes certain column families, their stores are omitted as well.

The StoreScanner class combines the store files and memstore that the Store instance contains. It is also where the exclusion happens, based on the Bloom filter, or the timestamp. If you are asking for versions that are not more than 30 minutes old, for example, you can skip all storage files that are older than one hour: they will not contain anything of interest. See Key Design for details on the exclusion, and how to make use of it.

The StoreScanner class also has the QueryMatcher (here the ScanQueryMatcher class), which will keep track of which KeyValues to include in the final result.

The RegionScanner internally is using a KeyValueHeap class to arrange all store scanners ordered by timestamps. The StoreScanner is using the same to order the stores the same way. This guarantees that you are reading KeyValues in their correct order (e.g., descending by timestamp).

When the store scanners are opened, they will position themselves at the requested row key, or—in the case of a get() call—on the next nonmatching row key. The scanner is now ready to read data. Figure 8-10 shows an example of what this looks like.

Rows stored and scanned across different stores, on disk or in memory
Figure 8-10. Rows stored and scanned across different stores, on disk or in memory

For a get() call, all the server has to do is to call next() on the RegionScanner. The call internally reads everything that should be part of the result. This includes all of the versions requested. Consider a column that has three versions, and you are requesting to retrieve all of them. The three KeyValue instances could be spread across any store, on disk or in memory. The next() call keeps reading from all store files until either the next row is reached, or enough versions have been found.

At the same time, it keeps track of delete markers too. As it scans through the KeyValues of the current row, it will come across these delete markers and note that anything with a timestamp that is less than or equal to the marker is considered erased.

Figure 8-10 also shows the logical row as a list of KeyValues, some in the same store file, some on other files, spanning multiple column families. A store file and a memstore were skipped because of the timestamp and Bloom filter exclusion process. The delete marker in the last store file is masking out entries, but they are still all part of the same row. The scanners—depicted as an arrow next to the stores—are either on the first matching entry in the file, or on the one that would follow the requested key, in case the store has no direct match.

Only scanners that are on the proper row are considered during the call to next(). The internal loop would read the KeyValues from the first and last stores one after the other, in time-descending order, until they also exceed the requested row key.

For scan operations, this is repeated by calling next() on the ResultScanner until either the stop row has been found, the end of the table has been reached, or enough rows have been read for the current batch (as set via scanner caching).

The final result is a list of KeyValue instances that matched the given get or scan operation. The list is sent back to the client, which can then use the API methods to access the contained columns.

Region Lookups

For the clients to be able to find the region server hosting a specific row key range, HBase provides two special catalog tables, called -ROOT- and .META..[92]

The -ROOT- table is used to refer to all regions in the .META. table. The design considers only one root region, that is, the root region is never split to guarantee a three-level, B+ tree-like lookup scheme: the first level is a node stored in ZooKeeper that contains the location of the root table’s region—in other words, the name of the region server hosting that specific region. The second level is the lookup of a matching meta region from the -ROOT- table, and the third is the retrieval of the user table region from the .META. table.

The row keys in the catalog tables are the region names, which are a concatenation of the region’s table name, its start row, and an ID (usually the current time in milliseconds). As of HBase 0.90.0 these keys may have another hashed value attached to them. This is currently only used for user tables. See Region-level files for an example.

Note

Avoiding any concerns about the three-level location scheme, the Bigtable paper states that with average limits on the .META. region size at 128 MB it can address 234 regions, or 261 bytes in 128 MB regions. Since the size of the regions can be increased without any impact on the location scheme, this is a conservative number and can be increased as needed.

Although clients cache region locations, there is an initial need to figure out where to send requests when looking for a specific row key—or when the cache is stale and a region has since been split, merged, or moved. The client library uses a recursive discovery process moving up in the hierarchy to find the current information. It asks the corresponding region server hosting the matching .META. region for the given row key and retrieves the address. If that information is invalid, it backs out, asking the root table where the .META. region is. Eventually, if all else fails, it has to do a read of the ZooKeeper node to find the root table region.

In a worst-case scenario, it would need six network round-trips to discover the user region, since stale entries in the cache are only discovered when the lookup fails, because it is assumed that assignments, especially of meta regions, do not change too often. When the cache is empty, the client needs three network round-trips to update its cache. One way to mitigate future round-trips is to prefetch location information in a single request, thus updating the client cache ahead of time. Refer to Miscellaneous Features for details on how to influence this using the client-side API.

Figure 8-11 shows the mapping of user table regions, through meta, and finally to the root table information. Once the user table region is known, it can be accessed directly without any further lookups. The lookups are numbered and assume an empty cache. However, if the cache were filled with only stale details, the client would fail on all three lookups, requiring a refresh of all three and resulting in the aforementioned six network round-trips.

Mapping of user table regions, starting with an empty cache and then performing three lookups
Figure 8-11. Mapping of user table regions, starting with an empty cache and then performing three lookups

The Region Life Cycle

The state of a region is tracked by the master, using the AssignmentManager class. It follows the region from its offline state, all the way through its life cycle. Table 8-1 lists the possible states of a region.

Table 8-1. Possible states of a region
StateDescription
OfflineThe region is offline.
Pending OpenA request to open the region was sent to the server.
OpeningThe server has started opening the region.
OpenThe region is open and fully operational.
Pending CloseA request to close the region has been sent to the server.
ClosingThe server is in the process of closing the region.
ClosedThe region is closed.
SplittingThe server started splitting the region.
SplitThe region has been split by the server.

The transitions between states are commonly initiated by the master, but may also be initiated by the region server hosting the region. For example, the master assigns a region to a server, which is then opened by the assignee. On the other hand, the region server starts the split process, which in itself triggers multiple region close and open events.

Because of the distributed nature of these events, the servers are using ZooKeeper to track specific states in a dedicated znode.

ZooKeeper

Since version 0.20.x, HBase has been using ZooKeeper as its distributed coordination service. This includes tracking of region servers, where the root region is hosted, and more. Version 0.90.x introduced a new master implementation which has an even tighter integration with ZooKeeper. It enables HBase to remove critical heartbeat messages that needed to be sent between the master and the region servers. These are now moved into ZooKeeper, which informs either party of changes whenever they occur, as opposed to the fixed intervals that were used before.

HBase creates a list of znodes under its root node. The default is /hbase and is configured with the zookeeper.znode.parent property. Here is the list of the contained znodes and their purposes:

Note

The examples use the ZooKeeper command-line interface (CLI) to issue the commands. You can start it with:

$ $ZK_HOME/bin/zkCli.sh -server <quorum-server>

The output of each command was shortened by the ZooKeeper internal details.

/hbase/hbaseid

Contains the cluster ID, as stored in the hbase.id file on HDFS. For example:

[zk: localhost(CONNECTED) 1] get /hbase/hbaseid        
e627e130-0ae2-448d-8bb5-117a8af06e97
/hbase/master

Holds the server name (see Cluster Status Information for details). For example:

[zk: localhost(CONNECTED) 2] get /hbase/master 
foo.internal,60000,1309859972983
/hbase/replication

Contains replication details. See Internals for details.

/hbase/root-region-server

Contains the server name of the region server hosting the -ROOT- regions. This is used during the region lookup (see Region Lookups). For instance:

[zk: localhost(CONNECTED) 3] get /hbase/root-region-server 
rs1.internal,60000,1309859972983
/hbase/rs

Acts as the root node for all region servers to list themselves when they start. It is used to track server failures. Each znode inside is ephemeral and its name is the server name of the region server. For example:

[zk: localhost(CONNECTED) 4] ls /hbase/rs 
[rs1.internal,60000,1309859972983,rs2.internal,60000,1309859345233]
/hbase/shutdown

Is used to track the cluster state. It contains the time when the cluster was started, and is empty when it was shut down. For example:

[zk: localhost(CONNECTED) 5] get /hbase/shutdown 
Tue Jul 05 11:59:33 CEST 2011
/hbase/splitlog

The parent znode for all log-splitting-related coordination (see Log splitting for details). For example:

[zk: localhost(CONNECTED) 6] ls /hbase/splitlog 
[hdfs%3A%2F%2Flocalhost%2Fhbase%2F.logs%2Ffoo.internal%2C60020%2C 
1309850971208%2Ffoo.internal%252C60020%252C1309850971208.1309851636647,
hdfs%3A%2F%2Flocalhost%2Fhbase%2F.logs%2Ffoo.internal%2C60020%2C 
1309850971208%2Ffoo.internal%252C60020%252C1309850971208.1309851641956,
...
hdfs%3A%2F%2Flocalhost%2Fhbase%2F.logs%2Ffoo.internal%2C60020%2C 
1309850971208%2Ffoo.internal%252C60020%252C1309850971208.1309851784396]

[zk: localhost(CONNECTED) 7] get /hbase/splitlog/ 
hdfs%3A%2F%2Flocalhost%2Fhbase%2F.logs%2Fmemcache1.internal%2C 
60020%2C1309850971208%2Fmemcache1.internal%252C60020%252C1309850971208. 
1309851784396 
unassigned foo.internal,60000,1309851879862

[zk: localhost(CONNECTED) 8] get /hbase/splitlog/ 
hdfs%3A%2F%2Flocalhost%2Fhbase%2F.logs%2Fmemcache1.internal%2C 
60020%2C1309850971208%2Fmemcache1.internal%252C60020%252C1309850971208. 
1309851784396 
owned foo.internal,60000,1309851879862

[zk: localhost(CONNECTED) 9] ls /hbase/splitlog
[RESCAN0000293834, hdfs%3A%2F%2Flocalhost%2Fhbase%2F.logs%2Fmemcache1. 
internal%2C60020%2C1309850971208%2Fmemcache1.internal%252C 
60020%252C1309850971208.1309851681118, RESCAN0000293827, RESCAN0000293828, 
RESCAN0000293829, RESCAN0000293838, RESCAN0000293837]

These examples list various things: you can see how a log to be split was first unassigned, and then owned by a region server. The RESCAN nodes are signifying that the workers, that is, the region server, is supposed to check for more work, in case a split has failed on another machine.

/hbase/table

The znode to which a disabled table is added as its parent. The name of the table is the newly created znode, and its content is the word DISABLED. For example:

[zk: localhost(CONNECTED) 10] ls /hbase/table
[testtable]
[zk: localhost(CONNECTED) 11] get /hbase/table/testtable
DISABLED
/hbase/unassigned

Is used by the AssignmentManager to track region states across the entire cluster. It contains znodes for those regions that are not open, but are in a transitional state. The name of the znode is the hash of the region. For example:

[zk: localhost(CONNECTED) 11] ls /hbase/unassigned
[8438203023b8cbba347eb6fc118312a7]

Replication

HBase replication is a way to copy data between HBase deployments. It can serve as a disaster recovery solution and can contribute to provide higher availability at the HBase layer. It can also serve a more practical purpose; for example, as a way to easily copy edits from a web-facing cluster to a MapReduce cluster that will process old and new data and ship back the results automatically.

The basic architecture pattern used for HBase replication is “(HBase cluster) master-push”; this pattern makes it much easier to keep track of what is currently being replicated since each region server has its own write-ahead log (WAL or HLog), just like other well-known solutions, such as MySQL master/slave replication, where there is only one binary log to keep track of. One master cluster can replicate to any number of slave clusters, and each region server will participate to replicate its own stream of edits.

The replication is done asynchronously, meaning that the clusters can be geographically distant, the links between them can be offline for some time, and rows inserted on the master cluster will not be available at the same time on the slave clusters (eventual consistency).

Figure 8-12 shows an overview of how replication works.

Overview of the replication architecture
Figure 8-12. Overview of the replication architecture

The replication format used in this design is conceptually the same as MySQL’s statement-based replication.[93] Instead of SQL statements, whole WALEdits (consisting of multiple cell inserts coming from the clients’ Put and Delete) are replicated in order to maintain atomicity.

The HLogs from each region server are the basis of HBase replication, and must be kept in HDFS as long as they are needed to replicate data to any slave cluster. Each region server reads from the oldest log it needs to replicate and keeps the current position inside ZooKeeper to simplify failure recovery. That position can be different for every slave cluster, as can the queue of HLogs to process.

The clusters participating in replication can be of asymmetric sizes and the master cluster will do its best effort to balance the stream of replication on the slave clusters by relying on randomization.

Life of a Log Edit

The following sections describe the life of a single edit going from a client that communicates with a master cluster all the way to a single slave cluster.

Normal processing

The client uses an HBase API that sends a Put, Delete, or Increment to a region server. The key/values are transformed into a WALEdit by the region server and the WALEdit is inspected by the replication code that, for each family that is scoped for replication, adds the scope to the edit. The edit is appended to the current WAL and is then applied to its MemStore.

In a separate thread, the edit is read from the log (as part of a batch) and only the KeyValues that are replicable are kept (i.e., they are part of a family that is scoped as GLOBAL in the family’s schema and are noncatalog so it is not .META. or -ROOT-). When the buffer is filled, or the reader hits the end of the file, the buffer is sent to a random region server on the slave cluster.

Synchronously, the region server that receives the edits reads them sequentially and separates each of them into buffers, one per table. Once all edits are read, each buffer is flushed using the normal HBase client (HTables managed by an HTablePool). This is done in order to leverage parallel insertion (MultiPut).

Back in the master cluster’s region server, the offset for the current WAL that is being replicated is registered in ZooKeeper.

Non-Responding slave clusters

The edit is inserted in the same way. In a separate thread, the region server reads, filters, and buffers the log edits the same way as is done during normal processing. The slave region server that is contacted does not answer to the RPC, so the master region server will sleep and retry up to a configured number of times. If the slave region server still is not available, the master cluster region server will select a new subset of the region server to replicate to and will try to send the buffer of edits again.

In the meantime, the WALs will be rolled and stored in a queue in ZooKeeper. Logs that are archived by their region server (archiving is basically moving a log from the region server’s logs directory to a central logs archive directory) will update their paths in the in-memory queue of the replicating thread.

When the slave cluster is finally available, the buffer will be applied the same way as during normal processing. The master cluster region server will then replicate the backlog of logs.

Internals

This section describes in depth how each of the replication’s internal features operates.

Choosing region servers to replicate to

When a master cluster region server initiates a replication source to a slave cluster, it first connects to the slave’s ZooKeeper ensemble using the provided cluster key (that key is composed of the value of hbase.zookeeper.quorum, zookeeper.znode.parent, and hbase.zookeeper.property.clientPort). It then scans the /hbase/rs directory to discover all the available sinks (region servers that are accepting incoming streams of edits to replicate) and will randomly choose a subset of them using a configured ratio (which has a default value of 10%). For example, if a slave cluster has 150 machines, 15 will be chosen as potential recipients for edits that this master cluster region server will be sending. Since this is done by all master cluster region servers, the probability that all slave region servers are used is very high, and this method works for clusters of any size. For example, a master cluster of 10 machines replicating to a slave cluster of five machines with a ratio of 10% means that the master cluster region servers will choose one machine each at random; thus the chance of overlapping and full usage of the slave cluster is higher.

Keeping track of logs

Every master cluster region server has its own znode in the replication znodes hierarchy. The parent znode contains one znode per peer cluster (if there are five slave clusters, five znodes are created), and each of these contains a queue of HLogs to process. Each of these queues will track the HLogs created by that region server, but they can differ in size. For example, if one slave cluster becomes unavailable for some time, the HLogs should not be deleted, and thus they need to stay in the queue (while the others are processed). See Region server failover for an example.

When a source is instantiated, it contains the current HLog that the region server is writing to. During log rolling, the new file is added to the queue of each slave cluster’s znode just before it is made available. This ensures that all the sources are aware that a new log exists before HLog is able to append edits into it, but this operation is now more expensive. The queue items are discarded when the replication thread cannot read more entries from a file (because it reached the end of the last block) and that there are other files in the queue. This means that if a source is up-to-date and replicates from the log that the region server writes to, reading up to the “end” of the current file will not delete the item in the queue.

When a log is archived (because it is not used anymore or because there are too many of them per hbase.regionserver.maxlogs, typically because the insertion rate is faster than the region flushing rate), it will notify the source threads that the path for that log changed. If a particular source was already done with it, it will just ignore the message. If it is in the queue, the path will be updated in memory. If the log is currently being replicated, the change will be done atomically so that the reader does not try to open the file when it is already moved. Also, moving a file is a NameNode operation; so, if the reader is currently reading the log, it will not generate any exceptions.

Reading, filtering, and sending edits

By default, a source will try to read from a logfile and ship log entries as quickly as possible to a sink. This is first limited by the filtering of log entries; only KeyValues that are scoped GLOBAL and that do not belong to catalog tables will be retained. A second limit is imposed on the total size of the list of edits to replicate per slave, which by default is 64 MB. This means that a master cluster region server with three slaves will use, at most, 192 MB to store data to replicate. This does not take into account the data that was filtered but was not garbage-collected.

Once the maximum number of edits has been buffered or the reader has hit the end of the logfile, the source thread will stop reading and will randomly choose a sink to replicate to (from the list that was generated by keeping only a subset of slave region servers). It will directly issue an RPC to the chosen machine and will wait for the method to return. If it is successful, the source will determine if the current file is emptied or if it should continue to read from it. If the former, it will delete the znode in the queue. If the latter, it will register the new offset in the log’s znode. If the RPC threw an exception, the source will retry 10 times until trying to find a different sink.

Cleaning logs

If replication is not enabled, the master’s log cleaning thread will delete old logs using a configured TTL. This does not work well with replication since archived logs that are past their TTL may still be in a queue. Thus, the default behavior is augmented so that if a log is past its TTL, the cleaning thread will look up every queue until it finds the log (while caching the ones it finds). If it is not found, the log will be deleted. The next time it has to look for a log, it will first use its cache.

Region server failover

As long as region servers do not fail, keeping track of the logs in ZooKeeper does not add any value. Unfortunately, they do fail, so since ZooKeeper is highly available, we can count on it and its semantics to help us manage the transfer of the queues.

All the master cluster region servers keep a watcher on one another to be notified when one dies (just like the master does). When this happens, they all race to create a znode called lock inside the dead region server’s znode that contains its queues. The one that creates it successfully will proceed by transferring all the queues to its own znode (one by one, since ZooKeeper does not support the rename operation) and will delete all the old ones when it is done. The recovered queues’ znodes will be named with the ID of the slave cluster appended with the name of the dead server.

Once that is done, the master cluster region server will create one new source thread per copied queue, and each of them will follow the read/filter/ship pattern. The main difference is that those queues will never have new data since they do not belong to their new region server, which means that when the reader hits the end of the last log, the queue’s znode will be deleted and the master cluster region server will close that replication source.

For example, consider a master cluster with three region servers that is replicating to a single slave with an ID of 2. The following hierarchy represents what the znodes’ layout could be at some point in time. We can see that the region servers’ znodes all contain a peers znode that contains a single queue. The znode names in the queues represent the actual filenames on HDFS in the form address,port.timestamp.

/hbase/replication/rs/
                      1.1.1.1,60020,123456780/
                        peers/
                              2/
                                1.1.1.1,60020.1234  (Contains a position)
                                1.1.1.1,60020.1265
                      1.1.1.2,60020,123456790/
                        peers/
                              2/
                                1.1.1.2,60020.1214  (Contains a position)
                                1.1.1.2,60020.1248
                                1.1.1.2,60020.1312
                      1.1.1.3,60020,    123456630/
                        peers/
                              2/
                                1.1.1.3,60020.1280  (Contains a position)

Now let’s say that 1.1.1.2 loses its ZooKeeper session. The survivors will race to create a lock, and for some reason 1.1.1.3 wins. It will then start transferring all the queues to its local peers znode by appending the name of the dead server. Right before 1.1.1.3 is able to clean up the old znodes, the layout will look like the following:

/hbase/replication/rs/
                      1.1.1.1,60020,123456780/
                        peers/
                              2/
                                1.1.1.1,60020.1234  (Contains a position)
                                1.1.1.1,60020.1265
                      1.1.1.2,60020,123456790/
                        lock
                        peers/
                              2/
                                1.1.1.2,60020.1214  (Contains a position)
                                1.1.1.2,60020.1248
                                1.1.1.2,60020.1312
                      1.1.1.3,60020,123456630/
                        peers/
                              2/
                                1.1.1.3,60020.1280  (Contains a position)

                              2-1.1.1.2,60020,123456790/
                                1.1.1.2,60020.1214  (Contains a position)
                                1.1.1.2,60020.1248
                                1.1.1.2,60020.1312

Sometime later, but before 1.1.1.3 is able to finish replicating the last HLog from 1.1.1.2, let’s say that it dies too (also, some new logs were created in the normal queues). The last region server will then try to lock 1.1.1.3’s znode and will begin transferring all the queues. The new layout will be:

/hbase/replication/rs/
                      1.1.1.1,60020,123456780/
                        peers/
                              2/
                                1.1.1.1,60020.1378  (Contains a position)

                              2-1.1.1.3,60020,123456630/
                                1.1.1.3,60020.1325  (Contains a position)
                                1.1.1.3,60020.1401

                              2-1.1.1.2,60020,123456790-1.1.1.3,60020,123456630/
                                1.1.1.2,60020.1312  (Contains a position)
                      1.1.1.3,60020,123456630/
                        lock
                        peers/
                              2/
                                1.1.1.3,60020.1325  (Contains a position)
                                1.1.1.3,60020.1401

                              2-1.1.1.2,60020,123456790/
                                1.1.1.2,60020.1312  (Contains a position)

Replication is still considered to be an experimental feature. Carefully evaluate whether it works for your use case before you consider using it.



[86] See “B+ trees” on Wikipedia.

[87] See “LSM-Tree” (http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.44.2782), O’Neil et al., 1996.

[88] From “Open Source Search” by Doug Cutting, December 5, 2005.

[89] In extreme cases, you may turn off this step by setting a flag using the Put.setWriteToWAL(boolean) method. This is not recommended as this will disable durability.

[90] See the JIRA issue HADOOP-3315 for details.

[91] For information on the term itself, read “Write-ahead logging” on Wikipedia.

[92] Subsequently, they are referred to interchangeably as root table and meta table, respectively, since, for example, "-ROOT-" is how the table is actually named in HBase and calling it a root table is stating its purpose.

[93] See the online manual for details.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.7.240