Chapter 5. Tutorials and Deep Dives

The following sections cover aspects of Impala that deserve a closer look. Brief examples illustrate interesting features for new users. More complex topics are covered by tutorials or deep dives into the inner workings.

Tutorial: From Unix Data File to Impala Table

Here is what your first Unix command-line session might look like when you’re using Impala. This example from a Bash shell session creates a couple of text files (which could be named anything), copies those files into the HDFS filesystem, and points an Impala table at the data so that it can be queried through SQL. The exact HDFS paths might differ based on your HDFS configuration and Linux users.

$ cat >csv.txt
1,red,apple,4
2,orange,orange,4
3,yellow,banana,3
4,green,apple,4
^D
$ cat >more_csv.txt
5,blue,bubblegum,0.5
6,indigo,blackberry,0.2
7,violet,edible flower,0.01
8,white,scoop of vanilla ice cream,3
9,black,licorice stick,0.2
^D
$ hadoop fs -mkdir /user/hive/staging
$ hadoop fs -put csv.txt /user/hive/staging
$ hadoop fs -put more_csv.txt /user/hive/staging

Note

Sometimes the user you are logged in as does not have permission to manipulate HDFS files. In that case, issue the commands with the permissions of the hdfs user, using the form:

sudo -u hdfs hadoop fs arguments

Now that the data files are in the HDFS filesystem, let’s go into the Impala shell and start working with them. (Some of the prompts and output are abbreviated here for easier reading by first-time users.) This example creates a new database, in case this experiment turns into a project with a whole set of related tables. Then we create a table inside this database, move the data files into the table, and run some queries.

$ impala-shell
> create database food_colors;
> use food_colors;
> create table food_data
    (id int, color string, food string, weight float)
    row format delimited fields terminated by ',';
> -- Here's where we move the data files from an arbitrary
  -- HDFS location to under Impala control.
> load data inpath '/user/hive/staging' into table food_data;
Query finished, fetching results ...
+----------------------------------------------------------+
| summary                                                  |
+----------------------------------------------------------+
| Loaded 2 file(s). Total files in destination location: 2 |
+----------------------------------------------------------+
> select food, color as "Possible Color" from food_data where
    food = 'apple';
Query finished, fetching results ...
+-------+----------------+
| food  | possible color |
+-------+----------------+
| apple | red            |
| apple | green          |
+-------+----------------+
Returned 2 row(s) in 0.66s
> select food as "Top 5 Heaviest Foods", weight
    from food_data
    order by weight desc limit 5;
Query finished, fetching results ...
+----------------------------+----------------------+
| top 5 heaviest foods       | weight               |
+----------------------------+----------------------+
| orange                     | 4                    |
| apple                      | 4                    |
| apple                      | 4                    |
| scoop of vanilla ice cream | 3                    |
| banana                     | 3                    |
+----------------------------+----------------------+
Returned 5 row(s) in 0.49s
> quit;

Back in the Unix shell, see how the CREATE DATABASE and CREATE TABLE statements created some new directories and how the LOAD DATA statement moved the original data files into an Impala-managed directory:

$ hadoop fs -ls -R /user/hive/warehouse/food_colors.db
drwxrwxrwt   - impala hive          0 2013-08-29 16:14 /user/h
ive/warehouse/food_colors.db/food_data
-rw-rw-rw-   3 hdfs   hive         66 2013-08-29 16:12 /user/h
ive/warehouse/food_colors.db/food_data/csv.txt
-rw-rw-rw-   3 hdfs   hive        139 2013-08-29 16:12 /user/h
ive/warehouse/food_colors.db/food_data/more_csv.txt

In one easy step, you’ve gone from a collection of human-readable text files to a SQL table that you can query using standard, widely known syntax. The data is automatically replicated and distributed across a cluster of networked machines by virtue of being put into an HDFS directory.

These same basic techniques scale up to enormous tables with billions of rows. By that point, you would likely be using a more compact and efficient data format than plain text files, and you might include a partitioning clause in the CREATE TABLE statement to split up the data files by date or category. Don’t worry, you can easily upgrade your Impala tables and rearrange the data as you learn the more advanced Impala features. In fact, that’s the subject of a later tutorial: Tutorial: The Journey of a Billion Rows.

Tutorial: Queries Without a Table

To understand how Impala works at the extreme ends of the spectrum, let’s consider for a moment the least intensive queries we could run. Impala does not have a built-in trivial table like the DUAL table in Oracle. Instead, to get back a single-row result of an expression, you construct a query with all constant expressions or function calls in the SELECT list, and leave off the FROM clause. Here are sample queries for you to run in the impala-shell interpreter; see if you can predict the results:

SELECT 1;
SELECT "hello world";
SELECT 2+2;
SELECT 10 > 5;
SELECT now();

You can use this very valuable table-less SELECT technique for experimenting with the detailed semantics of Impala functions, data types and casting, NULL handling, and all kinds of expressions.

These examples illustrate numeric expressions and arithmetic:

SELECT 1 + 0.5;
SELECT 1 / 3;
SELECT 1e6, 1.5e6;
SELECT 30000 BETWEEN min_smallint() AND max_smallint();
+-------------------------------------------------+
| 30000 between min_smallint() and max_smallint() |
+-------------------------------------------------+
| true                                            |
+-------------------------------------------------+

Note

The results of those floating-point expressions are more precise in Impala 1.4 and later than in previous releases, due to the introduction of the DECIMAL type.

These examples illustrate type conversions:

SELECT cast(1e6 AS string);
SELECT cast(true AS string);
SELECT cast(99.44 AS int);

These examples illustrate what happens when NULL is used in various contexts:

SELECT 1 + NULL, 1 = NULL, 1 > NULL, NULL = NULL, NULL IS NULL;
SELECT cast(NULL AS STRING), cast(NULL AS BIGINT), cast(NULL AS BOOLEAN);

These examples illustrate string literals and string functions:

SELECT 'hello
world';
SELECT "abc	012345	xyz";
SELECT concat('hello',NULL);
SELECT substr('hello',-2,2);

These examples illustrate regular expression comparisons and functions:

SELECT 'abc123xyz' REGEXP '[[:digit:]]{3}';
+-------------------------------------+
| 'abc123xyz' regexp '[[:digit:]]{3}' |
+-------------------------------------+
| true                                |
+-------------------------------------+

SELECT regexp_extract('>>>abc<<<','.*([a-z]+)',1);
+----------------------------------------------+
| regexp_extract('>>>abc<<<', '.*([a-z]+)', 1) |
+----------------------------------------------+
| abc                                          |
+----------------------------------------------+

SELECT regexp_replace('123456','(2|4|6)','x'),
+------------------------------------------+
| regexp_replace('123456', '(2|4|6)', 'x') |
+------------------------------------------+
| 1x3x5x                                   |
+------------------------------------------+

This example illustrates date/time expressions, functions, and arithmetic:

SELECT now() + INTERVAL 3 DAYS + INTERVAL 5 HOURS;
+--------------------------------------------+
| now() + interval 3 days + interval 5 hours |
+--------------------------------------------+
| 2014-08-03 16:48:44.201018000              |
+--------------------------------------------+

These types of queries can help you construct or debug the individual pieces of a complicated query. For example, you typically run a simple test to confirm that you have the right regex notation, function arguments, format strings, and so on, before applying a regular expression or date calculation to billions of rows.

Queries with no FROM clause are not subject to the limits imposed by the admission control feature on the number of concurrent queries; they are not parallelized and distributed across multiple cluster nodes. Instead, all work is done on the coordinator node. In terms of resource management, Impala still allocates a default amount of memory (several megabytes) for each such query, because even a query without a table might evaluate some enormously complicated expression or call a complex user-defined function.

Tutorial: The Journey of a Billion Rows

Now that we’ve seen how you can try out basic SQL features with no table or a tiny table, let’s get serious. In the Big Data field, there’s little point experimenting with thousands or even millions of rows. That volume of data is effectively the same as the smallest tables you could construct, so you don’t really learn much about distributed queries or Impala performance.

Let’s set up a table with a billion rows and see what we can learn. With a billion rows, by definition we’ll be working with gigabytes of data. Any inefficiency or wasted storage will be easy to spot. Any query that processes those gigabytes in only a few seconds will be cause for celebration.

Generating a Billion Rows of CSV Data

First, let’s generate a billion rows of general-purpose data. I intentionally chose an old-school technique, a simple Python script generating a single big file, to illustrate how Impala bridges the world of traditional Unix and databases, and the new Hadoop world of Big Data and distributed parallel computing.

#! /usr/bin/python

"""
multicols.py: Generate an arbitrary number of rows with random values.
"""

import sys
from random import *

# ---
# Load a list of the biggest US cities (289 of them),
# to pick a random city/state combination for an address.

usa_cities = []

def load_cities():
  global usa_cities
  lines = [line.rstrip() for line in open("usa_cities.lst").readlines()]
  usa_cities = [line.split(",") for line in lines]

def random_city():
  if usa_cities == []:
    load_cities()
  which = randint(0,len(usa_cities)-1)
  return usa_cities[which]

if __name__ == '__main__':

# Produce text format data with different kinds of separators.
  possible_separators = { "pipe": "|", "comma": ",", "csv": ",",
    "ctrl-a": "x01", "hash": "#", "bang": "!", "tab": "	",
    "tsv": "	" }

# Accept number of rows to generate as command-line argument.
  try:
    count = int(sys.argv[1])
  except:
    count = 1

# For random numeric values, define upper bound as another command-line argument.
# By default, values are 0-99999.
  try:
    upper = int(sys.argv[2])
  except:
    upper = 99999

# Accept mnemonic for separator characters as command-line argument.
  try:
    sep_arg = sys.argv[3]
    sep = possible_separators[sep_arg]
  except:
#    If no separator is specified, fall back to the Impala default.
    sep = "x01"

# Generate requested number of rows of data.
  for i in xrange(count):

# Column 1 is a sequential integer, starting from 1.
    c1 = str(i+1)

# Column 2 is a random integer, from 0 to the specified upper bound.
# 10% of the time, we substitute a NULL value instead of a number.
    chance = randint(1,10) % 10;
    if chance == 0:
      c2 = r"N"
    else:
      c2 = str(randint(0,upper))

# Column 3 is another random integer, but formatted with leading
# zeroes to exactly 6 characters.
    c3 = str(randint(0,upper)).zfill(6)

# Column 4 is a random string, from 4-22 characters.
# It is an initial capital letter, followed by 3 sequences of repeating letters.
# 1% of the time, we substitute a NULL value instead of a string.
    chance = randint(1,100) % 100;
    if chance == 0:
      c4 = r"N"
    else:
      cap = chr(randint(65,90))
      string1 = chr(randint(97,122)) * randint(1,7)
      string2 = chr(randint(97,122)) * randint(1,7)
      string3 = chr(randint(97,122)) * randint(1,7)
      c4 = cap + string1 + string2 + string3

# Column 5 is a random Boolean value.
# It's true 2/3 of the time, false 1/3 of the time.
    bool = randint(0,2)
    if bool == 0:
      c5 = "false"
    else:
      c5 = "true"

# We figure out a random city and state to use for a location field.
    (city,state) = random_city()
    c6 = city
    c7 = state

# Concatenate all the fields and print.
    row = (c1 + sep + c2 + sep + c3 + sep + c4 +
      sep + c5 + sep + c6 + sep + c7)
    print row

Impala is flexible in terms of the sequence of operations. You can prepare the data first and even bring it into HDFS, and then construct a database table that matches the structure of the data. Or you can use the traditional sequence of creating the table first and then preparing the data based on the table schema. For this exercise, we’ll prepare the data on the local filesystem, then set up the database table in Impala, and then move the data into the expected location in HDFS.

First we run the script to produce the random data. On my Linux server, this takes an hour or more. (This is a good illustration of why it’s so attractive to parallelize operations involving so much data.)

$ python multicols.py 1000000000 99999 comma >billion_rows.csv

Within the impala-shell interpreter, we create a table, which will contain a billion rows after the data files go into HDFS. The attributes of the table (file format, and separator character for text format) match what we used in the raw data files.

