Hive works by leveraging and extending the components of Hadoop,
common abstractions such as InputFormat
, OutputFormat
, Mapper
, and Reducer
, plus its own abstractions, like SerializerDeserializer
(SerDe), User-Defined
Functions (UDFs), and StorageHandlers
.
These components are all Java components, but Hive hides the complexity of implementing and using these components by letting the user work with SQL abstractions, rather than Java code.
Streaming offers an alternative way to transform data. During a streaming job, the Hadoop Streaming API opens an I/O pipe to an external process. Data is then passed to the process, which operates on the data it reads from the standard input and writes the results out through the standard output, and back to the Streaming API job. While Hive does not leverage the Hadoop streaming API directly, it works in a very similar way.
This pipeline computing model is familiar to users of Unix operating systems and their descendants, like Linux and Mac OS X.
Streaming is usually less efficient than coding the
comparable UDFs or InputFormat
objects.
Serializing and deserializing data to pass it in and out of the pipe is
relatively inefficient. It is also harder to debug the whole program in a
unified manner. However, it is useful for fast prototyping and for
leveraging existing code that is not written in Java. For Hive users who
don’t want to write Java code, it can be a very effective approach.
Hive provides several clauses to use streaming: MAP()
, REDUCE()
, and TRANSFORM()
. An important point to note is that
MAP()
does not actually force streaming
during the map phase nor does reduce force streaming to happen in the reduce
phase. For this reason, the functionally equivalent yet more generic
TRANSFORM()
clause is suggested to avoid
misleading the reader of the query.
For our streaming examples we will use a small table named a
, with columns named col1
and col2
,
both of type INT
, and two rows:
hive
>
CREATE
TABLE
a
(
col1
INT
,
col2
INT
)
>
ROW
FORMAT
DELIMITED
FIELDS
TERMINATED
BY
' '
;
hive
>
SELECT
*
FROM
a
;
4
5
3
2
hive
>
DESCRIBE
a
;
a
int
b
int
The most basic streaming job is an identity operation. The
/bin/cat
command echoes the data sent
to it and meets the requirements. In this example, /bin/cat
is assumed to be installed on all
TaskTracker
nodes. Any Linux system
should have it! Later, we will show how Hive can “ship” applications with
the job when they aren’t already installed around the cluster:
hive
>
SELECT
TRANSFORM
(
a
,
b
)
>
USING
'/bin/cat'
AS
newA
,
newB
>
FROM
default
.
a
;
4
5
3
2
The return columns from TRANSFORM
are typed as strings, by default.
There is an alternative syntax that casts the results to different
types.
hive
>
SELECT
TRANSFORM
(
col1
,
col2
)
>
USING
'/bin/cat'
AS
(
newA
INT
,
newB
DOUBLE
)
FROM
a
;
4
5
.
0
3
2
.
0
The cut command can be used with streaming to extract or
project specific fields. In other words, this behaves like the SELECT
statement:
hive
>
SELECT
TRANSFORM
(
a
,
b
)
>
USING
'/bin/cut -f1'
>
AS
newA
,
newB
FROM
a
;
4
NULL
3
NULL
Note that the query attempts to read more columns than are actually
returned from the external process, so newB
is always NULL
. By default, TRANSFORM
assumes two columns but there can be
any number of them:
hive
>
SELECT
TRANSFORM
(
a
,
b
)
>
USING
'/bin/cut -f1'
>
AS
newA
FROM
a
;
4
3
The /bin/sed
program (or
/usr/bin/sed
on Mac OS X systems) is a
stream editor. It takes the input stream, edits it according to the user’s
specification, and then writes the results to the output stream. The
example below replaces the string 4 with the string 10:
hive
>
SELECT
TRANSFORM
(
a
,
b
)
>
USING
'/bin/sed s/4/10/'
>
AS
newA
,
newB
FROM
a
;
10
5
3
2
All of the streaming examples thus far have used
applications such as cat
and sed
that are core parts of Unix operating
systems and their derivatives. When a query requires files that are not
already installed on every TaskTracker, users can use
the distributed cache to transmit data or program
files across the cluster that will be cleaned up when the job is
complete.
This is helpful, because installing (and sometimes removing) lots of little components across large clusters can be a burden. Also, the cache keeps one job’s cached files separate from those files belonging to other jobs.
The following example is a bash
shell script that converts degrees in Celsius to degrees in
Fahrenheit:
while
read
LINEdo
res
=
$(
echo
"scale=2;((9/5) * $LINE) + 32"
| bc)
echo
$res
done
To test this script, launch it locally. It will not prompt for input. Type 100 and then strike Enter. The process prints 212.00 to the standard output. Then enter another number and the program returns another result. You can continue entering numbers or use Control-D to end the input.
#!/
bin
/
bash
$
sh
ctof
.
sh
100
212
.
00
0
32
.
00
^
D
Hive’s ADD FILE
feature adds
files to the distributed cache. The added file is put in the current
working directory of each task. This allows the transform task to use the
script without needing to know where to find it:
hive
>
ADD
FILE
${
env
:
HOME
}
/
prog_hive
/
ctof
.
sh
;
Added
resource
:
/
home
/
edward
/
prog_hive
/
ctof
.
sh
hive
>
SELECT
TRANSFORM
(
col1
)
USING
'ctof.sh'
AS
convert
FROM
a
;
39
.
20
37
.
40
The examples shown thus far have taken one row of input and produced
one row of output. Streaming can be used to produce multiple rows of
output for each input row. This functionality produces output similar to
the EXPLODE()
UDF and the LATERAL VIEW
syntax[21].
Given an input file $HOME/kv_data.txt that looks like:
k1=v1,k2=v2 k4=v4,k5=v5,k6=v6 k7=v7,k7=v7,k3=v7
We would like the data in a tabular form. This will allow the rows to be processed by familiar HiveQL operators:
k1 v1 k2 v2 k4 k4
Create this Perl script and save it as $HOME/split_kv.pl
:
#!/usr/bin/perl
while
(
<STDIN>
)
{
my
$line
=
$_
;
chomp
(
$line
);
my
@kvs
=
split
(
/,/
,
$line
);
foreach
my
$p
(
@kvs
)
{
my
@kv
=
split
(
/=/
,
$p
);
$kv
[
0
]
.
" "
.
$kv
[
1
]
.
" "
;
}
}
Create a kv_data
table. The
entire table is defined as a single string column. The row format does not
need to be configured because the streaming script will do all the
tokenization of the fields:
hive
>
CREATE
TABLE
kv_data
(
line
STRING
);
hive
>
LOAD
DATA
LOCAL
INPATH
'${env:HOME}/kv_data.txt'
INTO
TABLE
kv_data
;
Use the transform script on the source table. The ragged, multiple-entry-per-row format is converted into a two-column result set of key-value pairs:
hive
>
SELECT
TRANSFORM
(
line
)
>
USING
'perl split_kv.pl'
>
AS
(
key
,
value
)
FROM
kv_data
;
k1
v1
k2
v2
k4
v4
k5
v5
k6
v6
k7
v7
k7
v7
k3
v7
Streaming can also be used to do aggregating operations like
Hive’s built-in SUM
function. This is
possible because streaming processes can return zero or more rows of
output for every given input.
To accomplish aggregation in an external application, declare an accumulator before the loop that reads from the input stream and output the sum after the completion of the input:
#!/usr/bin/perl
my$sum
=
0;while
(
<STDIN>)
{
my$line
=
$_
; chomp(
$line
)
;$sum
=
${
sum
}
+${
line
}
;}
print$sum
;
Create a table and populate it with integer data, one integer per line, for testing:
hive
>
CREATE
TABLE
sum
(
number
INT
);
hive
>
LOAD
DATA
LOCAL
INPATH
'${env:HOME}/data_to_sum.txt'
INTO
TABLE
sum
;
hive
>
SELECT
*
FROM
sum
;
5
5
4
Add the streaming program to the distributed cache and use it in a
TRANSFORM
query. The process returns a single row,
which is the sum of the input:
hive
>
ADD
FILE
${
env
:
HOME
}
/
aggregate
.
pl
;
Added
resource
:
/
home
/
edward
/
aggregate
.
pl
hive
>
SELECT
TRANSFORM
(
number
)
>
USING
'perl aggregate.pl'
AS
total
FROM
sum
;
14
Unfortunately, it is not possible to do multiple
TRANSFORM
s in a single query like the UDAF SUM()
can do. For example:
hive
>
SELECT
sum
(
number
)
AS
one
,
sum
(
number
)
AS
two
FROM
sum
;
14
14
Also, without using CLUSTER
BY
or DISTRIBUTE BY
for the
intermediate data, this job may run single, very long map and reduce
tasks. While not all operations can be done in parallel, many can. The
next section discusses how to do streaming in parallel, when possible.
Hive offers syntax to control how data is distributed and sorted. These features can be used on most queries, but are particularly useful when doing streaming processes. For example, data for the same key may need to be sent to the same processing node, or data may need to be sorted by a specific column, or by a function. Hive provides several ways to control this behavior.
The first way to control this behavior is the CLUSTER BY
clause, which ensures like data is
routed to the same reduce task and sorted.
To demonstrate the use of CLUSTER
BY
, let’s see a nontrivial example: another way to perform the
Word Count algorithm that we introduced in Chapter 1. Now, we will use the TRANSFORM
feature and two Python scripts, one to
tokenize lines of text into words, and the second to accept a stream of
word occurrences and an intermediate count of the words (mostly the number
“1”) and then sum up the counts for each word.
Here is the first Python script that tokenizes lines of text on whitespace (which doesn’t properly handle punctuation, etc.):
import
sys
for
line
in
sys
.
stdin
:
words
=
line
.
strip
()
.
split
()
for
word
in
words
:
"
%s
1"
%
(
word
.
lower
())
Without explaining all the Python syntax, this script imports common
functions from a sys
module, then it
loops over each line on the “standard input,” stdin
, splits each line on whitespace into a
collection of words, then iterates over the word and writes each word,
followed by a tab,
, and the “count”
of one.[22]
Before we show the second Python script, let’s discuss the data
that’s passed to it. We’ll use CLUSTER
BY
for the words output from the first Python script in our
TRANSFORM
Hive query. This will have
the effect of causing all occurrences of the word 1
“pairs” for a give, word
to be grouped together, one pair per
line:
word1
1
word1
1
word1
1
word2
1
word3
1
word3
1
...
Hence, the second Python script will be more complex, because it needs to cache the word it’s currently processing and the count of occurrences seen so far. When the word changes, the script must output the count for the previous word and reset its caches. So, here it is:
import
sys
(
last_key
,
last_count
)
=
(
None
,
0
)
for
line
in
sys
.
stdin
:
(
key
,
count
)
=
line
.
strip
()
.
split
(
"
"
)
if
last_key
and
last_key
!=
key
:
"
%s
%d
"
%
(
last_key
,
last_count
)
(
last_key
,
last_count
)
=
(
key
,
int
(
count
))
else
:
last_key
=
key
last_count
+=
int
(
count
)
if
last_key
:
"
%s
%d
"
%
(
last_key
,
last_count
)
We’ll assume that both Python scripts are in your home directory.
Finally, here is the Hive query that glues it all together. We’ll
start by repeating a CREATE TABLE
statement for an input table of lines of text, one that we used in Chapter 1. Any text file could serve as the data for this
table. Next we’ll show the TABLE
for
the output of word count. It will have two columns, the word
and count
, and data will be tab-delimited. Finally, we show the TRANSFORM
query that glues it all
together:
hive
>
CREATE
TABLE
docs
(
line
STRING
);
hive
>
CREATE
TABLE
word_count
(
word
STRING
,
count
INT
)
>
ROW
FORMAT
DELIMITED
FIELDS
TERMINATED
BY
' '
;
hive
>
FROM
(
>
FROM
docs
>
SELECT
TRANSFORM
(
line
)
USING
'${env:HOME}/mapper.py'
>
AS
word
,
count
>
CLUSTER
BY
word
)
wc
>
INSERT
OVERWRITE
TABLE
word_count
>
SELECT
TRANSFORM
(
wc
.
word
,
wc
.
count
)
USING
'${env:HOME}/reducer.py'
>
AS
word
,
count
;
The USING
clauses specify an
absolute path to the Python scripts.
A more flexible alternative to CLUSTER
BY
is to use DISTRIBUTE BY
and SORT BY
. This is used in the
general case when you wish to partition the data by one column and sort it
by another. In fact, CLUSTER BY word
is
equivalent to DISTRIBUTE BY word SORT BY word
ASC
.
The following version of the TRANSFORM
query outputs the word count results
in reverse order:
FROM
(
FROM
docs
SELECT
TRANSFORM
(
line
)
USING
'/.../mapper.py'
AS
word
,
count
DISTRIBUTE
BY
word
SORT
BY
word
DESC
)
wc
INSERT
OVERWRITE
TABLE
word_count
SELECT
TRANSFORM
(
wc
.
word
,
wc
.
count
)
USING
'/.../reducer.py'
AS
word
,
count
;
Using either CLUSTER BY
or DISTRIBUTE BY
with SORT BY
is important. Without these directives,
Hive may not be able to parallelize the job properly. All the data might
be sent to a single reducer, which would extend the job processing
time.
Typically, streaming is used to integrate non-Java code into Hive. Streaming works with applications written in essentially any language, as we saw. It is possible to use Java for streaming, and Hive includes a GenericMR API that attempts to give the feel of the Hadoop MapReduce API to streaming:
FROM
(
FROM
src
MAP
value
,
key
USING
'java -cp hive-contrib-0.9.0.jar
org.apache.hadoop.hive.contrib.mr.example.IdentityMapper'
AS
k
,
v
CLUSTER
BY
k
)
map_output
REDUCE
k
,
v
USING
'java -cp hive-contrib-0.9.0.jar
org.apache.hadoop.hive.contrib.mr.example.WordCountReduce'
AS
k
,
v
;
To understand how the IdentityMapper
is written, we can take a look at
the interfaces GenericMR provides. The Mapper
interface is implemented to build custom
Mapper implementations. It provides a map method where the column data is
sent as a string array, String
[]
:
package
org
.
apache
.
hadoop
.
hive
.
contrib
.
mr
;
public
interface
Mapper
{
void
map
(
String
[]
record
,
Output
output
)
throws
Exception
;
}
The IdentityMapper
makes no
changes to the input data and passes it to the collector. This is
functionally equivalent to the identity streaming done with /bin/cat
earlier in the chapter:
package
org
.
apache
.
hadoop
.
hive
.
contrib
.
mr
.
example
;
import
org.apache.hadoop.hive.contrib.mr.GenericMR
;
import
org.apache.hadoop.hive.contrib.mr.Mapper
;
import
org.apache.hadoop.hive.contrib.mr.Output
;
public
final
class
IdentityMapper
{
public
static
void
main
(
final
String
[]
args
)
throws
Exception
{
new
GenericMR
().
map
(
System
.
in
,
System
.
out
,
new
Mapper
()
{
@Override
public
void
map
(
final
String
[]
record
,
final
Output
output
)
throws
Exception
{
output
.
collect
(
record
);
}
});
}
private
IdentityMapper
()
{
}
}
The Reducer
interface provides
the first column as a String
, and the
remaining columns are available through the record Iterator
. Each iteration returns a pair of
Strings
, where the
0th element is the key repeated and the next
element is the value. The output object is the same one used to emit
results:
package
org
.
apache
.
hadoop
.
hive
.
contrib
.
mr
;
import
java.util.Iterator
;
public
interface
Reducer
{
void
reduce
(
String
key
,
Iterator
<
String
[]>
records
,
Output
output
)
throws
Exception
;
}
WordCountReduce
has an
accumulator that is added by each element taken from the records Iterator
. When all the records have been
counted, a single two-element array of the key and the count is
emitted:
package
org
.
apache
.
hadoop
.
hive
.
contrib
.
mr
.
example
;
import
java.util.Iterator
;
import
org.apache.hadoop.hive.contrib.mr.GenericMR
;
import
org.apache.hadoop.hive.contrib.mr.Output
;
import
org.apache.hadoop.hive.contrib.mr.Reducer
;
public
final
class
WordCountReduce
{
private
WordCountReduce
()
{
}
public
static
void
main
(
final
String
[]
args
)
throws
Exception
{
new
GenericMR
().
reduce
(
System
.
in
,
System
.
out
,
new
Reducer
()
{
public
void
reduce
(
String
key
,
Iterator
<
String
[]>
records
,
Output
output
)
throws
Exception
{
int
count
=
0
;
while
(
records
.
hasNext
())
{
// note we use col[1] -- the key is provided again as col[0]
count
+=
Integer
.
parseInt
(
records
.
next
()[
1
]);
}
output
.
collect
(
new
String
[]
{
key
,
String
.
valueOf
(
count
)});
}
});
}
}
It’s common in MapReduce applications to join together
records from multiple data sets and then stream them through a final
TRANSFORM
step. Using UNION ALL
and CLUSTER
BY
, we can perform this generalization of a GROUP BY
operation
Pig provides a native COGROUP BY
operation.
Suppose we have several sources of logfiles, with similar schema,
that we wish to bring together and analyze with a reduce_script
:
FROM
(
FROM
(
FROM
order_log
ol
-- User Id, order Id, and timestamp:
SELECT
ol
.
userid
AS
uid
,
ol
.
orderid
AS
id
,
av
.
ts
AS
ts
UNION
ALL
FROM
clicks_log
cl
SELECT
cl
.
userid
AS
uid
,
cl
.
id
AS
id
,
ac
.
ts
AS
ts
)
union_msgs
SELECT
union_msgs
.
uid
,
union_msgs
.
id
,
union_msgs
.
ts
CLUSTER
BY
union_msgs
.
uid
,
union_msgs
.
ts
)
map
INSERT
OVERWRITE
TABLE
log_analysis
SELECT
TRANSFORM
(
map
.
uid
,
map
.
id
,
map
.
ts
)
USING
'reduce_script'
AS
(
uid
,
id
,
...);
[21] The source code and concept for this example comes from Larry Ogrodnek, “Custom Map Scripts and Hive”, Bizo development (blog), July 14, 2009.
[22] This is the most naive approach. We could cache the counts of words seen and then write the final count. That would be faster, by minimizing I/O overhead, but it would also be more complex to implement.
18.224.215.188