Chapter 15. Customizing Hive File and Record Formats

Hive functionality can be customized in several ways. First, there are the variables and properties that we discussed in Variables and Properties. Second, you may extend Hive using custom UDFs, or user-defined functions, which was discussed in Chapter 13. Finally, you can customize the file and record formats, which we discuss now.

File Versus Record Formats

Hive draws a clear distinction between the file format, how records are encoded in a file, the record format, and how the stream of bytes for a given record are encoded in the record.

In this book we have been using text files, with the default STORED AS TEXTFILE in CREATE TABLE statements (see Text File Encoding of Data Values), where each line in the file is a record. Most of the time those records have used the default separators, with occasional examples of data that use commas or tabs as field separators. However, a text file could contain JSON or XML “documents.”

For Hive, the file format choice is orthogonal to the record format. We’ll first discuss options for file formats, then we’ll discuss different record formats and how to use them in Hive.

Demystifying CREATE TABLE Statements

Throughout the book we have shown examples of creating tables. You may have noticed that CREATE TABLE has a variety of syntax. Examples of this syntax are STORED AS SEQUENCEFILE, ROW FORMAT DELIMITED , SERDE, INPUTFORMAT, OUTPUTFORMAT. This chapter will cover much of this syntax and give examples, but as a preface note that some syntax is sugar for other syntax, that is, syntax used to make concepts easier (sweeter) to understand. For example, specifying STORED AS SEQUENCEFILE is an alternative to specifying an INPUTFORMAT of org.apache.hadoop.mapred.SequenceFileInputFormat and an OUTPUTFORMAT of org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat.

Let’s create some tables and use DESCRIBE TABLE EXTENDED to peel away the sugar and expose the internals. First, we will create and then describe a simple table (we have formatted the output here, as Hive otherwise would not have indented the output):

hive> create table text (x int) ;
hive> describe extended text;
OK
x       int

Detailed Table Information
Table(tableName:text, dbName:default, owner:edward, createTime:1337814583,
lastAccessTime:0, retention:0,
sd:StorageDescriptor(
  cols:[FieldSchema(name:x, type:int, comment:null)],
  location:file:/user/hive/warehouse/text,
  inputFormat:org.apache.hadoop.mapred.TextInputFormat,
  outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,
  compressed:false,
  numBuckets:-1,
  serdeInfo:SerDeInfo(
    name:null,
    serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
    parameters:{serialization.format=1}
  ),
  bucketCols:[], sortCols:[], parameters:{}), partitionKeys:[],
  parameters:{transient_lastDdlTime=1337814583},
  viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE
)

Now let’s create a table using STORED AS SEQUENCEFILE for comparison:

hive> CREATE TABLE seq (x int) STORED AS SEQUENCEFILE;
hive> DESCRIBE EXTENDED seq;
OK
x       int

Detailed Table Information
Table(tableName:seq, dbName:default, owner:edward, createTime:1337814571,
lastAccessTime:0, retention:0,
sd:StorageDescriptor(
  cols:[FieldSchema(name:x, type:int, comment:null)],
  location:file:/user/hive/warehouse/seq,
  inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
  outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
  compressed:false, numBuckets:-1,
  serdeInfo:SerDeInfo(
    name:null,
    serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
    parameters:{serialization.format=1}
  ),
  bucketCols:[], sortCols:[], parameters:{}), partitionKeys:[],
  parameters:{transient_lastDdlTime=1337814571},
  viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE
)
Time taken: 0.107 seconds

Unless you have been blinded by Hive’s awesomeness, you would have picked up on the difference between these two tables. That STORED AS SEQUENCEFILE has changed the InputFormat and the OutputFormat:

inputFormat:org.apache.hadoop.mapred.TextInputFormat,
outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,

inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,

Hive uses the InputFormat when reading data from the table, and it uses the OutputFormat when writing data to the table.

Note

InputFormat reads key-value pairs from files; Hive currently ignores the key and works only with the data found in the value by default. The reason for this is that the key, which comes from TextInputFormat, is a long integer that represents the byte offset in the block (which is not user data).

The rest of the chapter describes other aspects of the table metadata.

File Formats

We discussed in Text File Encoding of Data Values that the simplest data format to use is the text format, with whatever delimiters you prefer. It is also the default format, equivalent to creating a table with the clause STORED AS TEXTFILE.

