Chapter 11. Other File Formats and Compression

One of Hive’s unique features is that Hive does not force data to be converted to a specific format. Hive leverages Hadoop’s InputFormat APIs to read data from a variety of sources, such as text files, sequence files, or even custom formats. Likewise, the OutputFormat API is used to write data to various formats.

While Hadoop offers linear scalability in file storage for uncompressed data, storing data in compressed form has many benefits. Compression typically saves significant disk storage; for example, text-based files may compress 40% or more. Compression also can increase throughput and performance. This may seem counterintuitive because compressing and decompressing data incurs extra CPU overhead, however, the I/O savings resulting from moving fewer bytes into memory can result in a net performance gain.

Hadoop jobs tend to be I/O bound, rather than CPU bound. If so, compression will improve performance. However, if your jobs are CPU bound, then compression will probably lower your performance. The only way to really know is to experiment with different options and measure the results.

Determining Installed Codecs

Based on your Hadoop version, different codecs will be available to you. The set feature in Hive can be used to display the value of hiveconf or Hadoop configuration values. The codecs available are in a comma-separated list named io.compression.codec:

# hive -e "set io.compression.codecs"
io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec

Choosing a Compression Codec

Using compression has the advantage of minimizing the disk space required for files and the overhead of disk and network I/O. However, compressing and decompressing files increases the CPU overhead. Therefore, compression is best used for I/O-bound jobs, where there is extra CPU capacity, or when disk space is at a premium.

All recent versions of Hadoop have built-in support for the GZip and BZip2 compression schemes, including native Linux libraries that accelerate compression and decompression for these formats. Bundled support for Snappy compression was recently added, but if your version of Hadoop doesn’t support it, you can add the appropriate libraries yourself.[18] Finally, LZO compression is often used.[19]

So, why do we need different compression schemes? Each scheme makes a trade-off between speed and minimizing the size of the compressed output. BZip2 creates the smallest compressed output, but with the highest CPU overhead. GZip is next in terms of compressed size versus speed. Hence, if disk space utilization and I/O overhead are concerns, both are attractive choices.

LZO and Snappy create larger files but are much faster, especially for decompression. They are good choices if disk space and I/O overhead are less important than rapid decompression of frequently read data.

Another important consideration is whether or not the compression format is splittable. MapReduce wants to split very large input files into splits (often one split per filesystem block, i.e., a multiple of 64 MB), where each split is sent to a separate map process. This can only work if Hadoop knows the record boundaries in the file. In text files, each line is a record, but these boundaries are obscured by GZip and Snappy. However, BZip2 and LZO provide block-level compression, where each block has complete records, so Hadoop can split these files on block boundaries.

The desire for splittable files doesn’t rule out GZip and Snappy. When you create your data files, you could partition them so that they are approximately the desired size. Typically the number of output files is equal to the number of reducers. If you are using N reducers you typically get N output files. Be careful, if you have a large nonsplittable file, a single task will have to read the entire file beginning to end.

There’s much more we could say about compression, but instead we’ll refer you to Hadoop: The Definitive Guide by Tom White (O’Reilly) for more details, and we’ll focus now on how to tell Hive what format you’re using.

From Hive’s point of view, there are two aspects to the file format. One aspect is how the file is delimited into rows (records). Text files use (linefeed) as the default row delimiter. When you aren’t using the default text file format, you tell Hive the name of an InputFormat and an OutputFormat to use. Actually, you will specify the names of Java classes that implement these formats. The InputFormat knows how to read splits and partition them into records, and the OutputFormat knows how to write these splits back to files or console output.

The second aspect is how records are partitioned into fields (or columns). Hive uses ^A by default to separate fields in text files. Hive uses the name SerDe, which is short for serializer/deserializer for the “module” that partitions incoming records (the deserializer) and also knows how to write records in this format (the serializer). This time you will specify a single Java class that performs both jobs.

