Chapter 14. Streaming

Hive works by leveraging and extending the components of Hadoop, common abstractions such as InputFormat, OutputFormat, Mapper, and Reducer, plus its own abstractions, like SerializerDeserializer (SerDe), User-Defined Functions (UDFs), and StorageHandlers.

These components are all Java components, but Hive hides the complexity of implementing and using these components by letting the user work with SQL abstractions, rather than Java code.

Streaming offers an alternative way to transform data. During a streaming job, the Hadoop Streaming API opens an I/O pipe to an external process. Data is then passed to the process, which operates on the data it reads from the standard input and writes the results out through the standard output, and back to the Streaming API job. While Hive does not leverage the Hadoop streaming API directly, it works in a very similar way.

This pipeline computing model is familiar to users of Unix operating systems and their descendants, like Linux and Mac OS X.

Note

Streaming is usually less efficient than coding the comparable UDFs or InputFormat objects. Serializing and deserializing data to pass it in and out of the pipe is relatively inefficient. It is also harder to debug the whole program in a unified manner. However, it is useful for fast prototyping and for leveraging existing code that is not written in Java. For Hive users who don’t want to write Java code, it can be a very effective approach.

Hive provides several clauses to use streaming: MAP(), REDUCE(), and TRANSFORM(). An important point to note is that MAP() does not actually force streaming during the map phase nor does reduce force streaming to happen in the reduce phase. For this reason, the functionally equivalent yet more generic TRANSFORM() clause is suggested to avoid misleading the reader of the query.

For our streaming examples we will use a small table named a, with columns named col1 and col2, both of type INT, and two rows:

hive> CREATE TABLE a (col1 INT, col2 INT)
    > ROW FORMAT DELIMITED FIELDS TERMINATED BY '	';

hive> SELECT * FROM a;
4       5
3       2

hive> DESCRIBE a;
a       int
b       int

Identity Transformation

The most basic streaming job is an identity operation. The /bin/cat command echoes the data sent to it and meets the requirements. In this example, /bin/cat is assumed to be installed on all TaskTracker nodes. Any Linux system should have it! Later, we will show how Hive can “ship” applications with the job when they aren’t already installed around the cluster:

hive> SELECT TRANSFORM (a, b)
    > USING '/bin/cat' AS newA, newB
    > FROM default.a;
4       5
3       2

Changing Types

The return columns from TRANSFORM are typed as strings, by default. There is an alternative syntax that casts the results to different types.

hive> SELECT TRANSFORM (col1, col2)
    > USING '/bin/cat' AS (newA INT , newB DOUBLE) FROM a;
4       5.0
3       2.0

Projecting Transformation

The cut command can be used with streaming to extract or project specific fields. In other words, this behaves like the SELECT statement:

hive> SELECT TRANSFORM (a, b)
    > USING '/bin/cut -f1'
    > AS newA, newB FROM a;
4       NULL
3       NULL

Note that the query attempts to read more columns than are actually returned from the external process, so newB is always NULL. By default, TRANSFORM assumes two columns but there can be any number of them:

hive> SELECT TRANSFORM (a, b)
    > USING '/bin/cut -f1'
    > AS newA FROM a;
4
3

Manipulative Transformations

The /bin/sed program (or /usr/bin/sed on Mac OS X systems) is a stream editor. It takes the input stream, edits it according to the user’s specification, and then writes the results to the output stream. The example below replaces the string 4 with the string 10:

hive> SELECT TRANSFORM (a, b)
    > USING '/bin/sed s/4/10/'
    > AS newA, newB FROM a;
10 5
3        2

Using the Distributed Cache

All of the streaming examples thus far have used applications such as cat and sed that are core parts of Unix operating systems and their derivatives. When a query requires files that are not already installed on every TaskTracker, users can use the distributed cache to transmit data or program files across the cluster that will be cleaned up when the job is complete.

This is helpful, because installing (and sometimes removing) lots of little components across large clusters can be a burden. Also, the cache keeps one job’s cached files separate from those files belonging to other jobs.

The following example is a bash shell script that converts degrees in Celsius to degrees in Fahrenheit:

while read LINE
do
  res=$(echo "scale=2;((9/5) * $LINE) + 32" | bc)
  echo $res
done

To test this script, launch it locally. It will not prompt for input. Type 100 and then strike Enter. The process prints 212.00 to the standard output. Then enter another number and the program returns another result. You can continue entering numbers or use Control-D to end the input.

#!/bin/bash
$ sh ctof.sh
100
212.00
0
32.00
^D

Hive’s ADD FILE feature adds files to the distributed cache. The added file is put in the current working directory of each task. This allows the transform task to use the script without needing to know where to find it:

hive> ADD FILE ${env:HOME}/prog_hive/ctof.sh;
Added resource: /home/edward/prog_hive/ctof.sh

hive> SELECT TRANSFORM(col1) USING 'ctof.sh' AS convert FROM a;
39.20
37.40

Producing Multiple Rows from a Single Row

The examples shown thus far have taken one row of input and produced one row of output. Streaming can be used to produce multiple rows of output for each input row. This functionality produces output similar to the EXPLODE() UDF and the LATERAL VIEW syntax[21].

Given an input file $HOME/kv_data.txt that looks like:

k1=v1,k2=v2
k4=v4,k5=v5,k6=v6
k7=v7,k7=v7,k3=v7

We would like the data in a tabular form. This will allow the rows to be processed by familiar HiveQL operators:

k1      v1
k2      v2
k4      k4

Create this Perl script and save it as $HOME/split_kv.pl:

#!/usr/bin/perl
while (<STDIN>) {
    my $line = $_;
    chomp($line);
    my @kvs = split(/,/, $line);
    foreach my $p (@kvs) {
        my @kv = split(/=/, $p);
        print $kv[0] . "	" . $kv[1] . "
";
    }
}

Create a kv_data table. The entire table is defined as a single string column. The row format does not need to be configured because the streaming script will do all the tokenization of the fields:

hive> CREATE TABLE kv_data ( line STRING );

hive> LOAD DATA LOCAL INPATH '${env:HOME}/kv_data.txt' INTO TABLE kv_data;

Use the transform script on the source table. The ragged, multiple-entry-per-row format is converted into a two-column result set of key-value pairs:

hive> SELECT TRANSFORM (line)
    > USING 'perl split_kv.pl'
    > AS (key, value) FROM kv_data;
k1      v1
k2      v2
k4      v4
k5      v5
k6      v6
k7      v7
k7      v7
k3      v7

Calculating Aggregates with Streaming

Streaming can also be used to do aggregating operations like Hive’s built-in SUM function. This is possible because streaming processes can return zero or more rows of output for every given input.

To accomplish aggregation in an external application, declare an accumulator before the loop that reads from the input stream and output the sum after the completion of the input:

#!/usr/bin/perl
my $sum=0;
while (<STDIN>) {
  my $line = $_;
  chomp($line);
  $sum=${sum}+${line};
}
print $sum;

Create a table and populate it with integer data, one integer per line, for testing:

hive> CREATE TABLE sum (number INT);

hive> LOAD DATA LOCAL INPATH '${env:HOME}/data_to_sum.txt' INTO TABLE sum;

hive> SELECT * FROM sum;
5
5
4

Add the streaming program to the distributed cache and use it in a TRANSFORM query. The process returns a single row, which is the sum of the input:

hive> ADD FILE ${env:HOME}/aggregate.pl;
Added resource: /home/edward/aggregate.pl

hive> SELECT TRANSFORM (number)
    > USING 'perl aggregate.pl' AS total FROM sum;
14

Unfortunately, it is not possible to do multiple TRANSFORMs in a single query like the UDAF SUM() can do. For example:

hive> SELECT sum(number) AS one, sum(number) AS two FROM sum;
14      14

Also, without using CLUSTER BY or DISTRIBUTE BY for the intermediate data, this job may run single, very long map and reduce tasks. While not all operations can be done in parallel, many can. The next section discusses how to do streaming in parallel, when possible.

CLUSTER BY, DISTRIBUTE BY, SORT BY

Hive offers syntax to control how data is distributed and sorted. These features can be used on most queries, but are particularly useful when doing streaming processes. For example, data for the same key may need to be sent to the same processing node, or data may need to be sorted by a specific column, or by a function. Hive provides several ways to control this behavior.