The text file format is convenient for sharing data with other tools, such as Pig, Unix text tools like grep, sed, and awk, etc. It’s also convenient for viewing or editing files manually. However, the text format is not space efficient compared to binary formats. We can use compression, as we discussed in Chapter 11, but we can also gain more efficient usage of disk space and better disk I/O performance by using binary file formats.

SequenceFile

The first alternative is the SequenceFile format, which we can specify using the STORED AS SEQUENCEFILE clause during table creation.

Sequence files are flat files consisting of binary key-value pairs. When Hive converts queries to MapReduce jobs, it decides on the appropriate key-value pairs to use for a given record.

The sequence file is a standard format supported by Hadoop itself, so it is an acceptable choice when sharing files between Hive and other Hadoop-related tools. It’s less suitable for use with tools outside the Hadoop ecosystem. As we discussed in Chapter 11, sequence files can be compressed at the block and record level, which is very useful for optimizing disk space utilization and I/O, while still supporting the ability to split files on block boundaries for parallel processing.

Another efficient binary format that is supported natively by Hive is RCFile.

RCFile

Most Hadoop and Hive storage is row oriented, which is efficient in most cases. The efficiency can be attributed to several factors: most tables have a smaller number (1−20) of columns. Compression on blocks of a file is efficient for dealing with repeating data, and many processing and debugging tools (more, head, awk) work well with row-oriented data.

Not all tools and data stores take a row-oriented approach; column-oriented organization is a good storage option for certain types of data and applications. For example, if a given table has hundreds of columns but most queries use only a few of the columns, it is wasteful to scan entire rows then discard most of the data. However, if the data is stored by column instead of by row, then only the data for the desired columns has to be read, improving performance.

It also turns out that compression on columns is typically very efficient, especially when the column has low cardinality (only a few distinct entries). Also, some column-oriented stores do not physically need to store null columns.

Hive’s RCFile is designed for these scenarios.

While books like Programming Hive are invaluable sources of information, sometimes the best place to find information is inside the source code itself. A good description of how Hive’s column storage known as RCFile works is found in the source code:

cd hive-trunk
find . -name "RCFile*"
vi ./ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
 * <p>
 * RCFile stores columns of a table in a record columnar way. It first
 * partitions rows horizontally into row splits. and then it vertically
 * partitions each row split in a columnar way. RCFile first stores the meta
 * data of a row split, as the key part of a record, and all the data of a row
 * split as the value part.
 * </p>

A powerful aspect of Hive is that converting data between different formats is simple. Storage information is stored in the tables metadata. When a query SELECTs from one table and INSERTs into another, Hive uses the metadata about the tables and handles the conversion automatically. This makes for easy evaluation of the different options without writing one-off programs to convert data between the different formats.

Creating a table using the ColumnarSerDe, RCFileInputFormat, and RCFileOutputFormat:

hive> select * from a;
OK
4       5
3       2
Time taken: 0.336 seconds
hive> create table columnTable (key int , value int)
    > ROW FORMAT SERDE
    >   'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
    > STORED AS
    >   INPUTFORMAT  'org.apache.hadoop.hive.ql.io.RCFileInputFormat'
    >   OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileOutputFormat';

hive> FROM a INSERT OVERWRITE TABLE columnTable SELECT a.col1, a.col2;

RCFile’s cannot be opened with the tools that open typical sequence files. However, Hive provides an rcfilecat tool to display the contents of RCFiles:

$ bin/hadoop dfs -text /user/hive/warehouse/columntable/000000_0
text: java.io.IOException: WritableName can't load class:
org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer
$ bin/hive --service rcfilecat /user/hive/warehouse/columntable/000000_0
4       5
3       2

Example of a Custom Input Format: DualInputFormat

Many databases allow users to SELECT without FROM. This can be used to perform simple calculations, such as SELECT 1+2. If Hive did not allow this type of query, then a user would instead select from an existing table and limit the results to a single row. Or the user may create a table with a single row. Some databases provide a table named dual, which is a single row table to be used in this manner.

By default, a standard Hive table uses the TextInputFormat. The TextInputFormat calculates zero or more splits for the input. Splits are opened by the framework and a RecordReader is used to read the data. Each row of text becomes an input record. To create an input format that works with a dual table, we need to create an input format that returns one split with one row, regardless of the input path specified.[23]

In the example below, DualInputFormat returns a single split:

public class DualInputFormat implements InputFormat{
  public InputSplit[] getSplits(JobConf jc, int i) throws IOException {
    InputSplit [] splits = new DualInputSplit[1];
    splits[0]= new DualInputSplit();
    return splits;
  }
  public RecordReader<Text,Text> getRecordReader(InputSplit split, JobConf jc,
          Reporter rprtr) throws IOException {
    return new DualRecordReader(jc, split);
  }
}

In the example below the split is a single row. There is nothing to serialize or deserialize:

public class DualInputSplit implements InputSplit {
  public long getLength() throws IOException {
    return 1;
  }
  public String[] getLocations() throws IOException {
    return new String [] { "localhost" };
  }
  public void write(DataOutput d) throws IOException {
  }
  public void readFields(DataInput di) throws IOException {
  }
}

The DualRecordReader has a Boolean variable hasNext. After the first invocation of next(), its value is set to false. Thus, this record reader returns a single row and then is finished with virtual input:

public class DualRecordReader implements RecordReader<Text,Text>{
  boolean hasNext=true;
  public DualRecordReader(JobConf jc, InputSplit s) {
  }
  public DualRecordReader(){
  }
  public long getPos() throws IOException {
    return 0;
  }
  public void close() throws IOException {
  }
  public float getProgress() throws IOException {
    if (hasNext)
      return 0.0f;
    else
      return 1.0f;
  }
  public Text createKey() {
    return new Text("");
  }
  public Text createValue() {
    return new Text("");
  }
  public boolean next(Text k, Text v) throws IOException {
    if (hasNext){
      hasNext=false;
      return true;
    } else {
      return hasNext;
    }
  }
}

We can create a table using our DualInputFormat and the default HiveIgnoreKeyTextOutputFormat. Selecting from the table confirms that it returns a single empty row. InputFormats should be placed inside the Hadoop lib directory or preferably inside the Hive auxlib directory.

