Chapter 9. Advanced Usage

This chapter goes deeper into the various design implications imposed by HBase’s storage architecture. It is important to have a good understanding of how to design tables, row keys, column names, and so on, to take full advantage of the architecture.

Key Design

HBase has two fundamental key structures: the row key and the column key. Both can be used to convey meaning, by either the data they store, or by exploiting their sorting order. In the following sections, we will use these keys to solve commonly found problems when designing storage solutions.

Concepts

The first concept to explain in more detail is the logical layout of a table, compared to on-disk storage. HBase’s main unit of separation within a table is the column family—not the actual columns as expected from a column-oriented database in their traditional sense. Figure 9-1 shows the fact that, although you store cells in a table format logically, in reality these rows are stored as linear sets of the actual cells, which in turn contain all the vital information inside them.

The top-left part of the figure shows the logical layout of your data—you have rows and columns. The columns are the typical HBase combination of a column family name and a column qualifier, forming the column key. The rows also have a row key so that you can address all columns in one logical row.

Rows stored as linear sets of actual cells, which contain all the vital information
Figure 9-1. Rows stored as linear sets of actual cells, which contain all the vital information

The top-right hand side shows how the logical layout is folded into the actual physical storage layout. The cells of each row are stored one after the other, in a separate storage file per column family. In other words, on disk you will have all cells of one family in a StoreFile, and all cells of another in a different file.

Since HBase is not storing any unset cells (also referred to as NULL values by RDBMSes) from the table, the on-disk file only contains the data that has been explicitly set. It therefore has to also store the row key and column key with every cell so that it can retain this vital piece of information.

In addition, multiple versions of the same cell are stored as separate, consecutive cells, adding the required timestamp of when the cell was stored. The cells are sorted in descending order by that timestamp so that a reader of the data will see the newest value first—which is the canonical access pattern for the data.

The entire cell, with the added structural information, is called KeyValue in HBase terms. It has not just the column and actual value, but also the row key and timestamp, stored for every cell for which you have set a value. The KeyValues are sorted by row key first, and then by column key in case you have more than one cell per row in one column family.

The lower-right part of the figure shows the resultant layout of the logical table inside the physical storage files. The HBase API has various means of querying the stored data, with decreasing granularity from left to right: you can select rows by row keys and effectively reduce the amount of data that needs to be scanned when looking for a specific row, or a range of rows. Specifying the column family as part of the query can eliminate the need to search the separate storage files. If you only need the data of one family, it is highly recommended that you specify the family for your read operation.

Although the timestamp—or version—of a cell is farther to the right, it is another important selection criterion. The store files retain the timestamp range for all stored cells, so if you are asking for a cell that was changed in the past two hours, but a particular store file only has data that is four or more hours old it can be skipped completely. See also Read Path for details.

The next level of query granularity is the column qualifier. You can employ exact column lookups when reading data, or define filters that can include or exclude the columns you need to access. But as you will have to look at each KeyValue to check if it should be included, there is only a minor performance gain.

The value remains the last, and broadest, selection criterion, equaling the column qualifier’s effectiveness: you need to look at each cell to determine if it matches the read parameters. You can only use a filter to specify a matching rule, making it the least efficient query option. Figure 9-2 summarizes the effects of using the KeyValue fields.

Retrieval performance decreasing from left to right
Figure 9-2. Retrieval performance decreasing from left to right

The crucial part of Figure 9-1 shows is the shift in the lower-lefthand side. Since the effectiveness of selection criteria greatly diminishes from left to right for a KeyValue, you can move all, or partial, details of the value into a more significant place—without changing how much data is stored.

Tall-Narrow Versus Flat-Wide Tables

At this time, you may be asking yourself where and how you should store your data. The two choices are tall-narrow and flat-wide. The former is a table with few columns but many rows, while the latter has fewer rows but many columns. Given the explained query granularity of the KeyValue information, it seems to be advisable to store parts of the cell’s data—especially the parts needed to query it—in the row key, as it has the highest cardinality.

In addition, HBase can only split at row boundaries, which also enforces the recommendation to go with tall-narrow tables. Imagine you have all emails of a user in a single row. This will work for the majority of users, but there will be outliers that will have magnitudes of emails more in their inbox—so many, in fact, that a single row could outgrow the maximum file/region size and work against the region split facility.

The better approach would be to store each email of a user in a separate row, where the row key is a combination of the user ID and the message ID. Looking at Figure 9-1 you can see that, on disk, this makes no difference: if the message ID is in the column qualifier, or in the row key, each cell still contains a single email message. Here is the flat-wide layout on disk, including some examples:

<userId> : <colfam> : <messageId> : <timestamp> : <email-message>

12345 : data : 5fc38314-e290-ae5da5fc375d : 1307097848 : "Hi Lars, ..."
12345 : data : 725aae5f-d72e-f90f3f070419 : 1307099848 : "Welcome, and ..."
12345 : data : cc6775b3-f249-c6dd2b1a7467 : 1307101848 : "To Whom It ..."
12345 : data : dcbee495-6d5e-6ed48124632c : 1307103848 : "Hi, how are ..."

The same information stored as a tall-narrow table has virtually the same footprint when stored on disk:

<userId>-<messageId> : <colfam> : <qualifier> : <timestamp> : <email-message>

12345-5fc38314-e290-ae5da5fc375d : data : : 1307097848 : "Hi Lars, ..."
12345-725aae5f-d72e-f90f3f070419 : data : : 1307099848 : "Welcome, and ..."
12345-cc6775b3-f249-c6dd2b1a7467 : data : : 1307101848 : "To Whom It ..."
12345-dcbee495-6d5e-6ed48124632c : data : : 1307103848 : "Hi, how are ..."

This layout makes use of the empty qualifier (see Column Families). The message ID is simply moved to the left, making it more significant when querying the data, but also transforming each email into a separate logical row. This results in a table that is easily splittable, with the additional benefit of having a more fine-grained query granularity.

Partial Key Scans

The scan functionality of HBase, and the HTable-based client API, offers the second crucial part for transforming a table into a tall-narrow one, without losing query granularity: partial key scans.

