Chapter 17. Storage Handlers and NoSQL

Storage Handlers are a combination of InputFormat, OutputFormat, SerDe, and specific code that Hive uses to treat an external entity as a standard Hive table. This allows the user to issue queries seamlessly whether the table represents a text file stored in Hadoop or a column family stored in a NoSQL database such as Apache HBase, Apache Cassandra, and Amazon DynamoDB. Storage handlers are not only limited to NoSQL databases, a storage handler could be designed for many different kinds of data stores.

Note

A specific storage handler may only implement some of the capabilities. For example, a given storage handler may allow read-only access or impose some other restriction.

Storage handlers offer a streamlined system for ETL. For example, a Hive query could be run that selects a data table that is backed by sequence files, however it could output

Storage Handler Background

Hadoop has an abstraction known as InputFormat that allows data from different sources and formats to be used as input for a job. The TextInputFormat is a concrete implementation of InputFormat. It works by providing Hadoop with information on how to split a given path into multiple tasks, and it provides a RecordReader that provides methods for reading data from each split.

Hadoop also has an abstraction known as OutputFormat, which takes the output from a job and outputs it to an entity. The TextOutputFormat is a concrete implementation of OutputFormat. It works by persisting output to a file which could be stored on HDFS or locally.

Input and output that represent physical files are common in Hadoop, however InputFormat and OutputFormat abstractions can be used to load and persist data from other sources including relational databases, NoSQL stores like Cassandra or HBase, or anything that InputFormat or OutputFormat can be designed around!

In the HiveQL chapter, we demonstrated the Word Count example written in Java Code, and then demonstrated an equivalent solution written in Hive. Hive’s abstractions such as tables, types, row format, and other metadata are used by Hive to understand the source data. Once Hive understands the source data, the query engine can process the data using familiar HiveQL operators.

Many NoSQL databases have implemented Hive connectors using custom adapters.

HiveStorageHandler

HiveStorageHandler is the primary interface Hive uses to connect with NoSQL stores such as HBase, Cassandra, and others. An examination of the interface shows that a custom InputFormat, OutputFormat, and SerDe must be defined. The storage handler enables both reading from and writing to the underlying storage subsystem. This translates into writing SELECT queries against the data system, as well as writing into the data system for actions such as reports.

When executing Hive queries over NoSQL databases, the performance is less than normal Hive and MapReduce jobs on HDFS due to the overhead of the NoSQL system. Some of the reasons include the socket connection to the server and the merging of multiple underlying files, whereas typical access from HDFS is completely sequential I/O. Sequential I/O is very fast on modern hard drives.

A common technique for combining NoSQL databases with Hadoop in an overall system architecture is to use the NoSQL database cluster for real-time work, and utilize the Hadoop cluster for batch-oriented work. If the NoSQL system is the master data store, and that data needs to be queried on using batch jobs with Hadoop, bulk exporting is an efficient way to convert the NoSQL data into HDFS files. Once the HDFS files are created via an export, batch Hadoop jobs may be executed with a maximum efficiency.

HBase

The following creates a Hive table and an HBase table using HiveQL:

CREATE TABLE hbase_stocks(key INT, name STRING, price FLOAT)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,stock:val")
TBLPROPERTIES ("hbase.table.name" = "stocks");

To create a Hive table that points to an existing HBase table, the CREATE EXTERNAL TABLE HiveQL statement must be used:

CREATE EXTERNAL TABLE hbase_stocks(key INT, name STRING, price FLOAT)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = "cf1:val")
TBLPROPERTIES("hbase.table.name" = "stocks");

Instead of scanning the entire HBase table for a given Hive query, filter pushdowns will constrain the row data returned to Hive.

Examples of the types of predicates that are converted into pushdowns are:

  • key < 20

  • key = 20

  • key < 20 and key > 10

Any other more complex types of predicates will be ignored and not utilize the pushdown feature.

The following is an example of creating a simple table and a query that will use the filter pushdown feature. Note the pushdown is always on the HBase key, and not the column values of a column family:

CREATE TABLE hbase_pushdown(key int, value string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string");

SELECT * FROM hbase_pushdown WHERE key = 90;

The following query will not result in a pushdown because it contains an OR on the predicate:

SELECT * FROM hbase_pushdown
WHERE key <= '80' OR key >= '100';

Hive with HBase supports joins on HBase tables to HBase tables, and HBase tables to non-HBase tables.

By default, pushdowns are turned on, however they may be turned off with the following:

set hive.optimize.ppd.storage=false;

It is important to note when inserting data into HBase from Hive that HBase requires unique keys, whereas Hive has no such constraint.

A few notes on column mapping Hive for HBase:

  • There is no way to access the HBase row timestamp, and only the latest version of a row is returned

  • The HBase key must be defined explicitly

Cassandra

Cassandra has implemented the HiveStorageHandler interface in a similar way to that of HBase. The implementation was originally performed by Datastax on the Brisk project.

The model is fairly straightforward, a Cassandra column family maps to a Hive table. In turn, Cassandra column names map directly to Hive column names.

Static Column Mapping

Static column mapping is useful when the user has specific columns inside Cassandra which they wish to map to Hive columns. The following is an example of creating an external Hive table that maps to an existing Cassandra keyspace and column family:

CREATE EXTERNAL TABLE Weblog(useragent string, ipaddress string, timestamp string)
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES (
  "cassandra.columns.mapping" = ":key,user_agent,ip_address,time_stamp")
TBLPROPERTIES (
  "cassandra.range.size" = "200",
  "cassandra.slice.predicate.size" = "150" );

Transposed Column Mapping for Dynamic Columns

Some use cases of Cassandra use dynamic columns. This use case is where a given column family does not have fixed, named columns, but rather the columns of a row key represent some piece of data. This is often used in time series data where the column name represents a time and the column value represents the value at that time. This is also useful if the column names are not known or you wish to retrieve all of them:

CREATE EXTERNAL TABLE Weblog(useragent string, ipaddress string, timestamp string)
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES (
  "cassandra.columns.mapping" = ":key,:column,:value");

Cassandra SerDe Properties

The following properties in Table 17-1 can be declared in a WITH SERDEPROPERTIES clause:

Table 17-1. Cassandra SerDe storage handler properties

NameDescription

cassandra.columns.mapping

Mapping of Hive to Cassandra columns

cassandra.cf.name

Column family name in Cassandra

cassandra.host

IP of a Cassandra node to connect to

cassandra.port

Cassandra RPC port: default 9160

cassandra.partitioner

Partitioner: default RandomPartitioner

The following properties in Table 17-2 can be declared in a TBLPROPERTIES clause:

Table 17-2. Cassandra table properties

NameDescription

cassandra.ks.name

Cassandra keyspace name

cassandra.ks.repfactor

Cassandra replication factor: default 1

cassandra.ks.strategy

Replication strategy: default SimpleStrategy

cassandra.input.split.size

MapReduce split size: default 64 * 1024

cassandra.range.size

MapReduce range batch size: default 1000

cassandra.slice.predicate.size

MapReduce slice predicate size: default 1000

DynamoDB

Amazon’s Dynamo was one of the first NoSQL databases. Its design influenced many other databases, including Cassandra and HBase. Despite its influence, Dynamo was restricted to internal use by Amazon until recently. Amazon released another database influenced by the original Dynamo called DynamoDB.

DynamoDB is in the family of key-value databases. In DynamoDB, tables are a collection of items and they are required to have a primary key. An item consists of a key and an arbitrary number of attributes. The set of attributes can vary from item to item.

You can query a table with Hive and you can move data to and from S3. Here is another example of a Hive table for stocks that is backed by a DynamoDB table:

CREATE EXTERNAL TABLE dynamo_stocks(
 key INT, symbol STRING,
 ymd STRING, price FLOAT)
STORED BY
'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES (
 "dynamodb.table.name" = "Stocks",
 "dynamodb.column.mapping" =
  "key:Key,symbol:Symbol,
   ymd:YMD,price_close:Close");

See http://aws.amazon.com/dynamodb/ for more information about DynamoDB.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.149.27.202