client.execute("add jar dual.jar");
client.execute("create table dual (fake string) "+
  "STORED AS INPUTFORMAT 'com.m6d.dualinputformat.DualInputFormat'"+
  "OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
client.execute("select count(1) as cnt from dual");
String row = client.fetchOne();
assertEquals("1", row);
client.execute("select * from dual");
row = client.fetchOne();
assertEquals( "", row);

Record Formats: SerDes

SerDe is short for serializer/deserializer. A SerDe encapsulates the logic for converting the unstructured bytes in a record, which is stored as part of a file, into a record that Hive can use. SerDes are implemented using Java. Hive comes with several built-in SerDes and many other third-party SerDes are available.

Internally, the Hive engine uses the defined InputFormat to read a record of data. That record is then passed to the SerDe.deserialize() method.

A lazy SerDe does not fully materialize an object until individual attributes are necessary.

The following example uses a RegexSerDe to parse a standard formatted Apache web log. The RegexSerDe is included as a standard feature as a part of the Hive distribution:

CREATE TABLE serde_regex(
  host STRING,
  identity STRING,
  user STRING,
  time STRING,
  request STRING,
  status STRING,
  size STRING,
  referer STRING,
  agent STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|\[[^\]]*\])
    ([^ "]*|"[^"]*") (-|[0-9]*) (-|[0-9]*)(?: ([^ "]*|"[^"]*")
    ([^ "]*|"[^"]*"))?",
  "output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s"
)
STORED AS TEXTFILE;

Now we can load data and write queries:

hive> LOAD DATA LOCAL INPATH "../data/files/apache.access.log" INTO TABLE serde_regex;
hive> LOAD DATA LOCAL INPATH "../data/files/apache.access.2.log" INTO TABLE serde_regex;

hive> SELECT * FROM serde_regex ORDER BY time;

(The long regular expression was wrapped to fit.)

CSV and TSV SerDes

What about CSV (comma-separated values) and TSV (tab-separated values) files? Of course, for simple data such as numerical data, you can just use the default test file format and specify the field delimiter, as we saw previously. However, this simplistic approach doesn’t handle strings with embedded commas or tabs, nor does it handle other common conventions, like whether or not to quote all or no strings, or the optional presence of a “column header” row as the first line in each file.

First, it’s generally safer to remove the column header row, if present. Then one of several third-party SerDes are available for properly parsing CSV or TSV files. For CSV files, consider CSVSerde:

ADD JAR /path/to/csv-serde.jar;

CREATE TABLE stocks(ymd STRING, ...)
ROW FORMAT SERDE 'com.bizo.hive.serde.csv.CSVSerde'
STORED AS TEXTFILE
...;

While TSV support should be similar, there are no comparable third-party TSV SerDes available at the time of this writing.

ObjectInspector

Underneath the covers, Hive uses what is known as an ObjectInspector to transform raw records into objects that Hive can access.

Think Big Hive Reflection ObjectInspector

Think Big Analytics has created an ObjectInspector based on Java reflection called BeansStructObjectInspector. Using the JavaBeans model for introspection, any “property” on objects that are exposed through get methods or as public member variables may be referenced in queries.

An example of how to use the BeansStructObjectInspector is as follows:

public class SampleDeserializer implements Deserializer {
  @Override
  public ObjectInspector getObjectInspector() throws SerDeException {
    return BeansStructObjectInspector.getBeansObjectInspector(YourObject.class);
  }
}

XML UDF

XML is inherently unstructured, which makes Hive a powerful database platform for XML. One of the reasons Hadoop is ideal as an XML database platform is the complexity and resource consumption to parse and process potentially large XML documents. Because Hadoop parallelizes processing of XML documents, Hive becomes a perfect tool for accelerating XML-related data solutions. Additionally, HiveQL natively enables access to XML’s nested elements and values, then goes further by allowing joins on any of the nested fields, values, and attributes.

XPath (XML Path Language) is a global standard created by the W3C for addressing parts of an XML document. Using XPath as an expressive XML query language, Hive becomes extremely useful for extracting data from XML documents and into the Hive subsystem.

XPath models an XML document as a tree of nodes. Basic facilities are provided for access to primitive types, such as string, numeric, and Boolean types.

While commercial solutions such as Oracle XML DB and MarkLogic provide native XML database solutions, open source Hive leverages the advantages provided by the parallel petabyte processing of the Hadoop infrastructure to enable widely effective XML database vivification.

XPath-Related Functions

Hive contains a number of XPath-related UDFs since the 0.6.0 release (Table 15-1).

Table 15-1. XPath UDFs

NameDescription

xpath

Returns a Hive array of strings

xpath_string

Returns a string

xpath_boolean

Returns a Boolean

xpath_short

Returns a short integer

xpath_int

Returns an integer

xpath_long

Returns a long integer

xpath_float

Returns a floating-point number

xpath_double, xpath_number

Returns a double-precision floating-point number

Here are some examples where these functions are run on string literals:

hive> SELECT xpath('<a><b id="foo">b1</b><b id="bar">b2</b></a>','//@id')
    > FROM src LIMIT 1;
[foo","bar]
hive> SELECT xpath ('<a><b class="bb">b1</b><b>b2</b><b>b3</b><c class="bb">c1</c>
 <c>c2</c></a>', 'a/*[@class="bb"]/text()')
    > FROM src LIMIT 1;
[b1","c1]

(The long XML string was wrapped for space.)

hive> SELECT xpath_double ('<a><b>2</b><c>4</c></a>', 'a/b + a/c')
    > FROM src LIMIT 1;
6.0

JSON SerDe

What if you want to query JSON (JavaScript Object Notation) data with Hive? If each JSON “document” is on a separate line, you can use TEXTFILE as the input and output format, then use a JSON SerDe to parse each JSON document as a record.

There is a third-party JSON SerDe that started as a Google “Summer of Code” project and was subsequently cloned and forked by other contributors. Think Big Analytics created its own fork and added an enhancement we’ll go over in the discussion that follows.

In the following example, this SerDe is used to extract a few fields from JSON data for a fictitious messaging system. Not all the available fields are exposed. Those that are exposed become available as columns in the table:

CREATE EXTERNAL TABLE messages (
  msg_id      BIGINT,
  tstamp      STRING,
  text        STRING,
  user_id     BIGINT,
  user_name   STRING
)
ROW FORMAT SERDE "org.apache.hadoop.hive.contrib.serde2.JsonSerde"
WITH SERDEPROPERTIES (
  "msg_id"="$.id",
  "tstamp"="$.created_at",
  "text"="$.text",
  "user_id"="$.user.id",
  "user_name"="$.user.name"
)
LOCATION '/data/messages';

The WITH SERDEPROPERTIES is a Hive feature that allows the user to define properties that will be passed to the SerDe. The SerDe interprets those properties as it sees fit. Hive doesn’t know or care what they mean.

In this case, the properties are used to map fields in the JSON documents to columns in the table. A string like $.user.id means to take each record, represented by $, find the user key, which is assumed to be a JSON map in this case, and finally extract the value for the id key inside the user. This value for the id is used as the value for the user_id column.

Once defined, the user runs queries as always, blissfully unaware that the queries are actually getting data from JSON!

Avro Hive SerDe

Avro is a serialization systemit’s main feature is an evolvable schema-driven binary data format. Initially, Avro’s goals appeared to be in conflict with Hive since both wish to provide schema or metadata information. However Hive and the Hive metastore have pluggable design and can defer to the Avro support to infer the schema.

The Hive Avro SerDe system was created by LinkedIn and has the following features:

  • Infers the schema of the Hive table from the Avro schema

  • Reads all Avro files within a table against a specified schema, taking advantage of Avro’s backwards compatibility

  • Supports arbitrarily nested schemas

  • Translates all Avro data types into equivalent Hive types. Most types map exactly, but some Avro types do not exist in Hive and are automatically converted by Hive with Avro

  • Understands compressed Avro files

  • Transparently converts the Avro idiom of handling nullable types as Union[T, null] into just T and returns null when appropriate

  • Writes any Hive table to Avro files

Defining Avro Schema Using Table Properties

Create an Avro table by specifying the AvroSerDe, AvroContainerInputFormat, and AvroContainerOutputFormat. Avro has its own schema definition language. This schema definition language can be stored in the table properties as a string literal using the property avro.schema.literal. The schema specifies three columns: number as int, firstname as string, and lastname as string.

CREATE TABLE doctors
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES ('avro.schema.literal'='{
  "namespace": "testing.hive.avro.serde",
  "name": "doctors",
  "type": "record",
  "fields": [
    {
      "name":"number",
      "type":"int",
      "doc":"Order of playing the role"
    },
    {
      "name":"first_name",
      "type":"string",
      "doc":"first name of actor playing role"
    },
    {
      "name":"last_name",
      "type":"string",
      "doc":"last name of actor playing role"
    }
  ]
}');

When the DESCRIBE command is run, Hive shows the name and types of the columns. In the output below you will notice that the third column of output states from deserializer. This shows that the SerDe itself returned the information from the column rather than static values stored in the metastore:

hive> DESCRIBE doctors;
number          int     from deserializer
first_name      string  from deserializer
last_name       string  from deserializer

Defining a Schema from a URI

It is also possible to provide the schema as a URI. This can be a path to a file in HDFS or a URL to an HTTP server. To do this, specify avro.schema.url in table properties and do not specify avro.schema.literal.

The schema can be a file in HDFS:

TBLPROPERTIES ('avro.schema.url'='hdfs://hadoop:9020/path/to.schema')

The schema can also be stored on an HTTP server:

TBLPROPERTIES ('avro.schema.url'='http://site.com/path/to.schema')

Evolving Schema

Over time fields may be added or deprecated from data sets. Avro is designed with this in mind. An evolving schema is one that changes over time. Avro allows fields to be null. It also allows for default values to be returned if the column is not defined in the data file.

For example, if the Avro schema is changed and a field added, the default field supplies a value if the column is not found:

{
  "name":"extra_field",
  "type":"string",
  "doc:":"an extra field not in the original file",
  "default":"fishfingers and custard"
}

Binary Output

There are several kinds of binary output. We have already seen compression of files, sequence files (compressed or not), and related file types.

Sometimes, it’s also useful to read and write streams of bytes. For example, you may have tools that expect a stream of bytes, without field separators of any kind, and you either use Hive to generate suitable files for those tools or you want to query such files with Hive. You may also want the benefits of storing numbers in compact binary forms instead of strings like “5034223,” which consume more space. A common example is to query the output of the tcpdump command to analyze network behavior.

The following table expects its own files to be in text format, but it writes query results as binary streams:

CREATE TABLE binary_table (num1 INT, num2 INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('serialization.last.column.takes.rest'='true')
STORED AS
INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat';

Here’s a SELECT TRANSFORM query that reads binary data from a src table, streams it through the shell cat command and overwrites the contents of a destination1 table:

INSERT OVERWRITE TABLE destination1
SELECT TRANSFORM(*)
USING 'cat' AS mydata STRING
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('serialization.last.column.takes.rest'='true')
RECORDREADER 'org.apache.hadoop.hive.ql.exec.BinaryRecordReader'
FROM src;


[23] The source code for the DualInputFormat is available at: https://github.com/edwardcapriolo/DualInputFormat.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.223.195.97