Chapter 9. Schema Design

Hive looks and acts like a relational database. Users have a familiar nomenclature such as tables and columns, as well as a query language that is remarkably similar to SQL dialects they have used before. However, Hive is implemented and used in ways that are very different from conventional relational databases. Often, users try to carry over paradigms from the relational world that are actually Hive anti-patterns. This section highlights some Hive patterns you should use and some anti-patterns you should avoid.

Table-by-Day

Table-by-day is a pattern where a table named supply is appended with a timestamp such as supply_2011_01_01, supply_2011_01_02, etc. Table-by-day is an anti-pattern in the database world, but due to common implementation challenges of ever-growing data sets, it is still widely used:

hive> CREATE TABLE supply_2011_01_02 (id int, part string, quantity int);

hive> CREATE TABLE supply_2011_01_03 (id int, part string, quantity int);

hive> CREATE TABLE supply_2011_01_04 (id int, part string, quantity int);

hive> .... load data ...

hive> SELECT part,quantity supply_2011_01_02
    > UNION ALL
    > SELECT part,quantity from supply_2011_01_03
    > WHERE quantity < 4;

With Hive, a partitioned table should be used instead. Hive uses expressions in the WHERE clause to select input only from the partitions needed for the query. This query will run efficiently, and it is clean and easy on the eyes:

hive> CREATE TABLE supply (id int, part string, quantity int)
    > PARTITIONED BY (int day);

hive> ALTER TABLE supply add PARTITION (day=20110102);

hive> ALTER TABLE supply add PARTITION (day=20110103);

hive> ALTER TABLE supply add PARTITION (day=20110102);

hive> .... load data ...

hive> SELECT part,quantity FROM supply
    > WHERE day>=20110102 AND day<20110103 AND quantity < 4;

Over Partitioning

The partitioning feature is very useful in Hive. This is because Hive typically performs full scans over all input to satisfy a query (we’ll leave Hive’s indexing out for this discussion). However, a design that creates too many partitions may optimize some queries, but be detrimental for other important queries:

hive> CREATE TABLE weblogs (url string, time long )
    > PARTITIONED BY (day int, state string, city string);

hive> SELECT * FROM weblogs WHERE day=20110102;

HDFS was designed for many millions of large files, not billions of small files. The first drawback of having too many partitions is the large number of Hadoop files and directories that are created unnecessarily. Each partition corresponds to a directory that usually contains multiple files. If a given table contains thousands of partitions, it may have tens of thousands of files, possibly created every day. If the retention of this table is multiplied over years, it will eventually exhaust the capacity of the NameNode to manage the filesystem metadata. The NameNode must keep all metadata for the filesystem in memory. While each file requires a small number of bytes for its metadata (approximately 150 bytes/file), the net effect is to impose an upper limit on the total number of files that can be managed in an HDFS installation. Other filesystems, like MapR and Amazon S3 don’t have this limitation.

MapReduce processing converts a job into multiple tasks. In the default case, each task is a new JVM instance, requiring the overhead of start up and tear down. For small files, a separate task will be used for each file. In pathological scenarios, the overhead of JVM start up and tear down can exceed the actual processing time!

Hence, an ideal partition scheme should not result in too many partitions and their directories, and the files in each directory should be large, some multiple of the filesystem block size.

A good strategy for time-range partitioning, for example, is to determine the approximate size of your data accumulation over different granularities of time, and start with the granularity that results in “modest” growth in the number of partitions over time, while each partition contains files at least on the order of the filesystem block size or multiples thereof. This balancing keeps the partitions large, which optimizes throughput for the general case query. Consider when the next level of granularity is appropriate, especially if query WHERE clauses typically select ranges of smaller granularities:

hive> CREATE TABLE weblogs (url string, time long, state string, city string )
    > PARTITIONED BY (day int);
hive> SELECT * FROM weblogs WHERE day=20110102;

Another solution is to use two levels of partitions along different dimensions. For example, the first partition might be by day and the second-level partition might be by geographic region, like the state:

hive> CREATE TABLE weblogs (url string, time long, city string )
    > PARTITIONED BY (day int, state string);
hive> SELECT * FROM weblogs WHERE day=20110102;

However, since some states will probably result in lots more data than others, you could see imbalanced map tasks, as processing the larger states takes a lot longer than processing the smaller states.

If you can’t find good, comparatively sized partition choices, consider using bucketing as described in Bucketing Table Data Storage.

Unique Keys and Normalization

Relational databases typically use unique keys, indexes, and normalization to store data sets that fit into memory or mostly into memory. Hive, however, does not have the concept of primary keys or automatic, sequence-based key generation. Joins should be avoided in favor of denormalized data, when feasible. The complex types, Array, Map, and Struct, help by allowing the storage of one-to-many data inside a single row. This is not to say normalization should never be utilized, but star-schema type designs are nonoptimal.

The primary reason to avoid normalization is to minimize disk seeks, such as those typically required to navigate foreign key relations. Denormalizing data permits it to be scanned from or written to large, contiguous sections of disk drives, which optimizes I/O performance. However, you pay the penalty of denormalization, data duplication and the greater risk of inconsistent data.