$ impala-shell -i localhost
[localhost:21000] > create database oreilly;
[localhost:21000] > use oreilly;
[localhost:21000] > create table sample_data
                  > (id bigint, val int, zerofill string, name string,
                  > assertion boolean, city string, state string)
                  > row format delimited fields terminated by ",";
[localhost:21000] > desc sample_data;
+-----------+---------+---------+
| name      | type    | comment |
+-----------+---------+---------+
| id        | bigint  |         |
| val       | int     |         |
| zerofill  | string  |         |
| name      | string  |         |
| assertion | boolean |         |
| city      | string  |         |
| state     | string  |         |
+-----------+---------+---------+

Now we need to get the data into the right location in HDFS. To figure out where the data should go, we use the DESCRIBE FORMATTED statement:

[localhost:21000] > describe formatted sample_data;
...
| # Detailed Table Information | NULL
| Database:                    | oreilly
| Owner:                       | jrussell
| CreateTime:                  | Fri Jul 18 16:25:06 PDT 2014
| LastAccessTime:              | UNKNOWN
| Protect Mode:                | None
| Retention:                   | 0
| Location:                    | hdfs://a1730.abcde.example.com:8020 1
|                              | /user/impala/warehouse/oreilly.db/
|                              | sample_data
| Table Type:                  | MANAGED_TABLE
...
1

The Location: attribute represents the HDFS path to the table data. When using it with Hadoop commands, you can include the hdfs://host:port prefix or leave it out and specify it as a /user/whoever/... path.

Armed with this knowledge, we can run Linux utilities (or various kinds of Hadoop jobs) that deposit data in the appropriate HDFS directory. In this example, we do that from inside impala-shell using the ! command, which invokes an arbitrary Linux command.

[localhost:21000] > !hdfs dfs -put billion_rows.csv
                  > '/user/impala/warehouse/oreilly.db/sample_data';

Impala needs a reminder, in the form of a REFRESH statement, whenever data files are added or changed outside of Impala SQL statements such as CREATE TABLE AS SELECT or INSERT. At that point, we can query the table and see that the billion rows have arrived:

[localhost:21000] > refresh sample_data;
[localhost:21000] > select count(*) from sample_data;
+------------+
| count(*)   |
+------------+
| 1000000000 |
+------------+
Returned 1 row(s) in 45.31s

Now we’ve got a billion rows to play with, using all the familiar SQL techniques. Let’s try some simple queries that we know will produce small result sets:

[localhost:21000] > select max(name) from sample_data;
+------------------------+
| max(name)              |
+------------------------+
| Zzzzzzzzzzzzzzzzzzzzzz |
+------------------------+
Returned 1 row(s) in 50.73s
[localhost:21000] > select min(name) as first_in_alpha_order, assertion
                  > from sample_data group by assertion;
+----------------------+-----------+
| first_in_alpha_order | assertion |
+----------------------+-----------+
| Aaaa                 | true      |
| Aaaa                 | false     |
+----------------------+-----------+
Returned 2 row(s) in 37.35s
[localhost:21000] > select avg(val), min(name), max(name) from sample_data
                  > where name between 'A' and 'D';
+-------------------+-----------+------------------------+
| avg(val)          | min(name) | max(name)              |
+-------------------+-----------+------------------------+
| 49992.47281834851 | Aaaa      | Czzzzzzzzzzzzzzzzzzzzz |
+-------------------+-----------+------------------------+
Returned 1 row(s) in 12.77s
[localhost:21000] > select count(name) as num_names, assertion
                  > from sample_data group by assertion;
+-----------+-----------+
| num_names | assertion |
+-----------+-----------+
| 660023128 | true      |
| 329974394 | false     |
+-----------+-----------+
Returned 2 row(s) in 45.61s

What’s behind the scenes with these billion rows? We started with one big CSV file and we put it straight into the Hadoop filesystem. The SHOW TABLE STATS statement displays the physical characteristics of the table:

[localhost:21000] > show table stats sample_data;
+-------+--------+---------+--------------+--------+
| #Rows | #Files | Size    | Bytes Cached | Format |
+-------+--------+---------+--------------+--------+
| -1    | 1      | 56.72GB | NOT CACHED   | TEXT   | 1
+-------+--------+---------+--------------+--------+
Returned 1 row(s) in 0.01s
1

That single big file is still there in HDFS, all 56.72 gigabytes.

The whole big file is being read every time we do a query, which explains why the queries all take several seconds or more.

At this point, let your inquisitive hacker imagination run free. If there is some way to reduce the data size by several gigabytes, would that translate into seconds shaved off each of these queries? Yes, it would. How about if we could arrange the data so it didn’t have to be entirely read for each query? Yes, that would speed up the queries proportionally: read 1/10th as much data, take roughly 1/10th as much time as the original query.

How a Table Works When It’s One Big File

Are we losing out on parallelism by having just one file? Not really, because it’s so big:

  • HDFS internally divides the data up into blocks, 128 MB each by default.
  • Each block is replicated to some number of hosts in our cluster; by default 3.
  • For each query, each 128 MB block is processed by one of our 4 nodes. Which node processes a given block? That’s not entirely predictable, but anyway it’s one of the 3 nodes that can read the block off their local disk, rather than asking for it to be sent across the network from a node that does have it. So all 4 of our nodes are kept busy by these queries, and the queries can finish in approximately 1/4 of the time it would take if we did them on a single machine.

Normalizing the Original Data

How are we going to shrink the data? First, let’s do a bit of normalization. The CITY and STATE fields only have 289 values total, representing the largest cities in the USA. We could move repeated strings such as “California” and “Mississippi” out of the data file and replace them with small integers.

[localhost:21000] > select avg(length(city)) + avg(length(state))
                  > from sample_data;
+----------------------------------------+
| avg(length(city)) + avg(length(state)) |
+----------------------------------------+
| 17.190299006                           | 1
+----------------------------------------+
Returned 1 row(s) in 15.18s
[localhost:21000] > quit;
1

The average combined length of the CITY and STATE fields is about 17 characters.

We’ll replace those with a single number, between 1 and 3 digits. So we could expect to save roughly 15–16 gigabytes of disk space, by replacing 18 characters (CITY and STATE plus comma delimiter) with 2–3 digit numbers.

After we get the data into Impala, we can use SQL skills to slice and dice it in all sorts of ways. Let’s set up a tiny lookup table with all the city and state data, and then make a new table with the original data in normalized form. Before transforming the data, we’ll use a view to double-check the correctness of the join query that pulls out the normalized values.

Recall that we started with a simple list of CITY,STATE values in usa_cities.lst. To use this data as a lookup table, we need a file with numeric IDs. That’s easy to prepare with basic Unix commands: just take the output of cat -n, trim off leading spaces, and turn the tab after the line number into our separator character (comma).

$ cat -n usa_cities.lst | sed -e 's/	/,/' | sed -e 's/^ *//' | tee usa_cities.csv

Now we pick up back in the impala-shell interpreter, inside the oreilly database where we’re running these experiments. Again, we load the data file into the right HDFS directory, finding the location with DESCRIBE FORMATTED and running the hdfs command from inside impala-shell:

$ impala-shell -i localhost -d oreilly
[localhost:21000] > describe formatted usa_cities;
...
| Location: | hdfs://a1730.abcde.example.com:8020/user/impala/warehouse
|           | /oreilly.db/usa_cities
...
[localhost:21000] > !hdfs dfs -put usa_cities.csv
                  > '/user/impala/warehouse/oreilly.db/usa_cities';
[localhost:21000] > refresh usa_cities;
[localhost:21000] > select count(*) from usa_cities;
+----------+
| count(*) |
+----------+
| 289      |
+----------+
[localhost:21000] > show table stats usa_cities;
+-------+--------+--------+--------------+--------+
| #Rows | #Files | Size   | Bytes Cached | Format |
+-------+--------+--------+--------------+--------+
| -1    | 1      | 6.44KB | NOT CACHED   | TEXT   |
+-------+--------+--------+--------------+--------+
Returned 1 row(s) in 0.01s
[localhost:21000] > select * from usa_cities limit 5;
+----+--------------+--------------+
| id | city         | state        |
+----+--------------+--------------+
| 1  | New York     | New York     |
| 2  | Los Angeles  | California   |
| 3  | Chicago      | Illinois     |
| 4  | Houston      | Texas        |
| 5  | Philadelphia | Pennsylvania |
+----+--------------+--------------+

Before doing any resource-intensive operation like reorganizing the original 56 GB table, I always double-check the logic first using a view, which helps to avoid typing long queries over and over. Let’s make sure that the CITY and STATE data from the original table match up with the values from the new lookup table:

[localhost:21000] > create view normalized_view as
                  > select one.id, one.val, one.zerofill, one.name,
                  >   one.assertion, two.id as location_id
                  > from sample_data one join usa_cities two 1
                  > on (one.city = two.city and one.state = two.state);

[localhost:21000] > select one.id, one.location_id,
                  >   two.id, two.city, two.state 2
                  > from normalized_view one join usa_cities two
                  > on (one.location_id = two.id)
                  > limit 5;
+----------+-------------+-----+-----------+------------+
| id       | location_id | id  | city      | state      |
+----------+-------------+-----+-----------+------------+
| 15840253 | 216         | 216 | Denton    | Texas      |
| 15840254 | 110         | 110 | Fontana   | California |
| 15840255 | 250         | 250 | Gresham   | Oregon     |
| 15840256 | 200         | 200 | Waco      | Texas      |
| 15840257 | 165         | 165 | Escondido | California |
+----------+-------------+-----+-----------+------------+
Returned 5 row(s) in 0.42s

[localhost:21000] > select id, city, state from sample_data 3
                  > where id in (15840253, 15840254, 15840255, 15840256, 15840257);
+----------+-----------+------------+
| id       | city      | state      |
+----------+-----------+------------+
| 15840253 | Denton    | Texas      |
| 15840254 | Fontana   | California |
| 15840255 | Gresham   | Oregon     |
| 15840256 | Waco      | Texas      |
| 15840257 | Escondido | California |
+----------+-----------+------------+
Returned 5 row(s) in 5.27s
1

The view gets some columns from the original SAMPLE_DATA table, but retrieves CITY and STATE from the small USA_CITIES lookup table.

2

The join query pulls CITY and STATE from the small lookup table by way of the view.

3

The final query confirms that the results are the same when CITY and STATE come from the original SAMPLE_DATA table.

Now we’re satisfied that the join query in the view pulls out the correct combination of CITY, STATE, and ID values from the lookup table. So let’s create a version of our billion-row table that matches the layout of the view, with the CITY and STATE columns replaced by a single numeric LOCATION_ID:

[localhost:21000] > create table normalized_text
                  > row format delimited fields terminated by ","
                  > as select * from normalized_view;
+----------------------------+
| summary                    |
+----------------------------+
| Inserted 1000000000 row(s) |
+----------------------------+
Returned 1 row(s) in 422.06s

[localhost:21000] > select * from normalized_text limit 5;
+-----------+-------+----------+------------------+-----------+-------------+
| id        | val   | zerofill | name             | assertion | location_id |
+-----------+-------+----------+------------------+-----------+-------------+
| 921623839 | 95546 | 001301   | Pwwwwwbbe        | false     | 217         |
| 921623840 | 38224 | 018053   | Clldddddddll     | true      | 127         |
| 921623841 | 73153 | 032797   | Csssijjjjjj      | true      | 124         |
| 921623842 | 35567 | 094193   | Uhhhhhrrrrrrvvv  | false     | 115         |
| 921623843 | 4694  | 051840   | Uccccqqqqqbbbbbb | true      | 138         |
+-----------+-------+----------+------------------+-----------+-------------+

[localhost:21000] > show table stats normalized_text;
+-------+--------+---------+--------------+--------+
| #Rows | #Files | Size    | Bytes Cached | Format |
+-------+--------+---------+--------------+--------+
| -1    | 4      | 42.22GB | NOT CACHED   | TEXT   | 1
+-------+--------+---------+--------------+--------+
1

