Chapter 11. Implementation of HBase as a Master Data Management Tool

In Chapter 10, we reviewed the implementation of a customer 360 solution. In addition to HBase, it uses several different applications, including MapReduce and Hive. On the HBase side, we described how MapReduce is used to do lookups or to generate HBase files. In addition, as discussed in the previous chapter, Collective plans to improve its architecture by using Kafka. None of this should be new for you, as we covered it in detail in previous chapters. However, Collective is also planning to use Spark, and this is where things start to be interesting. Indeed, over the last several years, when applications needed to process HBase data, they have usually used MapReduce or the Java API. However, with Spark becoming more and more popular, we are seeing people starting to implement solutions using Spark on top of HBase.

Because we already covered Kafka, MapReduce, and the Java API in previous chapters, instead of going over all those technologies again to provide you with very similar examples, we will here focus on Spark over HBase. The example we are going to implement will still put the customer 360 description in action, but as for the other implementation examples, this can be reused for any other use case.

MapReduce Versus Spark

Before we continue, we should establish the pros and cons of using Spark versus using MapReduce. Although we will not provide a lengthy discussion on this topic, we will briefly highlight some points for you to consider as you narrow down your choice.

Spark is a recent technology, while MapReduce has been used for years. Although Spark has been proven to be stable, you might be more comfortable with a technology deployed in hundreds of thousands of production applications—if so, you should build your project around MapReduce. On the other hand, if you prefer to rely on recent technologies, then Spark will be a good fit.

For companies with good MapReduce knowledge that already have many MapReduce projects deployed, it might be easier, faster, and cheaper to stay with something they know well. However, if they start a brand-new project and are planning many projects, they might want to consider Spark.

Some use cases require fast and efficient data processing. MapReduce comes with a big overhead. It gives you reliability and many other benefits, but at the price of a performance hit. If your use case has tight SLAs, you might want to consider Spark. Otherwise, MapReduce is still a good option.

One last consideration is the development language. Spark is compatible with both Java and Scala code, while MapReduce is mostly for Java developers.

So which one should you choose? If you are still deciding between the two, we recommend trying Spark, as it has nice benefits over MapReduce.

Get Spark Interacting with HBase

As with MapReduce, there are two main ways for Spark to interact with HBase. The first is to run a Spark application on top of an HBase table. The second is to interact with HBase while running Spark on other data. We will assume that you have at least basic spark knowledge. Refer to the Spark website for more information.

Run Spark over an HBase Table

When we are running a MapReduce job on top of an HBase table, each mapper process a single region. You can also run a simple Java application that will scan the entire HBase table. This will be very similar with Spark. The HBase Spark API can return you an RDD representing your entire HBase table. But you partition this RDD to get it processed over multiple executors where each of them will process a single region

Calling HBase from Spark

The HBase Java API is accessible from the Spark code. Therefore, you can perform lookups like what we did when implementing our Flume interceptor. You can also use Spark to enrich and store data into HBase leveraging Puts and Increments. Last, the same way you can use MapReduce to generate HFiles for BulkLoads, you can use Spark to generate the same kind of files that you will later BulkLoad into HBase.

Implementing Spark with HBase

To illustrate using Spark with HBase, we will begin by implementing a Spark job to process a text file where each line is a record that we will use to enrich HBase data. Later, we will process another source of data that we will use to create HFiles to BulkLoad into HBase tables. Finally, we will implement a Spark job over the same HBase table to perform data aggregation.

Tip

You might find different versions of the Spark HBase API. The one we will use here is extracted from the Apache HBase 2.0 project. If you are using another version of the Spark HBase API, you might face some compilation issues, but the logic behind it will still be the same. You might be able to adjust the examples to compile with your code and run them.

All the operations we are going to implement will be done over the same table. Use the following command to create this table:

create 'user', {NAME => 'edge', BLOOMFILTER => 'NONE',
                COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'FAST_DIFF'},
               {NAME => 'segment', BLOOMFILTER => 'NONE',
                COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'FAST_DIFF'},
               {NAME => 'visitor', BLOOMFILTER => 'NONE',
                COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'FAST_DIFF'}

Let’s take a closer look at the table creation command and the reason for the different options we have chosen. First, we disabled the Bloom filters. Indeed, every time we query HBase for a row, we will use its rowID so that we can be sure that it exists. Also, most of the time, the entire dataset will be processed and not just a single line. Therefore, having to look at the Bloom filter just to confirm that the row exists is an overhead. Most of the time, it is a good idea to keep the Bloom filters on. If you are not sure, just keep them.

Second, we enabled the Snappy compression. Even if the compression ratio it gives is less efficient compared to some other algorithm, Snappy allows quick compression with very small impact on the CPU. Unless you are storing data like compressed images or sound, it is always a good idea to enable it.

Last, we enabled data block encoding. Data block encoding is very useful to reduce the overhead of storing the key for every column of the same row. Given that a user can have many columns, and given that the row key can be pretty long, activating the FAST_DIFF block encoding will provide us with extra savings.

Spark and HBase: Puts

There are two ways to enrich HBase data while processing an input file. The first way is to do that in “real time,” where we update data into HBase as we are parsing the input file. This allows you to see the results in HBase as soon as they are read by Spark. You can also implement a stream processing pipeline using similar code and Spark streaming. Now, if you don’t have the requirement to get the data in real time in HBase, you can process it to generate HBase HFiles to be bulk loaded. While the streaming approach will give you a more real-time way of updating HBase, the bulk load approach provides better throughput, as it doesn’t need to interact with HBase.

The very first thing we will need is a text file big enough to get a sense of Spark’s performance capabilities. As in “Kafka”, we will create a small utility to generate this file. This utility can be called as follows:

java -classpath ~/ahae/target/ahae.jar:`hbase classpath`
  com.architecting.ch11.DataGenerator 100000 > ~/ahae/resources/ch11/data.txt

After the file has been generated, it must be pushed into HDFS for processing:

hdfs dfs -put  ~/ahae/resources/ch11/data.txt

Running Spark over a text file doesn’t require any specific HBase API. The code in Example 11-1 simply reads a file from HDFS and processes all the lines. To keep it simple, we will only count those lines here, but later on we will enrich this example.

Example 11-1. Simple line count using Spark
 SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample ")
                                                             .setMaster("local[2]");
 JavaSparkContext jsc = new JavaSparkContext(sparkConf);
 JavaRDD<String> textFile = jsc.textFile("hdfs://localhost/user/cloudera/data.txt");
 JavaPairRDD<String, Integer> pairs = textFile.
                             mapToPair(new PairFunction<String, String, Integer>() {
   public Tuple2<String, Integer> call(String s) {
     return new Tuple2<String, Integer>(s.substring(0, s.indexOf("|")), 1); }
   });
 JavaPairRDD<String, Integer> counts = pairs.
                            reduceByKey(new Function2<Integer, Integer, Integer>() {
   public Integer call(Integer a, Integer b) { return a + b; }
   });
 System.out.println ("We have generaged " + counts.count() + " users");

This example should be straightforward—from each line, we extract the user ID only, then we group them together for a count. Because this is a very basic example, we will simply run it from Eclipse using the local environment.

Despite all the long DEBUG and INFO logs, what you are looking for is something like this:

We have generaged 999952 users

Now that we have a skeleton for our application, everything else will be standard HBase Java API calls mixed with Spark. As you might have figured, each entry in the input file represents information for someone. What we want to do is to update HBase with this information. There are two different ways to do that. The first option, illustrated in Example 11-2, has the benefit of being very easy to read and to understand. It will create one mutation per row on the input file and will buffer that to be sent to the HBase table.