For example, consider our running example, the employees table. Here it is again with some changes for clarity:

CREATE TABLE employees (
  name         STRING,
  salary       FLOAT,
  subordinates ARRAY<STRING>,
  deductions   MAP<STRING, FLOAT>
  address      STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>);

The data model of this example breaks the traditional design rules in a few ways.

First, we are informally using name as the primary key, although we all know that names are often not unique! Ignoring that issue for now, a relational model would have a single foreign key relation from an employee record to the manager record, using the name key. We represented this relation the other way around: each employee has an ARRAY of names of subordinates.

Second, the value for each deduction is unique to the employee, but the map keys are duplicated data, even if you substitute “flags” (say, integers) for the actual key strings. A normal relational model would have a separate, two-column table for the deduction name (or flag) and value, with a one-to-many relationship between the employees and this deductions table.

Finally, chances are that at least some employees live at the same address, but we are duplicating the address for each employee, rather than using a one-to-one relationship to an addresses table.

It’s up to us to manage referential integrity (or deal with the consequences), and to fix the duplicates of a particular piece of data that has changed. Hive does not give us a convenient way to UPDATE single records.

Still, when you have 10s of terabytes to many petabytes of data, optimizing speed makes these limitations worth accepting.

Making Multiple Passes over the Same Data

Hive has a special syntax for producing multiple aggregations from a single pass through a source of data, rather than rescanning it for each aggregation. This change can save considerable processing time for large input data sets. We discussed the details previously in Chapter 5.

For example, each of the following two queries creates a table from the same source table, history:

hive> INSERT OVERWRITE TABLE sales
    > SELECT * FROM history WHERE action='purchased';
hive> INSERT OVERWRITE TABLE credits
    > SELECT * FROM history WHERE action='returned';

This syntax is correct, but inefficient. The following rewrite achieves the same thing, but using a single pass through the source history table:

hive> FROM history
    > INSERT OVERWRITE sales   SELECT * WHERE action='purchased'
    > INSERT OVERWRITE credits SELECT * WHERE action='returned';

The Case for Partitioning Every Table

Many ETL processes involve multiple processing steps. Each step may produce one or more temporary tables that are only needed until the end of the next job. At first it may appear that partitioning these temporary tables is unnecessary. However, imagine a scenario where a mistake in step’s query or raw data forces a rerun of the ETL process for several days of input. You will likely need to run the catch-up process a day at a time in order to make sure that one job does not overwrite the temporary table before other tasks have completed.

For example, this following design creates an intermediate table by the name of distinct_ip_in_logs to be used by a subsequent processing step:

$ hive -hiveconf dt=2011-01-01
hive> INSERT OVERWRITE table distinct_ip_in_logs
    > SELECT distinct(ip) as ip from weblogs
    > WHERE hit_date='${hiveconf:dt}';

hive> CREATE TABLE state_city_for_day (state string,city string);

hive> INSERT OVERWRITE state_city_for_day
    > SELECT distinct(state,city) FROM distinct_ip_in_logs
    > JOIN geodata ON (distinct_ip_in_logs.ip=geodata.ip);

This approach works, however computing a single day causes the record of the previous day to be removed via the INSERT OVERWRITE clause. If two instances of this process are run at once for different days they could stomp on each others’ results.

A more robust approach is to carry the partition information all the way through the process. This makes synchronization a nonissue. Also, as a side effect, this approach allows you to compare the intermediate data day over day:

$ hive -hiveconf dt=2011-01-01
hive> INSERT OVERWRITE table distinct_ip_in_logs
    > PARTITION (hit_date=${dt})
    > SELECT distinct(ip) as ip from weblogs
    > WHERE hit_date='${hiveconf:dt}';

hive> CREATE TABLE state_city_for_day (state string,city string)
    > PARTITIONED BY (hit_date string);

hive> INSERT OVERWRITE table state_city_for_day PARTITION(${hiveconf:df})
    > SELECT distinct(state,city) FROM distinct_ip_in_logs
    > JOIN geodata ON (distinct_ip_in_logs.ip=geodata.ip)
    > WHERE (hit_date='${hiveconf:dt}');

A drawback of this approach is that you will need to manage the intermediate table and delete older partitions, but these tasks are easy to automate.

Bucketing Table Data Storage

Partitions offer a convenient way to segregate data and to optimize queries. However, not all data sets lead to sensible partitioning, especially given the concerns raised earlier about appropriate sizing.

Bucketing is another technique for decomposing data sets into more manageable parts.

For example, suppose a table using the date dt as the top-level partition and the user_id as the second-level partition leads to too many small partitions. Recall that if you use dynamic partitioning to create these partitions, by default Hive limits the maximum number of dynamic partitions that may be created to prevent the extreme case where so many partitions are created they overwhelm the filesystem’s ability to manage them and other problems. So, the following commands might fail:

hive> CREATE TABLE weblog (url STRING, source_ip STRING)
    > PARTITIONED BY (dt STRING, user_id INT);

hive> FROM raw_weblog
    > INSERT OVERWRITE TABLE page_view PARTITION(dt='2012-06-08', user_id)
    > SELECT server_name, url, source_ip, dt, user_id;