As predicted, we saved about 14.5 GB in our original table, by creating a lookup table that’s less than 7 KB. (From the perspective of the IT group, we’ve really saved 43.5 GB in total, because each unnecessary data block gets replicated across 3 nodes.)

When we do join queries to display the original city and state names in reports, that is a perfect illustration of the “broadcast” join technique: the lookup table that’s only a few KB will be transmitted to each node and cross-referenced against the data from the big table as that larger data set is read from local disk storage.

Q&A

How come when we asked for 5 rows with the LIMIT 5 clause, we didn’t always get the first 5 rows from the table? Some of those queries returned rows with IDs in the range of 15 million or even 921 million.

Remember that as the experiment progressed, the new tables we created had progressively more and more data files: first 4, then 64. Each of our 4 nodes was working on a subset of data, and whichever node came up with its 5 rows first, those are the rows we saw. Even when there was just a single 56 GB file, our 4 nodes were working in parallel on the individual 128 MB data blocks carved out of the original file, and the arbitrary rows we asked for could come back from any of those blocks.

Whenever you expect rows to be returned in a particular order, include an ORDER BY clause in the outermost block of a query, or in the query that references the view. The SQL standard does not guarantee sorted results from an ORDER BY in a subquery or a view definition.

Converting to Parquet Format

We’ve just saved several gigabytes of disk space and earned the gratitude of the IT department. Shall we stop there? No, we’re on a roll now!

When you read the official Impala documentation, you’ll see guidelines saying to use the Parquet file format for all your sizeable tables. This file format uses some clever tricks to shrink the overall size of the data and reduce I/O during queries. Let’s give that a try:

[localhost:21000] > create table normalized_parquet stored as parquet 1
                  > as select * from normalized_text;
+----------------------------+
| summary                    |
+----------------------------+
| Inserted 1000000000 row(s) |
+----------------------------+
Returned 1 row(s) in 183.63s
[localhost:21000] > select count(*) from normalized_parquet;
+------------+
| count(*)   |
+------------+
| 1000000000 |
+------------+
Returned 1 row(s) in 2.63s
[localhost:21000] > show table stats normalized_parquet;
+-------+--------+---------+--------------+---------+
| #Rows | #Files | Size    | Bytes Cached | Format  |
+-------+--------+---------+--------------+---------+
| -1    | 64     | 23.34GB | NOT CACHED   | PARQUET | 2
+-------+--------+---------+--------------+---------+
Returned 1 row(s) in 0.01s
1

As you gain more experience with Impala queries and performance tuning, you will start to get a warm, fuzzy feeling when you see the STORED AS PARQUET clause in CREATE TABLE statements.

2

Wow, we just reduced the size of the table again—by almost 20 more gigabytes this time. (Again, the 3x replication factor means we saved another 60 GB in total across the cluster).

The ultimate goal is for queries to be faster, so let’s see how the various tables we constructed perform with the same queries. We’ll run identical queries on the original 56 GB text table, the 42 GB normalized text table, and finally the 23 GB normalized Parquet table, expecting each to be faster than the preceding one:

[localhost:21000] > select max(name) from sample_data; 1
+------------------------+
| max(name) |
+------------------------+
| Zzzzzzzzzzzzzzzzzzzzzz |
+------------------------+
Returned 1 row(s) in 50.73s 1

[localhost:21000] > select max(name) from normalized_text; 2
+------------------------+
| max(name) |
+------------------------+
| Zzzzzzzzzzzzzzzzzzzzzz |
+------------------------+
Returned 1 row(s) in 24.15s 2

[localhost:21000] > select max(name) from normalized_parquet; 3
+------------------------+
| max(name) |
+------------------------+
| Zzzzzzzzzzzzzzzzzzzzzz |
+------------------------+
Returned 1 row(s) in 20.19s 3

[localhost:21000] > select avg(val), min(name), max(name) from sample_data 1
> where name between 'A' and 'D';
+-------------------+-----------+------------------------+
| avg(val) | min(name) | max(name) |
+-------------------+-----------+------------------------+
| 49992.47281834851 | Aaaa | Czzzzzzzzzzzzzzzzzzzzz |
+-------------------+-----------+------------------------+
Returned 1 row(s) in 26.36s 1

[localhost:21000] > select avg(val), min(name), max(name) from normalized_text 2
> where name between 'A' and 'D';
+-------------------+-----------+------------------------+
| avg(val) | min(name) | max(name) |
+-------------------+-----------+------------------------+
| 49992.47281834851 | Aaaa | Czzzzzzzzzzzzzzzzzzzzz |
+-------------------+-----------+------------------------+
Returned 1 row(s) in 21.17s 2

[localhost:21000] > select avg(val), min(name), max(name) from normalized_parquet 3
> where name between 'A' and 'D';
+-------------------+-----------+------------------------+
| avg(val) | min(name) | max(name) |
+-------------------+-----------+------------------------+
| 49992.47281834851 | Aaaa | Czzzzzzzzzzzzzzzzzzzzz |
+-------------------+-----------+------------------------+
Returned 1 row(s) in 12.11s 3
1

The SAMPLE_DATA is the biggest table, in text format with redundant string data. The queries for this table are the slowest.

2

The NORMALIZED_TEXT table is somewhat smaller, still in text format. The queries for this table are somewhat faster because of its smaller size, resulting in less I/O.

3

The NORMALIZED_PARQUET table is the smallest. The queries for this table are the fastest, because the overall data is smaller still, and Parquet reduces I/O even more by reading only the columns needed by the query.

Fun Fact

As I ran and reran these queries in my test environment, the times jumped up and down a bit, because sometimes the Linux servers had cached some of the data after reading it the first time. There again, having the data in the most compact format possible increases the chance that the data will still be cached later, instead of being evicted by reading data that’s bulkier than necessary.

As a final experiment with file formats, let’s see what happens if we convert the original 56 GB table directly to Parquet without the normalization step. Because we are not eliminating the redundant string data, we can predict that the overall size will be somewhere between the original 56 GB and the 23.34 GB of the NORMALIZED_PARQUET table:

localhost:21000] > create table denormalized_parquet stored as parquet as
                 > select * from sample_data;
+----------------------------+
| summary                    |
+----------------------------+
| Inserted 1000000000 row(s) |
+----------------------------+
Returned 1 row(s) in 225.69s
[localhost:21000] > show table stats denormalized_parquet;
+-------+--------+---------+--------------+---------+
| #Rows | #Files | Size    | Bytes Cached | Format  |
+-------+--------+---------+--------------+---------+
| -1    | 64     | 24.04GB | NOT CACHED   | PARQUET | 1
+-------+--------+---------+--------------+---------+
Returned 1 row(s) in 0.01s
1

The NORMALIZED_PARQUET table was 23.34 GB, while the DENORMALIZED_PARQUET table is only a little bigger at 24.04 GB.

Why isn’t there a bigger size difference like there was in text format? When the data was converted to Parquet, it was compacted (encoded) in multiple ways before the final compression step. One trick Parquet uses is to take columns with up to 16K of different values, and internally de-duplicate them, substituting numeric indexes instead of repeated strings. (That technique is known as dictionary encoding.) In a sense, Parquet did the same kind of normalization in the original data file, rather than making a separate lookup table. Ideally, you would still normalize such columns and use join queries to look up the original values, but either way, with minimal effort you can get substantial space savings.

Making a Partitioned Table

At this point, the Parquet file format is doing a lot of the heavy lifting to reduce the time for each query by reducing the overall I/O to read the data. If a column is not referenced in the query, Parquet lets the query avoid reading that column entirely, as opposed to text format. For example, in the SAMPLE_DATA and NORMALIZED_TEXT tables we’ve been using, each query reads 6 GB of data for the ZEROFILL column whether or not that column is used at all. And the compressed and encoded form of the column values means much less data is read even for columns that are needed.

Partitioning the table lets us use our domain knowledge of the data and corresponding queries to reduce the I/O even further. If you have not already read the guidelines for partitioned tables in Working with Partitioned Tables, familiarize yourself with those tips before tackling any real-world projects with partitioning.

In this thought experiment, let’s decide that our most common queries will target a subset of users based on the first letter of their names. All else being equal, we could analyze the data for the A users, D users, or X users in about 1/26th of the time it would take to process all users together. In real life, you commonly partition on date-related fields so that you can analyze a certain time period, or on location-related fields so that you can analyze different geographic regions.

Again, because we are going to reorganize several gigabytes of data, let’s first make a view that matches the columns of our partitioned table, with a new INITIAL column that represents the first letter of the name:

[localhost:21000] > desc normalized_parquet;
+-------------+----------+---------+
| name        | type     | comment |
+-------------+----------+---------+
| id          | bigint   |         |
| val         | int      |         |
| zerofill    | string   |         |
| name        | string   |         |
| assertion   | boolean  |         |
| location_id | smallint |         |
+-------------+----------+---------+
Returned 6 row(s) in 0.01s

[localhost:21000] > create view partitioned_normalized_view as
                  > select id, val, zerofill, name, assertion, location_id,
                  > substr(name,1,1) as initial 1
                  > from normalized_parquet;
Returned 0 row(s) in 2.89s

[localhost:21000] > select id, name, initial
                  > from partitioned_normalized_view limit 5;
+-----------+----------------------+---------+
| id        | name                 | initial |
+-----------+----------------------+---------+
| 663027574 | Ckkvvvvvvvmmmmmmm    | C       |
| 663027575 | Fkkkkkkkwwwwwwwyyyyy | F       |
| 663027576 | Orrrrrrrfmmmmm       | O       |
| 663027577 | Peeevvvvvvvvvv       | P       |
| 663027578 | Dmmmmhhhs            | D       |
+-----------+----------------------+---------+
Returned 5 row(s) in 4.65s
1

For partition key columns, we would normally use the verbatim column values from the original data where appropriate. In this case, however, we make a new partition key column by running a function on the original values.

After we’re satisfied that the new INITIAL column has the right values, we create a partitioned table using the PARTITIONED BY clause, and copy the data into it from the unpartitioned table:

[localhost:21000] > create table partitioned_normalized_parquet
                  > (id bigint, val int, zerofill string, name string,
                  > assertion boolean, location_id smallint)
                  > partitioned by (initial string) stored as parquet; 1
Returned 0 row(s) in 1.81s
[localhost:21000] > insert into partitioned_normalized_parquet partition(initial)
                  > select * from partitioned_normalized_view; 2
Inserted 1000000000 rows in 619.28s
1

The INITIAL column is referenced by the PARTITIONED BY clause, not in the regular column list.

2

The SELECT * portion of the INSERT statement requires that the regular columns come first, then any partition key columns last. This is another reason we use a view—to specify the columns in the most convenient order for the INSERT statement.

Now let’s examine how the data is broken down within the partitioned table:

[localhost:21000] > show table stats partitioned_normalized_parquet;
+---------+-------+--------+----------+--------------+---------+
| initial | #Rows | #Files | Size     | Bytes Cached | Format  |
+---------+-------+--------+----------+--------------+---------+
| A       | -1    | 3      | 871.79MB | NOT CACHED   | PARQUET | 1
| B       | -1    | 3      | 871.72MB | NOT CACHED   | PARQUET |
| C       | -1    | 3      | 871.40MB | NOT CACHED   | PARQUET |
| D       | -1    | 3      | 871.64MB | NOT CACHED   | PARQUET |
| E       | -1    | 3      | 871.54MB | NOT CACHED   | PARQUET |
| F       | -1    | 3      | 871.11MB | NOT CACHED   | PARQUET |
| G       | -1    | 3      | 871.29MB | NOT CACHED   | PARQUET |
| H       | -1    | 3      | 871.42MB | NOT CACHED   | PARQUET |
| K       | -1    | 3      | 871.42MB | NOT CACHED   | PARQUET |
| L       | -1    | 3      | 871.31MB | NOT CACHED   | PARQUET |
| M       | -1    | 3      | 871.38MB | NOT CACHED   | PARQUET |
| N       | -1    | 3      | 871.25MB | NOT CACHED   | PARQUET |
| O       | -1    | 3      | 871.14MB | NOT CACHED   | PARQUET |
| P       | -1    | 3      | 871.44MB | NOT CACHED   | PARQUET |
| Q       | -1    | 3      | 871.55MB | NOT CACHED   | PARQUET |
| R       | -1    | 3      | 871.30MB | NOT CACHED   | PARQUET |
| S       | -1    | 3      | 871.50MB | NOT CACHED   | PARQUET |
| T       | -1    | 3      | 871.65MB | NOT CACHED   | PARQUET |
| Y       | -1    | 3      | 871.57MB | NOT CACHED   | PARQUET |
| Z       | -1    | 3      | 871.54MB | NOT CACHED   | PARQUET |
| NULL    | -1    | 1      | 147.30MB | NOT CACHED   | PARQUET | 2
| I       | -1    | 3      | 871.44MB | NOT CACHED   | PARQUET |
| J       | -1    | 3      | 871.32MB | NOT CACHED   | PARQUET |
| U       | -1    | 3      | 871.36MB | NOT CACHED   | PARQUET |
| V       | -1    | 3      | 871.39MB | NOT CACHED   | PARQUET |
| W       | -1    | 3      | 871.79MB | NOT CACHED   | PARQUET |
| X       | -1    | 3      | 871.95MB | NOT CACHED   | PARQUET |
| Total   | -1    | 79     | 22.27GB  | 0B           |         |
+---------+-------+--------+----------+--------------+---------+
Returned 28 row(s) in 0.04s
1

Each partition has less than 1 GB of data.

2

The NULL partition is a reminder that our original data-generating script included some NULL values in the NAME column, which carried over to the INITIAL column we’re using as the partition key. This is something to check for during validation and cleansing operations, to make sure that some rows do not become “orphaned” by having null partition keys that never get matched by the WHERE clauses in your queries.

Now when we run queries that target just one or a few partitions, the query reads 3 files totalling less than 1 GB for each partition that is processed.

Partitioned tables are best for queries that access a small proportion of the total partitions.

[localhost:21000] > select avg(val), min(name), max(name)
                  > from normalized_parquet where substr(name,1,1) =  'Q';
+-------------------+-----------+------------------------+
| avg(val)          | min(name) | max(name)              |
+-------------------+-----------+------------------------+
| 50001.94660836487 | Qaaa      | Qzzzzzzzzzzzzzzzzzzzzz |
+-------------------+-----------+------------------------+
Returned 1 row(s) in 5.74s 1

[localhost:21000] > select avg(val), min(name), max(name)
                  > from partitioned_normalized_parquet where initial =  'Q';
+-------------------+-----------+------------------------+
| avg(val)          | min(name) | max(name)              |
+-------------------+-----------+------------------------+
| 50001.94660836487 | Qaaa      | Qzzzzzzzzzzzzzzzzzzzzz |
+-------------------+-----------+------------------------+
Returned 1 row(s) in 4.75s 2

[localhost:21000] > select avg(val), min(name), max(name)
                  > from normalized_parquet
                  > where substr(name,1,1) between 'A' and 'C';
+------------------+-----------+------------------------+
| avg(val)         | min(name) | max(name)              |
+------------------+-----------+------------------------+
| 49994.3356542968 | Aaaa      | Czzzzzzzzzzzzzzzzzzzzz |
+------------------+-----------+------------------------+
Returned 1 row(s) in 11.65s 3

[localhost:21000] > select avg(val), min(name), max(name)
                  > from partitioned_normalized_parquet
                  > where initial between 'A' and 'C';
+-------------------+-----------+------------------------+
| avg(val)          | min(name) | max(name)              |
+-------------------+-----------+------------------------+
| 49992.47281834851 | Aaaa      | Czzzzzzzzzzzzzzzzzzzzz |
+-------------------+-----------+------------------------+
Returned 1 row(s) in 8.91s 4
1

This query scans the whole table and analyzes the rows where the NAME column starts with a particular letter.

2

An equivalent query that touches one partition in the partitioned table is a little bit faster. It’s not 26 times faster though, due to the arithmetic having to do with block sizes, number of files, number of hosts in the cluster, and number of cores per host. Some of the resources across the cluster might sit idle during a particular query because there is just not enough data to require getting all hosts and cores involved. Here we are with a billion-row table, and still there is not enough data to really demonstrate all the potential performance benefits. On the other hand, the fact that there is still idle capacity is good news for scalability: the cluster could run many other concurrent queries without maxing out the available CPUs or storage devices.

3

This query against the unpartitioned table reads all the data and analyzes all rows where the NAME field starts with one of three different letters.

4

An equivalent query that touches three partitions in the partitioned table is again a little bit faster. The speedup is more noticeable as the volume of data in the table increases, and as the number of partitions increases.

Let’s see what happens with a query that scans the entire table:

[localhost:21000] > select avg(val), min(name), max(name)
                  > from partitioned_normalized_parquet;
+-------------------+-----------+------------------------+
| avg(val)          | min(name) | max(name)              |
+-------------------+-----------+------------------------+
| 49998.04368627915 | Aaaa      | Zzzzzzzzzzzzzzzzzzzzzz |
+-------------------+-----------+------------------------+
Returned 1 row(s) in 69.29s 1
[localhost:21000] > select avg(val), min(name), max(name)
                  > from normalized_parquet;
+-------------------+-----------+------------------------+
| avg(val)          | min(name) | max(name)              |
+-------------------+-----------+------------------------+
| 49998.04368627915 | Aaaa      | Zzzzzzzzzzzzzzzzzzzzzz |
+-------------------+-----------+------------------------+
Returned 1 row(s) in 68.26s 1
1

For a query that does a full-table scan, the partitioned table is actually a little slower than the unpartitioned one. Having to process all the different data files from the partition directories adds a bit of overhead. That’s why it’s important to partition on the columns that you actually use for filtering in your most important and most frequent queries.

Next Steps

At this point, we’ve done a reasonable job of optimizing single-table queries for our billion rows of sample data. From here, there are two other kinds of scenarios to explore:

  • If you know that certain tables or partitions will be queried intensively, you can enable HDFS caching to ensure they are held in memory. To use this feature in production and to understand the performance and scalability aspects typically requires coordinating with your system administrator (see Helping to Plan for Performance (Stats, HDFS Caching)).
  • Going farther with normalization or cross-referencing different kinds of data sets means doing a lot of join queries. Join queries have their own set of performance considerations, as shown in the next section, .

Deep Dive: Joins and the Role of Statistics

When dealing with large and ever-growing tables, Impala can better optimize complex queries (especially join queries) the more it knows about the characteristics of the data, both at the table level and the column level. The Impala SQL statement to collect such information is COMPUTE STATS. Run this statement after loading substantial new data into a table.

Creating a Million-Row Table to Join With

First, we create a table with the same structure as our original billion-row table (Tutorial: The Journey of a Billion Rows). We will take a sample of a million rows from our billion rows of data, then do joins between the big table and the small table:

[localhost:21000] > create table stats_demo like sample_data;
[localhost:21000] > show table stats stats_demo;
+-------+--------+------+--------------+--------+
| #Rows | #Files | Size | Bytes Cached | Format |
+-------+--------+------+--------------+--------+
| -1    | 0      | 0B   | NOT CACHED   | TEXT   |
+-------+--------+------+--------------+--------+
[localhost:21000] > show column stats stats_demo;
+-----------+---------+------------------+--------+----------+----------+
| Column    | Type    | #Distinct Values | #Nulls | Max Size | Avg Size |
+-----------+---------+------------------+--------+----------+----------+
| id        | BIGINT  | -1               | -1     | 8        | 8        | 1
| val       | INT     | -1               | -1     | 4        | 4        |
| zerofill  | STRING  | -1               | -1     | -1       | -1       | 2
| name      | STRING  | -1               | -1     | -1       | -1       |
| assertion | BOOLEAN | -1               | -1     | 1        | 1        |
| city      | STRING  | -1               | -1     | -1       | -1       |
| state     | STRING  | -1               | -1     | -1       | -1       |
+-----------+---------+------------------+--------+----------+----------+
1

Initially, Impala knows basic physical properties based on the data files and the schema, such as the total data size and the sizes of numeric columns, which never vary in length.

2

The -1 numbers indicate properties where Impala does not know the values. The unknown values are most prominent for STRING columns, with values that vary in size.

Loading Data and Computing Stats

In the following example, we load a million rows into the table and collect statistics for the data. To help Impala choose a good query plan for a join involving this table, it’s important to know the characteristics of the various columns.

[localhost:21000] > insert into stats_demo select * from sample_data limit 1000000;
[localhost:21000] > compute stats stats_demo;
+-----------------------------------------+
| summary                                 |
+-----------------------------------------+
| Updated 1 partition(s) and 7 column(s). |
+-----------------------------------------+
[localhost:21000] > show table stats stats_demo;
+---------+--------+---------+--------------+--------+
| #Rows   | #Files | Size    | Bytes Cached | Format |
+---------+--------+---------+--------------+--------+
| 1000000 | 1      | 57.33MB | NOT CACHED   | TEXT   |
+---------+--------+---------+--------------+--------+
[localhost:21000] > show column stats stats_demo;
+-----------+---------+------------------+--------+----------+-------------+
| Column    | Type    | #Distinct Vals | #Nulls | Max Size | Avg Size      |
+-----------+---------+----------------+--------+----------+---------------+
| id        | BIGINT  | 1023244        | -1     | 8        | 8             | 1
| val       | INT     | 139017         | -1     | 4        | 4             |
| zerofill  | STRING  | 101761         | -1     | 6        | 6             |
| name      | STRING  | 1005653        | -1     | 22       | 13.0006999969 | 2 3
| assertion | BOOLEAN | 2              | -1     | 1        | 1             |
| city      | STRING  | 282            | -1     | 16       | 8.78960037231 | 4
| state     | STRING  | 46             | -1     | 20       | 8.40079975128 | 4
+-----------+---------+----------------+--------+----------+---------------+
1

Currently, the number of nulls is not counted because the planner doesn’t use this information.

2

The ID and NAME columns contain essentially unique values. The NAME field tends to be longer than the CITY and STATE fields.

3

The number of distinct values is estimated rather than counted precisely, because the planner only needs a rough estimate to judge whether one approach is faster than another. For example, the estimate for the NAME column is slightly higher than the actual number of rows in the table. Impala automatically adjusts the estimate downward in such a case.

4

The CITY and STATE columns have very few distinct values.

Reviewing the EXPLAIN Plan

In a join query involving tables of different sizes, Impala automatically determines the following:

  • Which tables to read from local storage devices on the data nodes.
  • Which tables are small enough to send in their entirety to each node.
  • Which tables to split up and transmit smaller pieces to different nodes.
  • The optimal order of these operations, to minimize data transmission and the size of the intermediate result sets from each stage of join processing.

You can see the results by looking at the EXPLAIN plan for a query, without the need to actually run it:

[localhost:21000] > explain select count(*) from sample_data join stats_demo
                  > using (id) where substr(sample_data.name,1,1) = 'G';
+--------------------------------------------------------------------+
| Explain String                                                     |
+--------------------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=5.75GB VCores=2            |
| WARNING: The following tables are missing relevant table           |
|          and/or column statistics.                                 | 1
| oreilly.sample_data                                                |
|                                                                    |
| 06:AGGREGATE [MERGE FINALIZE]                                      |
| |  output: sum(count(*))                                           |
| |                                                                  |
| 05:EXCHANGE [UNPARTITIONED]                                        |
| |                                                                  |
| 03:AGGREGATE                                                       |
| |  output: count(*)                                                |
| |                                                                  |
| 02:HASH JOIN [INNER JOIN, BROADCAST]                               |
| |  hash predicates: oreilly.stats_demo.id = oreilly.sample_data.id |
| |                                                                  |
| |--04:EXCHANGE [BROADCAST]                                         |
| |  |                                                               |
| |  00:SCAN HDFS [oreilly.sample_data]                              | 2
| |     partitions=1/1 size=56.72GB                                  |
| |     predicates: substr(sample_data.name, 1, 1) = 'G'             |
| |                                                                  |
| 01:SCAN HDFS [oreilly.stats_demo]                                  | 3
|    partitions=1/1 size=57.33MB                                     |
+--------------------------------------------------------------------+
1

