Hive looks and acts like a relational database. Users have a familiar nomenclature such as tables and columns, as well as a query language that is remarkably similar to SQL dialects they have used before. However, Hive is implemented and used in ways that are very different from conventional relational databases. Often, users try to carry over paradigms from the relational world that are actually Hive anti-patterns. This section highlights some Hive patterns you should use and some anti-patterns you should avoid.
Table-by-day is a pattern where a table
named supply
is appended with a
timestamp such as supply_2011_01_01
,
supply_2011_01_02
, etc. Table-by-day is
an anti-pattern in the database world, but due to common implementation
challenges of ever-growing data sets, it is still widely used:
hive
>
CREATE
TABLE
supply_2011_01_02
(
id
int
,
part
string
,
quantity
int
);
hive
>
CREATE
TABLE
supply_2011_01_03
(
id
int
,
part
string
,
quantity
int
);
hive
>
CREATE
TABLE
supply_2011_01_04
(
id
int
,
part
string
,
quantity
int
);
hive
>
....
load
data
...
hive
>
SELECT
part
,
quantity
supply_2011_01_02
>
UNION
ALL
>
SELECT
part
,
quantity
from
supply_2011_01_03
>
WHERE
quantity
<
4
;
With Hive, a partitioned table should be used instead. Hive uses
expressions in the WHERE
clause to
select input only from the partitions needed for the query. This query
will run efficiently, and it is clean and easy on the eyes:
hive
>
CREATE
TABLE
supply
(
id
int
,
part
string
,
quantity
int
)
>
PARTITIONED
BY
(
int
day
);
hive
>
ALTER
TABLE
supply
add
PARTITION
(
day
=
20110102
);
hive
>
ALTER
TABLE
supply
add
PARTITION
(
day
=
20110103
);
hive
>
ALTER
TABLE
supply
add
PARTITION
(
day
=
20110102
);
hive
>
....
load
data
...
hive
>
SELECT
part
,
quantity
FROM
supply
>
WHERE
day
>=
20110102
AND
day
<
20110103
AND
quantity
<
4
;
The partitioning feature is very useful in Hive. This is because Hive typically performs full scans over all input to satisfy a query (we’ll leave Hive’s indexing out for this discussion). However, a design that creates too many partitions may optimize some queries, but be detrimental for other important queries:
hive
>
CREATE
TABLE
weblogs
(
url
string
,
time
long
)
>
PARTITIONED
BY
(
day
int
,
state
string
,
city
string
);
hive
>
SELECT
*
FROM
weblogs
WHERE
day
=
20110102
;
HDFS was designed for many millions of large files, not billions of small files. The first drawback of having too many partitions is the large number of Hadoop files and directories that are created unnecessarily. Each partition corresponds to a directory that usually contains multiple files. If a given table contains thousands of partitions, it may have tens of thousands of files, possibly created every day. If the retention of this table is multiplied over years, it will eventually exhaust the capacity of the NameNode to manage the filesystem metadata. The NameNode must keep all metadata for the filesystem in memory. While each file requires a small number of bytes for its metadata (approximately 150 bytes/file), the net effect is to impose an upper limit on the total number of files that can be managed in an HDFS installation. Other filesystems, like MapR and Amazon S3 don’t have this limitation.
MapReduce processing converts a job into multiple tasks. In the default case, each task is a new JVM instance, requiring the overhead of start up and tear down. For small files, a separate task will be used for each file. In pathological scenarios, the overhead of JVM start up and tear down can exceed the actual processing time!
Hence, an ideal partition scheme should not result in too many partitions and their directories, and the files in each directory should be large, some multiple of the filesystem block size.
A good strategy for time-range partitioning, for example, is
to determine the approximate size of your data accumulation over different
granularities of time, and start with the granularity that results in
“modest” growth in the number of partitions over time, while each
partition contains files at least on the order of the filesystem block
size or multiples thereof. This balancing keeps the partitions large,
which optimizes throughput for the
general case query. Consider when the next level of granularity is
appropriate, especially if query WHERE
clauses typically select ranges of smaller granularities:
hive
>
CREATE
TABLE
weblogs
(
url
string
,
time
long
,
state
string
,
city
string
)
>
PARTITIONED
BY
(
day
int
);
hive
>
SELECT
*
FROM
weblogs
WHERE
day
=
20110102
;
Another solution is to use two levels of partitions along different dimensions. For example, the first partition might be by day and the second-level partition might be by geographic region, like the state:
hive
>
CREATE
TABLE
weblogs
(
url
string
,
time
long
,
city
string
)
>
PARTITIONED
BY
(
day
int
,
state
string
);
hive
>
SELECT
*
FROM
weblogs
WHERE
day
=
20110102
;
However, since some states will probably result in lots more data than others, you could see imbalanced map tasks, as processing the larger states takes a lot longer than processing the smaller states.
If you can’t find good, comparatively sized partition choices, consider using bucketing as described in Bucketing Table Data Storage.
Relational databases typically use unique keys, indexes, and
normalization to store data sets that fit into memory or mostly into
memory. Hive, however, does not have the concept of primary keys or
automatic, sequence-based key generation. Joins should be avoided in favor
of denormalized data, when feasible. The complex types, Array
, Map
,
and Struct
, help by allowing the
storage of one-to-many data inside a single row. This is not to say
normalization should never be utilized, but star-schema type designs are
nonoptimal.
The primary reason to avoid normalization is to minimize disk seeks, such as those typically required to navigate foreign key relations. Denormalizing data permits it to be scanned from or written to large, contiguous sections of disk drives, which optimizes I/O performance. However, you pay the penalty of denormalization, data duplication and the greater risk of inconsistent data.
For example, consider our running example, the employees
table. Here it is again with some
changes for clarity:
CREATE
TABLE
employees
(
name
STRING
,
salary
FLOAT
,
subordinates
ARRAY
<
STRING
>
,
deductions
MAP
<
STRING
,
FLOAT
>
address
STRUCT
<
street
:
STRING
,
city
:
STRING
,
state
:
STRING
,
zip
:
INT
>
);
The data model of this example breaks the traditional design rules in a few ways.
First, we are informally using name
as the primary key, although we all know
that names are often not unique! Ignoring that issue for now, a relational
model would have a single foreign key relation from an employee record to
the manager record, using the name
key.
We represented this relation the other way around: each employee has an
ARRAY
of names of subordinates.
Second, the value for each deduction is unique to the employee, but
the map keys are duplicated data, even if you substitute “flags” (say,
integers) for the actual key strings. A normal relational model would have
a separate, two-column table for the deduction name (or flag) and value,
with a one-to-many relationship between the employees
and this deductions
table.
Finally, chances are that at least some employees live at the same
address, but we are duplicating the address for each employee, rather than
using a one-to-one relationship to an addresses
table.
It’s up to us to manage referential integrity (or deal with the
consequences), and to fix the duplicates of a particular piece of data
that has changed. Hive does not give us a convenient way to UPDATE
single records.
Still, when you have 10s of terabytes to many petabytes of data, optimizing speed makes these limitations worth accepting.
Hive has a special syntax for producing multiple aggregations from a single pass through a source of data, rather than rescanning it for each aggregation. This change can save considerable processing time for large input data sets. We discussed the details previously in Chapter 5.
For example, each of the following two queries creates a table from
the same source table, history
:
hive
>
INSERT
OVERWRITE
TABLE
sales
>
SELECT
*
FROM
history
WHERE
action
=
'purchased'
;
hive
>
INSERT
OVERWRITE
TABLE
credits
>
SELECT
*
FROM
history
WHERE
action
=
'returned'
;
This syntax is correct, but inefficient. The following rewrite
achieves the same thing, but using a single pass through the source
history
table:
hive
>
FROM
history
>
INSERT
OVERWRITE
sales
SELECT
*
WHERE
action
=
'purchased'
>
INSERT
OVERWRITE
credits
SELECT
*
WHERE
action
=
'returned'
;
Many ETL processes involve multiple processing steps. Each step may produce one or more temporary tables that are only needed until the end of the next job. At first it may appear that partitioning these temporary tables is unnecessary. However, imagine a scenario where a mistake in step’s query or raw data forces a rerun of the ETL process for several days of input. You will likely need to run the catch-up process a day at a time in order to make sure that one job does not overwrite the temporary table before other tasks have completed.
For example, this following design creates an intermediate table by
the name of
distinct_ip_in_logs
to be used by a subsequent processing
step:
$
hive
-
hiveconf
dt
=
2011
-
01
-
01
hive
>
INSERT
OVERWRITE
table
distinct_ip_in_logs
>
SELECT
distinct
(
ip
)
as
ip
from
weblogs
>
WHERE
hit_date
=
'${hiveconf:dt}'
;
hive
>
CREATE
TABLE
state_city_for_day
(
state
string
,
city
string
);
hive
>
INSERT
OVERWRITE
state_city_for_day
>
SELECT
distinct
(
state
,
city
)
FROM
distinct_ip_in_logs
>
JOIN
geodata
ON
(
distinct_ip_in_logs
.
ip
=
geodata
.
ip
);
This approach works, however computing a single day causes the
record of the previous day to be removed via the INSERT OVERWRITE
clause. If two instances of
this process are run at once for different days they could stomp on each
others’ results.
A more robust approach is to carry the partition information all the way through the process. This makes synchronization a nonissue. Also, as a side effect, this approach allows you to compare the intermediate data day over day:
$
hive
-
hiveconf
dt
=
2011
-
01
-
01
hive
>
INSERT
OVERWRITE
table
distinct_ip_in_logs
>
PARTITION
(
hit_date
=
${
dt
}
)
>
SELECT
distinct
(
ip
)
as
ip
from
weblogs
>
WHERE
hit_date
=
'${hiveconf:dt}'
;
hive
>
CREATE
TABLE
state_city_for_day
(
state
string
,
city
string
)
>
PARTITIONED
BY
(
hit_date
string
);
hive
>
INSERT
OVERWRITE
table
state_city_for_day
PARTITION
(
${
hiveconf
:
df
}
)
>
SELECT
distinct
(
state
,
city
)
FROM
distinct_ip_in_logs
>
JOIN
geodata
ON
(
distinct_ip_in_logs
.
ip
=
geodata
.
ip
)
>
WHERE
(
hit_date
=
'${hiveconf:dt}'
);
A drawback of this approach is that you will need to manage the intermediate table and delete older partitions, but these tasks are easy to automate.
Partitions offer a convenient way to segregate data and to optimize queries. However, not all data sets lead to sensible partitioning, especially given the concerns raised earlier about appropriate sizing.
Bucketing is another technique for decomposing data sets into more manageable parts.
For example, suppose a table using the date dt
as the top-level partition and the user_id
as the second-level partition leads to
too many small partitions. Recall that if you use dynamic partitioning to
create these partitions, by default Hive limits the maximum number of
dynamic partitions that may be created to prevent the extreme case where
so many partitions are created they overwhelm the filesystem’s ability to
manage them and other problems. So, the following commands might
fail:
hive
>
CREATE
TABLE
weblog
(
url
STRING
,
source_ip
STRING
)
>
PARTITIONED
BY
(
dt
STRING
,
user_id
INT
);
hive
>
FROM
raw_weblog
>
INSERT
OVERWRITE
TABLE
page_view
PARTITION
(
dt
=
'2012-06-08'
,
user_id
)
>
SELECT
server_name
,
url
,
source_ip
,
dt
,
user_id
;
Instead, if we bucket the weblog
table and use user_id
as the bucketing
column, the value of this column will be hashed by a user-defined number
into buckets. Records with the same user_id
will always be stored in the same
bucket. Assuming the number of users is much greater than the number of
buckets, each bucket will have many users:
hive
>
CREATE
TABLE
weblog
(
user_id
INT
,
url
STRING
,
source_ip
STRING
)
>
PARTITIONED
BY
(
dt
STRING
)
>
CLUSTERED
BY
(
user_id
)
INTO
96
BUCKETS
;
However, it is up to you to insert data
correctly into the table! The specification in CREATE TABLE
only defines metadata, but has no
effect on commands that actually populate the table.
This is how to populate the table correctly, when using an INSERT … TABLE
statement. First, we set a
property that forces Hive to choose the correct number of reducers
corresponding to the target table’s bucketing setup. Then we run a query
to populate the partitions. For example:
hive
>
SET
hive
.
enforce
.
bucketing
=
true
;
hive
>
FROM
raw_logs
>
INSERT
OVERWRITE
TABLE
weblog
>
PARTITION
(
dt
=
'2009-02-25'
)
>
SELECT
user_id
,
url
,
source_ip
WHERE
dt
=
'2009-02-25'
;
If we didn’t use the hive.enforce.bucketing
property, we would have
to set the number of reducers to match the number of buckets, using
set mapred.reduce.tasks=96
. Then the
INSERT
query would require a CLUSTER BY
clause after the SELECT
clause.
As for all table metadata, specifying bucketing doesn’t ensure that the table is properly populated. Follow the previous example to ensure that you correctly populate bucketed tables.
Bucketing has several advantages. The number of buckets is fixed so
it does not fluctuate with data. Buckets are ideal for sampling. If two
tables are bucketed by user_id
, Hive
can create a logically correct sampling. Bucketing also aids in doing
efficient map-side joins, as we discussed in Map-side Joins.
Hive allows the definition of a schema over raw data files, unlike many databases that force the conversion and importation of data following a specific format. A benefit of this separation of concerns is the ability to adapt a table definition easily when new columns are added to the data files.
Hive offers the SerDe abstraction, which enables the extraction of
data from input. The SerDe also enables the output of data, though the
output feature is not used as frequently because Hive is used primarily as
a query mechanism. A SerDe usually parses from left to right, splitting
rows by specified delimiters into columns. The SerDes tend to be very
forgiving. For example, if a row has fewer columns than expected, the
missing columns will be returned as null
. If the row has more columns than expected,
they will be ignored. Adding new columns to the schema involves a single
ALTER TABLE ADD COLUMN
command. This is
very useful as log formats tend to only add more information to a
message:
hive
>
CREATE
TABLE
weblogs
(
version
LONG
,
url
STRING
)
>
PARTITIONED
BY
(
hit_date
int
)
>
ROW
FORMAT
DELIMITED
FIELDS
TERMINATED
BY
' '
;
hive
>
!
cat
log1
.
txt
1
/
mystuff
1
/
toys
hive
>
LOAD
DATA
LOCAL
INPATH
'log1.txt'
int
weblogs
partition
(
20110101
);
hive
>
SELECT
*
FROM
weblogs
;
1
/
mystuff
20110101
1
/
toys
20110101
Over time a new column may be added to the underlying data. In the
following example the column user_id
is
added to the data. Note that some older raw data files may not have this
column:
hive
>
!
cat
log2
.
txt
2
/
cars
bob
2
/
stuff
terry
hive
>
ALTER
TABLE
weblogs
ADD
COLUMNS
(
user_id
string
);
hive
>
LOAD
DATA
LOCAL
INPATH
'log2.txt'
int
weblogs
partition
(
20110102
);
hive
>
SELECT
*
from
weblogs
1
/
mystuff
20110101
NULL
1
/
toys
20110101
NULL
2
/
cars
20110102
bob
2
/
stuff
20110102
terry
Note that with this approach, columns cannot be added in the beginning or the middle.
Hive typically uses row-oriented storage, however Hive also has a columnar SerDe that stores information in a hybrid row-column orientated form. While this format can be used for any type of data there are some data sets that it is optimal for.
Given enough rows, fields like state
and age
will have the same data repeated many
times. This type of data benefits from column-based storage.
state | uid | age |
---|---|---|
NY | Bob | 40 |
NJ | Sara | 32 |
NY | Peter | 14 |
NY | Sandra | 4 |
The table below has a large number of columns.
state | uid | age | server | tz | many_more … |
---|---|---|---|---|---|
NY | Bob | 40 | web1 | est | stuff |
NJ | Sara | 32 | web1 | est | stuff |
NY | Peter | 14 | web3 | pst | stuff |
NY | Sandra | 4 | web45 | pst | stuff |
Queries typically only use a single column or a small set of columns. Column-based storage will make analyzing the table data faster:
hive
>
SELECT
distinct
(
state
)
from
weblogs
;
NY
NJ
You can reference the section RCFile to see how to use this format.
In almost all cases, compression makes data smaller on disk, which usually makes queries faster by reducing I/O overhead. Hive works seamlessly with many compression types. The only compelling reason to not use compression is when the data produced is intended for use by an external system, and an uncompressed format, such as text, is the most compatible.
But compression and decompression consumes CPU resources. MapReduce jobs tend to be I/O bound, so the extra CPU overhead is usually not a problem. However, for workflows that are CPU intensive, such as some machine-learning algorithms, compression may actually reduce performance by stealing valuable CPU resources from more essential operations.
See Chapter 11 for more on how to use compression.
18.226.28.197