Instead, if we bucket the weblog table and use user_id as the bucketing column, the value of this column will be hashed by a user-defined number into buckets. Records with the same user_id will always be stored in the same bucket. Assuming the number of users is much greater than the number of buckets, each bucket will have many users:

hive> CREATE TABLE weblog (user_id INT, url STRING, source_ip STRING)
    > PARTITIONED BY (dt STRING)
    > CLUSTERED BY (user_id) INTO 96 BUCKETS;

However, it is up to you to insert data correctly into the table! The specification in CREATE TABLE only defines metadata, but has no effect on commands that actually populate the table.

This is how to populate the table correctly, when using an INSERT … TABLE statement. First, we set a property that forces Hive to choose the correct number of reducers corresponding to the target table’s bucketing setup. Then we run a query to populate the partitions. For example:

hive> SET hive.enforce.bucketing = true;

hive> FROM raw_logs
    > INSERT OVERWRITE TABLE weblog
    > PARTITION (dt='2009-02-25')
    > SELECT user_id, url, source_ip WHERE dt='2009-02-25';

If we didn’t use the hive.enforce.bucketing property, we would have to set the number of reducers to match the number of buckets, using set mapred.reduce.tasks=96. Then the INSERT query would require a CLUSTER BY clause after the SELECT clause.

Warning

As for all table metadata, specifying bucketing doesn’t ensure that the table is properly populated. Follow the previous example to ensure that you correctly populate bucketed tables.

Bucketing has several advantages. The number of buckets is fixed so it does not fluctuate with data. Buckets are ideal for sampling. If two tables are bucketed by user_id, Hive can create a logically correct sampling. Bucketing also aids in doing efficient map-side joins, as we discussed in Map-side Joins.

Adding Columns to a Table

Hive allows the definition of a schema over raw data files, unlike many databases that force the conversion and importation of data following a specific format. A benefit of this separation of concerns is the ability to adapt a table definition easily when new columns are added to the data files.

Hive offers the SerDe abstraction, which enables the extraction of data from input. The SerDe also enables the output of data, though the output feature is not used as frequently because Hive is used primarily as a query mechanism. A SerDe usually parses from left to right, splitting rows by specified delimiters into columns. The SerDes tend to be very forgiving. For example, if a row has fewer columns than expected, the missing columns will be returned as null. If the row has more columns than expected, they will be ignored. Adding new columns to the schema involves a single ALTER TABLE ADD COLUMN command. This is very useful as log formats tend to only add more information to a message:

hive> CREATE TABLE weblogs (version LONG, url STRING)
    > PARTITIONED BY (hit_date int)
    > ROW FORMAT DELIMITED FIELDS TERMINATED BY '	';

hive> ! cat log1.txt
1  /mystuff
1  /toys

hive> LOAD DATA LOCAL INPATH 'log1.txt' int weblogs partition(20110101);

hive> SELECT * FROM weblogs;
1  /mystuff  20110101
1  /toys     20110101

Over time a new column may be added to the underlying data. In the following example the column user_id is added to the data. Note that some older raw data files may not have this column:

hive> ! cat log2.txt
2  /cars   bob
2  /stuff  terry

hive> ALTER TABLE weblogs ADD COLUMNS (user_id string);

hive> LOAD DATA LOCAL INPATH 'log2.txt' int weblogs partition(20110102);

hive> SELECT * from weblogs
1  /mystuff  20110101  NULL
1  /toys     20110101  NULL
2  /cars     20110102  bob
2  /stuff    20110102  terry

Note that with this approach, columns cannot be added in the beginning or the middle.

Using Columnar Tables

Hive typically uses row-oriented storage, however Hive also has a columnar SerDe that stores information in a hybrid row-column orientated form. While this format can be used for any type of data there are some data sets that it is optimal for.

Repeated Data

Given enough rows, fields like state and age will have the same data repeated many times. This type of data benefits from column-based storage.

stateuidage

NY

Bob

40

NJ

Sara

32

NY

Peter

14

NY

Sandra

4

Many Columns

The table below has a large number of columns.

stateuidageservertzmany_more …

NY

Bob

40

web1

est

stuff

NJ

Sara

32

web1

est

stuff

NY

Peter

14

web3

pst

stuff

NY

Sandra

4

web45

pst

stuff

Queries typically only use a single column or a small set of columns. Column-based storage will make analyzing the table data faster:

hive> SELECT distinct(state) from weblogs;
NY
NJ

You can reference the section RCFile to see how to use this format.

(Almost) Always Use Compression!

In almost all cases, compression makes data smaller on disk, which usually makes queries faster by reducing I/O overhead. Hive works seamlessly with many compression types. The only compelling reason to not use compression is when the data produced is intended for use by an external system, and an uncompressed format, such as text, is the most compatible.

But compression and decompression consumes CPU resources. MapReduce jobs tend to be I/O bound, so the extra CPU overhead is usually not a problem. However, for workflows that are CPU intensive, such as some machine-learning algorithms, compression may actually reduce performance by stealing valuable CPU resources from more essential operations.

See Chapter 11 for more on how to use compression.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.28.197