All this information is specified as part of the table definition when you create the table. After creation, you query the table as you normally would, agnostic to the underlying format. Hence, if you’re a user of Hive, but not a Java developer, don’t worry about the Java aspects. The developers on your team will help you specify this information when needed, after which you’ll work as you normally do.

Enabling Intermediate Compression

Intermediate compression shrinks the data shuffled between the map and reduce tasks for a job. For intermediate compression, choosing a codec that has lower CPU cost is typically more important than choosing a codec that results in the most compression. The property hive.exec.compress.intermediate defaults to false and should be set to true by default:

<property>
  <name>hive.exec.compress.intermediate</name>
  <value>true</value>
  <description> This controls whether intermediate files produced by Hive between
  multiple map-reduce jobs are compressed. The compression codec and other options
  are determined from hadoop config variables mapred.output.compress* </description>
</property>

Note

The property that controls intermediate compression for other Hadoop jobs is mapred.compress.map.output.

Hadoop compression has a DefaultCodec. Changing the codec involves setting the mapred.map.output.compression.codec property. This is a Hadoop variable and can be set in the $HADOOP_HOME/conf/mapred-site.xml or the $HADOOP_HOME/conf/hive-site.xml. SnappyCodec is a good choice for intermediate compression because it combines good compression performance with low CPU cost:

<property>
  <name>mapred.map.output.compression.codec</name>
  <value>org.apache.hadoop.io.compress.SnappyCodec</value>
  <description> This controls whether intermediate files produced by Hive
  between multiple map-reduce jobs are compressed. The compression codec
  and other options are determined from hadoop config variables
  mapred.output.compress* </description>
</property>

Final Output Compression

When Hive writes output to a table, that content can also be compressed. The property hive.exec.compress.output controls this feature. You may wish to leave this value set to false in the global configuration file, so that the default output is uncompressed clear text. Users can turn on final compression by setting the property to true on a query-by-query basis or in their scripts:

<property>
  <name>hive.exec.compress.output</name>
  <value>false</value>
  <description> This controls whether the final outputs of a query
  (to a local/hdfs file or a Hive table) is compressed. The compression
  codec and other options are determined from hadoop config variables
  mapred.output.compress* </description>
</property>

Note

The property that controls final compression for other Hadoop jobs is mapred.output.compress.

If hive.exec.compress.output is set true, a codec can be chosen. GZip compression is a good choice for output compression because it typically reduces the size of files significantly, but remember that GZipped files aren’t splittable by subsequent MapReduce jobs:

<property>
  <name>mapred.output.compression.codec</name>
  <value>org.apache.hadoop.io.compress.GzipCodec</value>
  <description>If the job outputs are compressed, how should they be compressed?
  </description>
</property>

Sequence Files

Compressing files results in space savings but one of the downsides of storing raw compressed files in Hadoop is that often these files are not splittable. Splittable files can be broken up and processed in parts by multiple mappers in parallel. Most compressed files are not splittable because you can only start reading from the beginning.

The sequence file format supported by Hadoop breaks a file into blocks and then optionally compresses the blocks in a splittable way.

To use sequence files from Hive, add the STORED AS SEQUENCEFILE clause to a CREATE TABLE statement:

CREATE TABLE a_sequence_file_table STORED AS SEQUENCEFILE;

Sequence files have three different compression options: NONE, RECORD, and BLOCK. RECORD is the default. However, BLOCK compression is usually more efficient and it still provides the desired splittability. Like many other compression properties, this one is not Hive-specific. It can be defined in Hadoop’s mapred-site.xml file, in Hive’s hive-site.xml, or as needed in scripts or before individual queries:

<property>
  <name>mapred.output.compression.type</name>
  <value>BLOCK</value>
  <description>If the job outputs are to compressed as SequenceFiles,
  how should they be compressed? Should be one of NONE, RECORD or BLOCK.
  </description>
</property>

Compression in Action