Example 11-2. Performing HBase BulkPut using Spark
SparkConf sparkConf = new SparkConf().setAppName("IngestLines ")
                                     .setMaster("local[2]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
JavaRDD<String> textFile =
                   jsc.textFile("hdfs://localhost/user/cloudera/data.txt");

hbaseContext.bulkPut(textFile, TABLE_NAME, new Function<String, Put>() {
  @Override
  public Put call(String v1) throws Exception {
    String[] tokens = v1.split("\|");
    Put put = new Put(Bytes.toBytes(tokens[0]));
    put.addColumn(Bytes.toBytes("segment"),
                  Bytes.toBytes(tokens[1]),
                  Bytes.toBytes(tokens[2]));
    return put;
  }
});
jsc.close();

This code creates one mutation (a Put) for each and every column and simply returns it to the HBase Spark BulkPut framework. If we ignore the duplicates, we only have a maximum of seven columns per row, so it is acceptable to have one mutation for each. However, if you have tens of columns, you might improve performance by regrouping them first, and then creating the related Puts and emitting them directly to HBase. This approach is illustrated in Example 11-3, and if you are new to Spark, the code will be much more difficult to read and to understand. However, running on a local VM environment, with seven columns and one million lines, the code in Example 11-3 is about 10% faster than the code in Example 11-2. If you increase the number of columns, the difference will be even more significant.

Example 11-3. Text file to HBase Spark processing
SparkConf sparkConf = new SparkConf().setAppName("IngestLines ")
                                      .setMaster("local[2]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
JavaRDD<String> textFile =
                   jsc.textFile("hdfs://localhost/user/cloudera/data.txt");

PairFunction<String, String, String> linesplit = 1
new PairFunction<String, String, String>() {
  public Tuple2<String, String> call(String s) {
    int index = s.indexOf("|");
    return new Tuple2<String, String>(s.substring(0, index),
                                      s.substring(index + 1));
  }
};

JavaPairRDD<String, String> pairs = textFile.mapToPair(linesplit);
Function<String, List<String>> createCombiner =
new Function<String, List<String>>() {
  public List<String> call(String s) {
    List<String> list = new ArrayList<String>();
    list.add(s);
    return list;
  }
};

Function2<List<String>, String, List<String>> mergeValue =
new Function2<List<String>, String, List<String>>() {
  @Override
  public List<String> call(List<String> v1, String v2) throws Exception {
    v1.add(v2);
    return v1;
  }
};

Function2<List<String>, List<String>, List<String>> mergeCombiners =
new Function2<List<String>, List<String>, List<String>>() {
  @Override
  public List<String> call(List<String> v1, List<String> v2) throws Exception {
    v2.addAll(v1);
    return v2;
  }
};

JavaPairRDD<String, List<String>> keyValues = 2
            pairs.combineByKey(createCombiner, mergeValue, mergeCombiners);

JavaRDD<Put> keyValuesPuts = keyValues.map( 3
new Function<Tuple2<String, List<String>>, Put>() {
  @Override
  public Put call(Tuple2<String, List<String>> v1) throws Exception {
    Put put = new Put(Bytes.toBytes(v1._1));
    ListIterator<String> iterator = v1._2.listIterator();
    while (iterator.hasNext()) {
      String colAndVal = iterator.next();
      int indexDelimiter = colAndVal.indexOf("|");
      String columnQualifier = colAndVal.substring(0, indexDelimiter);
      String value = colAndVal.substring(indexDelimiter + 1);
      put.addColumn(COLUMN_FAMILY, Bytes.toBytes(columnQualifier),
                                   Bytes.toBytes(value));
    }
    return put;
  }
});

hbaseContext.foreachPartition(keyValuesPuts, 4
  new VoidFunction<Tuple2<Iterator<Put>, Connection>>() {
    @Override
    public void call(Tuple2<Iterator<Put>, Connection> t) throws Exception {
      Table table = t._2().getTable(TABLE_NAME);
      BufferedMutator mutator = t._2().getBufferedMutator(TABLE_NAME);
      while (t._1().hasNext()) {
        Put put = t._1().next();
        mutator.mutate(put);
      }

      mutator.flush();
      mutator.close();
      table.close();
    }
  });
jsc.close();
1

Extract the key from the lines to create pairs that can be regrouped together.

2

Combine all the entries based on the key so each key will have a list of “column qualifier|value” strings associated.

3

Transform one key and all its related “column qualifier|value” strings to a single HBase put object.

4

Emit all the puts to HBase.

Basically, what we are doing here is regrouping all the lines based on the key that we extract from the string, then we transform them into a single put for this line that we then send to HBase.

You can run this example directly from Eclipse.

Tip

Because of the way it is configured, this example only runs with two local threads. If you want to run it on YARN, remove the .setMaster("local[2]") parameter from the code and run the example again adding --master yarn-cluster and --deploy-mode client parameters.

Because the output of this example is quite verbose, we will not reproduce it here. However, when it is done, you can query your HBase table to make sure data has been processed:

hbase(main):005:0> scan 'user', LIMIT => 2
ROW               COLUMN+CELL
 0000003542a7-... column=segment:postalcode, ts=1457057812022, value=34270
 0000013542a7-... column=segment:birthdate, ts=1457057756713, value=20/05/1946
2 row(s) in 0.0330 seconds

Similarly to what we have done in Chapter 9 to enrich data as we ingest it into HBase, you can use the default HBase API to perform lookups before creating the mutations and enrich them with the required information.

Spark on HBase: Bulk Load

In the previous section, we discussed a real-time approach of updating an HBase table. If this is not required for your use case, you will achieve a better throughput by using the HBase bulk load option. Let’s reuse the same example, but this time, instead of interacting directly with HBase, we will generate HBase HFiles that we will upload later on.

The data will remain the same. The main difference will be on the Spark side. HBase split the tables in regions based on the keys boundaries. Each HFile that we will create will have to belong to one of those regions and will have to contain keys only within those regions boundaries.

At the time of writing, the Java API to perform bulk loads in Spark is not completed. This is tracked by the JIRA HBASE-14217. Until this JIRA is resolved, only Scala can be used to perform this operation. Therefore, the code in Example 11-4 will be done in Scala.

Example 11-4. HBase BulkLoad example in Scala using Spark
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.spark._
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._

object SparkBulkLoad {
  def main(args: Array[String]) {

    val columnFamily1 = "segment"
    val stagingFolder = "/tmp/user"
    val tableNameString = "user"
    val tableName = TableName.valueOf(tableNameString)

    val sc = new SparkContext(new SparkConf().setAppName("Spark BulkLoad")
                                             .setMaster("local[2]"))
    val config = HBaseConfiguration.create
    val hbaseContext = new HBaseContext(sc, config)

    val textFile = sc.textFile("hdfs://localhost/user/cloudera/data.txt")
    val toByteArrays = textFile.map(line => { 1
      val tokens = line.split("\|")
      (Bytes.toBytes(tokens(0)), (Bytes.toBytes(columnFamily1),
                                  Bytes.toBytes(tokens(1)),
                                  Bytes.toBytes(tokens(2))))
    })

    toByteArrays.hbaseBulkLoad(hbaseContext, tableName, 2
      t => { 3
        val rowKey = t._1
        val family:Array[Byte] = t._2._1
        val qualifier = t._2._2
        val value = t._2._3

        val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)

        Seq((keyFamilyQualifier, value)).iterator
      },
      stagingFolder)

    val load = new LoadIncrementalHFiles(config)
    load.run(Array(stagingFolder, tableNameString)) 4
  }
}
1

Apply transformation on the input file to transform each line into byte arrays representing the key, the column family, the column qualifier, and the value.

2

Based on the transformed input, make us of the Spark HBase API to generate the related HFiles.

3

Describe how bytes arrays are going to be transformed into the format required by the bulk load.

4

After HFiles are generated, make use of the LoadIncrementalHFiles API to load generated files into the target table.

There are a few steps required to run this application. First, because you will most probably not run this application as the Spark user, you will need permission to write in the applicationhistory directory. Use the following command to help you with this (you might need to set a password for HDFS):

sudo -u hdfs hadoop fs -chmod 1777 /user/spark/applicationHistory

Another important consideration to keep in mind is that, on a nonsecured cluster, HBase LoadIncrementalHFiles has to run with the HBase user. Indeed, it will push files into the /hbase tree, which will be read and written by the HBase processes. Pushing a file there with another user will end up with HBase not able to delete this file after compaction and fail.

You can run this small example using the following command:

su -l hbase -- spark-submit --class com.architecting.ch11.SparkBulkLoad 
            --master local ~/ahae/target/ahae.jar

Then add the following content into your HBase table:

hbase(main):013:0> scan 'user', LIMIT =>
ROW               COLUMN+CELL
 0000123542a7-... column=segment:postalcode, ts=1457567998199, value=34270
 0000153542a7-... column=segment:lastname, ts=1457567998199, value=Smith
 0000173542a7-... column=segment:birthdate, ts=1457567998199, value=06/03/1942
 0000173542a7-... column=segment:status, ts=1457567998199, value=maried
Tip

Spark stream is typically viewed as a micro batch processing engine. It will process a couple of entries at a time, then take the next batch, and so on. The bulk load approach creates HFiles. The bigger the files, the better throughput we will have. Because it will create HFiles for each small batch, using Spark streaming with bulk load will not make sense and is not recommended.

Spark Over HBase

The final example we will look at for using Spark and HBase will process the content of an HBase table using Spark. As we have done MapReduce over an HBase table, we will do Spark over an HBase table. The concept for Spark is similar; it will process the different regions in parallel.

The examples we’ll look at in this section are very simple, and they achieve the same goal—that is, they count the lines of an HBase table. Although this is very simple, you will see later how it can be extended for more complex use cases.

We’ll break the example down into a few pieces. The most important part is shown in Example 11-5.

Example 11-5. HBase Spark over HBase initialization
  // SparkConf sc = new SparkConf().setAppName("ProcessTable").setMaster("local[2]");
  SparkConf sc = new SparkConf().setAppName("ProcessTable");
  JavaSparkContext jsc = new JavaSparkContext(sc);
  Configuration conf = HBaseConfiguration.create();

  JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);

  Scan scan = new Scan();
  scan.setCaching(100);
  KeyOnlyFilter kof = new KeyOnlyFilter();
  scan.setFilter(kof);

  JavaRDD<Tuple2<ImmutableBytesWritable, Result>> data =
                          hbaseContext.hbaseRDD(TABLE_NAME, scan);