Wait a second. That warning at the top of the plan output reminds us that although we just ran COMPUTE STATS for our new table, we neglected to do it for our oldest (and biggest) table.

2

When Impala reports that it is going to “scan HDFS” for the SAMPLE_DATA table and then “broadcast” the result, that is an expensive network operation: it sends the results from scanning SAMPLE_DATA and extracting the G names to each node to compare and contrast against the STATS_DEMO table. That’s about 1/26th of 56.72 GB (about 2.2 GB) being sent to each of four nodes. It’s preferable to see a small amount of data being broadcast. Maybe we can reduce the amount of network I/O.

3

To understand the flow of the query, you read from bottom to top. (After checking any warnings at the top.) For a join query, you prefer to see the biggest table listed at the bottom, then the smallest, second smallest, third smallest, and so on.

When Impala sees a table with no statistics used in a join query (like SAMPLE_DATA in this case), it treats the table like it is zero-sized, as if it is no problem to send over the network. That is clearly wrong in this case, where SAMPLE_DATA is bigger and has more different values in both the ID and NAME columns referenced in the query.

Let’s collect statistics for the big (billion-row) SAMPLE_DATA table, too, and then try again:

[localhost:21000] > compute stats sample_data; 1
+-----------------------------------------+
| summary                                 |
+-----------------------------------------+
| Updated 1 partition(s) and 7 column(s). |
+-----------------------------------------+
[localhost:21000] > show table stats sample_data;
+------------+--------+---------+--------------+--------+
| #Rows      | #Files | Size    | Bytes Cached | Format |
+------------+--------+---------+--------------+--------+
| 1000000000 | 1      | 56.72GB | NOT CACHED   | TEXT   | 2
+------------+--------+---------+--------------+--------+
[localhost:21000] > show column stats sample_data;
+-----------+---------+----------------+--------+----------+---------------+
| Column    | Type    | #Distinct Vals | #Nulls | Max Size | Avg Size      |
+-----------+---------+----------------+--------+----------+---------------+
| id        | BIGINT  | 183861280      | 0      | 8        | 8             |
| val       | INT     | 139017         | 0      | 4        | 4             |
| zerofill  | STRING  | 101761         | 0      | 6        | 6             |
| name      | STRING  | 145636240      | 0      | 22       | 13.0002002716 | 3
| assertion | BOOLEAN | 2              | 0      | 1        | 1             |
| city      | STRING  | 282            | 0      | 16       | 8.78890037536 |
| state     | STRING  | 46             | 0      | 20       | 8.40139961242 |
+-----------+---------+----------------+--------+----------+---------------+
1

The COMPUTE STATS statement is the key to improving the efficiency of join queries. Now we’ve run it for all tables involved in the join.

2

The key item of information for the table stats is the number of rows.

3

In the column stats, Impala estimates the number of distinct values for each column and examines STRING columns to find the maximum and average length.

[localhost:21000] > explain select count(*) from sample_data join stats_demo
                  > using (id) where substr(sample_data.name,1,1) = 'G';
+--------------------------------------------------------------------+
| Explain String                                                     |
+--------------------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=3.77GB VCores=2            |
|                                                                    |
| 06:AGGREGATE [MERGE FINALIZE]                                      |
| |  output: sum(count(*))                                           |
| |                                                                  |
| 05:EXCHANGE [UNPARTITIONED]                                        |
| |                                                                  |
| 03:AGGREGATE                                                       |
| |  output: count(*)                                                |
| |                                                                  |
| 02:HASH JOIN [INNER JOIN, BROADCAST]                               |
| |  hash predicates: oreilly.sample_data.id = oreilly.stats_demo.id |
| |                                                                  |
| |--04:EXCHANGE [BROADCAST]                                         |
| |  |                                                               |
| |  01:SCAN HDFS [oreilly.stats_demo]                               | 1
| |     partitions=1/1 size=57.33MB                                  |
| |                                                                  |
| 00:SCAN HDFS [oreilly.sample_data]                                 | 2
|    partitions=1/1 size=56.72GB                                     |
|    predicates: substr(sample_data.name, 1, 1) = 'G'                | 3
+--------------------------------------------------------------------+
1

This time, the smaller STATS_DEMO table is broadcast in its entirety to all the four nodes. Instead of sending about 2.2 GB across the network to each node as in the previous query, we’re only sending about 57.33 MB, which is the size of the smaller table. We’ve just improved the efficiency of our query by a factor of about 38, without actually running either the slow or the fast version. That’s much better!

2

The data that’s broadcasted is cross-checked against the big SAMPLE_DATA table. Each of our four nodes will read 1/4 of this table from local storage. For join queries, we always want to see the biggest table at the bottom of the plan, meaning that the data from that table is read locally rather than being sent over the network.

3

We know that most of the 56.72 GB will be ignored and not need to be cross-checked against the other table, because it will not match the predicate that checks for the first letter 'G'. Impala does not yet account for that aspect in the plan numbers. We’ll improve on that as we progress to using partitioned tables.

Trying a Real Query

Just for kicks, let’s try this query out in real life:

[localhost:21000] > select count(*) from sample_data join stats_demo
                  > using (id) where substr(sample_data.name,1,1) = 'G';
+----------+
| count(*) |
+----------+
| 37763    |
+----------+
Returned 1 row(s) in 13.35s

By joining a table of a billion rows with a table of a million rows, we checked a million billion possible combinations. The results came back so fast, there was hardly enough time to play one move in Words with Friends™. (All timing numbers in this book are from a small cluster of modest capacity; I expect you to be able to beat them without much trouble.)

Remember that we demonstrated earlier that text tables are bulkier than they need to be, and we could trim things down and speed things up by converting to Parquet, doing some normalization, and introducing partitioning. Let’s try again with the more efficient tables we set up using that same data. (We don’t expect the count returned by the query to be exactly the same, because we’re taking a random sample of a million rows to copy into the new table.)

[localhost:21000] > create table stats_demo_parquet
                  >   like partitioned_normalized_parquet; 1
Returned 0 row(s) in 1.14s

[localhost:21000] > insert into stats_demo_parquet partition (initial)
                  > [shuffle] select * from partitioned_normalized_parquet 2
                  > limit 1000000;
Inserted 1000000 rows in 39.72s
1

The CREATE TABLE LIKE statement preserves the file format of the original table, so we know the new one will use Parquet format also.

2

We use the [SHUFFLE] hint technique to avoid having each of the four nodes try to allocate 27 GB-sized buffers to write separate data files for all the partition values. The “shuffle” operation takes a little longer, but avoids potential out-of-memory conditions. This is the default Impala uses when a table has no statistics, so strictly speaking, it is only necessary if Impala chooses the wrong execution plan for some reason, such as out-of-date statistics.

Again, we make sure to run the COMPUTE STATS statement for all the tables involved in the join query, after loading the data. In earlier examples with tables like PARTITIONED_NORMALIZED_PARQUET, we saw a little under 1 GB of data in each partition. In the smaller table containing a random sample of the data, each partition contains substantially less data.

[localhost:21000] > compute stats partitioned_normalized_parquet;
+------------------------------------------+
| summary                                  |
+------------------------------------------+
| Updated 26 partition(s) and 6 column(s). |
+------------------------------------------+
Returned 1 row(s) in 54.24s
[localhost:21000] > compute stats stats_demo_parquet;
+------------------------------------------+
| summary                                  |
+------------------------------------------+
| Updated 26 partition(s) and 6 column(s). |
+------------------------------------------+
Returned 1 row(s) in 4.86s
[localhost:21000] > show table stats stats_demo_parquet;
+---------+---------+--------+----------+--------------+---------+
| initial | #Rows   | #Files | Size     | Bytes Cached | Format  |
+---------+---------+--------+----------+--------------+---------+
| A       | 89088   | 1      | 2.34MB   | NOT CACHED   | PARQUET |
| B       | 46080   | 1      | 1.31MB   | NOT CACHED   | PARQUET |
| C       | 219136  | 1      | 5.28MB   | NOT CACHED   | PARQUET |
| D       | 63488   | 1      | 1.77MB   | NOT CACHED   | PARQUET |
| E       | 49152   | 1      | 1.39MB   | NOT CACHED   | PARQUET |
| F       | 32768   | 1      | 960.64KB | NOT CACHED   | PARQUET |
| G       | 11264   | 1      | 336.67KB | NOT CACHED   | PARQUET |
...
| W       | 16384   | 1      | 484.57KB | NOT CACHED   | PARQUET |
| X       | 51200   | 1      | 1.45MB   | NOT CACHED   | PARQUET |
| NULL    | -1      | 1      | 181.73KB | NOT CACHED   | PARQUET |
| Y       | 82944   | 1      | 2.21MB   | NOT CACHED   | PARQUET |
| Z       | 27648   | 1      | 816.00KB | NOT CACHED   | PARQUET |
| Total   | 1000000 | 27     | 26.99MB  | 0B           |         |
+---------+---------+--------+----------+--------------+---------+
Returned 28 row(s) in 0.02s

Now we go through the same exercise as before, running an EXPLAIN statement and examining the amount of data expected to be read from disk and transmitted across the network:

[localhost:21000] > explain select count(*) from partitioned_normalized_parquet
                  > join stats_demo_parquet using (id)
                  > where
                  > substr(partitioned_normalized_parquet.name,1,1) = 'G'; 1
+-------------------------------------------------------------------------+
| Explain String                                                          |
+-------------------------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=194.31MB VCores=2               |
|                                                                         |
| 06:AGGREGATE [MERGE FINALIZE]                                           |
| |  output: sum(count(*))                                                |
| |                                                                       |
| 05:EXCHANGE [UNPARTITIONED]                                             |
| |                                                                       |
| 03:AGGREGATE                                                            |
| |  output: count(*)                                                     |
| |                                                                       |
| 02:HASH JOIN [INNER JOIN, BROADCAST]                                    |
| |  hash predicates: oreilly.partitioned_normalized_parquet.id =         |
| |    oreilly.stats_demo_parquet.id                                      |
| |                                                                       |
| |--04:EXCHANGE [BROADCAST]                                              |
| |  |                                                                    |
| |  01:SCAN HDFS [oreilly.stats_demo_parquet]                            |
| |     partitions=27/27 size=26.99MB                                     | 2
| |                                                                       |
| 00:SCAN HDFS [oreilly.partitioned_normalized_parquet]                   |
|    partitions=27/27 size=22.27GB                                        | 3
|    predicates: substr(partitioned_normalized_parquet.name, 1, 1) = 'G'  | 4
+-------------------------------------------------------------------------+
Returned 21 row(s) in 0.03s

Those “scan” figures at the bottom are looking better than with the text tables.

1

The query does a naive translation of the original query with the SUBSTR() call.

2

We’re going to transmit (“broadcast”) 26.99 MB across the network to each node.

3

We’re going to read 22.27 GB from disk. This is the I/O-intensive part of this query, which occurs on the nodes that hold data blocks from the biggest table. Because we usually read these plans bottom to top, this is the first figure to consider in evaluating if the query is executing the way we want it to.

4

Calling a function in the WHERE clause is not always a smart move, because that function can be called so many times. Now that the first letter is available in a column, maybe it would be more efficient to refer to the INITIAL column.

The following example improves the query for the partitioned table by testing the first letter directly, referencing the INITIAL column instead of calling SUBSTR(). The more we can refer to the partition key columns, the better Impala can ignore all the irrelevant partitions.

[localhost:21000] > explain select count(*) from partitioned_normalized_parquet
                  > join stats_demo_parquet using (id)                  1
                  > where partitioned_normalized_parquet.initial = 'G'; 2
