Chapter 7. Implementation of an Underlying Storage Engine

In the previous chapter, we described how Omneo uses the different Hadoop technologies to implement its use case. In this chapter, we will look more closely at all the different parts involving HBase. We will not discuss each and every implementation detail, but will cover all the required tools and examples to help you understand what is important during this phase.

As usual when implementing an HBase project, the first consideration is the table schema, which is the most important part of every HBase project. Designing an HBase schema can be straightforward, but depending on the use case, can also be quite complex and require significant planning and testing. It is a good practice to always start with this task, keeping in mind how data is received from your application (write path) and how you will need to retrieve it (read path). Read and write access patterns will dictate most of the table design.

Table Design

As we said and we will continue to say all over the book, table design is one of the most important parts of your project. The table schema, the key you will choose to use, and the different parameters you will configure will all have an impact on not only the performances of your application but also on the consistency. This is why for all the use cases we will describe, we are going to spend a lot of time on the table’s design. After your application is running for weeks and storing terabytes of data, moving back from a bad table design to a good one will require duplicating the entire dataset, a lot of time, and usually an update to the client application. Those are all costly operations that you might want to avoid by spending the right amount of time in this phase.

Table Schema

Table design for the Omneo use case is pretty easy, but let’s work through the steps so you can apply a similar approach to your own table schema design. We want both read and write paths to be efficient. In Omneo’s case, data is received from external systems in bulk. Therefore, unlike other ingestion patterns where data is inserted one single value at a time, here it can be processed directly in bulk format and doesn’t require single random writes or updates based on the key. On the read side, the user needs to be able to retrieve all the information for a specific sensor very quickly by searching on any combination of sensor ID, event ID, date, and event type. There is no way we can design a key to allow all those retrieval criteria to be efficient. We will need to rely on an external index, which given all of our criteria, will return a key that we will use to query HBase. Because the key will be retrieved from this external index and we don’t need to look up or scan for it, we can simply use a hash of the sensor ID, with the column qualifier being the event ID. You can refer to “Generate Test Data” to see a preview of the data format.

Sensors can have very similar IDs, such as 42, 43, and 44. However, sensor IDs can also have a wide range (e.g., 40,000–49,000). If we use the original sensor ID as the key, we might encounter hotspots on specific regions due to the keys’ sequential nature. You can read more about hotspotting in Chapter 16.

Hashing keys

One option for dealing with hotspotting is to simply presplit the table based on those different known IDs to make sure they are correctly distributed accross the cluster. However, what if distribution of those IDs changes in the future? In that case, splits might not be correct anymore, and we might again end up with hot spots on some regions. If today all IDs are between 40xxxx and 49xxxx, regions will be split from the beginning to 41, 41 to 42, 42 to 43, and so on. But if tomorrow a new group of sensors is added with IDs from 40xxx to 39xxx, they will end up in the first region. Because it is not possible to forecast what the future IDs will be, we need to find a solution to ensure a good distribution whatever the IDs will be. When hashing data, even two initially close keys will produce a very different result. In this example, 42 will produce 50a2fabfdd276f573ff97ace8b11c5f4 as its md5 hash, while 43 will produce f0287f33eba7192e2a9c6a14f829aa1a. As you can see, unlike the original sensor IDs 42 and 43, sorting those two md5 hashes puts them far from one another. And even if new IDs are coming, because they are now translated into a hexadecimal value, they will always be distributed between 0 and F. Using such a hashing approach will ensure a good distribution of the data across all the regions, while given a specific sensor ID, we still have direct access to its data.

Warning

The hash approach cannot be used when you need to scan your data keeping the initial order of the key, as the md5 version of the key disrupts the original ordering, distributing the rows throughout the table.

Column qualifier

Regarding the column qualifier, the event ID will be used. The event ID is a hash value received from the downstream system, unique for the given event for this specific sensor. Each event has a specific type, such as “alert”, “warning”, or “RMA” (which stands for return merchandise authorization). At first, we considered using the event type as a column qualifier. However, a sensor can encounter a single event type multiple times. Each “warning” a sensor encountered would overwrite the previous “warning”, unless we used HBase’s “versions” feature. Using the unique event ID as the column qualifier allows us to have multiple events with the same type for the same sensor being stored without having to code extra logic to use HBase’s “versions” feature to retrieve all of a sensor’s events.

Table Parameters

To get the best peformances possible, we have to look at all the parameters and make sure to set them as required depending on our need and usage. However, only the parameters that apply to this specific use case are listed in this section.

Compression

The first parameter we’ll examine is the compression algorithm used when writing table data to disk. HBase writes the data into HFiles in a block format. Each block is 64 KB by default, and is not compressed. Blocks store the data belonging to one region and column family. A table’s columns usually contain related information, which normally results in a common data pattern. Compressing those blocks can almost always give good results. As an example, it will be good to compress column families containing logs and customer information. HBase supports multiple compression algorithms: LZO, GZ (for GZip), SNAPPY, and LZ4. Each compression algorithm will have its own pros and cons. For each algorithm, consider the performance impact of compressing and decompressing the data versus the compression ratio (i.e., was the data sufficiently compressed to warrant running the compression algorithm?).

Snappy will be very fast in almost all operations but will have a lower compression ratio, while GZ will be more resource intensive but will normally compress better. The algorithm you will choose depends on your use case. It is recommended to test a few of them on a sample dataset to validate compression rate and performance. As an example, a 1.6 GB CSV file generates 2.2 GB of uncompressed HFiles, while from the exact same dataset, it uses only 1.5 GB with LZ4. Snappy compressed HFiles for the same dataset take 1.5 GB, too. Because read and write latencies are important for us, we will use Snappy for our table. Be aware of the availability of the various compression libraries on different Linux distributions. For example, Debian does not include Snappy libraries by default. Due to licensing, LZO and LZ4 libraries are usually not bundled with common Apache Hadoop distributions, and must be installed separately.