This is where all the magic happens. We start with the normal Spark and HBase initialization calls. If you are used to HBase, and as we have seen when we have done MapReduce over an HBase table, we have to initialize a scan that will help us to filter the rows that we do not want and return the one we are looking at. Here we just want to get all the rows. However, because we want to count them, we do not need to get the value so we will filter it and keep only the keys. This should already be familiar to you. The last line is the interesting part. Using the HBaseContext and given the table name and the scan object, you will get an RDD representation of your HBase table. Processing this RDD will handle the entire HBase table. Example 11-6 is a very basic count operation on the table rows.

Example 11-6. Spark HBase RDD count
System.out.println("data.count() = " + data.count());

This simple line of code will trigger a count on the Spark RDD. The Spark RRD representing the entire HBase table, it will simply count all the rows in the table. This is very simple, but it doesn’t really let you do any specific operation on the rows.

Examples 11-7 and 11-8 also perform a count on the HBase table, however, as you will see, even if they just count the rows, it will be very easy to modify these examples to perform any manipulation of the row you might want.

Example 11-7. HBase Spark over HBase partitions reduce count
  FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, Integer> setup =
  new FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, Integer>() {
    @Override
    public Iterable<Integer>
                      call(Iterator<Tuple2<ImmutableBytesWritable, Result>> input) {
      int a = 0;
      while (input.hasNext()) {
        a++; 1
        input.next();
      }
      ArrayList<Integer> ret = new ArrayList<Integer>();
      ret.add(a);
      return ret;
    }
  };
  Function2<Integer, Integer, Integer> combine =
  new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer a, Integer b) {
      return a+b; 2
    }
  };

  System.err.println("data.mapPartitions(setup).reduce(combine) = " +
      data.mapPartitions(setup).reduce(combine));
  long time3 = System.currentTimeMillis();
  System.err.println("Took " + (time3 - time2) + " milliseconds");