+-----------------------------------------------------------------+
| Explain String                                                  |
+-----------------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=106.31MB VCores=2       |
|                                                                 |
| 06:AGGREGATE [MERGE FINALIZE]                                   |
| |  output: sum(count(*))                                        |
| |                                                               |
| 05:EXCHANGE [UNPARTITIONED]                                     |
| |                                                               |
| 03:AGGREGATE                                                    |
| |  output: count(*)                                             |
| |                                                               |
| 02:HASH JOIN [INNER JOIN, BROADCAST]                            |
| |  hash predicates: oreilly.partitioned_normalized_parquet.id = |
| |    oreilly.stats_demo_parquet.id                              |
| |                                                               |
| |--04:EXCHANGE [BROADCAST]                                      |
| |  |                                                            |
| |  01:SCAN HDFS [oreilly.stats_demo_parquet]                    |
| |     partitions=27/27 size=26.99MB                             |
| |                                                               |
| 00:SCAN HDFS [oreilly.partitioned_normalized_parquet]           |
|    partitions=1/27 size=871.29MB                                | 2
+-----------------------------------------------------------------+
Returned 20 row(s) in 0.02s
1

Our join clause is USING(id) because all the corresponding rows have matching ID values.

2

By replacing the SUBSTR() call with a reference to the partition key column, we really chopped down how much data has to be read from disk in the first phase: now it’s less than 1 GB instead of 22.27 GB.

We happen to know (although Impala doesn’t know) that rows with the same ID value will also have the same INITIAL value. Let’s add INITIAL to the USING clause and see if that helps.

[localhost:21000] > explain select count(*) from partitioned_normalized_parquet
                  > join stats_demo_parquet using (id,initial) 1
                  > where partitioned_normalized_parquet.initial = 'G'; 
+-----------------------------------------------------------------+
| Explain String                                                  |
+-----------------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=98.27MB VCores=2        |
|                                                                 |
| 06:AGGREGATE [MERGE FINALIZE]                                   |
| |  output: sum(count(*))                                        |
| |                                                               |
| 05:EXCHANGE [UNPARTITIONED]                                     |
| |                                                               |
| 03:AGGREGATE                                                    |
| |  output: count(*)                                             |
| |                                                               |
| 02:HASH JOIN [INNER JOIN, BROADCAST]                            |
| |  hash predicates: oreilly.partitioned_normalized_parquet.id = |
| |     oreilly.stats_demo_parquet.id,                            |
| |     oreilly.partitioned_normalized_parquet.initial =          |
| |     oreilly.stats_demo_parquet.initial                        |
| |                                                               |
| |--04:EXCHANGE [BROADCAST]                                      |
| |  |                                                            |
| |  01:SCAN HDFS [oreilly.stats_demo_parquet]                    |
| |     partitions=1/27 size=336.67KB                             | 2
| |                                                               |
| 00:SCAN HDFS [oreilly.partitioned_normalized_parquet]           |
|    partitions=1/27 size=871.29MB                                |
+-----------------------------------------------------------------+
Returned 20 row(s) in 0.02s
1

Now the USING clause references two columns that must both match in both tables.

2

Now instead of transmitting 26.99 MB (the entire smaller table) across the network, we’re transmitting 336.67 KB, the size of the G partition in the smaller table.

This looks really promising. We’ve gone from transmitting gigabytes across the network for each query, to under a megabyte. Again, even as we iterated through several variations of the query, we didn’t have to actually try them and run the risk of executing a really slow, resource-intensive one.

The Story So Far

Just to recap, we took the following optimization steps, starting from our original bulky text table:

  1. Converted the data to Parquet file format.
  2. Normalized the data to reduce redundancy.
  3. Partitioned the data to quickly locate ranges of values.
  4. Computed the stats for both tables involved in the join query.
  5. Referenced the partition key columns wherever practical in the query itself, especially in the join and WHERE clauses.
  6. Used EXPLAIN to get an idea of the efficiency of possible queries as we iterated through several alternatives.

Final Join Query with 1B x 1M Rows

Now let’s see how the query performs in real life after going through several iterations of fine-tuning it and checking the EXPLAIN plan:

[localhost:21000] > select count(*) from partitioned_normalized_parquet
                  > join stats_demo_parquet using (id,initial)
                  > where partitioned_normalized_parquet.initial = 'G';
+----------+
| count(*) |
+----------+
| 11264    |
+----------+
Returned 1 row(s) in 1.87s

That’s a million billion potential combinations being evaluated in less than 2 seconds, on a 4-node cluster with modest hardware specs. (For example, these nodes have 48 GB of memory each, which is much less than in a typical Impala cluster.)

Anti-Pattern: A Million Little Pieces

One common anti-pattern to avoid is what’s known as the “many small files” problem. Hadoop, HDFS, and Impala are all optimized to work with multimegabyte files. Ingesting data that was not originally organized for Hadoop can result in a profusion of tiny data files, leading to suboptimal performance even though the volume of data being read is small. The overhead of distributing a parallel query across a cluster isn’t worthwhile if the data is fragmented into a few kilobytes or even a few megabytes per file.

The techniques you want to avoid are:

  • Running a sequence of INSERT ... VALUES statements, especially with a single item in the VALUES clause. If you need to build up a data file line by line, use a technique outside of Impala such as running Sqoop or Flume, or writing your own data-generation program (possibly running it as a MapReduce job).
  • Partitioning down to the most granular level possible, so that the table contains thousands or tens of thousands of partitions, and each partition has only a tiny amount of data. Sometimes, Impala tables do best with one less level of partitioning than you might be used to, such as year and month rather than year, month, and day.
  • Inserting into a table with lots of partitions, using a dynamic INSERT ... SELECT statement. The dynamic form of this statement divides the data among multiple partitions at runtime, based on values in the SELECT query. The INSERT goes faster if you specify the partition key values as constants and operate on one partition at a time.

Ways to avoid or recover from this kind of problem include:

  • If you create a lookup table with a predictable set of hardcoded values, do it with a single VALUES clause:

    INSERT INTO canada_regions VALUES
      ("Newfoundland and Labrador" ,"NL"),
      ("Prince Edward Island","PE"),
      ("New Brunswick","NB"), ("Nova Scotia","NS"),
      ("Quebec","PQ"), ("Ontario","ON"),
      ("Manitoba","MB"), ("Saskatchewan","SK"), ("Alberta","AB"),
      ("British Columbia","BC"), ("YT","Yukon"),
      ("Northwest Territories","NT"), ("Nunavut","NU");

    This technique generates a single data file; although it’s still tiny in comparison to the 128 MB block size in HDFS, it’s more efficient than a dozen separate data files containing one row each!

  • If you have a table with an inefficient file layout, coalesce the data by copying the entire contents to a different table with an INSERT ... SELECT operation. The output data files will be reorganized based on the number of nodes in your cluster and the number of cores per node.
  • When loading into a partitioned table, where practical, insert the data one partition at a time:
INSERT INTO sales_data PARTITION (year=2014, month=07)
  SELECT customer, product, amount, purchase_date FROM raw_data
  WHERE year = 2014 AND month = 07;
  • You can minimize disruption from coalescing data into a new table by pointing all your reporting queries at a view and switching the table that’s accessed by the view:

    CREATE VIEW production_report AS SELECT ... FROM original_table WHERE ...;
    INSERT INTO optimized_table SELECT * FROM original_table;
    COMPUTE STATS optimized_table;
    ALTER VIEW production_report AS SELECT ... FROM optimized_table WHERE ...;

    This way, all your query-intensive applications can refer to a consistent name, even if you reorganize the data behind the scenes. The new table could use the Parquet file format, partitioning, or more compact data types than the original.

  • When inserting into a partitioned table, have accurate table and column statistics on the table holding the original data. Use the SHOW TABLE STATS and SHOW COLUMN STATS to check if the stats are present and accurate (particularly the “number of rows” figure in the table statistics). Use the COMPUTE STATS statement to collect the statistics if that information is missing or substantially different from the current contents of the source table.
  • When doing an insert operation across multiple partitions in a Parquet table, consider using the [SHUFFLE] hint on the INSERT ... SELECT statement. This hint makes the INSERT statement take longer, but reduces the number of output files generated. This technique can both avoid the “many small files” problem, and reduce the memory usage during the INSERT statement. (In the latest releases, Impala applies the [SHUFFLE] hint automatically if necessary, so this tip mainly applies to older Impala instances.)
INSERT INTO partitioned_parquet_table PARTITION (year, month, region)
  [SHUFFLE] SELECT c1, c2, c3, year, month, region FROM new_batch_of_raw_data;

Tutorial: Across the Fourth Dimension

One challenge in every programming language, operating system, or storage format is how to represent and manipulate date-based and time-based values. Let’s look at how this works in Impala.

TIMESTAMP Data Type

In Impala, the one-stop shop for any temporal value is the TIMESTAMP data type. It can represent a date, a time, or both. It is stored in a consistent numeric format, relative to the Coordinated Universal Time (UTC) time zone to avoid issues with time zone translation. You can use TIMESTAMP as the data type for a table column, and pass or return values of that type using various built-in functions.

It has been traditional in Hadoop to represent date and time values as strings, and convert to a binary representation behind the scenes. Impala prefers to make TIMESTAMP a first-class data type; thus, some date- and time-related functions carried over from Hive have both STRING and TIMESTAMP variants in Impala.

Format Strings for Dates and Times

Impala recognizes strings with the format YYYY-MM-DD HH:MM:SS.sssssssss and can automatically convert those to TIMESTAMP values. A date or a time is allowed by itself, and the fractional second portion is optional for time values. To turn a string in some other format into a TIMESTAMP requires a two-step process: convert to an integer value with the unix_timestamp() function, which takes a string format argument; then convert that integer back into a TIMESTAMP.

The following example shows how a string 2014-12-01 in the standard notation can be directly converted to a TIMESTAMP, while the string 2014/12/01 requires converting to an integer and then back to a TIMESTAMP:

[localhost:21000] > select cast('2014-12-01' as timestamp);
+---------------------------------+
| cast('2014-12-01' as timestamp) |
+---------------------------------+
| 2014-12-01 00:00:00             |
+---------------------------------+
[localhost:21000] > select unix_timestamp('2014/12/01','yyyy/MM/dd'),
+--------------------------------------------+
| unix_timestamp('2014/12/01', 'yyyy/mm/dd') |
+--------------------------------------------+
| 1417392000                                 |
+--------------------------------------------+
[localhost:21000] > select from_unixtime(
                  >   unix_timestamp('2014/12/01','yyyy/MM/dd')
                  > );
+-----------------------------------------------------------+
| from_unixtime(unix_timestamp('2014/12/01', 'yyyy/mm/dd')) |
+-----------------------------------------------------------+
| 2014-12-01 00:00:00                                       |
+-----------------------------------------------------------+
[localhost:21000] > select from_unixtime(
                  >   unix_timestamp('12/01/2014','MM/dd/yyyy')
                  > );
+-----------------------------------------------------------+
| from_unixtime(unix_timestamp('12/01/2014', 'mm/dd/yyyy')) |
+-----------------------------------------------------------+
| 2014-12-01 00:00:00                                       |
+-----------------------------------------------------------+

Working with Individual Date and Time Fields

Sometimes it’s convenient to have access to the individual date and time fields. For example, if your table is partitioned by year and month, you can’t just designate a TIMESTAMP value as the partition key, because then there would be a different partition for every hour, minute, second, and even microsecond. The table needs separate YEAR and MONTH columns, even if it also preserves the full date and time information as a TIMESTAMP column.

The way to get the separate fields is through the EXTRACT() function (new in Impala 1.4). It’s important to keep these values as integer types—ideally, the smallest applicable ones such as TINYINT for anything up to 127, and SMALLINT for anything up to 32767—so they can be represented compactly in memory. That’s another reason to avoid storing dates as strings, even though it might be convenient to represent months by their names, or days with leading zeros. It’s easy to overlook this optimization tip, because you might not notice any storage savings on disk if you use text data files (where string and numeric values consume equal space) or partitioned tables (where the partition key columns are used as directory names, so string and numeric values are represented the same way). The storage and performance benefits become apparent when billions or trillions of these values are being compared, stored in hash tables in memory, or transmitted across the network between different machines in the cluster.