We have introduced a number of compression-related properties in Hive, and different permutations of these options result in different output. Let’s use these properties in some examples and show what they produce. Remember that variables set by the CLI persist across the rest of the queries in the session, so between examples you should revert the settings or simply restart the Hive session:

hive> SELECT * FROM a;
4       5
3       2

hive> DESCRIBE a;
a       int
b       int

First, let’s enable intermediate compression. This won’t affect the final output, however the job counters will show less physical data transferred for the job, since the shuffle sort data was compressed:

hive> set hive.exec.compress.intermediate=true;
hive> CREATE TABLE intermediate_comp_on
    > ROW FORMAT DELIMITED FIELDS TERMINATED BY '	'
    > AS SELECT * FROM a;
Moving data to: file:/user/hive/warehouse/intermediate_comp_on
Table default.intermediate_comp_on stats: [num_partitions: 0, num_files: 1,
num_rows: 2, total_size: 8, raw_data_size: 6]
...

As expected, intermediate compression did not affect the final output, which remains uncompressed:

hive> dfs -ls /user/hive/warehouse/intermediate_comp_on;
Found 1 items
/user/hive/warehouse/intermediate_comp_on/000000_0

hive> dfs -cat /user/hive/warehouse/intermediate_comp_on/000000_0;
4       5
3       2

We can also chose an intermediate compression codec other then the default codec. In this case we chose GZIP, although Snappy is normally a better option. The first line is wrapped for space:

hive> set mapred.map.output.compression.codec
    =org.apache.hadoop.io.compress.GZipCodec;
hive> set hive.exec.compress.intermediate=true;

hive> CREATE TABLE intermediate_comp_on_gz
    > ROW FORMAT DELIMITED FIELDS TERMINATED BY '	'
    > AS SELECT * FROM a;
Moving data to: file:/user/hive/warehouse/intermediate_comp_on_gz
Table default.intermediate_comp_on_gz stats:
[num_partitions: 0, num_files: 1, num_rows: 2, total_size: 8, raw_data_size: 6]

hive> dfs -cat /user/hive/warehouse/intermediate_comp_on_gz/000000_0;
4       5
3       2

Next, we can enable output compression:

hive> set hive.exec.compress.output=true;

hive> CREATE TABLE final_comp_on
    > ROW FORMAT DELIMITED FIELDS TERMINATED BY '	'
    > AS SELECT * FROM a;
Moving data to: file:/tmp/hive-edward/hive_2012-01-15_11-11-01_884_.../-ext-10001
Moving data to: file:/user/hive/warehouse/final_comp_on
Table default.final_comp_on stats:
[num_partitions: 0, num_files: 1, num_rows: 2, total_size: 16, raw_data_size: 6]

hive> dfs -ls /user/hive/warehouse/final_comp_on;
Found 1 items
/user/hive/warehouse/final_comp_on/000000_0.deflate

The output table statistics show that the total_size is 16, but the raw_data_size is 6. The extra space is overhead for the deflate algorithm. We can also see the output file is named .deflate.

Trying to cat the file is not suggested, as you get binary output. However, Hive can query this data normally:

hive> dfs -cat /user/hive/warehouse/final_comp_on/000000_0.deflate;
... UGLYBINARYHERE ...

hive> SELECT * FROM final_comp_on;
4       5
3       2

This ability to seamlessly work with compressed files is not Hive-specific; Hadoop’s TextInputFormat is at work here. While the name is confusing in this case, TextInputFormat understands file extensions such as .deflate or .gz and decompresses these files on the fly. Hive is unaware if the underlying files are uncompressed or compressed using any of the supported compression schemes.

Let’s change the codec used by output compression to see the results (another line wrap for space):

hive> set hive.exec.compress.output=true;
hive> set mapred.output.compression.codec
  =org.apache.hadoop.io.compress.GzipCodec;

hive> CREATE TABLE final_comp_on_gz
    > ROW FORMAT DELIMITED FIELDS TERMINATED BY '	'
    > AS SELECT * FROM a;