In the preceding example, you have a separate row for each message, across all users. Before you had one row per user, so a particular inbox was a single row and could be accessed as a whole. Each column was an email message of the users’ inbox. The exact row key would be used to match the user ID when loading the data.

With the tall-narrow layout an arbitrary message ID is now postfixed to the user ID in each row key. If you do not have an exact combination of these two IDs you cannot retrieve a particular message. The way to get around this complication is to use partial key scans: you can specify a start and end key that is set to the exact user ID only, with the stop key set to userId + 1.

The start key of a scan is inclusive, while the stop key is exclusive. Setting the start key to the user ID triggers the internal lexicographic comparison mechanism of the scan to find the exact row key, or the one sorting just after it. Since the table does not have an exact match for the user ID, it positions the scan at the next row, which is:

<userId>-<lowest-messageId>

In other words, it is the row key with the lowest (in terms of sorting) user ID and message ID combination. The scan will then iterate over all the messages of a user and you can parse the row key to extract the message ID.

The partial key scan mechanism is quite powerful, as you can use it as a lefthand index, with each added field adding to its cardinality. Consider the following row key structure:

<userId>-<date>-<messageId>-<attachmentId>

Note

Make sure that you pad the value of each field in the composite row key so that the lexicographical (binary, and ascending) sorting works as expected. You will need a fixed-length field structure to guarantee that the rows are sorted by each field, going from left to right.[94]

You can, with increasing precision, construct a start and stop key for the scan that selects the required rows. Usually you only create the start key and set the stop key to the same value as the start key, while increasing the least significant byte of its first field by one. For the preceding inbox example, the start key could be 12345, and the stop key 123456.

Table 9-1 shows the possible start keys and what they translate into.

Table 9-1. Possible start keys and their meaning
CommandDescription
<userId>Scan over all messages for a given user ID.
<userId>-<date>Scan over all messages on a given date for the given user ID.
<userId>-<date>-<messageId>Scan over all parts of a message for a given user ID and date.
<userId>-<date>-<messageId>-<attachmentId>Scan over all attachments of a message for a given user ID and date.

These composite row keys are similar to what RDBMSes offer, yet you can control the sort order for each field separately. For example, you could do a bitwise inversion of the date expressed as a long value (the Linux epoch). This would then sort the rows in descending order by date. Another approach is to compute the following:

Long.MAX_VALUE - <date-as-long>

This will reverse the dates and guarantee that the sorting order of the date field is descending.

In the preceding example, you have the date as the second field in the composite index for the row key. This is only one way to express such a combination. If you were to never query by date, you would want to drop the date from the key—and/or possibly use another, more suitable, dimension instead.

Note

While it seems like a good idea to always implement a composite row key as discussed in the preceding text, there is one major drawback to doing so: atomicity. Since the data is now spanning many rows for a single inbox, it is not possible to modify it in one operation. If you are not concerned with updating the entire inbox with all the user messages in an atomic fashion, the aforementioned design is appropriate. But if you need to have such guarantees, you may have to go back to flat-wide table design.

Pagination

Using the partial key scan approach, it is possible to iterate over subsets of rows. The principle is the same: you have to specify an appropriate start and stop key to limit the overall number of rows scanned. Then you take an offset and limit parameter, applying them to the rows on the client side.

Note

You can also use the PageFilter, or ColumnPaginationFilter to achieve pagination. The approach shown here is mainly to explain the concept of what a dedicated row key design can achieve.

For pure pagination, the ColumnPaginationFilter is also the recommended approach, as it avoids sending unnecessary data over the network to the client.

The steps are the following:

  1. Open a scanner at the start row.

  2. Skip offset rows.

  3. Read the next limit rows and return to the caller.

  4. Close the scanner.

Applying this to the inbox example, it is possible to paginate through all of the emails of a user. Assuming an average user has a few hundred emails in his inbox, it is quite common for a web-based email client to show only the first, for example, 50 emails. The remainder of the emails are then accessed by clicking the Next button to load the next page.

The client would set the start row to the user ID, and the stop row to the user ID + 1. The remainder of the process would follow the approach we just discussed, so for the first page, where the offset is zero, you can read the next 50 emails. When the user clicks the Next button, you would set the offset to 50, therefore skipping those first 50 rows, returning row 51 to 100, and so on.

This approach works well for a low number of pages. If you were to page through thousands of pages, a different approach would be required. You could add a sequential ID into the row key to directly position the start key at the right offset. Or you could use the date field of the key—if you are using one—to remember the date of the last displayed item and add the date to the start key, but probably dropping the hour part of it. If you were using epochs, you could compute the value for midnight of the last seen date. That way you can rescan that entire day and make a more knowledgeable decision regarding what to return.

There are many ways to design the row key to allow for efficient selection of subranges and enable pagination through records, such as the emails in the user inbox example. Using the composite row key with the user ID and date gives you a natural order, displaying the newest messages first, sorting them in descending order by date. But what if you also want to offer sorting by different fields so that the user can switch at will? One way to do this is discussed in Secondary Indexes.

Time Series Data

When dealing with stream processing of events, the most common use case is time series data. Such data could be coming from a sensor in a power grid, a stock exchange, or a monitoring system for computer systems. Its salient feature is that its row key represents the event time. This imposes a problem with the way HBase is arranging its rows: they are all stored sorted in a distinct range, namely regions with specific start and stop keys.

The sequential, monotonously increasing nature of time series data causes all incoming data to be written to the same region. And since this region is hosted by a single server, all the updates will only tax this one machine. This can cause regions to really run hot with the number of accesses, and in the process slow down the perceived overall performance of the cluster, because inserting data is now bound to the performance of a single machine.

It is easy to overcome this problem by ensuring that data is spread over all region servers instead. This can be done, for example, by prefixing the row key with a nonsequential prefix. Common choices include:

Salting

You can use a salting prefix to the key that guarantees a spread of all rows across all region servers. For example:

byte prefix = (byte) (Long.hashCode(timestamp) % <number of region
servers>);
byte[] rowkey = Bytes.add(Bytes.toBytes(prefix), Bytes.toBytes(timestamp);

This formula will generate enough prefix numbers to ensure that rows are sent to all region servers. Of course, the formula assumes a specific number of servers, and if you are planning to grow your cluster you should set this number to a multiple instead. The generated row keys might look like this:

0myrowkey-1, 1myrowkey-2, 2myrowkey-3, 0myrowkey-4, 1myrowkey-5, 
2myrowkey-6, ...

When these keys are sorted and sent to the various regions the order would be:

0myrowkey-1
0myrowkey-4
1myrowkey-2
1myrowkey-5
...

In other words, the updates for row keys 0myrowkey-1 and 0myrowkey-4 would be sent to one region (assuming they do not overlap two regions, in which case there would be an even broader spread), and 1myrowkey-2 and 1myrowkey-5 are sent to another.

The drawback of this approach is that access to a range of rows must be fanned out in your own code and read with <number of region servers> get or scan calls. On the upside, you could use multiple threads to read this data from distinct servers, therefore parallelizing read access. This is akin to a small map-only MapReduce job, and should result in increased I/O performance.

Field swap/promotion

Using the same approach as described in Partial Key Scans, you can move the timestamp field of the row key or prefix it with another field. This approach uses the composite row key concept to move the sequential, monotonously increasing timestamp to a secondary position in the row key.

If you already have a row key with more than one field, you can swap them. If you have only the timestamp as the current row key, you need to promote another field from the column keys, or even the value, into the row key.

There is also a drawback to moving the time to the righthand side in the composite key: you can only access data, especially time ranges, for a given swapped or promoted field.

Randomization

A totally different approach is to randomize the row key using, for example:

byte[] rowkey = MD5(timestamp)

Using a hash function like MD5 will give you a random distribution of the key across all available region servers. For time series data, this approach is obviously less than ideal, since there is no way to scan entire ranges of consecutive timestamps.

On the other hand, since you can re-create the row key by hashing the timestamp requested, it still is very suitable for random lookups of single rows. When your data is not scanned in ranges but accessed randomly, you can use this strategy.

Summarizing the various approaches, you can see that it is not trivial to find the right balance between optimizing for read and write performance. It depends on your access pattern, which ultimately drives the decision on how to structure your row keys. Figure 9-3 shows the various solutions and how they affect sequential read and write performance.

Finding the right balance between sequential read and write performance
Figure 9-3. Finding the right balance between sequential read and write performance

Using the salted or promoted field keys can strike a good balance of distribution for write performance, and sequential subsets of keys for read performance. If you are only doing random reads, it makes most sense to use random keys: this will avoid creating region hot-spots.

Time-Ordered Relations

In our preceding discussion, the time series data dealt with inserting new events as separate rows. However, you can also store related, time-ordered data: using the columns of a table. Since all of the columns are sorted per column family, you can treat this sorting as a replacement for a secondary index, as available in RDBMSes. Multiple secondary indexes can be emulated by using multiple column families—although that is not the recommended way of designing a schema. But for a small number of indexes, this might be what you need.

Consider the earlier example of the user inbox, which stores all of the emails of a user in a single row. Since you want to display the emails in the order they were received, but, for example, also sorted by subject, you can make use of column-based sorting to achieve the different views of the user inbox.

Note

Given the advice to keep the number of column families in a table low—especially when mixing large families with small ones (in terms of stored data)—you could store the inbox inside one table, and the secondary indexes in another table. The drawback is that you cannot make use of the provided per-table row-level atomicity. Also see Secondary Indexes for strategies to overcome this limitation.

The first decision to make concerns what the primary sorting order is, in other words, how the majority of users have set the view of their inbox. Assuming they have set the view in descending order by date, you can use the same approach mentioned earlier, which reverses the timestamp of the email, effectively sorting all of them in descending order by time:

Long.MAX_VALUE - <date-as-long>

The email itself is stored in the main column family, while the sort indexes are in separate column families. You can extract the subject from the email address and add it to the column key to build the secondary sorting order. If you need descending sorting as well, you would need another family.

To circumvent the proliferation of column families, you can alternatively store all secondary indexes in a single column family that is separate from the main column family. Once again, you would make use of implicit sorting by prefixing the values with an index ID—for example, idx-subject-desc, idx-to-asc, and so on. Next, you would have to attach the actual sort value. The actual value of the cell is the key of the main index, which also stores the message. This also implies that you need to either load the message details from the main table, display only the information stored in the secondary index, or store the display details redundantly in the index, avoiding the random lookup on the main information source. Recall that denormalization is quite common in HBase to reduce the required read operations in favor of vastly improved user-facing responsiveness.

Putting the aforementioned schema into action might result in something like this:

12345 : data : 5fc38314-e290-ae5da5fc375d : 1307097848 : "Hi Lars, ..."
12345 : data : 725aae5f-d72e-f90f3f070419 : 1307099848 : "Welcome, and ..."
12345 : data : cc6775b3-f249-c6dd2b1a7467 : 1307101848 : "To Whom It ..."
12345 : data : dcbee495-6d5e-6ed48124632c : 1307103848 : "Hi, how are ..."
...
12345 : index : [email protected] : 1307099848 : 725aae5f-d72e...
12345 : index : [email protected] : 1307103848 : dcbee495-6d5e...
12345 : index : [email protected] : 1307097848 : 5fc38314-e290...
12345 : index : [email protected] : 1307101848 : cc6775b3-f249...
...
12345 : index : idx-subject-desc-xa8x90x8dx93x9bxde : 
  1307103848 : dcbee495-6d5e-6ed48124632c
12345 : index : idx-subject-desc-xb7x9ax93x93x90xd3 : 
  1307099848 : 725aae5f-d72e-f90f3f070419
...

In the preceding code, one index (idx-from-asc) is sorting the emails in ascending order by from address, and another (idx-subject-desc) in descending order by subject. The subject itself is not readable anymore as it was bit-inversed to achieve the descending sorting order. For example:

% String s = "Hello,";
% for (int i = 0; i < s.length(); i++) { 
  print(Integer.toString(s.charAt(i) ^ 0xFF, 16)); 
}
b7 9a 93 93 90 d3

All of the index values are stored in the column family index, using the prefixes mentioned earlier. A client application can read the entire column family and cache the content to let the user quickly switch the sorting order. Or, if the number of values is large, the client can read the first 10 columns starting with idx-subject-desc to show the first 10 email messages sorted in ascending order by the email subject lines. Using a scan with intra-row batching (see Caching Versus Batching) enables you to efficiently paginate through the subindexes. Another option is the ColumnPaginationFilter, combined with the ColumnPrefixFilter to iterate over an index page by page.

Advanced Schemas

So far we have discussed how to use the provided table schemas to map data into the column-oriented layout HBase supports. You will have to decide how to structure your row and column keys to access data in a way that is optimized for your application.

Each column value is then an actual data point, stored as an arbitrary array of bytes. While this type of schema, combined with the ability to create columns with arbitrary keys when needed, enables you to evolve with new client application releases, there are use cases that require more formal support of a more feature-rich, evolveable serialization API, where each value is a compact representation of a more complex, nestable record structure.

Possible solutions include the already discussed serialization packages—see Introduction to REST, Thrift, and Avro for details—listed here as examples:

Avro

An exemplary project using Avro to store complex records in each column is HAvroBase.[97] This project facilitates Avro’s interface definition language (IDL) to define the actual schema, which is then used to store records in their serialized form within arbitrary table columns.

Protocol Buffers

Similar to Avro, you can use the Protocol Buffer’s IDL to define an external schema, which is then used to serialize complex data structures into HBase columns.

The idea behind this approach is that you get a definition language that allows you to define an initial schema, which you can then update by adding or removing fields. The serialization API takes care of reading older schemas with newer ones. Missing fields are ignored or filled in with defaults.

Secondary Indexes

Although HBase has no native support for secondary indexes, there are use cases that need them. The requirements are usually that you can look up a cell with not just the primary coordinates—the row key, column family name, and qualifier—but also an alternative coordinate. In addition, you can scan a range of rows from the main table, but ordered by the secondary index.

Similar to an index in RDBMSes, secondary indexes store a mapping between the new coordinates and the existing ones. Here is a list of possible solutions:

Client-managed

Moving the responsibility completely into the application layer, this approach typically combines a data table and one (or more) lookup/mapping tables. Whenever the code writes into the data table it also updates the lookup tables. Reading data requires either a direct lookup in the main table, or, if the key is from a secondary index, a lookup of the main row key, and then retrieval of the data in a second operation.

There are advantages and disadvantages to this approach. First, since the entire logic is handled in the client code, you have all the freedom to map the keys exactly the way they are needed. The list of shortcomings is longer, though: since you have no cross-row atomicity, for example, in the form of transactions, you cannot guarantee consistency of the main and dependent tables. This can be partially overcome using regular pruning jobs, for instance, using MapReduce to scan the tables and remove obsolete—or add missing—entries.

The missing transactional support could result in data being stored in the data table, but with no mapping in the secondary index tables, because the operation failed after the main table was updated, but before the index tables were written. This can be alleviated by writing to the secondary index tables first, and to the data table at the end of the operation. Should anything fail in the process, you are left with orphaned mappings, but those are subsequently removed by the asynchronous, regular pruning jobs.

Having all the freedom to design the mapping between the primary and secondary indexes comes with the drawback of having to implement all the necessary wiring to store and look up the data. External keys need to be identified to access the correct table, for example:

myrowkey-1
@myrowkey-2

The first key denotes a direct data table lookup, while the second, using the prefix, is a mapping that has to be performed through a secondary index table. The name of the table could be also encoded as a number and added to the prefix. The flip side is this is hardcoded in your application and needs to evolve with overall schema changes, and new requirements.

Indexed-Transactional HBase

A different solution is offered by the open source Indexed-Transactional HBase (ITHBase) project.[98] This solution extends HBase by adding special implementations of the client and server-side classes.

The core extension is the addition of transactions, which are used to guarantee that all secondary index updates are consistent. On top of this it adds index support, by providing a client-side IndexedTableDescriptor, defining how a data table is backed by a secondary index table.

Most client and server classes are replaced by ones that handle indexing support. For example, HTable is replaced with IndexedTable on the client side. It has a new method called getIndexedScanner(), which enables the iteration over rows in the data table using the ordering of a secondary index.

Just as with the client-managed index described earlier, this index stores the mappings between the primary and secondary keys in separate tables. In contrast, though, these are automatically created, and maintained, based on the descriptor. Combined with the transactional updates of these indexes, this solution provides a complete implementation of secondary indexes for HBase.

The drawback is that it may not support the latest version of HBase available, as it is not tied to its release cycle. It also adds a considerable amount of synchronization overhead that results in decreased performance, so you need to benchmark carefully.

Indexed HBase

Another solution that allows you to add secondary indexes to HBase is Indexed HBase (IHBase).[99] This solution forfeits the use of separate tables for each index but maintains them purely in memory. The indexes are generated when a region is opened for the first time, or when a memstore is flushed to disk—involving an entire region’s scan to build the index. Depending on your configured region size, this can take a considerable amount of time and I/O resources.

Only the on-disk information is indexed; the in-memory data is searched as-is: it uses the memstore data directly to search for index-related details. The advantage of this solution is that the index is never out of sync, and no explicit transactional control is necessary.

In comparison to table-based indexing, using this approach is very fast, as it has all the required details in memory and can perform a fast binary search to find matching rows. However, it requires a lot of extra heap to maintain the index. Depending on your requirements and the amount of data you want to index, you might run into a situation where IHBase cannot keep all the indexes you need.

The in-memory indexes are typed and allow for more fine-grained sorting, as well as more memory-efficient storage. There is support for BYTE, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE, BIG_DECIMAL, BYTE_ARRAY, and CHAR_ARRAY. There is no explicit control over the sorting order; thus data is always stored in ascending order. You will need to do the bitwise inversion of the value described earlier to sort in descending order.

The definition of an index revolves around the IdxIndexDescriptor class that defines the specific column of the data table that holds the index, and the type of the values it contains, taken from the list in the preceding paragraph.

Accessing an index is handled by the client-side IdxScan class, which extends the normal Scan class by adding support to define Expressions. A scan without an explicit expression defaults to normal scan behavior. Expressions provide basic boolean logic with an And and Or construct. For example:

Expression expression = Expression
  .or(
    Expression.comparison(columnFamily1, qualifer1, operator1, value1)
  )
  .or(
    Expression.and()
      .and(Expression.comparison(columnFamily2, qualifer2, operator2, value2))
      .and(Expression.comparison(columnFamily3, qualifer3, operator3, value3))
  );

The preceding example uses builder-style helper methods to generate a complex expression that combines three separate indexes. The lowest level of an expression is the Comparison, which allows you to specify the actual index, and a filter-like syntax to select values that match a comparison value and operator. Table 9-2 list the possible operator choices.

Table 9-2. Possible values for the Comparison.Operator enumeration
OperatorDescription
EQThe equals operator
GTThe greater than operator
GTEThe greater than or equals operator
LTThe less than operator
LTEThe less than or equals operator
NEQThe not equals operator

You have to specify a columnFamily, and a qualifier of an existing index, or else an IllegalStateException will be thrown.

The Comparison class has an optional includeMissing parameter, which works similarly to filterIfMissing, described in SingleColumnValueFilter. You can use it to fine-tune what is included in the scan depending on how the expression is evaluated.

The sorting order is defined by the first evaluated index in the expression, while the other indexes are used to intersect (for the and) or unite (for the or) the possible keys with the first index. In other words, using complex expressions is predictable only when using the same index, but with various comparisons.

The benefit of IHBase over ITHBase, for example, is that it achieves the same guarantees—namely maintaining a consistent index based on an existing column in a data table—but without the need to employ extra tables. It shares the same drawbacks, for the following reasons:

  • It is quite intrusive, as its installation requires additional JAR files plus a configuration that replaces vital client- and server-side classes.

  • It needs extra resources, although it trades memory for extra I/O requirements.

  • It does random lookups on the data table, based on the sorting order defined by the secondary index.

  • It may not be available for the latest version of HBase.[100]

Coprocessor

There is work being done to implement an indexing solution based on coprocessors.[101] Using the server-side hooks provided by the coprocessor framework, it is possible to implement indexing similar to ITHBase, as well as IHBase while not having to replace any client- and server-side classes. The coprocessor would load the indexing layer for every region, which would subsequently handle the maintenance of the indexes.

The code can make use of the scanner hooks to transparently iterate over a normal data table, or an index-backed view on the same. The definition of the index would need to go into an external schema that is read by the coprocessor-based classes, or it could make use of the generic attributes a column family can store.

Note

Since this is in its early stages, there is not much that can be documented at this time. Watch the online issue tracking system for updates on the work if you are interested.

Search Integration

Using indexes gives you the ability to iterate over a data table in more than the implicit row key order. You are still confined to the available keys and need to use either filters or straight iterations to find the values you are looking for. A very common use case is to combine the arbitrary nature of keys with a search-based lookup, often backed by full search engine integration.

Common choices are the Apache Lucene-based solutions, such as Lucene itself, or Solr, a high-performance enterprise search server.[102] Similar to the indexing solutions, there are a few possible approaches:

Client-managed

These range from implementations using HBase as the data store, and using MapReduce jobs to build the search index, to those that use HBase as the backing store for Lucene. Another approach is to route every update of the data table to the adjacent search index. Implementing support for search indexes in combination with HBase is primarily driven by how the data is accessed, and if HBase is used as the data store, or as the index store.

A prominent implementation of a client-managed solution is the Facebook inbox search. The schema is built roughly like this:

  • Every row is a single inbox, that is, every user has a single row in the search table.

  • The columns are the terms indexed from the messages.

  • The versions are the message IDs.

  • The values contain additional information, such as the position of the term in the document.

With this schema it is easy to search a user’s inbox for messages containing specific words. Boolean operators, such as and or or, can be implemented in the client code, merging the lists of documents found. You can also efficiently implement type-ahead queries: the user can start typing a word and the search finds all messages that contain words that match the user’s input as a prefix.

Lucene

Using Lucene—or a derived solution—separately from HBase involves building the index using a MapReduce job. An externally hosted project[103] provides the BuildTableIndex class, which was formerly part of the contrib modules shipping with HBase. This class scans an entire table and builds the Lucene indexes, which ultimately end up as directories on HDFS—their count depends on the number of reducers used. These indexes can be downloaded to a Lucene-based server, and accessed locally using, for example, a MultiSearcher class, provided by Lucene.

Another approach is to merge the index parts by either running the MapReduce job with a single reducer, or using the index merge tool that comes with Lucene. A merged index usually provides better performance, but the time required to build, merge, and eventually serve the index is longer.

In general, this approach uses HBase only to store the data. If a search is performed through Lucene, usually only the matching row keys are returned. A random lookup into the data table is required to display the document. Depending on the number of lookups, this can take a considerable amount of time. A better solution would be something that combines the search directly with the stored data, thus avoiding the additional random lookup.

HBasene

The approach chosen by HBasene[104] is to build an entire search index directly inside HBase, while supporting the well-established Lucene API. The schema used stores each document field, or term, in a separate row, with the documents containing the term stored as columns inside that row.

The schema also reuses the same table to store various other details required to implement full Lucene support. It implements an IndexWriter that stores the documents directly into the HBase table, as they are inserted using the normal Lucene API. Searching is then done using the Lucene search API. Here is an example taken from the test class that comes with HBasene:

private static final String[] AIRPORTS = { "NYC", "JFK", "EWR", "SEA", 
  "SFO", "OAK", "SJC" };

private final Map<String, List<Integer>> airportMap = 
  new TreeMap<String, List<Integer>>();

protected HTablePool tablePool;

protected void doInitDocs() throws CorruptIndexException, IOException {
  Configuration conf = HBaseConfiguration.create();
  HBaseIndexStore.createLuceneIndexTable("idxtbl", conf, true);
  tablePool = new HTablePool(conf, 10);
  HBaseIndexStore hbaseIndex = new HBaseIndexStore(tablePool, conf, 
    "idxtbl");
  HBaseIndexWriter indexWriter = new HBaseIndexWriter(hbaseIndex, "id")
  for (int i = 100; i >= 0; --i) {
    Document doc = getDocument(i);
    indexWriter.addDocument(doc, new StandardAnalyzer(Version.LUCENE_30));
  }
}

private Document getDocument(int i) {
  Document doc = new Document();
  doc.add(new Field("id", "doc" + i, Field.Store.YES, Field.Index.NO));
  int randomIndex = (int) (Math.random() * 7.0f);
  doc.add(new Field("airport", AIRPORTS[randomIndex], Field.Store.NO,
    Field.Index.ANALYZED_NO_NORMS));
  doc.add(new Field("searchterm", Math.random() > 0.5f ? 
    "always" : "never",
    Field.Store.NO, Field.Index.ANALYZED_NO_NORMS));
  return doc;
}

public TopDocs search() throws IOException {
  HBaseIndexReader indexReader = new HBaseIndexReader(tablePool, "idxtbl", 
    "id");    
  HBaseIndexSearcher indexSearcher = new HBaseIndexSearcher(indexReader);
  TermQuery termQuery = new TermQuery(new Term("searchterm", "always"));
  Sort sort = new Sort(new SortField("airport", SortField.STRING));
  TopDocs docs = this.indexSearcher.search(termQuery
    .createWeight(indexSearcher), null, 25, sort, false);
  return docs;
}

public static void main(String[] args) throws IOException {
  doInitDocs();
  TopDocs docs = search();
  // use the returned documents...
}

The example creates a small test index and subsequently searches it. You may note that there is a lot of Lucene API usage, with small amendments to support the HBase-backed index writer.

Note

The project—as of this writing—is more a proof of concept than a production-ready implementation.

Coprocessors

Yet another approach to complement a data table with Lucene-based search functionality, and currently in development,[105] is based on coprocessors. It uses the provided hooks to maintain the index, which is stored directly on HDFS. Every region has its own index and search is distributed across them to gather the full result.

This is only one example of what is possible with coprocessors. Similar to the use of coprocessors to build secondary indexes, you have the choice of where to store the actual index: either in another table, or externally. The framework offers the enabling technology; the implementing code has the choice of how to use it.

Transactions

It seems somewhat counterintuitive to talk about transactions in regard to HBase. However, the secondary index example showed that for some use cases it is beneficial to abandon the simplified data model HBase offers, and in fact introduce concepts that are usually seen in traditional database systems.

One of those concepts is transactions, offering ACID compliance across more than one row, and more than one table. This is necessary in lieu of a matching schema pattern in HBase. For example, updating the main data table and the secondary index table requires transactions to be reliably consistent.

Often, transactions are not needed, as normalized data schemas can be folded into a single table and row design that does not need the overhead of distributed transaction support. If you cannot do without this extra control, here are a few possible solutions:

Transactional HBase

The Indexed Transactional HBase project comes with a set of extended classes that replace the default client- and server-side classes, while adding support for transactions across row and table boundaries. The region servers, and more precisely, each region, keeps a list of transactions, which are initiated with a beginTransaction() call, and are finalized with the matching commit() call. Every read and write operation then takes a transaction ID to guard the call against other transactions.

ZooKeeper

HBase requires a ZooKeeper ensemble to be present, acting as the seed, or bootstrap mechanism, for cluster setup. There are templates, or recipes, available that show how ZooKeeper can also be used as a transaction control backend. For example, the Cages project offers an abstraction to implement locks across multiple resources, and is scheduled to add a specialized transactions class—using ZooKeeper as the distributed coordination system.

ZooKeeper also comes with a lock recipe that can be used to implement a two-phase commit protocol. It uses a specific znode representing the transaction, and a child znode for every participating client. The clients can use their znodes to flag whether their part of the transaction was successful or failed. The other clients can monitor the peer znodes and take the appropriate action.[106]

Bloom Filters

Column Families introduced the syntax to declare Bloom filters at the column family level, and discussed specific use cases in which it makes sense to use them.

The reason to use Bloom filters at all is that the default mechanisms to decide if a store file contains a specific row key are limited to the available block index, which is, in turn, fairly coarse-grained: the index stores the start row key of each contained block only. Given the default block size of 64 KB, and a store file of, for example, 1 GB, you end up with 16,384 blocks, and the same amount of indexed row keys.

If we further assume your cell size is an average of 200 bytes, you will have more than 5 million of them stored in that single file. Given a random row key you are looking for, it is very likely that this key will fall in between two block start keys. The only way for HBase to figure out if the key actually exists is by loading the block and scanning it to find the key.

This problem is compounded by the fact that, for a typical application, you will expect a certain update rate, which results in flushing in-memory data to disk, and subsequent compactions aggregating them into larger store files. Since minor compactions only combine the last few store files, and only up to a configured maximum size, you will end up with a number of store files, all acting as possible candidates to have some cells of the requested row key. Consider the example in Figure 9-4.

Using Bloom filters to help reduce the number of I/O operations
Figure 9-4. Using Bloom filters to help reduce the number of I/O operations

The files are all from one column family and have a similar spread in row keys, although only a few really hold an update to a specific row. The block index has a spread across the entire row key range, and therefore always reports positive to contain a random row. The region server would need to load every block to check if the block actually contains a cell of the row or not.

On the other hand, enabling the Bloom filter does give you the immediate advantage of knowing if a file contains a particular row key or not. The nature of the filter is that it can give you a definitive answer if the file does not contain the row—but might report a false positive, claiming the file contains the data, where in reality it does not. The number of false positives can be tuned and is usually set to 1%, meaning that in 1% of all reports by the filter that a file contains a requested row, it is wrong—and a block is loaded and checked erroneously.

Note

This does not translate into an immediate performance gain on individual get operations, since HBase does the reads in parallel, and is ultimately bound by disk read latency. Reducing the number of unnecessary block loads improves the overall throughput of the cluster.

You can see from the example, however, that the number of block loads is greatly reduced, which can make a big difference in a heavily loaded system. For this to be efficient, you must also match a specific update pattern: if you modify all of the rows on a regular basis, the majority of the store files will have a piece of the row you are looking for, and therefore would not be a good use case for Bloom filters. But if you update data in batches so that each row is written into only a few store files at a time, the filter is a great feature to reduce the overall number of I/O operations.

Another place where you will find this to be advantageous is in the block cache. The hit rate of the cache should improve as loading fewer blocks results in less churn. Since the server is now loading blocks that contain the requested data most of the time, related data has a greater chance to remain in the block cache and subsequent read operations can make use of it.

Besides the update pattern, another driving factor to decide if a Bloom filter makes sense for your use case is the overhead it adds. Every entry in the filter requires about one byte of storage. Going back to the earlier example store file that was 1 GB in size, assuming you store only counters (i.e., long values encoded as eight bytes), and adding the overhead of the KeyValue information—which is its coordinates, or, the row key, column family name, column qualifier, timestamp, and type—then every cell is about 20 bytes (further assuming you use very short keys) in size. Then the Bloom filter would be 1/20th of your file, or about 51 MB.

Now assume your cells are, on average, 1 KB in size; in this case, the filter needs only 1 MB. Taking into account further optimizations, you often end up with a row-level Bloom filter of a few hundred kilobytes for a store file of one or more gigabyte. In that case, it seems that it would always be to enable the filter.

The final question is whether to use a row or a row+column Bloom filter. The answer depends on your usage pattern. If you are doing only row scans, having the more specific row+column filter will not help at all: having a row-level Bloom filter enables you to narrow down the number of files that need to be checked, even when you do row+column read operations, but not the other way around.

The row+column Bloom filter is useful when you cannot batch updates for a specific row, and end up with store files which all contain parts of the row. The more specific row+column filter can then identify which of the files contain the data you are requesting. Obviously, if you always load the entire row, this filter is once again hardly useful, as the region server will need to load the matching block out of each file anyway.

Since the row+column filter will require more storage, you need to do the math to determine whether it is worth the extra resources. It is also interesting to know that there is a maximum number of elements a Bloom filter can hold. If you have too many cells in your store file, you might exceed that number and would need to fall back to the row-level filter.

Figure 9-5 summarizes the selection criteria for the different Bloom filter levels.

Selection criteria for deciding what Bloom filter to use
Figure 9-5. Selection criteria for deciding what Bloom filter to use

Depending on your use case, it may be useful to enable Bloom filters, to increase the overall performance of your system. If possible, you should try to use the row-level Bloom filter, as it strikes a good balance between the additional space requirements and the gain in performance coming from its store file selection filtering. Only resort to the more costly row+column Bloom filter when you would otherwise gain no advantage from using the row-level one.

Versioning

Now that we have seen how data is stored and retrieved in HBase, it is time to revisit the subject of versioning. There are a few advanced techniques when using timestamps that—given that you understand their behavior—may be an option for specific use cases. They also expose a few intricacies you should be aware of.

Implicit Versioning

I pointed out before that you should ensure that the clock on your servers is synchronized. Otherwise, when you store data in multiple rows across different servers, using the implicit timestamps, you may end up with completely different time settings.

For example, say you use the HBase URL Shortener and store three new shortened URLs for an existing user. All of the keys are considered fully distributed, so all three of the new rows end up on a different region server. Further, assuming that these servers are all one hour apart, if you were to scan from the client side to get the list of new shortened URLs within the past hour, you would miss a few, as they have been saved with a timestamp that is more than an hour different from what the client considers current.

This can be avoided by setting an agreed, or shared, timestamp when storing these values. The put operation allows you to set a client-side timestamp that is used instead, therefore overriding the server time. Obviously, the better approach is to rely on the servers doing this work for you, but you might be required to use this approach in some circumstances.[107]

Another issue with servers not being aligned by time is exposed by region splits. Assume you have saved a value on a server that is one hour ahead all other servers in the cluster, using the implicit timestamp of the server. Ten minutes later the region is split and the half with your update is moved to another server. Five minutes later you are inserting a new value for the same column, again using the automatic server time. The new value is now considered older than the initial one, because the first version has a timestamp one hour ahead of the current server’s time. If you do a standard get call to retrieve the newest version of the value, you will get the one that was stored first.

Once you have all the servers synchronized, there are a few more interesting side effects you should know about. First, it is possible—for a specific time—to make versions of a column reappear. This happens when you store more versions than are configured at the column family level. The default is to keep the last three versions of a cell, or value.

If you insert a new value 10 times into the same column, and request a complete list of all versions retained, using the setMaxVersions() call of the Get class, you will only ever receive up to what is configured in the table schema, that is, the last three versions by default.

But what would happen when you explicitly delete the last two versions? Example 9-1 demonstrates this.

Example 9-1. Application deleting with explicit timestamps
    for (int count = 1; count <= 6; count++) { 1
      Put put = new Put(ROW1);
      put.add(COLFAM1, QUAL1, count, Bytes.toBytes("val-" + count)); 2
      table.put(put);
    }

    Delete delete = new Delete(ROW1); 3
    delete.deleteColumn(COLFAM1, QUAL1, 5);
    delete.deleteColumn(COLFAM1, QUAL1, 6);
    table.delete(delete);
1

Store the same column six times.

2

The version is set to a specific value, using the loop variable.

3

Delete the newest two versions.

When you run the example, you should see the following output:

After put calls...
KV: row1/colfam1:qual1/6/Put/vlen=5, Value: val-6
KV: row1/colfam1:qual1/5/Put/vlen=5, Value: val-5
KV: row1/colfam1:qual1/4/Put/vlen=5, Value: val-4
After delete call...
KV: row1/colfam1:qual1/4/Put/vlen=5, Value: val-4
KV: row1/colfam1:qual1/3/Put/vlen=5, Value: val-3
KV: row1/colfam1:qual1/2/Put/vlen=5, Value: val-2

An interesting observation is that you have resurrected versions 2 and 3! This is caused by the fact that the servers delay the housekeeping to occur at well-defined times. The older versions of the column are still kept, so deleting newer versions makes the older versions come back.

This is only possible until a major compaction has been performed, after which the older versions are removed forever, using the predicate delete based on the configured maximum versions to retain.

Note

The example code has some commented-out code you can enable to enforce a flush and major compaction. If you rerun the example, you will see this result instead:

After put calls...
KV: row1/colfam1:qual1/6/Put/vlen=5, Value: val-6
KV: row1/colfam1:qual1/5/Put/vlen=5, Value: val-5
KV: row1/colfam1:qual1/4/Put/vlen=5, Value: val-4
After delete call...
KV: row1/colfam1:qual1/4/Put/vlen=5, Value: val-4

Since the older versions have been removed, they do not reappear anymore.

Finally, when dealing with timestamps, there is another issue to watch out for: delete markers. This refers to the fact that, in HBase, a delete is actually adding a tombstone marker into the store that has a specific timestamp. Based on that, it masks out versions that are either a direct match, or, in the case of a column delete marker, anything that is older than the given timestamp. Example 9-2 shows this using the shell.

Example 9-2. Deletes mask puts with explicit timestamps in the past
hbase(main):001:0> create 'testtable', 'colfam1'
0 row(s) in 1.1100 seconds

hbase(main):002:0> Time.now.to_i
=> 1308900346

hbase(main):003:0> put 'testtable', 'row1', 'colfam1:qual1', 'val1' 1
0 row(s) in 0.0290 seconds

hbase(main):004:0> scan 'testtable'
ROW   COLUMN+CELL
 row1 column=colfam1:qual1, timestamp=1308900355026, value=val1
1 row(s) in 0.0360 seconds

hbase(main):005:0> delete 'testtable', 'row1', 'colfam1:qual1' 2
0 row(s) in 0.0280 seconds

hbase(main):006:0> scan 'testtable'
ROW   COLUMN+CELL
0 row(s) in 0.0260 seconds

hbase(main):007:0> put 'testtable', 'row1', 'colfam1:qual1', 'val1',  
  Time.now.to_i - 50000 3
0 row(s) in 0.0260 seconds

hbase(main):008:0> scan 'testtable'
ROW   COLUMN+CELL
0 row(s) in 0.0260 seconds

hbase(main):009:0> flush 'testtable' 4
0 row(s) in 0.2720 seconds 

hbase(main):010:0> major_compact 'testtable'
0 row(s) in 0.0420 seconds

hbase(main):011:0> put 'testtable', 'row1', 'colfam1:qual1', 'val1',  
  Time.now.to_i - 50000 5
0 row(s) in 0.0280 seconds

hbase(main):012:0> scan 'testtable'
ROW   COLUMN+CELL
 row1 column=colfam1:qual1, timestamp=1308900423953, value=val1
1 row(s) in 0.0290 seconds
1

Store a value into the column of the newly created table, and run a scan to verify.

2

Delete all values from the column. This sets the delete marker with a timestamp of now.

3

Store the value again into the column, but use a time in the past. The subsequent scan fails to return the masked value.

4

Flush and conduct a major compaction of the table to remove the delete marker.

5

Store the value with the time in the past again. The subsequent scan now shows it as expected.

The example shows that there are sometimes situations where you might see something you do not expect to see. But this behavior is explained by the architecture of HBase, and is deterministic.

Custom Versioning

Since you can specify your own timestamp values—and therefore create your own versioning scheme—while overriding the server-side timestamp generation based on the synchronized server time, you are free to not use epoch-based versions at all.

For example, you could use the timestamp with a global number generator[108] that supplies you with ever increasing, sequential numbers starting at 1. Every time you insert a new value you retrieve a new number and use that when calling the put function.

You must do this for every put operation, or the server will insert an epoch-based timestamp instead. There is a flag in the table or column descriptors that indicates your use of custom timestamp values; in other words, your own versioning. If you fail to set the value, it is silently replaced with the server timestamp.

Warning

When using your own timestamp values, you need to test your solution thoroughly, as this approach has not been used widely in production.

Be aware that negative timestamp values are untested and, while they have been discussed a few times in HBase developer circles, they have never been confirmed to work properly.

Make sure to avoid collisions by using the same value for two separate updates to the same cell. Usually the last saved value is visible afterward.

With these warnings out of the way, here are a few use cases that show how a custom versioning scheme can be beneficial in the overall concept of table schema design:

Record IDs

A prominent example using this technique was discussed in Search Integration, that is, the Facebook inbox search. It uses the timestamp value to hold the message ID. Since these IDs are increasing over time, and the implicit sort order of versions in HBase is descending, you can retrieve, for example, the last 10 versions of a matching search term column to get the latest 10 messages, sorted by time, that contain said term.

Number generator

This follows on with the initially given example, making use of a distributed number generator. It may seem that a number generator would do the same thing as epoch-based timestamps do: sort all values ascending by a monotonously increasing value. The difference is subtler, because the resolution of the Java timer used is down to the millisecond, which means it is quite unlikely to store two values at the exact same time—but that can happen. If you were to require a solution in which you need an absolutely unique versioning scheme, using the number generator can solve this issue.

Using the time component of HBase is an interesting way to exploit this extra dimension offered by the architecture. You have less freedom, as it only accepts long values, as opposed to arbitrary binary keys supported by row and column keys. Nevertheless, it could solve your specific use case.



[94] You could, for example, use Orderly to generate the composite row keys.

[95] See the Mozilla wiki page on Socorro for details.

[96] See the OpenTSDB project website for details. In particular, the page that discusses the project’s schema is a recommended read, as it adds advanced key design concepts for an efficient storage format that also allows for high-performance querying of the stored data.

[97] See the HAvroBase GitHub project page.

[98] The ITHBase project started as a contrib module for HBase. It was subsequently moved to an external repository allowing it to address different versions of HBase, and to develop at its own pace. See the GitHub project page for details.

[99] Similar to ITHBase, IHBase started as a contrib project within HBase. It was moved to an external repository for the same reasons. See the GitHub project page for details. The original documentation of the JIRA issue is online at HBASE-2037.

[100] As of this writing, IHBase only supports HBase version 0.20.5.

[101] See HBASE-2038 in the JIRA issue tracking system for details.

[102] Solr is based on Lucene, but extends it to provide a fully featured search server. See the project’s website for details on either project.

[103] See the GitHub project page for details and to access the code.

[104] The GitHub page has the details, and source code.

[106] More details can be found on the ZooKeeper project page.

[107] One example, although very uncommon, is based on virtualized servers. See http://support.ntp.org/bin/view/Support/KnownOsIssues#Section_9.2.2, which lists an issue with NTP, the commonly used Network Time Protocol, on virtual machines.

[108] As an example for a number generator based on ZooKeeper, see the zk_idgen project.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.31.22