This example shows how you can pull out each individual field from a TIMESTAMP value. We make a tiny lookup table with the symbolic names of all of the fields for easy reference later:

CREATE TABLE UNITS (granularity TINYINT, unit STRING);
INSERT INTO units VALUES (1,'year'),(2,'month'),(3,'day'),(4,'hour'),
  (5,'minute'),(6,'second'),(7,'millisecond'),

-- Get each date and time field from a single TIMESTAMP value.
SELECT unit, extract(now(), unit) FROM units ORDER BY granularity;
+-------------+----------------------+
| unit        | extract(now(), unit) |
+-------------+----------------------+
| year        | 2014                 |
| month       | 7                    |
| day         | 9                    |
| hour        | 13                   |
| minute      | 26                   |
| second      | 52                   |
| millisecond | 608                  |
+-------------+----------------------+

Date and Time Arithmetic

The TRUNC() function truncates a TIMESTAMP value down to the next lower year, week, day, quarter, and so on. This is a very useful technique for condensing a large number of date and time values down to a predictable number of combinations, either for doing GROUP BY queries or using the truncated values as partition key columns.

INTERVAL expressions let you add and subtract specific date and time increments to TIMESTAMP values. Any time you need to calculate a delta value (such as when an online auction ends), you can compute the appropriate TIMESTAMP by adding or subtracting some number of days, weeks, months, hours, and so on. You can chain a series of INTERVAL additions and subtractions to create a very precise delta value.

For example, you might strip off the time portion of a TIMESTAMP value so that you were left with just the date. Then you could add an INTERVAL expression to add back a specific time. Or you could use other kinds of INTERVAL addition or subtraction to create specific dates and times for reminders, deadlines, or other relative kinds of temporal values.

-- Get just the current date, no time.
[localhost:21000] > select trunc(now(), 'DD')
                  > as "first thing this morning";
+--------------------------+
| first thing this morning |
+--------------------------+
| 2014-07-09 00:00:00      |
+--------------------------+

[localhost:21000] > select trunc(now(), 'DD') + interval 8 hours
                  > as "8 AM this morning";
+---------------------+
| 8 am this morning   |
+---------------------+
| 2014-07-09 08:00:00 |
+---------------------+

[localhost:21000] > select now() + interval 2 weeks
                  > as "2 weeks from right now";
+-------------------------------+
| 2 weeks from right now        |
+-------------------------------+
| 2014-07-23 15:11:01.526788000 |
+-------------------------------+

[localhost:21000] > select trunc(now(), 'DD') - interval 2 days + interval 15 hours
                  > as "3 PM, the day before yesterday";
+--------------------------------+
| 3 pm, the day before yesterday |
+--------------------------------+
| 2014-07-07 15:00:00            |
+--------------------------------+

Note

Always double-check the unit argument when using the TRUNC() function, because the argument values and some of their meanings differ from the arguments to EXTRACT(). In particular, the 'DAY' argument to TRUNC() truncates to the first day of the week, while DD truncates to the current day.

Let’s Solve the Y2K Problem

Whenever I look at a new technology for storing and manipulating data, I ask myself whether that technology makes it more or less likely to run into Y2K-style problems. The Y2K problem arose because people designed data processing applications under the assumption that year values could be stored as 2 digits, with an implicit base year of 1900. This issue became critical in the year 2000, when the 2-digit years could no longer be used for date arithmetic. I judge software systems based on how easy it is to correct such problems if developers make assumptions that later turn out to be wrong. This kind of flexibility is one of the key strengths of the Hadoop software stack.

The root of the Y2K problem was a desire to save money on expensive disk storage by saving two bytes per date field. Could such cost considerations still occur today? Hmm, if there are a billion rows, each extra byte represents another gigabyte of storage. Imagine a big web property with hundreds of millions of customers, and for each of those customers, you have to record a birthday, date joined, date of last visit, and so on. Two bytes per date field per customer adds up to a substantial number of gigabytes. Although everyone now knows not to leave the century out of year values, developers might still cut corners in their schema design for cost reasons, and those bad decisions might come back to bite them later.

As a thought experiment, let’s construct a scenario with some Y2K-style bad data, and see how we could solve it in Impala.

Note

This is a simplified example, not a comprehensive treatment of the subject. Your mileage may vary. No warranty express or implied.

We start off with a data file constructed way back in the 20th century, with some names and birth dates in MM-DD-YY format, and whether the person is still living:

$ cat >20th_century.dat
John Smith,06-04-52,false
Jane Doe,03-22-76,true
^D

In the pre-Hadoop days, the original code parsed the birth date values as 2-digit or 2-character values, and filled in the 19 prefix whenever it needed to print any reports or do any arithmetic for the birth dates. Now, the company (which has a minimum age limit of 14) is just starting to sign up its first customers born in the 21st century. The new data file uses 4-digit years:

$ cat >2014_new_customers.dat
Adam Millennial,01-01-2000,true
Maria Sanchez,03-29-2001,true
^D

With Impala, there are several ways to solve this problem. Let’s look at ways to make use of our SQL expertise (as opposed to just editing the original text data files):

CREATE TABLE inconsistent_data (name STRING, birthdate STRING, living BOOLEAN)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ",";
... Use 'hdfs dfs -put' command to move data files into appropriate Hadoop
... directories as demonstrated in earlier examples.

-- Make Impala aware of the newly added data files.
REFRESH inconsistent_data;
SELECT * FROM inconsistent_data;
+-----------------+------------+--------+
| name            | birthdate  | living |
+-----------------+------------+--------+
| Adam Millennial | 01-01-2000 | true   |
| Maria Sanchez   | 03-29-2001 | true   |
| John Smith      | 06-04-52   | false  |
| Jane Doe        | 03-22-76   | true   |
+-----------------+------------+--------+

At this point, we have a mixture of good and bad date values represented as strings. We’ll construct some expressions to parse out the different month, day, and year portions. As we build a set of useful queries to transform the original values through a series of steps, we’ll save each query as a view to keep each query readable and avoid a single monster query.

CREATE VIEW customer_data_separate_fields AS
  SELECT
   name,
   regexp_extract(birthdate,'([[:digit:]]+)-([[:digit:]]+)-([[:digit:]]+)', 1) month,
   regexp_extract(birthdate,'([[:digit:]]+)-([[:digit:]]+)-([[:digit:]]+)', 2) day,
   regexp_extract(birthdate,'([[:digit:]]+)-([[:digit:]]+)-([[:digit:]]+)', 3) year,
   living
  FROM inconsistent_data;
SELECT * FROM customer_data_separate_fields;
+-----------------+-------+-----+------+--------+
| name            | month | day | year | living |
+-----------------+-------+-----+------+--------+
| Adam Millennial | 01    | 01  | 2000 | true   |
| Maria Sanchez   | 03    | 29  | 2001 | true   |
| John Smith      | 06    | 04  | 52   | false  |
| Jane Doe        | 03    | 22  | 76   | true   |
+-----------------+-------+-----+------+--------+

The next step is to convert the separated-out fields to integer types instead of strings. Then we can do arithmetic on the dates.

CREATE VIEW customer_data_int_fields AS
  SELECT name, cast(month AS TINYINT) month,
    cast(day AS TINYINT) day,
    cast(year AS SMALLINT) year,
    living
    FROM customer_data_separate_fields;

Last, we identify the year values that were originally given as 2 digits, and convert those to 4-digit values from the 20th century. Any NULL values are passed through unchanged. Any year greater than 2 digits is passed through unchanged. (For simplicity, let’s stipulate that this company does not have any customers born in the 1800s or earlier.)

CREATE VIEW customer_data_full_years AS
  SELECT name, month, day,
    CASE
      WHEN year IS NULL THEN NULL
      WHEN year < 100 THEN year + 1900
      ELSE year
    END
    AS year,
    living
  FROM customer_data_int_fields;

Here we made a logical arrangement of the data that is more flexible and easier to extend and analyze. Even without changing the underlying data files, we accounted for 2-digit and 4-digit year values; we split up the original 3-part strings into separate fields; and we made the year, month, and day values into integers so that we could do arithmetic on them.

We can query the views to analyze the data in its cleaned up and reorganized form. Here we use a LIMIT clause to cap the number of rows returned, in case the back office loaded millions more rows in the meantime:

-- Doublecheck that the data is OK.
SELECT * FROM customer_data_full_years LIMIT 100;
+-----------------+-------+-----+------+--------+
| name            | month | day | year | living |
+-----------------+-------+-----+------+--------+
| John Smith      | 6     | 4   | 1952 | false  |
| Jane Doe        | 3     | 22  | 1976 | true   |
| Adam Millennial | 1     | 1   | 2000 | true   |
| Maria Sanchez   | 3     | 29  | 2001 | true   |
+-----------------+-------+-----+------+--------+

After running more queries to double-check that the data is entirely clean, we could make the new improved schema permanent and convert all the existing data files to a compact binary format. As we see in other examples using the Parquet format, the savings from this compression step are likely much greater than could be obtained by shortchanging the year values.

CREATE TABLE modernized_customer_data
  STORED AS PARQUET
  AS SELECT * FROM customer_data_full_years;

More Fun with Dates

The Impala TIMESTAMP data type has a range that starts on January 1, 1400 AD. Thus, for anything before that date, you would store separate integer fields for the year, month, day, and any time-related fields, rather than a single TIMESTAMP value that includes both date and time.

Applying the principle of using the smallest practical integer type, that means MONTH and DAY could always be TINYINT columns, and YEAR would depend on the time scale involved. Historians could use SMALLINT for their YEAR column to record years back to –32768 BC. Paleontologists could use INT to date fossils back to –2147483648 BC. And cosmologists could use BIGINT to chart time from the Big Bang to the future Big Crunch or heat death of the universe.

Pro Tip

The examples for schema evolution (Tutorial: When Schemas Evolve) show ways to deal with data where it is not immediately clear whether the existing values fit into the range for a particular integer type.

Tutorial: Verbose and Quiet impala-shell Output

In this book, I switch between verbose output in impala-shell when I need to show timing information for queries, or quiet mode for demonstrating features unrelated to performance. By default, an impala-shell session looks like this:

$ impala-shell -i localhost -d oreilly
...
[localhost:21000] > create table foo (x int);
Query: create table foo (x int); 1
Returned 0 row(s) in 1.13s 2
[localhost:21000] > select x from foo;
Query: select x from foo;
Returned 0 row(s) in 0.19s
1

The way the statement is echoed back as a single line lets you copy and paste it, which is most useful for multiline statements that are hard to capture due to the continuation prompts.

2

The time measurement is useful when you’re comparing the performance of different query techniques and table structures, logging the output of a sequence of statements, or running the same statements multiple times to check if performance is consistent across runs.

A “quiet” session looks like this, without the query echoed back or the elapsed time for the query:

$ impala-shell -i localhost -d oreilly --quiet
[localhost:21000] > create table bar (s string);
[localhost:21000] > select s from bar;

This more compact form lets you see what’s happening without all the extra informational messages.

The -B option produces an even more compact output style, with no ASCII boxes around the query results. You can think of -B as the “benchmark” option, because if all you want to do is get the results as fast as possible, suppressing the boxes lets impala-shell display the results much faster. The -B option is often used in combination with -q (run a single query) or -f (run all the statements in a file), for benchmarking, setup scripts, or any kinds of automated jobs.

This example runs a single SHOW TABLES statement and then massages the results to produce a set of DROP TABLE statements, which are then stored in a .sql script file:

$ impala-shell -B --quiet -q 'show tables in oreilly' | 
  sed -e 's/^/drop table /' | sed -e 's/$/;/' | 
  tee drop_all_tables.sql
drop table bar;
drop table billion_numbers;
drop table billion_numbers_compacted;
...

This example runs a sequence of statements from an input file. Here we leave out the --quiet option because we are interested in the output showing the original query, and the time taken. We include the -d option to specify the database where all the queries should run, so that we do not need to use fully qualified table names.