Moving data to: file:/user/hive/warehouse/final_comp_on_gz
Table default.final_comp_on_gz stats:
[num_partitions: 0, num_files: 1, num_rows: 2, total_size: 28, raw_data_size: 6]

hive> dfs -ls /user/hive/warehouse/final_comp_on_gz;
Found 1 items
/user/hive/warehouse/final_comp_on_gz/000000_0.gz

As you can see, the output folder now contains zero or more .gz files. Hive has a quick hack to execute local commands like zcat from inside the Hive shell. The ! tells Hive to fork and run the external command and block until the system returns a result. zcat is a command-line utility that decompresses and displays output:

hive> ! /bin/zcat /user/hive/warehouse/final_comp_on_gz/000000_0.gz;
4       5
3       2
hive> SELECT * FROM final_comp_on_gz;
OK
4       5
3       2
Time taken: 0.159 seconds

Using output compression like this results in binary compressed files that are small and, as a result, operations on them are very fast. However, recall that the number of output files is a side effect of how many mappers or reducers processed the data. In the worst case scenario, you can end up with one large binary file in a directory that is not splittable. This means that subsequent steps that have to read this data cannot work in parallel. The answer to this problem is to use sequence files:

hive> set mapred.output.compression.type=BLOCK;
hive> set hive.exec.compress.output=true;
hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;

hive> CREATE TABLE final_comp_on_gz_seq
    > ROW FORMAT DELIMITED FIELDS TERMINATED BY '	'
    > STORED AS SEQUENCEFILE
    > AS SELECT * FROM a;
Moving data to: file:/user/hive/warehouse/final_comp_on_gz_seq
Table default.final_comp_on_gz_seq stats:
[num_partitions: 0, num_files: 1, num_rows: 2, total_size: 199, raw_data_size: 6]

hive> dfs -ls /user/hive/warehouse/final_comp_on_gz_seq;
Found 1 items
/user/hive/warehouse/final_comp_on_gz_seq/000000_0

Sequence files are binary. But it is a nice exercise to see the header. To confirm the results are what was intended (output wrapped):

hive> dfs -cat /user/hive/warehouse/final_comp_on_gz_seq/000000_0;
SEQ[]org.apache.hadoop.io.BytesWritable[]org.apache.hadoop.io.BytesWritable[]
 org.apache.hadoop.io.compress.GzipCodec[]

Because of the meta-information embedded in the sequence file and in the Hive metastore, Hive can query the table without any specific settings. Hadoop also offers the dfs -text command to strip the header and compression away from sequence files and return the raw result:

hive> dfs -text /user/hive/warehouse/final_comp_on_gz_seq/000000_0;
        4       5
        3       2
hive> select * from final_comp_on_gz_seq;
OK
4       5
3       2

Finally, let’s use intermediate and output compression at the same time and set different compression codecs for each while saving the final output to sequence files! These settings are commonly done for production environments where data sets are large and such settings improve performance:

hive> set mapred.map.output.compression.codec
 =org.apache.hadoop.io.compress.SnappyCodec;
hive> set hive.exec.compress.intermediate=true;
hive> set mapred.output.compression.type=BLOCK;
hive> set hive.exec.compress.output=true;
hive> set mapred.output.compression.codec
 =org.apache.hadoop.io.compress.GzipCodec;

hive> CREATE TABLE final_comp_on_gz_int_compress_snappy_seq
    > ROW FORMAT DELIMITED FIELDS TERMINATED BY '	'
    > STORED AS SEQUENCEFILE AS SELECT * FROM a;

Archive Partition

Hadoop has a format for storage known as HAR, which stands for Hadoop ARchive. A HAR file is like a TAR file that lives in the HDFS filesystem as a single file. However, internally it can contain multiple files and directories. In some use cases, older directories and files are less commonly accessed than newer files. If a particular partition contains thousands of files it will require significant overhead to manage it in the HDFS NameNode. By archiving the partition it is stored as a single, large file, but it can still be accessed by hive. The trade-off is that HAR files will be less efficient to query. Also, HAR files are not compressed, so they don’t save any space.