1

For each Spark partition, we simply increment a counter for each line we found. Here, it will be very easy to accumulate a value extracted from the HBase cell or do any other kind of aggregation we want.

2

Now that all the partitions are aggregated, to combine them, we simply need to sum all the values.

Example 11-7 looks very close to what we were doing in previous chapters with MapReduce. Because it doesn’t really map by partition, Example 11-8 is a bit different, but the output will again be the same.

Example 11-8. HBase Spark over HBase aggregate count
  Function2<Integer, Tuple2<ImmutableBytesWritable, Result>, Integer> aggregator =
  new Function2<Integer, Tuple2<ImmutableBytesWritable, Result>, Integer>() {
    @Override
    public Integer call(Integer v1, Tuple2<ImmutableBytesWritable, Result> v2)
        throws Exception {
      return v1 + 1; 1
    }
  };
  Function2<Integer, Integer, Integer> combiner =
  new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer v1, Integer v2) throws Exception {
      return v1 + v2; 2
    }
  };

  System.err.println("data.aggregate(0, aggregator, combiner) = " +
                                           data.aggregate(0, aggregator, combiner));
  long time4 = System.currentTimeMillis();
  System.err.println("Took " + (time4 - time3) + " milliseconds");
  // end::PROCESS1[]

  jsc.close();
}