$ impala-shell -d oreilly -B -f benchmark.sql
...some startup banner messages...
Query: use oreilly
Query: select count(*) from canada_facts
13
Returned 1 row(s) in 0.21s
Query: select count(*) from canada_regions
13
Returned 1 row(s) in 0.19s
Query: select count(*) from usa_cities
289
Returned 1 row(s) in 0.19s

Tutorial: When Schemas Evolve

One of the tenets of Hadoop is “schema on read,” meaning that you’re not required to do extensive planning up front about how your data is laid out, and you’re not penalized if you later need to change or fine-tune your original decisions. Historically, this principle has clashed with the traditional SQL model where a CREATE TABLE statement defines a precise layout for a table, and data is reorganized to match this layout during the load phase. Impala bridges these philosophies in clever ways:

  • Impala lets you define a schema for data files that you already have and immediately begin querying that data with no change to the underlying raw files.
  • Impala does not require any length constraints for strings. No more trying to predict how much room to allow for the longest possible name, address, phone number, product ID, and so on.
  • In the simplest kind of data file (using text format), fields can be flexibly interpreted as strings, numbers, timestamps, or other kinds of values.
  • Impala allows data files to have more or fewer columns than the corresponding table. It ignores extra fields in the data file, and returns NULL if fields are missing from the data file. You can rewrite the table definition to have more or fewer columns and mix and match data files with the old and new column definitions.
  • You can redefine a table to have more columns, fewer columns, or different data types at any time. The data files are not changed in any way.
  • In a partitioned table, if newer data arrives in a different file format, you can change the definition of the table only for certain partitions, rather than going back and reformatting or converting all the old data.
  • Impala can query data files stored outside its standard data repository. You could even point multiple tables (with different column definitions) at the same set of data files—for example, to treat a certain value as a string for some queries and a number for other queries.

The benefits of this approach include more flexibility, less time and effort spent converting data into a rigid format, and less resistance to the notion of fine-tuning the schema as needs change and you gain more experience. For example, if a counter exceeds the maximum value for an INT, you can promote it to a BIGINT with minimal hassle. If you originally stored postal codes or credit card numbers as integers and later received data values containing dashes or spaces, you could switch those columns to strings without reformatting the original data.

For example, the SAMPLE_DATA table used in several earlier examples has a column named ZEROFILL containing 6-digit integer values, including leading zeros where needed so that every value really has 6 characters. That field could be used to represent an encoded value where each digit or group of digits has some meaning, as with a credit card or telephone number. Treating that column as a STRING data type makes it easier to do SUBSTR() calls to pull out the first 3 digits, last 4 digits, or search and replace to get rid of optional punctuation characters, in the case of a phone number. Other times, it might be preferable to treat that column as a number, for example, to construct the next sequential value. Or what seems like a small range of values might later turn out to be a larger one, so you might initially treat it as a SMALLINT but then later change the column type to INT or BIGINT.

Regardless of how the column is defined, you can always use CAST() to convert its values to a different type during a query. What is the “best” type is a question of convenience, query readability, and efficiency. (Remember, your queries will likely process millions or billions of rows, so any unnecessary type conversions can add considerable overhead.) And when you convert to a binary file format such as Parquet or Avro, numbers can be stored more compactly than strings, potentially saving gigabytes of disk space for each byte you can remove from all the rows.

Impala lets you try out all these representations to see which one works best in practice. When the data is in a relatively unstructured file format, such as a delimited text file, you can make unlimited changes to the types and names of columns. Farther along the data pipeline, when the data files are in a structured format such as Parquet or Avro, the table schema is embedded in each data file and the changes you can make are more limited. For example, with Parquet you can change a column’s type between TINYINT, SMALLINT, and INT, but not between other types such as STRING or TIMESTAMP.

You could also discover that some fields supplied in the data aren’t really needed and so remove them from the table definition, or that new fields are useful and so add those to the table definition and any new data files. These techniques work in all file formats, but apply only to the last columns in the table, so define any optional or less-important columns last.

Numbers Versus Strings

In the following example, we first treat the ZEROFILL column as a string (its original definition) to find values starting with 0123:

SELECT zerofill FROM sample_data
  WHERE zerofill LIKE '0123%' LIMIT 5;
+----------+
| zerofill |
+----------+
| 012330   |
| 012372   |
| 012350   |
| 012301   |
| 012327   |
+----------+
Returned 5 row(s) in 0.57s

Next, we change the ZEROFILL column to a number, finding examples of even values, doing some arithmetic with the values, and ignoring the leading zeros:

ALTER TABLE sample_data
  CHANGE zerofill zerofill INT;
SELECT zerofill AS even, zerofill+1 AS odd
  FROM sample_data
  WHERE zerofill % 2 = 0 LIMIT 5;
+----------+--------+
| even     | odd    |
+----------+--------+
| 3838     | 3839   |
| 97464    | 97465  |
| 87046    | 87047  |
| 12158    | 12159  |
| 55478    | 55479  |
+----------+--------+
Returned 5 row(s) in 0.31s

Finally, we change ZEROFILL back to a string for some regular expression matching, to find values containing a sequence of three 1 digits.

Tip

The CHANGE clause repeats the name ZEROFILL twice because it also sets a new name for the column; when changing only the data type, specify the same name again.

ALTER TABLE sample_data
  CHANGE zerofill zerofill STRING;
SELECT zerofill FROM sample_data
  WHERE zerofill REGEXP '1{3}' LIMIT 5;
+----------+
| zerofill |
+----------+
| 081116   |
| 031110   |
| 091118   |
| 011138   |
| 061110   |
+----------+
Returned 5 row(s) in 0.56s

Dealing with Out-of-Range Integers

If your table has values that are out of range for the specified integer type, they will be returned as the maximum value for the type. Thus, if you see numbers that bump up against the top of the range, you might need a bigger type for that column. Here is how you might deal with integer values where you do not know in advance whether their range will fit into the column type for an existing table.

Setup: Construct a table with some values that do not “fit” into the type of an integer column.

The column X starts off as a TINYINT, which can only hold a very limited range of values (–128 to 127).

CREATE TABLE unknown_range (x BIGINT);
INSERT INTO unknown_range VALUES (-50000), (-4000), (0), (75), (33000);
ALTER TABLE unknown_range CHANGE x x TINYINT;
Problem: We don’t know if the –128 and 127 values are real, or signify out-of-range numbers.

We call the MIN_TINYINT() and MAX_TINYINT() functions, and later the equivalents for other types, so that we don’t have to remember the exact ranges.

SELECT x FROM unknown_range LIMIT 10;
+------+
| x    |
+------+
| -128 |
| -128 |
| 0    |
| 75   |
| 127  |
+------+

SELECT count(x) AS "Suspicious values" FROM unknown_range
  WHERE x IN (min_tinyint(), max_tinyint());
+-------------------+
| suspicious values |
+-------------------+
| 3                 |
+-------------------+
Solution: Increase the size of the column and check against the allowed range, until there are no more suspicious values.
ALTER TABLE unknown_range CHANGE x x SMALLINT;
SELECT x FROM unknown_range LIMIT 10;
+--------+
| x      |
+--------+
| -32768 |
| -4000  |
| 0      |
| 75     |
| 32767  |
+--------+
SELECT count(x) AS "Suspicious values" FROM unknown_range
  WHERE x IN (min_smallint(), max_smallint());
+-------------------+
| suspicious values |
+-------------------+
| 2                 |
+-------------------+

ALTER TABLE unknown_range CHANGE x x INT;
SELECT x FROM unknown_range;
+--------+
| x      |
+--------+
| -50000 |
| -4000  |
| 0      |
| 75     |
| 33000  |
+--------+

SELECT count(x) AS "Suspicious values" FROM unknown_range
  WHERE x IN (min_smallint(), max_smallint());
+-------------------+
| suspicious values |
+-------------------+
| 0                 |
+-------------------+

At this point, you know the column is a large enough type to hold all the existing values without being larger than necessary and wasting space on disk and in memory.

Just as a refresher, here are the ranges for the different integer types:

[localhost:21000] > select min_bigint(), max_bigint();
+----------------------+---------------------+
| min_bigint()         | max_bigint()        |
+----------------------+---------------------+
| -9223372036854775808 | 9223372036854775807 |
+----------------------+---------------------+
[localhost:21000] > select min_int(), max_int();
+-------------+------------+
| min_int()   | max_int()  |
+-------------+------------+
| -2147483648 | 2147483647 |
+-------------+------------+
[localhost:21000] > select min_smallint(), max_smallint();
+----------------+----------------+
| min_smallint() | max_smallint() |
+----------------+----------------+
| -32768         | 32767          |
+----------------+----------------+
[localhost:21000] > select min_tinyint(), max_tinyint();
+---------------+---------------+
| min_tinyint() | max_tinyint() |
+---------------+---------------+
| -128          | 127           |
+---------------+---------------+

If you need a larger integer than MAX_BIGINT(), you can define a DECIMAL(n). The maximum value for n is 38, which can hold up to 999999… (9 repeated 38 times).

Tutorial: Levels of Abstraction

SQL shares some of the convenience of functional programming languages, where the end result is built from multiple layers, each performing some easily understood transformation. Whatever result you get from a query, you can enhance the results further by running individual columns through an additional function, or layering another query on top by using a WITH clause or a subquery, or pushing down the complexity by turning the query into a view.

String Formatting

In this example, we received some string data that is not in the optimal format. It is in all lowercase, and it has double quotes around the values, which is not appropriate for Impala text data. We run the string columns through a regular expression function to remove leading and trailing quotation marks. Then we run the result through another formatting function to capitalize the first letter. After finding the right combination of functions to achieve the desired output, we embed the details in a view, which hides the complexity of the function calls and makes subsequent queries more readable.

SELECT * FROM bad_format;
+------------+-----------+
| first_name | last_name |
+------------+-----------+
| "john"     | "smith"   |
| "jane"     | "doe"     |
+------------+-----------+

SELECT regexp_replace(first_name,'(^"|"$)','') AS first_name
  FROM bad_format;
+------------+
| first_name |
+------------+
| john       |
| jane       |
+------------+

SELECT initcap(regexp_replace(first_name,'(^"|"$)','')) AS first_name
  FROM bad_format;
+------------+
| first_name |
+------------+
| John       |
| Jane       |
+------------+

CREATE VIEW good_format AS
  SELECT initcap(regexp_replace(first_name,'(^"|"$)','')) AS first_name,
    initcap(regexp_replace(last_name,'(^"|"$)','')) AS last_name
  FROM bad_format;

SELECT * FROM good_format;
+------------+-----------+
| first_name | last_name |
+------------+-----------+
| John       | Smith     |
| Jane       | Doe       |
+------------+-----------+

Temperature Conversion

This example uses a subquery in the WITH clause to evaluate a temperature conversion formula and then runs calculations on the converted values. This is a handy technique to avoid repeating complicated expressions multiple times. Because the WITH clause does not create any permanent object, you avoid cluttering the namespace with new tables or views.

WITH celsius_temps AS
  (SELECT (degrees_f - 32) * 5 / 9 AS degrees_c FROM fahrenheit_temps)
SELECT min(degrees_c), max(degrees_c), avg(degrees_c) FROM celsius_temps;

This example encodes the Fahrenheit-to-Celsius conversion formula in a view, then filters the converted data by querying the view, referring only to the Celsius values.

CREATE VIEW celsius_temps AS SELECT (degrees_f - 32) * 5 / 9 AS degrees_c,
  year, month, day, location FROM fahrenheit_temps;
SELECT max(degrees_c), min(degrees_c) FROM celsius_temps
  WHERE year = 1999 AND degrees_c BETWEEN -40 and 40;

This example builds another view on top of the first one, to take a numeric value and do some string formatting to make it suitable for use in a report. The final query doesn’t need to know anything about the original Fahrenheit values or the raw numbers used in the report.

CREATE VIEW celsius_pretty_printed AS
  SELECT concat(cast(degrees_c as string)," degrees Celsius") AS degrees_c,
  year, month, day, location FROM celsius_temps;
SELECT degrees_c, year, month, day location FROM celsius_pretty_printed
  WHERE year = 1998 ORDER BY year, month, day;
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.124.49