In the following example, we’ll use Hive’s own documentation as data.

First, create a partitioned table and load it with the text data from the Hive package:

hive> CREATE TABLE hive_text (line STRING) PARTITIONED BY (folder STRING);

hive> ! ls $HIVE_HOME;
LICENSE
README.txt
RELEASE_NOTES.txt

hive> ALTER TABLE hive_text ADD PARTITION (folder='docs');

hive> LOAD DATA INPATH '${env:HIVE_HOME}/README.txt'
    > INTO TABLE hive_text PARTITION (folder='docs');
Loading data to table default.hive_text partition (folder=docs)

hive> LOAD DATA INPATH '${env:HIVE_HOME}/RELEASE_NOTES.txt'
    > INTO TABLE hive_text PARTITION (folder='docs');
Loading data to table default.hive_text partition (folder=docs)

hive> SELECT * FROM hive_text WHERE line LIKE '%hive%' LIMIT 2;
  http://hive.apache.org/       docs
- Hive 0.8.0 ignores the hive-default.xml file, though we continue      docs

Some versions of Hadoop, such as Hadoop v0.20.2, will require the JAR containing the Hadoop archive tools to be placed on the Hive auxlib:

$ mkdir $HIVE_HOME/auxlib
$ cp $HADOOP_HOME/hadoop-0.20.2-tools.jar $HIVE_HOME/auxlib/

Take a look at the underlying structure of the table, before we archive it. Note the location of the table’s data partition, since it’s a managed, partitioned table:

hive> dfs -ls /user/hive/warehouse/hive_text/folder=docs;
Found 2 items
/user/hive/warehouse/hive_text/folder=docs/README.txt
/user/hive/warehouse/hive_text/folder=docs/RELEASE_NOTES.txt

The ALTER TABLE ... ARCHIVE PARTITION statement converts the table into an archived table:

hive> SET hive.archive.enabled=true;
hive> ALTER TABLE hive_text ARCHIVE PARTITION (folder='docs');
intermediate.archived is
 file:/user/hive/warehouse/hive_text/folder=docs_INTERMEDIATE_ARCHIVED
intermediate.original is
 file:/user/hive/warehouse/hive_text/folder=docs_INTERMEDIATE_ORIGINAL
Creating data.har for file:/user/hive/warehouse/hive_text/folder=docs
in file:/tmp/hive-edward/hive_..._3862901820512961909/-ext-10000/partlevel
Please wait... (this may take a while)
Moving file:/tmp/hive-edward/hive_..._3862901820512961909/-ext-10000/partlevel
to file:/user/hive/warehouse/hive_text/folder=docs_INTERMEDIATE_ARCHIVED
Moving file:/user/hive/warehouse/hive_text/folder=docs
to file:/user/hive/warehouse/hive_text/folder=docs_INTERMEDIATE_ORIGINAL
Moving file:/user/hive/warehouse/hive_text/folder=docs_INTERMEDIATE_ARCHIVED
to file:/user/hive/warehouse/hive_text/folder=docs

(We reformatted the output slightly so it would fit, and used ... to replace two timestamp strings in the original output.)

The underlying table has gone from two files to one Hadoop archive (HAR file):

hive> dfs -ls /user/hive/warehouse/hive_text/folder=docs;
Found 1 items
/user/hive/warehouse/hive_text/folder=docs/data.har

The ALTER TABLE ... UNARCHIVE PARTITION command extracts the files from the HAR and puts them back into HDFS:

ALTER TABLE hive_text UNARCHIVE PARTITION (folder='docs');

Compression: Wrapping Up

Hive’s ability to read and write different types of compressed files is a big performance win as it saves disk space and processing overhead. This flexibility also aids in integration with other tools, as Hive can query many native file types without the need to write custom “adapters” in Java.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.223.33.157