public static void main(String[] args) {
  processVersion1();
}

}
1

Because we just want to count the rows, we simply emit 1 for each one. However, think about something more complicated. Imagine that the cell contains an Avro object representing an order. If you want to sum all the order’s amounts, this is the place where you will do that. Simply extract the Avro object from the parameter and aggregate its amount.

2

Again, to combine the values together, we simply have to sum them.

Examples 11-7 and 11-8 can very easily be modified to achieve way more processing on the data you have:

  • Think about doing data correlation where you perform an HBase lookup on the map side to enrich an output file that will generate Parquet data for analytic quests.

  • Think about doing aggregation to enrich your HBase table with daily, weekly, and monthly values.

  • Think about feeding a Kafka key to allow subsequent systems to receive and process the HBase data.

Everything you were previously doing with MapReduce can now be achieved using Spark.

Going Further

If you want to extend the examples presented in this chapter, the following list offers some options you can try based on our discussion:

Partitions

To clearly see the benefit of partitioning the RRDs, create a bigger HBase table with multiple regions and try to create two version of your Spark code: one that runs over all the data with a single executor, and another that partitions the data to run it over multiple executors. Instead of a single executor processing the entire table, you should see one executor per HBase region on your table. The difference in execution time should be significant.

Streaming

Try to convert the put example into Spark streaming. Instead of processing the entire file at once, try to process it by micro batches. This should allow you to start seeing results into HBase sooner than with the entire puts bulked at once.

Scan modification

Try to modify the scanner initialization to sort only certain rows, or extract only certain columns from your table. The scan object you give to your Spark RDD will determine what data you are going to receive on the subsequent calls.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.9.169