The first way to control this behavior is the CLUSTER BY clause, which ensures like data is routed to the same reduce task and sorted.

To demonstrate the use of CLUSTER BY, let’s see a nontrivial example: another way to perform the Word Count algorithm that we introduced in Chapter 1. Now, we will use the TRANSFORM feature and two Python scripts, one to tokenize lines of text into words, and the second to accept a stream of word occurrences and an intermediate count of the words (mostly the number “1”) and then sum up the counts for each word.

Here is the first Python script that tokenizes lines of text on whitespace (which doesn’t properly handle punctuation, etc.):

import sys

for line in sys.stdin:
    words = line.strip().split()
    for word in words:
        print "%s	1" % (word.lower())

Without explaining all the Python syntax, this script imports common functions from a sys module, then it loops over each line on the “standard input,” stdin, splits each line on whitespace into a collection of words, then iterates over the word and writes each word, followed by a tab, , and the “count” of one.[22]

Before we show the second Python script, let’s discuss the data that’s passed to it. We’ll use CLUSTER BY for the words output from the first Python script in our TRANSFORM Hive query. This will have the effect of causing all occurrences of the word 1 “pairs” for a give, word to be grouped together, one pair per line:

word1   1
word1   1
word1   1
word2   1
word3   1
word3   1
...

Hence, the second Python script will be more complex, because it needs to cache the word it’s currently processing and the count of occurrences seen so far. When the word changes, the script must output the count for the previous word and reset its caches. So, here it is:

import sys

(last_key, last_count) = (None, 0)
for line in sys.stdin:
  (key, count) = line.strip().split("	")
  if last_key and last_key != key:
    print "%s	%d" % (last_key, last_count)
    (last_key, last_count) = (key, int(count))
  else:
    last_key = key
    last_count += int(count)

if last_key:
    print "%s	%d" % (last_key, last_count)

We’ll assume that both Python scripts are in your home directory.

Finally, here is the Hive query that glues it all together. We’ll start by repeating a CREATE TABLE statement for an input table of lines of text, one that we used in Chapter 1. Any text file could serve as the data for this table. Next we’ll show the TABLE for the output of word count. It will have two columns, the word and count, and data will be tab-delimited. Finally, we show the TRANSFORM query that glues it all together:

hive> CREATE TABLE docs (line STRING);

hive> CREATE TABLE word_count (word STRING, count INT)
    > ROW FORMAT DELIMITED FIELDS TERMINATED BY '	';

hive> FROM (
    >   FROM docs
    >   SELECT TRANSFORM (line) USING '${env:HOME}/mapper.py'
    >   AS word, count
    >   CLUSTER BY word) wc
    > INSERT OVERWRITE TABLE word_count
    > SELECT TRANSFORM (wc.word, wc.count) USING '${env:HOME}/reducer.py'
    > AS word, count;

The USING clauses specify an absolute path to the Python scripts.

A more flexible alternative to CLUSTER BY is to use DISTRIBUTE BY and SORT BY. This is used in the general case when you wish to partition the data by one column and sort it by another. In fact, CLUSTER BY word is equivalent to DISTRIBUTE BY word SORT BY word ASC.

The following version of the TRANSFORM query outputs the word count results in reverse order:

FROM (
  FROM docs
  SELECT TRANSFORM (line) USING '/.../mapper.py'
  AS word, count
  DISTRIBUTE BY word SORT BY word DESC) wc
INSERT OVERWRITE TABLE word_count
SELECT TRANSFORM (wc.word, wc.count) USING '/.../reducer.py'
AS word, count;

Using either CLUSTER BY or DISTRIBUTE BY with SORT BY is important. Without these directives, Hive may not be able to parallelize the job properly. All the data might be sent to a single reducer, which would extend the job processing time.

GenericMR Tools for Streaming to Java

Typically, streaming is used to integrate non-Java code into Hive. Streaming works with applications written in essentially any language, as we saw. It is possible to use Java for streaming, and Hive includes a GenericMR API that attempts to give the feel of the Hadoop MapReduce API to streaming:

FROM (
  FROM src
  MAP value, key
  USING 'java -cp hive-contrib-0.9.0.jar
   org.apache.hadoop.hive.contrib.mr.example.IdentityMapper'
  AS k, v
  CLUSTER BY k) map_output
REDUCE k, v
USING 'java -cp hive-contrib-0.9.0.jar
  org.apache.hadoop.hive.contrib.mr.example.WordCountReduce'
AS k, v;

To understand how the IdentityMapper is written, we can take a look at the interfaces GenericMR provides. The Mapper interface is implemented to build custom Mapper implementations. It provides a map method where the column data is sent as a string array, String []:

package org.apache.hadoop.hive.contrib.mr;

public interface Mapper {
  void map(String[] record, Output output) throws Exception;
}

The IdentityMapper makes no changes to the input data and passes it to the collector. This is functionally equivalent to the identity streaming done with /bin/cat earlier in the chapter:

package org.apache.hadoop.hive.contrib.mr.example;

import org.apache.hadoop.hive.contrib.mr.GenericMR;
import org.apache.hadoop.hive.contrib.mr.Mapper;
import org.apache.hadoop.hive.contrib.mr.Output;

public final class IdentityMapper {
  public static void main(final String[] args) throws Exception {
    new GenericMR().map(System.in, System.out, new Mapper() {
      @Override
      public void map(final String[] record, final Output output) throws Exception {
        output.collect(record);
      }
    });
  }

  private IdentityMapper() {
  }
}

The Reducer interface provides the first column as a String, and the remaining columns are available through the record Iterator. Each iteration returns a pair of Strings, where the 0th element is the key repeated and the next element is the value. The output object is the same one used to emit results:

package org.apache.hadoop.hive.contrib.mr;

import java.util.Iterator;

public interface Reducer {
  void reduce(String key, Iterator<String[]> records, Output output)
      throws Exception;
}

WordCountReduce has an accumulator that is added by each element taken from the records Iterator. When all the records have been counted, a single two-element array of the key and the count is emitted:

package org.apache.hadoop.hive.contrib.mr.example;

import java.util.Iterator;
import org.apache.hadoop.hive.contrib.mr.GenericMR;
import org.apache.hadoop.hive.contrib.mr.Output;
import org.apache.hadoop.hive.contrib.mr.Reducer;

public final class WordCountReduce {

  private WordCountReduce() {
  }

  public static void main(final String[] args) throws Exception {
    new GenericMR().reduce(System.in, System.out, new Reducer() {
      public void reduce(String key, Iterator<String[]> records, Output output)
          throws Exception {
        int count = 0;
        while (records.hasNext()) {
          // note we use col[1] -- the key is provided again as col[0]
          count += Integer.parseInt(records.next()[1]);
        }
        output.collect(new String[] {key, String.valueOf(count)});
      }
    });
  }
}

Calculating Cogroups

It’s common in MapReduce applications to join together records from multiple data sets and then stream them through a final TRANSFORM step. Using UNION ALL and CLUSTER BY, we can perform this generalization of a GROUP BY operation

Note

Pig provides a native COGROUP BY operation.

Suppose we have several sources of logfiles, with similar schema, that we wish to bring together and analyze with a reduce_script:

FROM (
  FROM (
    FROM order_log ol
    -- User Id, order Id, and timestamp:
    SELECT ol.userid AS uid, ol.orderid AS id, av.ts AS ts

    UNION ALL

    FROM clicks_log cl
    SELECT cl.userid AS uid, cl.id AS id, ac.ts AS ts
  ) union_msgs
SELECT union_msgs.uid, union_msgs.id, union_msgs.ts
CLUSTER BY union_msgs.uid, union_msgs.ts) map
INSERT OVERWRITE TABLE log_analysis
SELECT TRANSFORM(map.uid, map.id, map.ts) USING 'reduce_script'
AS (uid, id, ...);


[21] The source code and concept for this example comes from Larry Ogrodnek, “Custom Map Scripts and Hive”, Bizo development (blog), July 14, 2009.

[22] This is the most naive approach. We could cache the counts of words seen and then write the final count. That would be faster, by minimizing I/O overhead, but it would also be more complex to implement.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.224.215.188