Tip

Keep in mind that compression ratio might vary based on the data type. Indeed, if you try to compress a text file, it will compress much better than a PNG image. For example, a 143,976 byte PNG file will only compress to 143,812 bytes (a space savings of only 2.3%), whereas a 143,509 byte XML file can compress as small as 6,284 bytes (a 95.7% space savings!) It is recommended that you test the different algorithms on your dataset before selecting one. If the compression ratio is not significant, avoid using compression and save processor overhead.

Data block encoding

Data block encoding is an HBase feature where keys are encoded and compressed based on the previous key. One of the encoding options (FAST_DIFF) asks HBase to store only the difference between the current key and the previous one. HBase stores each cell individually, with its key and value. When a row has many cells, much space can be consumed by writing the same key for each cell. Therefore, activating the data block encoding can allow important space saving. It is almost always helpful to activate data block encoding, so if you are not sure, activate FAST_DIFF. The current use case will benefit from this encoding because a given row can have thousands of columns.

Bloom filter

Bloom filters are useful in reducing unnecessary I/O by skipping input files from HBase regions. A Bloom filter will tell HBase if a given key might be or is not in a given file. But it doesn’t mean the key is definitively included in the file.

However, there are certain situations where Bloom filters are not required. For the current use case, files are loaded once a day, and then a major compaction is run on the table. As a result, there will almost always be only a single file per region. Also, queries to the HBase table will be based on results returned by Solr. This means read requests will always succeed and return a value. Because of that, the Bloom filter will always return true, and HBase will always open the file. As a result, for this specific use case, the Bloom filter will be an overhead and is not required.

Because Bloom filters are activated by default, in this case we will need to explicitly disable them.

Presplitting

Presplits are not really table parameters. Presplit information is not stored within the metadata of the table and is used only at the time of table creation. However, it’s important to have an understanding of this step before moving on the implementation. Presplitting a table means asking HBase to split the table into multiple regions when it is created. HBase comes with different presplit algorithms. The goal of presplitting a table is to make sure the initial load will be correctly distributed across all the regions and will not hotspot a single region. Granted, data would be distributed over time as region splits occur automatically, but presplitting provides the distribution from the onset.

Implementation

Now that we have decided which parameters we want to set for our table, it’s time to create it. We will keep all the default parameters except the ones we just discussed. Run the following command in the HBase shell to create a table called “sensors” with a single column family and the parameters we just discussed, presplit into 15 regions (NUMREGIONS and SPLITALGO are the two parameters used to instruct HBase to pre-split the table):

hbase(main):001:0> create 'sensors', {NUMREGIONS => 15,
                                      SPLITALGO => 'HexStringSplit'}, 
                                     {NAME => 'v', COMPRESSION => 'SNAPPY',
                                      BLOOMFILTER => 'NONE',
                                      DATA_BLOCK_ENCODING => 'FAST_DIFF'}

When your table is created, you can see its details using the HBase WebUI interface or the following shell command:

hbase(main):002:0> describe 'sensors'
Table sensors is ENABLED
sensors
COLUMN FAMILIES DESCRIPTION
{NAME => 'v', DATA_BLOCK_ENCODING => 'FAST_DIFF', BLOOMFILTER => 'NONE',
REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'SNAPPY',
MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE',
BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
1 row(s) in 0.1410 seconds
Tip

The NUMREGIONS and SPLITALGO parameters are used for the table creation but are not stored within the metadata of the table. It is not possible to retrieve this information after the table has been created.

As you can see, the parameters we specified are listed in the output, along with the default table parameters. The default parameters might vary based on the HBase version you are using. However, BLOOMFILTER, DATA_BLOCK_ENCODING, and COMPRESSION should be configured as we specified here.

Now that we have our table ready, we can move forward with the data preparation.

Data conversion

To be able to implement and test the described use case, we will need to have ingest data into our system. Therefore, it will be required to generated some testing data that we will later process and transform.

Generate Test Data

The next goal is to generate a set of representative test data to run through our process and verify the results. The first thing we will create is some data files with test values. The goal is to have a dataset to allow you to run the different commands and programs.

In the examples, you will find a class called CSVGenerator, which creates data resembling the code shown here:

1b87,58f67b33-5264-456e-938a-9d2e9c5f4db8,ALERT,NE-565,0-0000-000,1,ECEGYFFL ...
3244,350cee9e-55fc-409d-b389-6780a8af9e76,RETURNED,NE-382,0-0000-000,1,OOQTY ...
727d,b97df483-f0bd-4f24-8ff3-6988d8eff88c,ALERT,NE-858,0-0000-000,1,MSWOCQXM ...
53d4,d8c39bf8-6f5f-4311-8ee5-9d3bce3e18d7,RETURNED,NE-881,0-0000-000,1,PMKMW ...
1fa8,4a0bf5b3-680d-4b87-8d9e-e55f06614ae4,ALERT,NE-523,0-0000-000,1,IYIZSHKA ...

Each line contains a random sensor ID comprised of four characters (0 to 65535, represented in hexadecimal), then a random event ID, document type, part name, part number, version, and a payload formed of random letters (64 to 128 characters in length). To generate a different workload, you can rerun the CSVGenerator code any time you want. Subsequent parts of the example code will read this file from the ~/ahae/resources/ch07 folder. This class will create files relative to where it’s run; therefore we need to run the class from the ~/ahae folder. If you want to increase or reduce the size of the dataset, simply update the following line:

for (int index = 0; index < 1000000; index++) {

You can run this data generator directly from Eclipse without any parameter or from the shell into the ~/ahae folder using the following command:

hbase -classpath ~/ahae/target/ahae.jar com.architecting.ch07.CSVGenerator

This will create a file called omneo.csv in ~/ahae/resources/ch07/omneo.csv.

Create Avro Schema

Now that we have some data to start with, we need to define an Avro schema that will reflect the format of the data generated. Based on the search schema provided in the previous chapter, we will need the following Avro schema:

{"namespace": "com.architecting.ch07",
 "type": "record",
 "name": "Event",
 "fields": [
     {"name": "id", "type": "string"},
     {"name": "eventid",  "type": "string"},
     {"name": "docType",  "type": "string"},
     {"name": "partName",  "type": "string"},
     {"name": "partNumber",  "type": "string"},
     {"name": "version",  "type": "long"},
     {"name": "payload",  "type": "string"}
 ]
}

You can find the schema in the omneo.avsc file, which is available in the resources/ch07 directory. Because it has already been compiled and imported into the project, it is not required to compile it. However, if you want to modify it, you can recompile it using the following command:

java -jar ~/ahae/lib/avro-tools-1.7.7.jar compile schema omneo.avsc ~/ahae/src/

This creates the file ~/ahae/src/com/architecting/ch07/Event.java containing the Event object that will be used to store the Event Avro object into HBase.

Implement MapReduce Transformation

As shown in Example 7-1, the first steps of the production process is to parse the received CSV file to generate HBase HFiles, which will be the input to the next step. They will map the format of the previously created table.

Our production data will be large files, so we will implement this transformation using MapReduce to benefit from parallelism. The input of this MapReduce job will be the text file, and the output will be the HFiles. This dictates the way you should configure your MapReduce job.

Example 7-1. Convert to HFiles example
Table table = connection.getTable(tableName);

Job job = Job.getInstance(conf, "ConvertToHFiles: Convert CSV to HFiles");

HFileOutputFormat2.configureIncrementalLoad(job, table,
                                 connection.getRegionLocator(tableName)); 1
job.setInputFormatClass(TextInputFormat.class); 2

job.setJarByClass(ConvertToHFiles.class); 3
job.setJar("/home/cloudera/ahae/target/ahae.jar"); 3

job.setMapperClass(ConvertToHFilesMapper.class); 4
job.setMapOutputKeyClass(ImmutableBytesWritable.class); 5
job.setMapOutputValueClass(KeyValue.class); 6

FileInputFormat.setInputPaths(job, inputPath);
HFileOutputFormat2.setOutputPath(job, new Path(outputPath));
1

HBase provides a helper class that will do most of the configuration for you. This is the first thing to call when you want to configure your MapReduce job to provide HFiles as the output.

2

Here we want to read a text file with CSV data, so we will use TextInputFormat.

3

When running from the command line, all the required classes are bundled into a client JAR, which is referenced by the setJarByClass method. However, when running from Eclipse, it is necessary to manually provide the JAR path because the class that we are running is from the Eclipse environment, which MapReduce is not aware of. Because of that, we need to provide MapReduce with the path of an external file where the given class is also available.

4

Defines the mapper you want to use to parse your CSV content and create the Avro output.

5

We need to define ImmutableBytesWritable as the mapper output key class. It is the format we will use to write the key.

6

We need to define KeyValue as the mapper output value class. This will represent the data we want to store into our HFiles.

Warning

The reducer used to create the HFiles needs to load into memory the columns of a single row and then sort all before being able to write them all. If you have many columns in your dataset, it might not fit into memory. This should be fixed in a future release when HBASE-13897 will be implemented.

The operations on the mapper side are simple. The goal is just to split the line into different fields, assign them to an Avro object, and provide this Avro object to the HBase framework to be stored into HFiles ready to be loaded.

As shown in Example 7-2, the first thing we need to do is define a set of variables that we will reuse for each and every iteration of the mapper. This is done to reduce the number of objects created.

Example 7-2. Convert to HFiles mapper
  public static final ByteArrayOutputStream out = new ByteArrayOutputStream();
  public static final DatumWriter<Event> writer = new SpecificDatumWriter<Event>
                                                            (Event.getClassSchema());
  public static final BinaryEncoder encoder = encoderFactory.binaryEncoder(out,null);
  public static final Event event = new Event();
  public static final ImmutableBytesWritable rowKey = new ImmutableBytesWritable();

Those objects are all reused on the map method shown in Example 7-3.

Example 7-3. Convert to HFiles mapper
    // Extract the different fields from the received line.
    String[] line = value.toString().split(","); 1

    event.setId(line[0]);
    event.setEventId(line[1]);
    event.setDocType(line[2]);
    event.setPartName(line[3]);
    event.setPartNumber(line[4]);
    event.setVersion(Long.parseLong(line[5]));
    event.setPayload(line[6]);  2

    // Serialize the AVRO object into a ByteArray
    out.reset(); 3
    writer.write(event, encoder); 4
    encoder.flush();

    byte[] rowKeyBytes = DigestUtils.md5(line[0]);
    rowKey.set(rowKeyBytes); 5
    context.getCounter("Convert", line[2]).increment(1);

    KeyValue kv = new KeyValue(rowKeyBytes,
                            CF,
                            Bytes.toBytes(line[1]),
                            out.toByteArray()); 6
    context.write (rowKey, kv); 7
1

First, we split the line into fields so that we can have individual direct access to each of them.

2

Instead of creating a new Avro object at each iteration, we reuse the same object for all the map calls and simply assign it the new received values.

3

This is another example of object reuse. The fewer objects you create in your mapper code, the less garbage collection you will have to do and the faster your code will execute. The map method is called for each and every line of your input file. Creating a single ByteArrayOutputStream and reusing it and its internal buffer for each map iteration saves millions of object creations.

4

Serialize the Avro object into an array of bytes to store them into HBase, reusing existing objects as much as possible.

5

Construct our HBase key from the sensor ID.

6

Construct our HBase KeyValue object from our key, our column family, our eventid as the column qualifier and our Avro object as the value.

7

Emit our KeyValue object so the reducers can regroup them and write the required HFiles. The row key will only be used for partitioning the data. When data will be written into the underlying files, only the KeyValue data will be used for both the key and the value.

Tip

When implementing a MapReduce job, avoid creating objects when not required. If you need to access a small subset of fields in a String, it is not recommended to use the string split() method to extract the fields. Using split() on 10 million strings having 50 fields each will create 500 million objects that will be garbage collected. Instead, parse the string to find the few fields’ locations and use the substring() method. Also consider using the com.google.common.base.Splitter object from Guava libraries.

Again, the example can be run directly from Eclipse or from the command line. In both cases, you will need to specify the input file, the output folder, and the table name as the parameters. The table name is required for HBase to find the region’s boundaries to create the required splits in the output data, but also to look up the column family parameters such as the compression and the encoding. The MapReduce job will produce HFiles in the output folder based on the table regions and the column family parameters.

The following command will create the HFiles on HDFS (if because you are running on the standalone version you need the files to be generated on local disk, simply update the destination folder):

hbase -classpath ~/ahae/target/ahae.jar:`hbase classpath` 
com.architecting.ch09.ConvertToHFiles  1
file:///home/cloudera/ahae/resources/ch09/omneo.csv  2
hdfs://localhost/user/cloudera/ch09/hfiles/ sensors 3
1

The class called for the conversion

2

Our input file

3

Output folder and table name

If you start the class from Eclipse, make sure to add the parameters by navigating to Run → Run Configurations/Arguments.

Because this will start a MapReduce job, the output will be verbose and will give you lots of information. Pay attention to the following lines:

Map-Reduce Framework
        Map input records=1000000
        Map output records=1000000
        Reduce input groups=65536

The Map input records value represents the number of lines in your CSV file. Because for each line we emit one and only one Avro object, it matches the value of the Map output records counter. The Reduce input groups represents the number of unique keys. So here we can see that there were one million lines for 65,536 different rows, which gives us an average of 15 columns per row.

At the end of this process, your folder content should look like the following:

[cloudera@quickstart ~]$ hadoop fs -ls -R ch07/
drwxr-xr-x     0 2015-05-08 19:23 ch07/hfiles
-rw-r--r--     0 2015-05-08 19:23 ch07/hfiles/_SUCCESS
drwxr-xr-x     0 2015-05-08 19:23 ch07/hfiles/v
-rw-r--r-- 10480 2015-05-18 19:57 ch07/hfiles/v/345c5c462c6e4ff6875c3185ec84c48e
-rw-r--r-- 10475 2015-05-18 19:56 ch07/hfiles/v/46d20246053042bb86163cbd3f9cd5fe
-rw-r--r-- 10481 2015-05-18 19:56 ch07/hfiles/v/6419434351d24624ae9a49c51860c80a
-rw-r--r-- 10468 2015-05-18 19:57 ch07/hfiles/v/680f817240c94f9c83f6e9f720e503e1
-rw-r--r-- 10409 2015-05-18 19:58 ch07/hfiles/v/69f6de3c5aa24872943a7907dcabba8f
-rw-r--r-- 10502 2015-05-18 19:56 ch07/hfiles/v/75a255632b44420a8462773624c30f45
-rw-r--r-- 10401 2015-05-18 19:56 ch07/hfiles/v/7c4125bfa37740ab911ce37069517a36
-rw-r--r-- 10441 2015-05-18 19:57 ch07/hfiles/v/9accdf87a00d4fd68b30ebf9d7fa3827
-rw-r--r-- 10584 2015-05-18 19:58 ch07/hfiles/v/9ee5c28cf8e1460c8872f9048577dace
-rw-r--r-- 10434 2015-05-18 19:57 ch07/hfiles/v/c0adc6cfceef49f9b1401d5d03226c12
-rw-r--r-- 10460 2015-05-18 19:57 ch07/hfiles/v/c0c9e4483988476ab23b991496d8c0d5
-rw-r--r-- 10481 2015-05-18 19:58 ch07/hfiles/v/ccb61f16feb24b4c9502b9523f1b02fe
-rw-r--r-- 10586 2015-05-18 19:56 ch07/hfiles/v/d39aeea4377c4d76a43369eb15a22bff
-rw-r--r-- 10438 2015-05-18 19:57 ch07/hfiles/v/d3b4efbec7f140d1b2dc20a589f7a507
-rw-r--r-- 10483 2015-05-18 19:56 ch07/hfiles/v/ed40f94ee09b434ea1c55538e0632837

Owner and group information was condensed to fit the page. All the files belong to the user who has started the MapReduce job.

As you can see in the filesystem, the MapReduce job created as many HFiles as we have regions in the table.

Warning

When generating the input files, be careful to provide the correct column family. Indeed, it a common mistake to not provide the right column family name to the MapReduce job, which will create the directory structure based on its name. This will cause the bulk load phase to fail.

The folder within which the files are stored is named based on the column family name we have specified in our code—“v” in the given example.

HFile Validation

Throughout the process, all the information we get in the console is related to the MapReduce framework and tasks. However, even if they succeed, the content they have generated might not be good. For example, we might have used the wrong column family, forgotten to configure the compression when we created our table, or taken some other misstep.

HBase comes with a tool to read HFiles and extract the metadata. This tool is called the HFilePrettyPrinter and can be called by using the following command line:

hbase hfile -printmeta -f ch07/hfiles/v/345c5c462c6e4ff6875c3185ec84c48e

The only parameter this tool takes is the HFile location in HDFS.

Here we show part of the output of the previous command (some sections have been omitted, as they are not relevant for this chapter):

Block index size as per heapsize: 161264
reader=ch07/hfiles/v/345c5c462c6e4ff6875c3185ec84c48e,
    compression=snappy, 1
    cacheConf=CacheConfig:disabled,
    firstKey=7778/v:03afef80-7918-4a46-a903-f6e35b629926/1432004229936/Put, 2
    lastKey=8888/v:fc69a89f-4a78-4e2d-ae0a-b22dc93c962c/1432004229936/Put, 3
    avgKeyLen=53, 4
    avgValueLen=171, 5
    entries=666591, 6
    length=104861200 7

Let’s now take a look at the important parts of this output:

1

This shows you the compression format used for your file, which should reflect what you have configured when you created the table (we initially chose to use Snappy, but if you configured a different one, you should see it here).

2

Key of the first cell of this HFile, as well as column family name.

3

The last key contained in the HFile (only keys between 7778 and 8888 are present in the file; it is used by HBase to skip entire files when the key you are looking for is not between the first and last key).

4

Average size of the keys.

5

Average size of the values.

6

Number of cells present in the HFile.

7

Total size of the HFile.

Using the output of this command, you can validate that there is data in the files you have generated and that the format of the data is according to your expectations (compression, bloom filters, average key size, etc.).

Bulk Loading

Bulk loading inserts multiple pre-generated HFiles into HBase instead of performing puts one by one using the HBase API. Bulk loads are the most efficient way to insert a large quantity of values into the system. Here we will show you how to perform a bulk load.

The HDFS content of your table should look as follows (to fit the page width, file permissions and owner information was removed, and /hbase/data/default/sensors was abbreviated to …/s):

  0 2015-05-18 19:46 .../s/.tabledesc
287 2015-05-18 19:46 .../s/.tabledesc/.tableinfo.0000000001
  0 2015-05-18 19:46 .../s/.tmp
  0 2015-05-18 19:46 .../s/0cc853926c7c10d3d12959bbcacc55fd
 58 2015-05-18 19:46 .../s/0cc853926c7c10d3d12959bbcacc55fd/.regioninfo
  0 2015-05-18 19:46 .../s/0cc853926c7c10d3d12959bbcacc55fd/recovered.edits
  0 2015-05-18 19:46 .../s/0cc853926...3d12959bbcacc55fd/recovered.edits/2.seqid
  0 2015-05-18 19:46 .../s/0cc853926c7c10d3d12959bbcacc55fd/v

If your table is empty, you will still have all the region folders, because we have pre-split the table. HFiles might be present in the regions folders if data already existed prior to loading. We show only one region’s directory in the preceding snippet, and you can see that this region’s column family v is empty because it doesn’t contain any HFiles.

Our HFiles have been generated by the MapReduce job, and we now need to tell HBase to place the HFiles into the given table. This is done using the following command:

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles ch07/ 
hfiles sensors

In this command, we provide HBase the location of the HFiles we have generated (ch07/hfiles) and the table into which we want to insert those files (sensors). If the target table splits or merges some regions before the files are bulk loaded, splits and merges of the input HFiles will be handled on the client side at that time by the application. Indeed, the application used to push the HFiles into the HBase table will validate that each and every HFile still belongs to a single region. If a region got split before we pushed the file, the load tool will split the input files the same way before pushing them into the table. On the other side, if two regions are merged, the belonging input HFiles are simply going to be pushed into the same region.

When it runs, it will produce this output in the console:

$ hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles 
                                        ch07/hfiles sensors
2015-05-18 20:09:29,701 WARN [main] mapreduce.LoadIncrementalHFiles: Skipping
non-directory hdfs://quickstart.cloudera:8020/user/cloudera/ch07/hfiles/_SUCCESS
2015-05-18 20:09:29,768 INFO [main] Configuration.deprecation: hadoop.native.lib
is deprecated. Instead, use io.native.lib.available
2015-05-18 20:09:30,476 INFO [LoadIncrementalHFiles-0] compress.CodecPool: Got
brand-new decompressor [.snappy]

After completion of the bulk load, you should find your files in HDFS under the table and the regions they belong to. Looking again at HDFS should show you something like this (again, to fit the page width, file permissions and owner information was removed, /hbase/data/default/sensors was abbreviated to …/s, and the region encoded name has been truncated):

  0 2015-05-18 19:46 .../s/0cc...
 58 2015-05-18 19:46 .../s/0cc.../.regioninfo
  0 2015-05-18 19:46 .../s/0cc.../recovered.edits
  0 2015-05-18 19:46 .../s/0cc.../recovered.edits/2.seqid
  0 2015-05-18 20:09 .../s/0cc.../v
836 2015-05-18 19:56 .../s/0cc.../v/c0ab6873aa184cbb89c6f9d02db69e4b_SeqId_4_ 1
1

You can see that we now have a file in our previously empty region. This is one of the HFiles we initially created. By looking at the size of this file and by comparing it to the initial HFiles created by the MapReduce job, we can match it to ch07/hfiles/v/ed40f94ee09b434ea1c55538e0632837. You can also look at the other regions and map them to the other input HFiles.

Data Validation

Now that data is in the table, we need to verify that it is as expected. We will first check that we have as many rows as expected. Then we will verify that the records contain what we expect.

Table Size

Looking into an HFile using the HFilePrettyPrinter gives us the number of cells within a single HFile, but how many unique rows does it really represent? Because an HFile only represents a subset of rows, we need to count rows at the table level. HBase provides two different mechanisms to count the rows.

Counting from the shell

Counting the rows from the shell is pretty straightforward, simple, and efficient for small examples. It will simply do a full table scan and count the rows one by one. While this works well for small tables, it can take a lot of time for big tables, so we will use this method only when we are sure our tables are small.

Here is the command to count our rows:

hbase(main):003:0> count 'sensors', INTERVAL => 40000, CACHE => 40000
Current count: 40000, row: 9c3f
65536 row(s) in 1.1870 seconds

The count command takes up to three parameters. The first parameter, which is mandatory, is the name of the table whose rows you want to count. The second and third parameters are optional; the second parameter tells the shell to display a progress status only every 40,000 rows, and the third parameter is the size of the cache we want to use to do our full table scan. The third parameter is used to set up the setCaching value of the underlying scan object.

Counting from MapReduce

The second way to count the number of rows in an HBase table is to use the RowCounter MapReduce tool. The benefit of using MapReduce to count your rows is HBase will create one mapper per region in your table. For a very big table, this will distribute the work on multiple nodes to perform the count operation in parallel instead of scanning regions sequentially, which is what the shell’s count command does.

This tool is called from the command line by passing the table name only:

hbase org.apache.hadoop.hbase.mapreduce.RowCounter sensors

Here is the most important part of the output (some sections have been removed in order to focus attention on key information and to reduce the size of this snippet):

2015-05-18 20:21:02,493 INFO  [main] mapreduce.Job: Counters: 31
	Map-Reduce Framework
		Map input records=65536 1
		Map output records=0
		Input split bytes=1304
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=2446
		CPU time spent (ms)=48640
		Physical memory (bytes) snapshot=3187818496
		Virtual memory (bytes) snapshot=24042749952
		Total committed heap usage (bytes)=3864526848
	org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper$Counters
		ROWS=65536 2

Let’s now take a look at the important fields to consider here:

1

Because the input records of this job are the HBase rows, we will have as many input records as we have rows.

2

The number of rows we have in the table, which will match the number of input records. Indeed, this MapReduce job simply increments a ROWS counter for each input record.

Tip

HBase also provides a MapReduce tool called CellCounter to count not just the number of rows in a table, but also the number of columns and the number of versions for each of them. However, this tool needs to create a Hadoop counter for each and every unique row key found in the table. Hadoop has a default limit of 120 counters. It is possible to increase this limit, but increasing it to the number of rows we have in the table might create some issues. If you are working on a small dataset, this might be useful to test your application and debug it. This tool generally cannot be run on a big table. Some work have been done on the Apache repository to fix this limitation. Please refer to HBASE-15773 for more details.

File Content

We have our table with the number of lines we expected and the format we asked. But what does the data in the table really look like? Are we able to read what we wrote? Let’s see two ways to have a look at our data.

Using the shell

The easiest and quickest way to read data from HBase is to use the HBase shell. Using the shell, you can issue commands to retrieve the data you want. The first command is get, which will give you a single row. If you specify a column family, only the columns for that family are returned. If you specify both a column family and a column qualifier (separated with a colon), it will return only the specific value if it exists. The second option is to use scan, which will return a certain number of rows that we can limit using the LIMIT parameter or the STARTROW and STOPROW parameters. Both of the following commands will return all the columns for the row with row key value 000a:

get 'sensors', '000a', {COLUMN => 'v'}
scan 'sensors', {COLUMNS => ['v'], STARTROW => '000a', LIMIT => 1 }

Now as you will see in the output, there might be many columns for each row. If you want to limit the output to a specific column qualifier, you need to specify it in both commands the following way:

get 'sensors', '000a', {COLUMN => 'v:f92acb5b-079a-42bc-913a-657f270a3dc1'}
scan 'sensors', { COLUMNS => ['v:f92acb5b-079a-42bc-913a-657f270a3dc1'], 
                  STARTROW => '000a', STOPROW => '000a' }

The output of the get should then look like this:

COLUMN       CELL
 v:f9acb...  timestamp=1432088038576, value=x08000aHf92acb5b-079a-42bc-913a...
1 row(s) in 0.0180 seconds

Because the value is an Avro object, it contains some nonprintable characters, which are displayed as x08, but most of it should still be readable. This shows us that our table contains the expected key and data that matches what we are looking for.

Using Java

Using the shell, we have been able to validate that our table contains some data resembling Avro data, but to make sure it is exactly what we are expecting, we will need to implement a piece of Java code to retrieve the value, convert it into an Avro object, and retrieve the fields from it (see Example 7-4).

Example 7-4. Read Avro object from HBase
    try (Connection connection = ConnectionFactory.createConnection(config);
         Table sensorsTable = connection.getTable(sensorsTableName)) {  1
      Scan scan = new Scan ();
      scan.setCaching(1); 2

      ResultScanner scanner = sensorsTable.getScanner(scan);
      Result result = scanner.next(); 3
      if (result != null && !result.isEmpty()) {
        Event event = new Util().cellToEvent(result.listCells().get(0), null); 4
        LOG.info("Retrived AVRO content: " + event.toString());
      } else {
        LOG.error("Impossible to find requested cell");
      }
    }
1

Retrieves the table from the HBase connection.

2

Make sure we return from the scan after we get the first row. Because we don’t want to print more than one row, there is no need to wait for HBase to send us back more data.

3

Executes the scan against the table and returns the result.

4

Transforms the cell we received as the value into an Avro object.

Once again, you can run this example from Eclipse or from the command line. You should see output similar to what is shown here:

2015-05-20 18:30:24,214 INFO  [main] ch07.ReadFromHBase: Retrieved Avro object
 with ID 000a
2015-05-20 18:30:24,215 INFO  [main] ch07.ReadFromHBase: Avro content: {"id":
 "000a", "eventid": "f92acb5b-079a-42bc-913a-657f270a3dc1", "docType": "FAILURE",
 "partName": "NE-858", "partNumber": "0-0000-000", "version": 1, "payload":
 "SXOAXTPSIUFPPNUCIEVQGCIZHCEJBKGWINHKIHFRHWHNATAHAHQBFRAYLOAMQEGKLNZIFM 000a"}

With this very small piece of code, we have been able to perform the last step of the validation process and retrieved, de-serialized, and printed an Avro object from the table. To summarize, we have validated the size of the HFiles, their format, the numbers of entries in the HFiles and in the table, and the table content itself. We can now confirm that our data has been correctly and fully loaded into the table.

Data Indexing

The next and last step of the implementation consists of indexing the table we have just loaded, to be able to quickly search for any of the records using Solr. Indexing is an incremental process. Indeed, Omneo receives new files daily. As seen in the previous chapter, data from those files is loaded into a main table, which contains data from the previous days, and an indexation table. The goal is to add the indexation result into the Solr index built from previous days’ indexations. At the end, the index will reference all that has been uploaded to the main table. To implement this last example, you will need to have a Solr instance running on your environment. If you are comfortable with it, you can install and run it locally; however, HBase needs to run in pseudodistributed mode because the Solr indexer cannot work with the local jobrunner. Alternatively, you can execute this example in a virtual machine where Solr is already installed.

Most of the MapReduce indexing code has been built from the Solr examples and has been modified and simplified to index an HBase table.

After confirming that you have a working local Solr environment, running the running the following commands will create the Solr collection with a single shard and the provided schema:

export PROJECT_HOME=~/ahae/resources/ch07/search
rm -rf $PROJECT_HOME
solrctl instancedir --generate $PROJECT_HOME
mv $PROJECT_HOME/conf/schema.xml $PROJECT_HOME/conf/schema.old
cp $PROJECT_HOME/../schema.xml $PROJECT_HOME/conf/
solrctl instancedir --create Ch07-Collection $PROJECT_HOME
solrctl collection --create Ch07-Collection -s 1

In a production environment, to scale your application, you might want to consider using more shards.

The most important file to define your index is its schema.xml file. This file is available in the book’s GitHub repository and contains many tags. The most important section of the schema is the following:

<field name="id" type="string" indexed="true" stored="true" required="true"
                                                          multiValued="false" />
<field name="rowkey" type="binary" indexed="false" stored="true" omitNorms="true"
                                                               required="true"/>
<field name="eventId" type="string" indexed="true" stored="false"
                                              omitNorms="true" required="true"/>
<field name="docType" type="string" indexed="true" stored="false"
                                                              omitNorms="true"/>
<field name="partName" type="lowercase" indexed="true" stored="false"
                                                              omitNorms="true"/>
<field name="partNumber" type="lowercase" indexed="true" stored="false"
                                                              omitNorms="true"/>
<field name="version" type="long" indexed="true" stored="false" required="true"
                                                          multiValued="false" />
<field name="payload" type="string" indexed="true" stored="false" required="true"
                                                          multiValued="false" />
<field name="_version_" type="long" indexed="true" stored="true"/>

Because this book is focused on HBase, we won’t go into all the details of this file and all its fields, but invite you to look at the Solr online documentation.

The following commands will create the required index for the examples:

export PROJECT_HOME=~/ahae/resources/ch07/search
rm -rf $PROJECT_HOME
solrctl instancedir --generate $PROJECT_HOME
mv $PROJECT_HOME/conf/schema.xml $PROJECT_HOME/conf/schema.old
cp $PROJECT_HOME/../schema.xml $PROJECT_HOME/conf/
solrctl instancedir --create Ch07-Collection $PROJECT_HOME
solrctl collection --create Ch07-Collection -s 1

If, for any reason, you want to delete your collection, use the following commands:

solrctl collection --delete Ch07-Collection
solrctl instancedir --delete Ch07-Collection
solrctl instancedir --delete search

The steps to get the table indexed are pretty straightforward. The first thing we need to do is to scan the entire HBase table using MapReduce to create Solr index files. The second step is to bulk load those files into Solr similar to how we bulk loaded our HFiles into HBase. The entire code will not be shown here due to size, however there are few pieces we want to show you here.

Example 7-5 demonstrates how we need to configure our MapReduce job in the driver class.

Example 7-5. Index HBase Avro table to Solr using MapReduce driver
    scan.setCaching(500);        1
    scan.setCacheBlocks(false);  2

    TableMapReduceUtil.initTableMapperJob( 3
      options.inputTable,              // Input HBase table name
      scan,                            // Scan instance to control what to index
      HBaseAvroToSOLRMapper.class,     // Mapper to parse cells content
      Text.class,                      // Mapper output key
      SolrInputDocumentWritable.class, // Mapper output value
      job);

    FileOutputFormat.setOutputPath(job, outputReduceDir);

    job.setJobName(getClass().getName() + "/"
                            + Utils.getShortClassName(HBaseAvroToSOLRMapper.class));
    job.setReducerClass(SolrReducer.class); 4
    job.setPartitionerClass(SolrCloudPartitioner.class); 5
    job.getConfiguration().set(SolrCloudPartitioner.ZKHOST, options.zkHost);
    job.getConfiguration().set(SolrCloudPartitioner.COLLECTION, options.collection);
    job.getConfiguration().setInt(SolrCloudPartitioner.SHARDS, options.shards);

    job.setOutputFormatClass(SolrOutputFormat.class);
    SolrOutputFormat.setupSolrHomeCache(options.solrHomeDir, job);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(SolrInputDocumentWritable.class);
    job.setSpeculativeExecution(false);
1

By default, scans cache only one row at a time. To reduce remote procedure calls (RPC) calls and improve throughput, we want to increase the size of the cache.

2

Because we are going to scan the entire table once and only once, caching the blocks is not required and will just put pressure on the RegionServers’ blockcache. It is always recommended to disable the blockcache when running a MapReduce job over a table.

3

Again, we are using HBase utility classes to configure required MapReduce input formats and output formats as well as the required mapper.

4

Use the default Apache Solr reducer class.

5

Also use the default apache Solr partitioner class.

Everything on the class should be pretty straightforward to understand.

Now let’s have a look at the mapper. The goal of the mapper is to read the content from HBase and translate it for Solr. We have already written a class to create an Avro object from an HBase cell. We reuse the same code here, as this is exactly what we want to achieve. We want to read each cell, convert it back to an Avro object, and provide to Solr the data we want to index (Example 7-6).

Example 7-6. Index HBase Avro table to Solr using MapReduce mapper
        event = util.cellToEvent(cell, event); 1

        inputDocument.clear(); 2
        inputDocument.addField("id", UUID.randomUUID().toString()); 3
        inputDocument.addField("rowkey", row.get());
        inputDocument.addField("eventId", event.getEventId().toString());
        inputDocument.addField("docType", event.getDocType().toString());
        inputDocument.addField("partName", event.getPartName().toString());
        inputDocument.addField("partNumber", event.getPartNumber().toString());
        inputDocument.addField("version", event.getVersion());
        inputDocument.addField("payload", event.getPayload().toString());

        context.write(new Text(cell.getRowArray()),
                          new SolrInputDocumentWritable(inputDocument)); 4
1

Transform the received cell into an Avro object reusing the event instance to avoid creation of new objects.

2

Here again we want to reuse existing objects as much as possible and therefore will simply reinitialize and reuse the Solr input document.

3

Assign to the Solr input document all the fields we want to index or store from the Avro event object.

4

Write the Solr document to the context for indexing.

If you want to run the indexing from the command line, you will have to use the following command:

hbase -classpath ~/ahae/target/ahae.jar:`hbase classpath` 
com.architecting.ch07.MapReduceIndexerTool

You can also execute it from Eclipse without any specific parameter.

Data Retrieval

At this point, we have generated test data, transformed it into Avro format stored into HFiles, loaded it into a table, and indexed it into Solr. The only remaining piece is to make sure we can query Solr to find what we are looking for and then retrieve the related information from HBase. The HBase retrieval part is the same as what we have already seen before. You can query Solr using the code shown in Example 7-7.

Example 7-7. Retrieve Avro data from HBase based on Solr
    CloudSolrServer solr = new CloudSolrServer("localhost:2181/solr"); 1
    solr.setDefaultCollection("Ch09-Collection"); 2
    solr.connect();

    ModifiableSolrParams params = new ModifiableSolrParams();
    params.set("qt", "/select");
    params.set("q", "docType:ALERT AND partName:NE-555"); 3

    QueryResponse response = solr.query(params); 4
    SolrDocumentList docs = response.getResults();

    LOG.info("Found " + docs.getNumFound() + " matching documents.");
    if (docs.getNumFound() == 0) return;
    byte[] firstRowKey = (byte[]) docs.get(0).getFieldValue("rowkey");
    LOG.info("First document rowkey is " + Bytes.toStringBinary(firstRowKey));

    // Retrieve and print the first 10 columns of the first returned document
    Configuration config = HBaseConfiguration.create();
    try (Connection connection = ConnectionFactory.createConnection(config);
        Admin admin = connection.getAdmin();
        Table sensorsTable = connection.getTable(sensorsTableName)) {
      Get get = new Get(firstRowKey); 5

      Result result = sensorsTable.get(get);
      Event event = null;
      if (result != null && !result.isEmpty()) { 6
        for (int index = 0; index < 10; index++) { // Print first 10 columns
          if (!result.advance())
            break; // There are no more columns and we have not reached 10
          event = new Util().cellToEvent(result.current(), event);
          LOG.info("Retrieved AVRO content: " + event.toString());
        }
      } else {
        LOG.error("Impossible to find requested cell");
      }
    }
1

Connect to your Solr cluster. Adjust this if you are not running Solr on the same cluster as HBase.

2

Define the Solr collection you want to use.

3

Configure the request you want Solr to execute. Here we ask it for all the ALERT documents for the NE-555 part.

4

Execute the Solr request and retrieve the response from the server.

5

Call HBase, specifying the row key of the first document sent back by Solr.

6

Iterate over the columns for the given key and display the first 10 Avro objects retrieved from those columns.

Going Further

If you want to extend the examples presented in this chapter, the following list offers some options you can try based on our discussions from this chapter:

Bigger input file
To make sure examples run pretty fast, the dataset we worked with was pretty small. What about using a bigger dataset? Depending on the available disk space you have and the performance of your environment, try to create a significantly bigger input file and verify it’s processed the exact same way.
Single region table
Because it’s a good practice to avoid hotspotting, we have created a table with multiple regions split based on the key we used. Therefore the different MapReduce jobs have generated multiple files, one per region. What if we create a table with a single region instead? Try to modify the create table statement to have a single region and load more than 10 GB of data into it. You should see the region splitting after the data is inserted; however, since we are using bulk load, you should still not see any hotspotting on this region. You can validate your table splits and the content of each region by looking in HDFS, as discussed in “Bulk Loading”.
Impact on table parameters
We have created our table using the parameters that are good for our current use case. We recommend modifying the various parameters and rerunning the process to measure the impact.
Compression
Try to use different types of compression and compare. If you used Snappy (which is fast), try to configure LZ4 (which is slower, but compresses better), and compare the overall time it takes to process everything in relation to the size of your files.
Block encoding
Because of the format of the key we store into this table, we configured it to use the FAST_DIFF data block encoding. Here again, look at the performance and the overall data size at the end.
Bloom filter
When doing reads, Bloom filters are useful for skipping HBase store files where we can confirm the key we are looking for is not present. However, here we knew that the data we are looking for will always be present in the file, so we disabled the Bloom filters. Create a list of tens of keys and columns that you know are present in the table and measure how long it takes to read them all. Activate the Bloom filter on the table and run a major compaction, which will create the Bloom filters. There should be no noticeable performance gain on subsequent tests for this specific use case.
Warning

It is almost always good to have Bloom filters activated. We disabled them here because this use case is very specific. If you are not sure, just keep them on.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.139.105.159