The following sections cover aspects of Impala that deserve a closer look. Brief examples illustrate interesting features for new users. More complex topics are covered by tutorials or deep dives into the inner workings.
Here is what your first Unix command-line session might look like when you’re using Impala. This example from a Bash shell session creates a couple of text files (which could be named anything), copies those files into the HDFS filesystem, and points an Impala table at the data so that it can be queried through SQL. The exact HDFS paths might differ based on your HDFS configuration and Linux users.
$cat >csv.txt 1,red,apple,4 2,orange,orange,4 3,yellow,banana,3 4,green,apple,4
^D $cat >more_csv.txt 5,blue,bubblegum,0.5 6,indigo,blackberry,0.2 7,violet,edible flower,0.01 8,white,scoop of vanilla ice cream,3 9,black,licorice stick,0.2
^D $hadoop fs -mkdir /user/hive/staging
$hadoop fs -put csv.txt /user/hive/staging
$hadoop fs -put more_csv.txt /user/hive/staging
Sometimes the user you are logged in as does not have permission to manipulate HDFS files. In that case, issue the commands with the permissions of the hdfs
user, using the form:
sudo -u hdfs hadoop fs arguments
Now that the data files are in the HDFS filesystem, let’s go into the Impala shell and start working with them. (Some of the prompts and output are abbreviated here for easier reading by first-time users.) This example creates a new database, in case this experiment turns into a project with a whole set of related tables. Then we create a table inside this database, move the data files into the table, and run some queries.
$impala-shell
>create database food_colors;
>use food_colors;
>create table food_data (id int, color string, food string, weight float) row format delimited fields terminated by ',';
> -- Here's where we move the data files from an arbitrary -- HDFS location to under Impala control. >load data inpath '/user/hive/staging' into table food_data;
Query finished, fetching results ... +----------------------------------------------------------+ | summary | +----------------------------------------------------------+ | Loaded 2 file(s). Total files in destination location: 2 | +----------------------------------------------------------+ >select food, color as "Possible Color" from food_data where food = 'apple';
Query finished, fetching results ... +-------+----------------+ | food | possible color | +-------+----------------+ | apple | red | | apple | green | +-------+----------------+ Returned 2 row(s) in 0.66s >select food as "Top 5 Heaviest Foods", weight from food_data order by weight desc limit 5;
Query finished, fetching results ... +----------------------------+----------------------+ | top 5 heaviest foods | weight | +----------------------------+----------------------+ | orange | 4 | | apple | 4 | | apple | 4 | | scoop of vanilla ice cream | 3 | | banana | 3 | +----------------------------+----------------------+ Returned 5 row(s) in 0.49s >quit;
Back in the Unix shell, see how the CREATE DATABASE
and CREATE TABLE
statements created some new directories and how the LOAD DATA
statement moved the original data files into an Impala-managed directory:
$ hadoop fs -ls -R /user/hive/warehouse/food_colors.db
drwxrwxrwt - impala hive 0 2013-08-29 16:14 /user/h
ive/warehouse/food_colors.db/food_data
-rw-rw-rw- 3 hdfs hive 66 2013-08-29 16:12 /user/h
ive/warehouse/food_colors.db/food_data/csv.txt
-rw-rw-rw- 3 hdfs hive 139 2013-08-29 16:12 /user/h
ive/warehouse/food_colors.db/food_data/more_csv.txt
In one easy step, you’ve gone from a collection of human-readable text files to a SQL table that you can query using standard, widely known syntax. The data is automatically replicated and distributed across a cluster of networked machines by virtue of being put into an HDFS directory.
These same basic techniques scale up to enormous tables with billions of rows. By that point, you would likely be using a more compact and efficient data format than plain text files, and you might include a partitioning clause in the CREATE TABLE
statement to split up the data files by date or category. Don’t worry, you can easily upgrade your Impala tables and rearrange the data as you learn the more advanced Impala features. In fact, that’s the subject of a later tutorial: Tutorial: The Journey of a Billion Rows.
To understand how Impala works at the extreme ends of the spectrum, let’s consider for a moment the least intensive queries we could run. Impala does not have a built-in trivial table like the DUAL
table in Oracle. Instead, to get back a single-row result of an expression, you construct a query with all constant expressions or function calls in the SELECT
list, and leave off the FROM
clause. Here are sample queries for you to run in the impala-shell
interpreter; see if you can predict the results:
SELECT 1; SELECT "hello world"; SELECT 2+2; SELECT 10 > 5; SELECT now();
You can use this very valuable table-less SELECT
technique for experimenting with the detailed semantics of Impala functions, data types and casting, NULL
handling, and all kinds of expressions.
These examples illustrate numeric expressions and arithmetic:
SELECT 1 + 0.5; SELECT 1 / 3; SELECT 1e6, 1.5e6; SELECT 30000 BETWEEN min_smallint() AND max_smallint(); +-------------------------------------------------+ | 30000 between min_smallint() and max_smallint() | +-------------------------------------------------+ | true | +-------------------------------------------------+
The results of those floating-point expressions are more precise in Impala 1.4 and later than in previous releases, due to the introduction of the DECIMAL
type.
These examples illustrate type conversions:
SELECT cast(1e6 AS string); SELECT cast(true AS string); SELECT cast(99.44 AS int);
These examples illustrate what happens when NULL
is used in various contexts:
SELECT 1 + NULL, 1 = NULL, 1 > NULL, NULL = NULL, NULL IS NULL; SELECT cast(NULL AS STRING), cast(NULL AS BIGINT), cast(NULL AS BOOLEAN);
These examples illustrate string literals and string functions:
SELECT 'hello world'; SELECT "abc 012345 xyz"; SELECT concat('hello',NULL); SELECT substr('hello',-2,2);
These examples illustrate regular expression comparisons and functions:
SELECT 'abc123xyz' REGEXP '[[:digit:]]{3}'; +-------------------------------------+ | 'abc123xyz' regexp '[[:digit:]]{3}' | +-------------------------------------+ | true | +-------------------------------------+ SELECT regexp_extract('>>>abc<<<','.*([a-z]+)',1); +----------------------------------------------+ | regexp_extract('>>>abc<<<', '.*([a-z]+)', 1) | +----------------------------------------------+ | abc | +----------------------------------------------+ SELECT regexp_replace('123456','(2|4|6)','x'), +------------------------------------------+ | regexp_replace('123456', '(2|4|6)', 'x') | +------------------------------------------+ | 1x3x5x | +------------------------------------------+
This example illustrates date/time expressions, functions, and arithmetic:
SELECT now() + INTERVAL 3 DAYS + INTERVAL 5 HOURS; +--------------------------------------------+ | now() + interval 3 days + interval 5 hours | +--------------------------------------------+ | 2014-08-03 16:48:44.201018000 | +--------------------------------------------+
These types of queries can help you construct or debug the individual pieces of a complicated query. For example, you typically run a simple test to confirm that you have the right regex notation, function arguments, format strings, and so on, before applying a regular expression or date calculation to billions of rows.
Queries with no FROM
clause are not subject to the limits imposed by the admission control feature on the number of concurrent queries; they are not parallelized and distributed across multiple cluster nodes. Instead, all work is done on the coordinator node. In terms of resource management, Impala still allocates a default amount of memory (several megabytes) for each such query, because even a query without a table might evaluate some enormously complicated expression or call a complex user-defined function.
Now that we’ve seen how you can try out basic SQL features with no table or a tiny table, let’s get serious. In the Big Data field, there’s little point experimenting with thousands or even millions of rows. That volume of data is effectively the same as the smallest tables you could construct, so you don’t really learn much about distributed queries or Impala performance.
Let’s set up a table with a billion rows and see what we can learn. With a billion rows, by definition we’ll be working with gigabytes of data. Any inefficiency or wasted storage will be easy to spot. Any query that processes those gigabytes in only a few seconds will be cause for celebration.
First, let’s generate a billion rows of general-purpose data. I intentionally chose an old-school technique, a simple Python script generating a single big file, to illustrate how Impala bridges the world of traditional Unix and databases, and the new Hadoop world of Big Data and distributed parallel computing.
#! /usr/bin/python """ multicols.py: Generate an arbitrary number of rows with random values. """ import sys from random import * # --- # Load a list of the biggest US cities (289 of them), # to pick a random city/state combination for an address. usa_cities = [] def load_cities(): global usa_cities lines = [line.rstrip() for line in open("usa_cities.lst").readlines()] usa_cities = [line.split(",") for line in lines] def random_city(): if usa_cities == []: load_cities() which = randint(0,len(usa_cities)-1) return usa_cities[which] if __name__ == '__main__': # Produce text format data with different kinds of separators. possible_separators = { "pipe": "|", "comma": ",", "csv": ",", "ctrl-a": "x01", "hash": "#", "bang": "!", "tab": " ", "tsv": " " } # Accept number of rows to generate as command-line argument. try: count = int(sys.argv[1]) except: count = 1 # For random numeric values, define upper bound as another command-line argument. # By default, values are 0-99999. try: upper = int(sys.argv[2]) except: upper = 99999 # Accept mnemonic for separator characters as command-line argument. try: sep_arg = sys.argv[3] sep = possible_separators[sep_arg] except: # If no separator is specified, fall back to the Impala default. sep = "x01" # Generate requested number of rows of data. for i in xrange(count): # Column 1 is a sequential integer, starting from 1. c1 = str(i+1) # Column 2 is a random integer, from 0 to the specified upper bound. # 10% of the time, we substitute a NULL value instead of a number. chance = randint(1,10) % 10; if chance == 0: c2 = r"N" else: c2 = str(randint(0,upper)) # Column 3 is another random integer, but formatted with leading # zeroes to exactly 6 characters. c3 = str(randint(0,upper)).zfill(6) # Column 4 is a random string, from 4-22 characters. # It is an initial capital letter, followed by 3 sequences of repeating letters. # 1% of the time, we substitute a NULL value instead of a string. chance = randint(1,100) % 100; if chance == 0: c4 = r"N" else: cap = chr(randint(65,90)) string1 = chr(randint(97,122)) * randint(1,7) string2 = chr(randint(97,122)) * randint(1,7) string3 = chr(randint(97,122)) * randint(1,7) c4 = cap + string1 + string2 + string3 # Column 5 is a random Boolean value. # It's true 2/3 of the time, false 1/3 of the time. bool = randint(0,2) if bool == 0: c5 = "false" else: c5 = "true" # We figure out a random city and state to use for a location field. (city,state) = random_city() c6 = city c7 = state # Concatenate all the fields and print. row = (c1 + sep + c2 + sep + c3 + sep + c4 + sep + c5 + sep + c6 + sep + c7) print row
Impala is flexible in terms of the sequence of operations. You can prepare the data first and even bring it into HDFS, and then construct a database table that matches the structure of the data. Or you can use the traditional sequence of creating the table first and then preparing the data based on the table schema. For this exercise, we’ll prepare the data on the local filesystem, then set up the database table in Impala, and then move the data into the expected location in HDFS.
First we run the script to produce the random data. On my Linux server, this takes an hour or more. (This is a good illustration of why it’s so attractive to parallelize operations involving so much data.)
$ python multicols.py 1000000000 99999 comma >billion_rows.csv
Within the impala-shell
interpreter, we create a table, which will contain a billion rows after the data files go into HDFS. The attributes of the table (file format, and separator character for text format) match what we used in the raw data files.
$ impala-shell -i localhost [localhost:21000] >create database oreilly;
[localhost:21000] >use oreilly;
[localhost:21000] >create table sample_data
>(id bigint, val int, zerofill string, name string,
>assertion boolean, city string, state string)
>row format delimited fields terminated by ",";
[localhost:21000] >desc sample_data;
+-----------+---------+---------+ | name | type | comment | +-----------+---------+---------+ | id | bigint | | | val | int | | | zerofill | string | | | name | string | | | assertion | boolean | | | city | string | | | state | string | | +-----------+---------+---------+
Now we need to get the data into the right location in HDFS. To figure out where the data should go, we use the DESCRIBE FORMATTED
statement:
[localhost:21000] > describe formatted sample_data;
...
| # Detailed Table Information | NULL
| Database: | oreilly
| Owner: | jrussell
| CreateTime: | Fri Jul 18 16:25:06 PDT 2014
| LastAccessTime: | UNKNOWN
| Protect Mode: | None
| Retention: | 0
| Location: | hdfs://a1730.abcde.example.com:8020
| | /user/impala/warehouse/oreilly.db/
| | sample_data
| Table Type: | MANAGED_TABLE
...
Armed with this knowledge, we can run Linux utilities (or various kinds of Hadoop jobs) that deposit data in the appropriate HDFS directory. In this example, we do that from inside impala-shell
using the !
command, which invokes an arbitrary Linux command.
[localhost:21000] >!hdfs dfs -put billion_rows.csv
>'/user/impala/warehouse/oreilly.db/sample_data';
Impala needs a reminder, in the form of a REFRESH
statement, whenever data files are added or changed outside of Impala SQL statements such as CREATE TABLE AS SELECT
or INSERT
. At that point, we can query the table and see that the billion rows have arrived:
[localhost:21000] >refresh sample_data;
[localhost:21000] >select count(*) from sample_data;
+------------+ | count(*) | +------------+ | 1000000000 | +------------+ Returned 1 row(s) in 45.31s
Now we’ve got a billion rows to play with, using all the familiar SQL techniques. Let’s try some simple queries that we know will produce small result sets:
[localhost:21000] >select max(name) from sample_data;
+------------------------+ | max(name) | +------------------------+ | Zzzzzzzzzzzzzzzzzzzzzz | +------------------------+ Returned 1 row(s) in 50.73s [localhost:21000] >select min(name) as first_in_alpha_order, assertion
>from sample_data group by assertion;
+----------------------+-----------+ | first_in_alpha_order | assertion | +----------------------+-----------+ | Aaaa | true | | Aaaa | false | +----------------------+-----------+ Returned 2 row(s) in 37.35s [localhost:21000] >select avg(val), min(name), max(name) from sample_data
>where name between 'A' and 'D';
+-------------------+-----------+------------------------+ | avg(val) | min(name) | max(name) | +-------------------+-----------+------------------------+ | 49992.47281834851 | Aaaa | Czzzzzzzzzzzzzzzzzzzzz | +-------------------+-----------+------------------------+ Returned 1 row(s) in 12.77s [localhost:21000] >select count(name) as num_names, assertion
>from sample_data group by assertion;
+-----------+-----------+ | num_names | assertion | +-----------+-----------+ | 660023128 | true | | 329974394 | false | +-----------+-----------+ Returned 2 row(s) in 45.61s
What’s behind the scenes with these billion rows? We started with one big CSV file and we put it straight into the Hadoop filesystem. The SHOW TABLE STATS
statement displays the physical characteristics of the table:
[localhost:21000] > show table stats sample_data;
+-------+--------+---------+--------------+--------+
| #Rows | #Files | Size | Bytes Cached | Format |
+-------+--------+---------+--------------+--------+
| -1 | 1 | 56.72GB | NOT CACHED | TEXT |
+-------+--------+---------+--------------+--------+
Returned 1 row(s) in 0.01s
The whole big file is being read every time we do a query, which explains why the queries all take several seconds or more.
At this point, let your inquisitive hacker imagination run free. If there is some way to reduce the data size by several gigabytes, would that translate into seconds shaved off each of these queries? Yes, it would. How about if we could arrange the data so it didn’t have to be entirely read for each query? Yes, that would speed up the queries proportionally: read 1/10th as much data, take roughly 1/10th as much time as the original query.
Are we losing out on parallelism by having just one file? Not really, because it’s so big:
How are we going to shrink the data? First, let’s do a bit of normalization. The CITY
and STATE
fields only have 289 values total, representing the largest cities in the USA. We could move repeated strings such as “California” and “Mississippi” out of the data file and replace them with small integers.
[localhost:21000] >select avg(length(city)) + avg(length(state))
>from sample_data;
+----------------------------------------+ | avg(length(city)) + avg(length(state)) | +----------------------------------------+ | 17.190299006 | +----------------------------------------+ Returned 1 row(s) in 15.18s [localhost:21000] >quit;
We’ll replace those with a single number, between 1 and 3 digits. So we could expect to save roughly 15–16 gigabytes of disk space, by replacing 18 characters (CITY
and STATE
plus comma delimiter) with 2–3 digit numbers.
After we get the data into Impala, we can use SQL skills to slice and dice it in all sorts of ways. Let’s set up a tiny lookup table with all the city and state data, and then make a new table with the original data in normalized form. Before transforming the data, we’ll use a view to double-check the correctness of the join query that pulls out the normalized values.
Recall that we started with a simple list of CITY,STATE
values in usa_cities.lst
. To use this data as a lookup table, we need a file with numeric IDs. That’s easy to prepare with basic Unix commands: just take the output of cat -n
, trim off leading spaces, and turn the tab after the line number into our separator character (comma).
$ cat -n usa_cities.lst | sed -e 's/ /,/' | sed -e 's/^ *//' | tee usa_cities.csv
Now we pick up back in the impala-shell
interpreter, inside the oreilly
database where we’re running these experiments. Again, we load the data file into the right HDFS directory, finding the location with DESCRIBE FORMATTED
and running the hdfs
command from inside impala-shell
:
$impala-shell -i localhost -d oreilly
[localhost:21000] >describe formatted usa_cities;
... | Location: | hdfs://a1730.abcde.example.com:8020/user/impala/warehouse | | /oreilly.db/usa_cities ... [localhost:21000] >!hdfs dfs -put usa_cities.csv
>'/user/impala/warehouse/oreilly.db/usa_cities';
[localhost:21000] >refresh usa_cities;
[localhost:21000] >select count(*) from usa_cities;
+----------+ | count(*) | +----------+ | 289 | +----------+ [localhost:21000] >show table stats usa_cities;
+-------+--------+--------+--------------+--------+ | #Rows | #Files | Size | Bytes Cached | Format | +-------+--------+--------+--------------+--------+ | -1 | 1 | 6.44KB | NOT CACHED | TEXT | +-------+--------+--------+--------------+--------+ Returned 1 row(s) in 0.01s [localhost:21000] >select * from usa_cities limit 5;
+----+--------------+--------------+ | id | city | state | +----+--------------+--------------+ | 1 | New York | New York | | 2 | Los Angeles | California | | 3 | Chicago | Illinois | | 4 | Houston | Texas | | 5 | Philadelphia | Pennsylvania | +----+--------------+--------------+
Before doing any resource-intensive operation like reorganizing the original 56 GB table, I always double-check the logic first using a view, which helps to avoid typing long queries over and over. Let’s make sure that the CITY
and STATE
data from the original table match up with the values from the new lookup table:
[localhost:21000] >create view normalized_view as
>select one.id, one.val, one.zerofill, one.name,
>one.assertion, two.id as location_id
>from sample_data one join usa_cities two
>on (one.city = two.city and one.state = two.state);
[localhost:21000] >select one.id, one.location_id,
>two.id, two.city, two.state
>from normalized_view one join usa_cities two
>on (one.location_id = two.id)
>limit 5;
+----------+-------------+-----+-----------+------------+ | id | location_id | id | city | state | +----------+-------------+-----+-----------+------------+ | 15840253 | 216 | 216 | Denton | Texas | | 15840254 | 110 | 110 | Fontana | California | | 15840255 | 250 | 250 | Gresham | Oregon | | 15840256 | 200 | 200 | Waco | Texas | | 15840257 | 165 | 165 | Escondido | California | +----------+-------------+-----+-----------+------------+ Returned 5 row(s) in 0.42s [localhost:21000] >select id, city, state from sample_data
>where id in (15840253, 15840254, 15840255, 15840256, 15840257);
+----------+-----------+------------+ | id | city | state | +----------+-----------+------------+ | 15840253 | Denton | Texas | | 15840254 | Fontana | California | | 15840255 | Gresham | Oregon | | 15840256 | Waco | Texas | | 15840257 | Escondido | California | +----------+-----------+------------+ Returned 5 row(s) in 5.27s
The view gets some columns from the original SAMPLE_DATA
table, but retrieves CITY
and STATE
from the small USA_CITIES
lookup table.
The join query pulls CITY
and STATE
from the small lookup table by way of the view.
The final query confirms that the results are the same when CITY
and STATE
come from the original SAMPLE_DATA
table.
Now we’re satisfied that the join query in the view pulls out the correct combination of CITY
, STATE
, and ID
values from the lookup table. So let’s create a version of our billion-row table that matches the layout of the view, with the CITY
and STATE
columns replaced by a single numeric LOCATION_ID
:
[localhost:21000] >create table normalized_text
>row format delimited fields terminated by ","
>as select * from normalized_view;
+----------------------------+ | summary | +----------------------------+ | Inserted 1000000000 row(s) | +----------------------------+ Returned 1 row(s) in 422.06s [localhost:21000] >select * from normalized_text limit 5;
+-----------+-------+----------+------------------+-----------+-------------+ | id | val | zerofill | name | assertion | location_id | +-----------+-------+----------+------------------+-----------+-------------+ | 921623839 | 95546 | 001301 | Pwwwwwbbe | false | 217 | | 921623840 | 38224 | 018053 | Clldddddddll | true | 127 | | 921623841 | 73153 | 032797 | Csssijjjjjj | true | 124 | | 921623842 | 35567 | 094193 | Uhhhhhrrrrrrvvv | false | 115 | | 921623843 | 4694 | 051840 | Uccccqqqqqbbbbbb | true | 138 | +-----------+-------+----------+------------------+-----------+-------------+ [localhost:21000] >show table stats normalized_text;
+-------+--------+---------+--------------+--------+ | #Rows | #Files | Size | Bytes Cached | Format | +-------+--------+---------+--------------+--------+ | -1 | 4 | 42.22GB | NOT CACHED | TEXT | +-------+--------+---------+--------------+--------+
When we do join queries to display the original city and state names in reports, that is a perfect illustration of the “broadcast” join technique: the lookup table that’s only a few KB will be transmitted to each node and cross-referenced against the data from the big table as that larger data set is read from local disk storage.
How come when we asked for 5 rows with the LIMIT 5
clause, we didn’t always get the first 5 rows from the table? Some of those queries returned rows with IDs in the range of 15 million or even 921 million.
Remember that as the experiment progressed, the new tables we created had progressively more and more data files: first 4, then 64. Each of our 4 nodes was working on a subset of data, and whichever node came up with its 5 rows first, those are the rows we saw. Even when there was just a single 56 GB file, our 4 nodes were working in parallel on the individual 128 MB data blocks carved out of the original file, and the arbitrary rows we asked for could come back from any of those blocks.
Whenever you expect rows to be returned in a particular order, include an
clause in the outermost block of a query, or in the query that references the view. The SQL standard does not guarantee sorted results from an ORDER BY
in a subquery or a view definition.ORDER BY
We’ve just saved several gigabytes of disk space and earned the gratitude of the IT department. Shall we stop there? No, we’re on a roll now!
When you read the official Impala documentation, you’ll see guidelines saying to use the Parquet file format for all your sizeable tables. This file format uses some clever tricks to shrink the overall size of the data and reduce I/O during queries. Let’s give that a try:
[localhost:21000] >create table normalized_parquet stored as parquet
>as select * from normalized_text;
+----------------------------+ | summary | +----------------------------+ | Inserted 1000000000 row(s) | +----------------------------+ Returned 1 row(s) in 183.63s [localhost:21000] >select count(*) from normalized_parquet;
+------------+ | count(*) | +------------+ | 1000000000 | +------------+ Returned 1 row(s) in 2.63s [localhost:21000] >show table stats normalized_parquet;
+-------+--------+---------+--------------+---------+ | #Rows | #Files | Size | Bytes Cached | Format | +-------+--------+---------+--------------+---------+ | -1 | 64 | 23.34GB | NOT CACHED | PARQUET | +-------+--------+---------+--------------+---------+ Returned 1 row(s) in 0.01s
As you gain more experience with Impala queries and performance tuning, you will start to get a warm, fuzzy feeling when you see the STORED AS PARQUET
clause in CREATE TABLE
statements.
Wow, we just reduced the size of the table again—by almost 20 more gigabytes this time. (Again, the 3x replication factor means we saved another 60 GB in total across the cluster).
The ultimate goal is for queries to be faster, so let’s see how the various tables we constructed perform with the same queries. We’ll run identical queries on the original 56 GB text table, the 42 GB normalized text table, and finally the 23 GB normalized Parquet table, expecting each to be faster than the preceding one:
[localhost:21000] > select max(name) from sample_data; +------------------------+ | max(name) | +------------------------+ | Zzzzzzzzzzzzzzzzzzzzzz | +------------------------+ Returned 1 row(s) in 50.73s [localhost:21000] > select max(name) from normalized_text; +------------------------+ | max(name) | +------------------------+ | Zzzzzzzzzzzzzzzzzzzzzz | +------------------------+ Returned 1 row(s) in 24.15s [localhost:21000] > select max(name) from normalized_parquet; +------------------------+ | max(name) | +------------------------+ | Zzzzzzzzzzzzzzzzzzzzzz | +------------------------+ Returned 1 row(s) in 20.19s [localhost:21000] > select avg(val), min(name), max(name) from sample_data > where name between 'A' and 'D'; +-------------------+-----------+------------------------+ | avg(val) | min(name) | max(name) | +-------------------+-----------+------------------------+ | 49992.47281834851 | Aaaa | Czzzzzzzzzzzzzzzzzzzzz | +-------------------+-----------+------------------------+ Returned 1 row(s) in 26.36s [localhost:21000] > select avg(val), min(name), max(name) from normalized_text > where name between 'A' and 'D'; +-------------------+-----------+------------------------+ | avg(val) | min(name) | max(name) | +-------------------+-----------+------------------------+ | 49992.47281834851 | Aaaa | Czzzzzzzzzzzzzzzzzzzzz | +-------------------+-----------+------------------------+ Returned 1 row(s) in 21.17s [localhost:21000] > select avg(val), min(name), max(name) from normalized_parquet > where name between 'A' and 'D'; +-------------------+-----------+------------------------+ | avg(val) | min(name) | max(name) | +-------------------+-----------+------------------------+ | 49992.47281834851 | Aaaa | Czzzzzzzzzzzzzzzzzzzzz | +-------------------+-----------+------------------------+ Returned 1 row(s) in 12.11s
The SAMPLE_DATA
is the biggest table, in text format with redundant string data. The queries for this table are
the slowest.
The NORMALIZED_TEXT
table is somewhat smaller, still in text format. The queries for this table are somewhat
faster because of its smaller size, resulting in less I/O.
The NORMALIZED_PARQUET
table is the smallest. The queries for this table are the fastest, because the overall
data is smaller still, and Parquet reduces I/O even more by reading only the columns needed by the query.
As I ran and reran these queries in my test environment, the times jumped up and down a bit, because sometimes the Linux servers had cached some of the data after reading it the first time. There again, having the data in the most compact format possible increases the chance that the data will still be cached later, instead of being evicted by reading data that’s bulkier than necessary.
As a final experiment with file formats, let’s see what happens if we convert the original 56 GB table directly to Parquet without the normalization step. Because we are not eliminating the redundant string data, we can predict that the overall size will be somewhere between the original 56 GB and the 23.34 GB of the NORMALIZED_PARQUET
table:
localhost:21000] >create table denormalized_parquet stored as parquet as
>select * from sample_data;
+----------------------------+ | summary | +----------------------------+ | Inserted 1000000000 row(s) | +----------------------------+ Returned 1 row(s) in 225.69s [localhost:21000] >show table stats denormalized_parquet;
+-------+--------+---------+--------------+---------+ | #Rows | #Files | Size | Bytes Cached | Format | +-------+--------+---------+--------------+---------+ | -1 | 64 | 24.04GB | NOT CACHED | PARQUET | +-------+--------+---------+--------------+---------+ Returned 1 row(s) in 0.01s
Why isn’t there a bigger size difference like there was in text format? When the data was converted to Parquet, it was compacted (encoded) in multiple ways before the final compression step. One trick Parquet uses is to take columns with up to 16K of different values, and internally de-duplicate them, substituting numeric indexes instead of repeated strings. (That technique is known as dictionary encoding.) In a sense, Parquet did the same kind of normalization in the original data file, rather than making a separate lookup table. Ideally, you would still normalize such columns and use join queries to look up the original values, but either way, with minimal effort you can get substantial space savings.
At this point, the Parquet file format is doing a lot of the heavy lifting to reduce the time for each query by reducing the overall I/O to read the data. If a column is not referenced in the query, Parquet lets the query avoid reading that column entirely, as opposed to text format. For example, in the SAMPLE_DATA
and NORMALIZED_TEXT
tables we’ve been using, each query reads 6 GB of data for the ZEROFILL
column whether or not that column is used at all. And the compressed and encoded form of the column values means much less data is read even for columns that are needed.
Partitioning the table lets us use our domain knowledge of the data and corresponding queries to reduce the I/O even further. If you have not already read the guidelines for partitioned tables in Working with Partitioned Tables, familiarize yourself with those tips before tackling any real-world projects with partitioning.
In this thought experiment, let’s decide that our most common queries will target a subset of users based on the first letter of their names. All else being equal, we could analyze the data for the A
users, D
users, or X
users in about 1/26th of the time it would take to process all users together. In real life, you commonly partition on date-related fields so that you can analyze a certain time period, or on location-related fields so that you can analyze different geographic regions.
Again, because we are going to reorganize several gigabytes of data, let’s first make a view that matches the columns of our partitioned table, with a new INITIAL
column that represents the first letter of the name:
[localhost:21000] >desc normalized_parquet;
+-------------+----------+---------+ | name | type | comment | +-------------+----------+---------+ | id | bigint | | | val | int | | | zerofill | string | | | name | string | | | assertion | boolean | | | location_id | smallint | | +-------------+----------+---------+ Returned 6 row(s) in 0.01s [localhost:21000] >create view partitioned_normalized_view as
>select id, val, zerofill, name, assertion, location_id,
>substr(name,1,1) as initial
>from normalized_parquet;
Returned 0 row(s) in 2.89s [localhost:21000] >select id, name, initial
>from partitioned_normalized_view limit 5;
+-----------+----------------------+---------+ | id | name | initial | +-----------+----------------------+---------+ | 663027574 | Ckkvvvvvvvmmmmmmm | C | | 663027575 | Fkkkkkkkwwwwwwwyyyyy | F | | 663027576 | Orrrrrrrfmmmmm | O | | 663027577 | Peeevvvvvvvvvv | P | | 663027578 | Dmmmmhhhs | D | +-----------+----------------------+---------+ Returned 5 row(s) in 4.65s
After we’re satisfied that the new INITIAL
column has the right values, we create a partitioned table using the PARTITIONED BY
clause, and copy the data into it from the unpartitioned table:
[localhost:21000] >create table partitioned_normalized_parquet
>(id bigint, val int, zerofill string, name string,
>assertion boolean, location_id smallint)
>partitioned by (initial string) stored as parquet;
Returned 0 row(s) in 1.81s [localhost:21000] >insert into partitioned_normalized_parquet partition(initial)
>select * from partitioned_normalized_view;
Inserted 1000000000 rows in 619.28s
The INITIAL
column is referenced by the PARTITIONED BY
clause, not in the regular column list.
The SELECT *
portion of the INSERT
statement requires that the regular columns come first, then any partition key columns last. This is another reason we use a view—to specify the columns in the most convenient order for the INSERT
statement.
Now let’s examine how the data is broken down within the partitioned table:
[localhost:21000] > show table stats partitioned_normalized_parquet;
+---------+-------+--------+----------+--------------+---------+
| initial | #Rows | #Files | Size | Bytes Cached | Format |
+---------+-------+--------+----------+--------------+---------+
| A | -1 | 3 | 871.79MB | NOT CACHED | PARQUET |
| B | -1 | 3 | 871.72MB | NOT CACHED | PARQUET |
| C | -1 | 3 | 871.40MB | NOT CACHED | PARQUET |
| D | -1 | 3 | 871.64MB | NOT CACHED | PARQUET |
| E | -1 | 3 | 871.54MB | NOT CACHED | PARQUET |
| F | -1 | 3 | 871.11MB | NOT CACHED | PARQUET |
| G | -1 | 3 | 871.29MB | NOT CACHED | PARQUET |
| H | -1 | 3 | 871.42MB | NOT CACHED | PARQUET |
| K | -1 | 3 | 871.42MB | NOT CACHED | PARQUET |
| L | -1 | 3 | 871.31MB | NOT CACHED | PARQUET |
| M | -1 | 3 | 871.38MB | NOT CACHED | PARQUET |
| N | -1 | 3 | 871.25MB | NOT CACHED | PARQUET |
| O | -1 | 3 | 871.14MB | NOT CACHED | PARQUET |
| P | -1 | 3 | 871.44MB | NOT CACHED | PARQUET |
| Q | -1 | 3 | 871.55MB | NOT CACHED | PARQUET |
| R | -1 | 3 | 871.30MB | NOT CACHED | PARQUET |
| S | -1 | 3 | 871.50MB | NOT CACHED | PARQUET |
| T | -1 | 3 | 871.65MB | NOT CACHED | PARQUET |
| Y | -1 | 3 | 871.57MB | NOT CACHED | PARQUET |
| Z | -1 | 3 | 871.54MB | NOT CACHED | PARQUET |
| NULL | -1 | 1 | 147.30MB | NOT CACHED | PARQUET |
| I | -1 | 3 | 871.44MB | NOT CACHED | PARQUET |
| J | -1 | 3 | 871.32MB | NOT CACHED | PARQUET |
| U | -1 | 3 | 871.36MB | NOT CACHED | PARQUET |
| V | -1 | 3 | 871.39MB | NOT CACHED | PARQUET |
| W | -1 | 3 | 871.79MB | NOT CACHED | PARQUET |
| X | -1 | 3 | 871.95MB | NOT CACHED | PARQUET |
| Total | -1 | 79 | 22.27GB | 0B | |
+---------+-------+--------+----------+--------------+---------+
Returned 28 row(s) in 0.04s
Each partition has less than 1 GB of data.
The NULL
partition is a reminder that our original data-generating script included some NULL
values in the NAME
column, which carried over to the INITIAL
column we’re using as the partition key. This is something to check for during validation and cleansing operations, to make sure that some rows do not become “orphaned” by having null partition keys that never get matched by the WHERE
clauses in your queries.
Now when we run queries that target just one or a few partitions, the query reads 3 files totalling less than 1 GB for each partition that is processed.
Partitioned tables are best for queries that access a small proportion of the total partitions.
[localhost:21000] >select avg(val), min(name), max(name)
>from normalized_parquet where substr(name,1,1) = 'Q';
+-------------------+-----------+------------------------+ | avg(val) | min(name) | max(name) | +-------------------+-----------+------------------------+ | 50001.94660836487 | Qaaa | Qzzzzzzzzzzzzzzzzzzzzz | +-------------------+-----------+------------------------+ Returned 1 row(s) in 5.74s [localhost:21000] >select avg(val), min(name), max(name)
>from partitioned_normalized_parquet where initial = 'Q';
+-------------------+-----------+------------------------+ | avg(val) | min(name) | max(name) | +-------------------+-----------+------------------------+ | 50001.94660836487 | Qaaa | Qzzzzzzzzzzzzzzzzzzzzz | +-------------------+-----------+------------------------+ Returned 1 row(s) in 4.75s [localhost:21000] >select avg(val), min(name), max(name)
>from normalized_parquet
>where substr(name,1,1) between 'A' and 'C';
+------------------+-----------+------------------------+ | avg(val) | min(name) | max(name) | +------------------+-----------+------------------------+ | 49994.3356542968 | Aaaa | Czzzzzzzzzzzzzzzzzzzzz | +------------------+-----------+------------------------+ Returned 1 row(s) in 11.65s [localhost:21000] >select avg(val), min(name), max(name)
>from partitioned_normalized_parquet
>where initial between 'A' and 'C';
+-------------------+-----------+------------------------+ | avg(val) | min(name) | max(name) | +-------------------+-----------+------------------------+ | 49992.47281834851 | Aaaa | Czzzzzzzzzzzzzzzzzzzzz | +-------------------+-----------+------------------------+ Returned 1 row(s) in 8.91s
This query scans the whole table and analyzes the rows where the NAME
column starts with a particular letter.
An equivalent query that touches one partition in the partitioned table is a little bit faster. It’s not 26 times faster though, due to the arithmetic having to do with block sizes, number of files, number of hosts in the cluster, and number of cores per host. Some of the resources across the cluster might sit idle during a particular query because there is just not enough data to require getting all hosts and cores involved. Here we are with a billion-row table, and still there is not enough data to really demonstrate all the potential performance benefits. On the other hand, the fact that there is still idle capacity is good news for scalability: the cluster could run many other concurrent queries without maxing out the available CPUs or storage devices.
This query against the unpartitioned table reads all the data and analyzes all rows where the NAME
field starts with one of three different letters.
An equivalent query that touches three partitions in the partitioned table is again a little bit faster. The speedup is more noticeable as the volume of data in the table increases, and as the number of partitions increases.
Let’s see what happens with a query that scans the entire table:
[localhost:21000] >select avg(val), min(name), max(name)
>from partitioned_normalized_parquet;
+-------------------+-----------+------------------------+ | avg(val) | min(name) | max(name) | +-------------------+-----------+------------------------+ | 49998.04368627915 | Aaaa | Zzzzzzzzzzzzzzzzzzzzzz | +-------------------+-----------+------------------------+ Returned 1 row(s) in 69.29s [localhost:21000] >select avg(val), min(name), max(name)
>from normalized_parquet;
+-------------------+-----------+------------------------+ | avg(val) | min(name) | max(name) | +-------------------+-----------+------------------------+ | 49998.04368627915 | Aaaa | Zzzzzzzzzzzzzzzzzzzzzz | +-------------------+-----------+------------------------+ Returned 1 row(s) in 68.26s
For a query that does a full-table scan, the partitioned table is actually a little slower than the unpartitioned one. Having to process all the different data files from the partition directories adds a bit of overhead. That’s why it’s important to partition on the columns that you actually use for filtering in your most important and most frequent queries.
At this point, we’ve done a reasonable job of optimizing single-table queries for our billion rows of sample data. From here, there are two other kinds of scenarios to explore:
When dealing with large and ever-growing tables, Impala can better optimize complex queries (especially join queries) the more it knows about the characteristics of the data, both at the table level and the column level. The Impala SQL statement to collect such information is COMPUTE STATS
. Run this statement after loading substantial new data into a table.
First, we create a table with the same structure as our original billion-row table (Tutorial: The Journey of a Billion Rows). We will take a sample of a million rows from our billion rows of data, then do joins between the big table and the small table:
[localhost:21000] >create table stats_demo like sample_data;
[localhost:21000] >show table stats stats_demo;
+-------+--------+------+--------------+--------+ | #Rows | #Files | Size | Bytes Cached | Format | +-------+--------+------+--------------+--------+ | -1 | 0 | 0B | NOT CACHED | TEXT | +-------+--------+------+--------------+--------+ [localhost:21000] >show column stats stats_demo;
+-----------+---------+------------------+--------+----------+----------+ | Column | Type | #Distinct Values | #Nulls | Max Size | Avg Size | +-----------+---------+------------------+--------+----------+----------+ | id | BIGINT | -1 | -1 | 8 | 8 | | val | INT | -1 | -1 | 4 | 4 | | zerofill | STRING | -1 | -1 | -1 | -1 | | name | STRING | -1 | -1 | -1 | -1 | | assertion | BOOLEAN | -1 | -1 | 1 | 1 | | city | STRING | -1 | -1 | -1 | -1 | | state | STRING | -1 | -1 | -1 | -1 | +-----------+---------+------------------+--------+----------+----------+
Initially, Impala knows basic physical properties based on the data files and the schema, such as the total data size and the sizes of numeric columns, which never vary in length.
The -1
numbers indicate properties where Impala does not know the values. The unknown values are most prominent for STRING
columns, with values that vary in size.
In the following example, we load a million rows into the table and collect statistics for the data. To help Impala choose a good query plan for a join involving this table, it’s important to know the characteristics of the various columns.
[localhost:21000] >insert into stats_demo select * from sample_data limit 1000000;
[localhost:21000] >compute stats stats_demo;
+-----------------------------------------+ | summary | +-----------------------------------------+ | Updated 1 partition(s) and 7 column(s). | +-----------------------------------------+ [localhost:21000] >show table stats stats_demo;
+---------+--------+---------+--------------+--------+ | #Rows | #Files | Size | Bytes Cached | Format | +---------+--------+---------+--------------+--------+ | 1000000 | 1 | 57.33MB | NOT CACHED | TEXT | +---------+--------+---------+--------------+--------+ [localhost:21000] >show column stats stats_demo;
+-----------+---------+------------------+--------+----------+-------------+ | Column | Type | #Distinct Vals | #Nulls | Max Size | Avg Size | +-----------+---------+----------------+--------+----------+---------------+ | id | BIGINT | 1023244 | -1 | 8 | 8 | | val | INT | 139017 | -1 | 4 | 4 | | zerofill | STRING | 101761 | -1 | 6 | 6 | | name | STRING | 1005653 | -1 | 22 | 13.0006999969 | | assertion | BOOLEAN | 2 | -1 | 1 | 1 | | city | STRING | 282 | -1 | 16 | 8.78960037231 | | state | STRING | 46 | -1 | 20 | 8.40079975128 | +-----------+---------+----------------+--------+----------+---------------+
Currently, the number of nulls is not counted because the planner doesn’t use this information.
The ID
and NAME
columns contain essentially unique values. The NAME
field tends to be longer than the CITY
and STATE
fields.
The number of distinct values is estimated rather than counted precisely, because the planner only needs a rough estimate to judge whether one approach is faster than another. For example, the estimate for the NAME
column is slightly higher than the actual number of rows in the table. Impala automatically adjusts the estimate downward in such a case.
The CITY
and STATE
columns have very few distinct values.
In a join query involving tables of different sizes, Impala automatically determines the following:
You can see the results by looking at the EXPLAIN
plan for a query, without the need to actually run it:
[localhost:21000] >explain select count(*) from sample_data join stats_demo
>using (id) where substr(sample_data.name,1,1) = 'G';
+--------------------------------------------------------------------+ | Explain String | +--------------------------------------------------------------------+ | Estimated Per-Host Requirements: Memory=5.75GB VCores=2 | | WARNING: The following tables are missing relevant table | | and/or column statistics. | | oreilly.sample_data | | | | 06:AGGREGATE [MERGE FINALIZE] | | | output: sum(count(*)) | | | | | 05:EXCHANGE [UNPARTITIONED] | | | | | 03:AGGREGATE | | | output: count(*) | | | | | 02:HASH JOIN [INNER JOIN, BROADCAST] | | | hash predicates: oreilly.stats_demo.id = oreilly.sample_data.id | | | | | |--04:EXCHANGE [BROADCAST] | | | | | | | 00:SCAN HDFS [oreilly.sample_data] | | | partitions=1/1 size=56.72GB | | | predicates: substr(sample_data.name, 1, 1) = 'G' | | | | | 01:SCAN HDFS [oreilly.stats_demo] | | partitions=1/1 size=57.33MB | +--------------------------------------------------------------------+
Wait a second. That warning at the top of the plan output reminds us that although we just ran COMPUTE STATS
for our new table, we neglected to do it for our oldest (and biggest) table.
When Impala reports that it is going to “scan HDFS” for the SAMPLE_DATA
table and then “broadcast” the result, that is an expensive network operation: it sends the results from scanning SAMPLE_DATA
and extracting the G
names to each node to compare and contrast against the STATS_DEMO
table. That’s about 1/26th of 56.72 GB (about 2.2 GB) being sent to each of four nodes. It’s preferable to see a small amount of data being broadcast. Maybe we can reduce the amount of network I/O.
To understand the flow of the query, you read from bottom to top. (After checking any warnings at the top.) For a join query, you prefer to see the biggest table listed at the bottom, then the smallest, second smallest, third smallest, and so on.
When Impala sees a table with no statistics used in a join query (like SAMPLE_DATA
in this case), it treats the table like it is zero-sized, as if it is no problem to send over the network. That is clearly wrong in this case, where SAMPLE_DATA
is bigger and has more different values in both the ID
and NAME
columns referenced in the query.
Let’s collect statistics for the big (billion-row) SAMPLE_DATA
table, too, and then try again:
[localhost:21000] >compute stats sample_data;
+-----------------------------------------+ | summary | +-----------------------------------------+ | Updated 1 partition(s) and 7 column(s). | +-----------------------------------------+ [localhost:21000] >show table stats sample_data;
+------------+--------+---------+--------------+--------+ | #Rows | #Files | Size | Bytes Cached | Format | +------------+--------+---------+--------------+--------+ | 1000000000 | 1 | 56.72GB | NOT CACHED | TEXT | +------------+--------+---------+--------------+--------+ [localhost:21000] >show column stats sample_data;
+-----------+---------+----------------+--------+----------+---------------+ | Column | Type | #Distinct Vals | #Nulls | Max Size | Avg Size | +-----------+---------+----------------+--------+----------+---------------+ | id | BIGINT | 183861280 | 0 | 8 | 8 | | val | INT | 139017 | 0 | 4 | 4 | | zerofill | STRING | 101761 | 0 | 6 | 6 | | name | STRING | 145636240 | 0 | 22 | 13.0002002716 | | assertion | BOOLEAN | 2 | 0 | 1 | 1 | | city | STRING | 282 | 0 | 16 | 8.78890037536 | | state | STRING | 46 | 0 | 20 | 8.40139961242 | +-----------+---------+----------------+--------+----------+---------------+
The COMPUTE STATS
statement is the key to improving the efficiency of join queries. Now we’ve run it for all tables involved in the join.
The key item of information for the table stats is the number of rows.
In the column stats, Impala estimates the number of distinct values for each column and examines STRING
columns to find the maximum and average length.
[localhost:21000] >explain select count(*) from sample_data join stats_demo
>using (id) where substr(sample_data.name,1,1) = 'G';
+--------------------------------------------------------------------+ | Explain String | +--------------------------------------------------------------------+ | Estimated Per-Host Requirements: Memory=3.77GB VCores=2 | | | | 06:AGGREGATE [MERGE FINALIZE] | | | output: sum(count(*)) | | | | | 05:EXCHANGE [UNPARTITIONED] | | | | | 03:AGGREGATE | | | output: count(*) | | | | | 02:HASH JOIN [INNER JOIN, BROADCAST] | | | hash predicates: oreilly.sample_data.id = oreilly.stats_demo.id | | | | | |--04:EXCHANGE [BROADCAST] | | | | | | | 01:SCAN HDFS [oreilly.stats_demo] | | | partitions=1/1 size=57.33MB | | | | | 00:SCAN HDFS [oreilly.sample_data] | | partitions=1/1 size=56.72GB | | predicates: substr(sample_data.name, 1, 1) = 'G' | +--------------------------------------------------------------------+
This time, the smaller STATS_DEMO
table is broadcast in its entirety to all the four nodes. Instead of sending about 2.2 GB across the network to each node as in the previous query, we’re only sending about 57.33 MB, which is the size of the smaller table. We’ve just improved the efficiency of our query by a factor of about 38, without actually running either the slow or the fast version. That’s much better!
The data that’s broadcasted is cross-checked against the big SAMPLE_DATA
table. Each of our four nodes will read 1/4 of this table from local storage. For join queries, we always want to see the biggest table at the bottom of the plan, meaning that the data from that table is read locally rather than being sent over the network.
We know that most of the 56.72 GB will be ignored and not need to be cross-checked against the other table, because it will not match the predicate that checks for the first letter 'G'
. Impala does not yet account for that aspect in the plan numbers. We’ll improve on that as we progress to using partitioned tables.
Just for kicks, let’s try this query out in real life:
[localhost:21000] >select count(*) from sample_data join stats_demo
>using (id) where substr(sample_data.name,1,1) = 'G';
+----------+ | count(*) | +----------+ | 37763 | +----------+ Returned 1 row(s) in 13.35s
By joining a table of a billion rows with a table of a million rows, we checked a million billion possible combinations. The results came back so fast, there was hardly enough time to play one move in Words with Friends™. (All timing numbers in this book are from a small cluster of modest capacity; I expect you to be able to beat them without much trouble.)
Remember that we demonstrated earlier that text tables are bulkier than they need to be, and we could trim things down and speed things up by converting to Parquet, doing some normalization, and introducing partitioning. Let’s try again with the more efficient tables we set up using that same data. (We don’t expect the count returned by the query to be exactly the same, because we’re taking a random sample of a million rows to copy into the new table.)
[localhost:21000] >create table stats_demo_parquet
>like partitioned_normalized_parquet;
Returned 0 row(s) in 1.14s [localhost:21000] >insert into stats_demo_parquet partition (initial)
>[shuffle] select * from partitioned_normalized_parquet
>limit 1000000;
Inserted 1000000 rows in 39.72s
The CREATE TABLE LIKE
statement preserves the file format of the original table, so we know the new one will use Parquet format also.
We use the [SHUFFLE]
hint technique to avoid having each of the four nodes try to allocate 27 GB-sized buffers to write separate data files for all the partition values. The “shuffle” operation takes a little longer, but avoids potential out-of-memory conditions. This is the default Impala uses when a table has no statistics, so strictly speaking, it is only necessary if Impala chooses the wrong execution plan for some reason, such as out-of-date statistics.
Again, we make sure to run the COMPUTE STATS
statement for all the tables involved in the join query, after loading the data. In earlier examples with tables like PARTITIONED_
NORMALIZED_PARQUET
, we saw a little under 1 GB of data in each partition. In the smaller table containing a random sample of the data, each partition contains substantially less data.
[localhost:21000] >compute stats partitioned_normalized_parquet;
+------------------------------------------+ | summary | +------------------------------------------+ | Updated 26 partition(s) and 6 column(s). | +------------------------------------------+ Returned 1 row(s) in 54.24s [localhost:21000] >compute stats stats_demo_parquet;
+------------------------------------------+ | summary | +------------------------------------------+ | Updated 26 partition(s) and 6 column(s). | +------------------------------------------+ Returned 1 row(s) in 4.86s [localhost:21000] >show table stats stats_demo_parquet;
+---------+---------+--------+----------+--------------+---------+ | initial | #Rows | #Files | Size | Bytes Cached | Format | +---------+---------+--------+----------+--------------+---------+ | A | 89088 | 1 | 2.34MB | NOT CACHED | PARQUET | | B | 46080 | 1 | 1.31MB | NOT CACHED | PARQUET | | C | 219136 | 1 | 5.28MB | NOT CACHED | PARQUET | | D | 63488 | 1 | 1.77MB | NOT CACHED | PARQUET | | E | 49152 | 1 | 1.39MB | NOT CACHED | PARQUET | | F | 32768 | 1 | 960.64KB | NOT CACHED | PARQUET | | G | 11264 | 1 | 336.67KB | NOT CACHED | PARQUET | ... | W | 16384 | 1 | 484.57KB | NOT CACHED | PARQUET | | X | 51200 | 1 | 1.45MB | NOT CACHED | PARQUET | | NULL | -1 | 1 | 181.73KB | NOT CACHED | PARQUET | | Y | 82944 | 1 | 2.21MB | NOT CACHED | PARQUET | | Z | 27648 | 1 | 816.00KB | NOT CACHED | PARQUET | | Total | 1000000 | 27 | 26.99MB | 0B | | +---------+---------+--------+----------+--------------+---------+ Returned 28 row(s) in 0.02s
Now we go through the same exercise as before, running an EXPLAIN
statement and examining the amount of data expected to be read from disk and transmitted across the network:
[localhost:21000] >explain select count(*) from partitioned_normalized_parquet
>join stats_demo_parquet using (id)
>where
>substr(partitioned_normalized_parquet.name,1,1) = 'G';
+-------------------------------------------------------------------------+ | Explain String | +-------------------------------------------------------------------------+ | Estimated Per-Host Requirements: Memory=194.31MB VCores=2 | | | | 06:AGGREGATE [MERGE FINALIZE] | | | output: sum(count(*)) | | | | | 05:EXCHANGE [UNPARTITIONED] | | | | | 03:AGGREGATE | | | output: count(*) | | | | | 02:HASH JOIN [INNER JOIN, BROADCAST] | | | hash predicates: oreilly.partitioned_normalized_parquet.id = | | | oreilly.stats_demo_parquet.id | | | | | |--04:EXCHANGE [BROADCAST] | | | | | | | 01:SCAN HDFS [oreilly.stats_demo_parquet] | | | partitions=27/27 size=26.99MB | | | | | 00:SCAN HDFS [oreilly.partitioned_normalized_parquet] | | partitions=27/27 size=22.27GB | | predicates: substr(partitioned_normalized_parquet.name, 1, 1) = 'G' | +-------------------------------------------------------------------------+ Returned 21 row(s) in 0.03s
Those “scan” figures at the bottom are looking better than with the text tables.
The query does a naive translation of the original query with the SUBSTR()
call.
We’re going to transmit (“broadcast”) 26.99 MB across the network to each node.
We’re going to read 22.27 GB from disk. This is the I/O-intensive part of this query, which occurs on the nodes that hold data blocks from the biggest table. Because we usually read these plans bottom to top, this is the first figure to consider in evaluating if the query is executing the way we want it to.
Calling a function in the WHERE
clause is not always a smart move, because that function can be called so many times. Now that the first letter is available in a column, maybe it would be more efficient to refer to the INITIAL
column.
The following example improves the query for the partitioned table by testing the first letter directly, referencing the INITIAL
column instead of calling SUBSTR()
. The more we can refer to the partition key columns, the better Impala can ignore all the irrelevant partitions.
[localhost:21000] >explain select count(*) from partitioned_normalized_parquet
>join stats_demo_parquet using (id)
>where partitioned_normalized_parquet.initial = 'G';
+-----------------------------------------------------------------+ | Explain String | +-----------------------------------------------------------------+ | Estimated Per-Host Requirements: Memory=106.31MB VCores=2 | | | | 06:AGGREGATE [MERGE FINALIZE] | | | output: sum(count(*)) | | | | | 05:EXCHANGE [UNPARTITIONED] | | | | | 03:AGGREGATE | | | output: count(*) | | | | | 02:HASH JOIN [INNER JOIN, BROADCAST] | | | hash predicates: oreilly.partitioned_normalized_parquet.id = | | | oreilly.stats_demo_parquet.id | | | | | |--04:EXCHANGE [BROADCAST] | | | | | | | 01:SCAN HDFS [oreilly.stats_demo_parquet] | | | partitions=27/27 size=26.99MB | | | | | 00:SCAN HDFS [oreilly.partitioned_normalized_parquet] | | partitions=1/27 size=871.29MB | +-----------------------------------------------------------------+ Returned 20 row(s) in 0.02s
We happen to know (although Impala doesn’t know) that rows with the same ID
value will also have the same INITIAL
value. Let’s add INITIAL
to the USING
clause and see if that helps.
[localhost:21000] >explain select count(*) from partitioned_normalized_parquet
>join stats_demo_parquet using (id,initial)
>where partitioned_normalized_parquet.initial = 'G';
+-----------------------------------------------------------------+ | Explain String | +-----------------------------------------------------------------+ | Estimated Per-Host Requirements: Memory=98.27MB VCores=2 | | | | 06:AGGREGATE [MERGE FINALIZE] | | | output: sum(count(*)) | | | | | 05:EXCHANGE [UNPARTITIONED] | | | | | 03:AGGREGATE | | | output: count(*) | | | | | 02:HASH JOIN [INNER JOIN, BROADCAST] | | | hash predicates: oreilly.partitioned_normalized_parquet.id = | | | oreilly.stats_demo_parquet.id, | | | oreilly.partitioned_normalized_parquet.initial = | | | oreilly.stats_demo_parquet.initial | | | | | |--04:EXCHANGE [BROADCAST] | | | | | | | 01:SCAN HDFS [oreilly.stats_demo_parquet] | | | partitions=1/27 size=336.67KB | | | | | 00:SCAN HDFS [oreilly.partitioned_normalized_parquet] | | partitions=1/27 size=871.29MB | +-----------------------------------------------------------------+ Returned 20 row(s) in 0.02s
This looks really promising. We’ve gone from transmitting gigabytes across the network for each query, to under a megabyte. Again, even as we iterated through several variations of the query, we didn’t have to actually try them and run the risk of executing a really slow, resource-intensive one.
Just to recap, we took the following optimization steps, starting from our original bulky text table:
WHERE
clauses.
EXPLAIN
to get an idea of the efficiency of possible queries as we iterated through several alternatives.
Now let’s see how the query performs in real life after going through several iterations of fine-tuning it and checking the EXPLAIN
plan:
[localhost:21000] >select count(*) from partitioned_normalized_parquet
>join stats_demo_parquet using (id,initial)
>where partitioned_normalized_parquet.initial = 'G';
+----------+ | count(*) | +----------+ | 11264 | +----------+ Returned 1 row(s) in 1.87s
That’s a million billion potential combinations being evaluated in less than 2 seconds, on a 4-node cluster with modest hardware specs. (For example, these nodes have 48 GB of memory each, which is much less than in a typical Impala cluster.)
One common anti-pattern to avoid is what’s known as the “many small files” problem. Hadoop, HDFS, and Impala are all optimized to work with multimegabyte files. Ingesting data that was not originally organized for Hadoop can result in a profusion of tiny data files, leading to suboptimal performance even though the volume of data being read is small. The overhead of distributing a parallel query across a cluster isn’t worthwhile if the data is fragmented into a few kilobytes or even a few megabytes per file.
The techniques you want to avoid are:
INSERT ... VALUES
statements, especially with a single item in the VALUES
clause. If you need to build up a data file line by line, use a technique outside of Impala such as running Sqoop or Flume, or writing your own data-generation program (possibly running it as a MapReduce job).
INSERT ... SELECT
statement. The dynamic form of this statement divides the data among multiple partitions at runtime, based on values in the SELECT
query. The INSERT
goes faster if you specify the partition key values as constants and operate on one partition at a time.
Ways to avoid or recover from this kind of problem include:
If you create a lookup table with a predictable set of hardcoded values, do it with a single VALUES
clause:
INSERT INTO canada_regions VALUES ("Newfoundland and Labrador" ,"NL"), ("Prince Edward Island","PE"), ("New Brunswick","NB"), ("Nova Scotia","NS"), ("Quebec","PQ"), ("Ontario","ON"), ("Manitoba","MB"), ("Saskatchewan","SK"), ("Alberta","AB"), ("British Columbia","BC"), ("YT","Yukon"), ("Northwest Territories","NT"), ("Nunavut","NU");
This technique generates a single data file; although it’s still tiny in comparison to the 128 MB block size in HDFS, it’s more efficient than a dozen separate data files containing one row each!
INSERT ... SELECT
operation. The output data files will be reorganized based on the number of nodes in your cluster and the number of cores per node.
INSERT INTO sales_data PARTITION (year=2014, month=07) SELECT customer, product, amount, purchase_date FROM raw_data WHERE year = 2014 AND month = 07;
You can minimize disruption from coalescing data into a new table by pointing all your reporting queries at a view and switching the table that’s accessed by the view:
CREATE VIEW production_report AS SELECT ... FROM original_table WHERE ...; INSERT INTO optimized_table SELECT * FROM original_table; COMPUTE STATS optimized_table; ALTER VIEW production_report AS SELECT ... FROM optimized_table WHERE ...;
This way, all your query-intensive applications can refer to a consistent name, even if you reorganize the data behind the scenes. The new table could use the Parquet file format, partitioning, or more compact data types than the original.
SHOW TABLE STATS
and SHOW COLUMN STATS
to check if the stats are present and accurate (particularly the “number of rows” figure in the table statistics). Use the COMPUTE STATS
statement to collect the statistics if that information is missing or substantially different from the current contents of the source table.
[SHUFFLE]
hint on the INSERT ... SELECT
statement. This hint makes the INSERT
statement take longer, but reduces the number of output files generated. This technique can both avoid the “many small files” problem, and reduce the memory usage during the INSERT
statement. (In the latest releases, Impala applies the [SHUFFLE]
hint automatically if necessary, so this tip mainly applies to older Impala instances.)
INSERT INTO partitioned_parquet_table PARTITION (year, month, region) [SHUFFLE] SELECT c1, c2, c3, year, month, region FROM new_batch_of_raw_data;
One challenge in every programming language, operating system, or storage format is how to represent and manipulate date-based and time-based values. Let’s look at how this works in Impala.
In Impala, the one-stop shop for any temporal value is the TIMESTAMP
data type. It can represent a date, a time, or both. It is stored in a consistent numeric format, relative to the Coordinated Universal Time (UTC) time zone to avoid issues with time zone translation. You can use TIMESTAMP
as the data type for a table column, and pass or return values of that type using various built-in functions.
It has been traditional in Hadoop to represent date and time values as strings, and convert to a binary representation behind the scenes. Impala prefers to make TIMESTAMP
a first-class data type; thus, some date- and time-related functions carried over from Hive have both STRING
and TIMESTAMP
variants in Impala.
Impala recognizes strings with the format YYYY-MM-DD HH:MM:SS.sssssssss
and can automatically convert those to TIMESTAMP
values. A date or a time is allowed by itself, and the fractional second portion is optional for time values. To turn a string in some other format into a TIMESTAMP
requires a two-step process: convert to an integer value with the unix_timestamp()
function, which takes a string format argument; then convert that integer back into a TIMESTAMP
.
The following example shows how a string 2014-12-01
in the standard notation can be directly converted to a TIMESTAMP
, while the string 2014/12/01
requires converting to an integer and then back to a TIMESTAMP
:
[localhost:21000] >select cast('2014-12-01' as timestamp);
+---------------------------------+ | cast('2014-12-01' as timestamp) | +---------------------------------+ | 2014-12-01 00:00:00 | +---------------------------------+ [localhost:21000] >select unix_timestamp('2014/12/01','yyyy/MM/dd'),
+--------------------------------------------+ | unix_timestamp('2014/12/01', 'yyyy/mm/dd') | +--------------------------------------------+ | 1417392000 | +--------------------------------------------+ [localhost:21000] >select from_unixtime(
>unix_timestamp('2014/12/01','yyyy/MM/dd')
>);
+-----------------------------------------------------------+ | from_unixtime(unix_timestamp('2014/12/01', 'yyyy/mm/dd')) | +-----------------------------------------------------------+ | 2014-12-01 00:00:00 | +-----------------------------------------------------------+ [localhost:21000] >select from_unixtime(
>unix_timestamp('12/01/2014','MM/dd/yyyy')
>);
+-----------------------------------------------------------+ | from_unixtime(unix_timestamp('12/01/2014', 'mm/dd/yyyy')) | +-----------------------------------------------------------+ | 2014-12-01 00:00:00 | +-----------------------------------------------------------+
Sometimes it’s convenient to have access to the individual date and time fields. For example, if your table is partitioned by year and month, you can’t just designate a TIMESTAMP
value as the partition key, because then there would be a different partition for every hour, minute, second, and even microsecond. The table needs separate YEAR
and MONTH
columns, even if it also preserves the full date and time information as a TIMESTAMP
column.
The way to get the separate fields is through the EXTRACT()
function (new in Impala 1.4). It’s important to keep these values as integer types—ideally, the smallest applicable ones such as TINYINT
for anything up to 127, and SMALLINT
for anything up to 32767—so they can be represented compactly in memory. That’s another reason to avoid storing dates as strings, even though it might be convenient to represent months by their names, or days with leading zeros. It’s easy to overlook this optimization tip, because you might not notice any storage savings on disk if you use text data files (where string and numeric values consume equal space) or partitioned tables (where the partition key columns are used as directory names, so string and numeric values are represented the same way). The storage and performance benefits become apparent when billions or trillions of these values are being compared, stored in hash tables in memory, or transmitted across the network between different machines in the cluster.
This example shows how you can pull out each individual field from a TIMESTAMP
value. We make a tiny lookup table with the symbolic names of all of the fields for easy reference later:
CREATE TABLE UNITS (granularity TINYINT, unit STRING); INSERT INTO units VALUES (1,'year'),(2,'month'),(3,'day'),(4,'hour'), (5,'minute'),(6,'second'),(7,'millisecond'), -- Get each date and time field from a single TIMESTAMP value. SELECT unit, extract(now(), unit) FROM units ORDER BY granularity; +-------------+----------------------+ | unit | extract(now(), unit) | +-------------+----------------------+ | year | 2014 | | month | 7 | | day | 9 | | hour | 13 | | minute | 26 | | second | 52 | | millisecond | 608 | +-------------+----------------------+
The TRUNC()
function truncates a TIMESTAMP
value down to the next lower year, week, day, quarter, and so on. This is a very useful technique for condensing a large number of date and time values down to a predictable number of combinations, either for doing
queries or using the truncated values as partition key columns.GROUP BY
INTERVAL
expressions let you add and subtract specific date and time increments to TIMESTAMP
values. Any time you need to calculate a delta value (such as when an online auction ends), you can compute the appropriate TIMESTAMP
by adding or subtracting some number of days, weeks, months, hours, and so on. You can chain a series of INTERVAL
additions and subtractions to create a very precise delta value.
For example, you might strip off the time portion of a TIMESTAMP
value so that you were left with just the date. Then you could add an INTERVAL
expression to add back a specific time. Or you could use other kinds of INTERVAL
addition or subtraction to create specific dates and times for reminders, deadlines, or other relative kinds of temporal values.
-- Get just the current date, no time. [localhost:21000] >select trunc(now(), 'DD')
>as "first thing this morning";
+--------------------------+ | first thing this morning | +--------------------------+ | 2014-07-09 00:00:00 | +--------------------------+ [localhost:21000] >select trunc(now(), 'DD') + interval 8 hours
>as "8 AM this morning";
+---------------------+ | 8 am this morning | +---------------------+ | 2014-07-09 08:00:00 | +---------------------+ [localhost:21000] >select now() + interval 2 weeks
>as "2 weeks from right now";
+-------------------------------+ | 2 weeks from right now | +-------------------------------+ | 2014-07-23 15:11:01.526788000 | +-------------------------------+ [localhost:21000] >select trunc(now(), 'DD') - interval 2 days + interval 15 hours
>as "3 PM, the day before yesterday";
+--------------------------------+ | 3 pm, the day before yesterday | +--------------------------------+ | 2014-07-07 15:00:00 | +--------------------------------+
Always double-check the unit argument when using the TRUNC()
function, because the argument values and some of their meanings differ from the arguments to EXTRACT()
. In particular, the 'DAY'
argument to TRUNC()
truncates to the first day of the week, while DD
truncates to the current day.
Whenever I look at a new technology for storing and manipulating data, I ask myself whether that technology makes it more or less likely to run into Y2K-style problems. The Y2K problem arose because people designed data processing applications under the assumption that year values could be stored as 2 digits, with an implicit base year of 1900. This issue became critical in the year 2000, when the 2-digit years could no longer be used for date arithmetic. I judge software systems based on how easy it is to correct such problems if developers make assumptions that later turn out to be wrong. This kind of flexibility is one of the key strengths of the Hadoop software stack.
The root of the Y2K problem was a desire to save money on expensive disk storage by saving two bytes per date field. Could such cost considerations still occur today? Hmm, if there are a billion rows, each extra byte represents another gigabyte of storage. Imagine a big web property with hundreds of millions of customers, and for each of those customers, you have to record a birthday, date joined, date of last visit, and so on. Two bytes per date field per customer adds up to a substantial number of gigabytes. Although everyone now knows not to leave the century out of year values, developers might still cut corners in their schema design for cost reasons, and those bad decisions might come back to bite them later.
As a thought experiment, let’s construct a scenario with some Y2K-style bad data, and see how we could solve it in Impala.
This is a simplified example, not a comprehensive treatment of the subject. Your mileage may vary. No warranty express or implied.
We start off with a data file constructed way back in the 20th century, with some names and birth dates in MM-DD-YY
format, and whether the person is still living:
$ cat >20th_century.dat John Smith,06-04-52,false Jane Doe,03-22-76,true ^D
In the pre-Hadoop days, the original code parsed the birth date values as 2-digit or 2-character values, and filled in the 19
prefix whenever it needed to print any reports or do any arithmetic for the birth dates. Now, the company (which has a minimum age limit of 14) is just starting to sign up its first customers born in the 21st century. The new data file uses 4-digit years:
$ cat >2014_new_customers.dat Adam Millennial,01-01-2000,true Maria Sanchez,03-29-2001,true ^D
With Impala, there are several ways to solve this problem. Let’s look at ways to make use of our SQL expertise (as opposed to just editing the original text data files):
CREATE TABLE inconsistent_data (name STRING, birthdate STRING, living BOOLEAN) ROW FORMAT DELIMITED FIELDS TERMINATED BY ","; ... Use 'hdfs dfs -put' command to move data files into appropriate Hadoop ... directories as demonstrated in earlier examples. -- Make Impala aware of the newly added data files. REFRESH inconsistent_data; SELECT * FROM inconsistent_data; +-----------------+------------+--------+ | name | birthdate | living | +-----------------+------------+--------+ | Adam Millennial | 01-01-2000 | true | | Maria Sanchez | 03-29-2001 | true | | John Smith | 06-04-52 | false | | Jane Doe | 03-22-76 | true | +-----------------+------------+--------+
At this point, we have a mixture of good and bad date values represented as strings. We’ll construct some expressions to parse out the different month, day, and year portions. As we build a set of useful queries to transform the original values through a series of steps, we’ll save each query as a view to keep each query readable and avoid a single monster query.
CREATE VIEW customer_data_separate_fields AS SELECT name, regexp_extract(birthdate,'([[:digit:]]+)-([[:digit:]]+)-([[:digit:]]+)', 1) month, regexp_extract(birthdate,'([[:digit:]]+)-([[:digit:]]+)-([[:digit:]]+)', 2) day, regexp_extract(birthdate,'([[:digit:]]+)-([[:digit:]]+)-([[:digit:]]+)', 3) year, living FROM inconsistent_data; SELECT * FROM customer_data_separate_fields; +-----------------+-------+-----+------+--------+ | name | month | day | year | living | +-----------------+-------+-----+------+--------+ | Adam Millennial | 01 | 01 | 2000 | true | | Maria Sanchez | 03 | 29 | 2001 | true | | John Smith | 06 | 04 | 52 | false | | Jane Doe | 03 | 22 | 76 | true | +-----------------+-------+-----+------+--------+
The next step is to convert the separated-out fields to integer types instead of strings. Then we can do arithmetic on the dates.
CREATE VIEW customer_data_int_fields AS SELECT name, cast(month AS TINYINT) month, cast(day AS TINYINT) day, cast(year AS SMALLINT) year, living FROM customer_data_separate_fields;
Last, we identify the year values that were originally given as 2 digits, and convert those to 4-digit values from the 20th century. Any NULL
values are passed through unchanged. Any year greater than 2 digits is passed through unchanged. (For simplicity, let’s stipulate that this company does not have any customers born in the 1800s or earlier.)
CREATE VIEW customer_data_full_years AS SELECT name, month, day, CASE WHEN year IS NULL THEN NULL WHEN year < 100 THEN year + 1900 ELSE year END AS year, living FROM customer_data_int_fields;
Here we made a logical arrangement of the data that is more flexible and easier to extend and analyze. Even without changing the underlying data files, we accounted for 2-digit and 4-digit year values; we split up the original 3-part strings into separate fields; and we made the year, month, and day values into integers so that we could do arithmetic on them.
We can query the views to analyze the data in its cleaned up and reorganized form. Here we use a LIMIT
clause to cap the number of rows returned, in case the back office loaded millions more rows in the meantime:
-- Doublecheck that the data is OK. SELECT * FROM customer_data_full_years LIMIT 100; +-----------------+-------+-----+------+--------+ | name | month | day | year | living | +-----------------+-------+-----+------+--------+ | John Smith | 6 | 4 | 1952 | false | | Jane Doe | 3 | 22 | 1976 | true | | Adam Millennial | 1 | 1 | 2000 | true | | Maria Sanchez | 3 | 29 | 2001 | true | +-----------------+-------+-----+------+--------+
After running more queries to double-check that the data is entirely clean, we could make the new improved schema permanent and convert all the existing data files to a compact binary format. As we see in other examples using the Parquet format, the savings from this compression step are likely much greater than could be obtained by shortchanging the year values.
CREATE TABLE modernized_customer_data STORED AS PARQUET AS SELECT * FROM customer_data_full_years;
The Impala TIMESTAMP
data type has a range that starts on January 1, 1400 AD. Thus, for anything before that date, you would store separate integer fields for the year, month, day, and any time-related fields, rather than a single TIMESTAMP
value that includes both date and time.
Applying the principle of using the smallest practical integer type, that means MONTH
and DAY
could always be TINYINT
columns, and YEAR
would depend on the time scale involved. Historians could use SMALLINT
for their YEAR
column to record years back to –32768 BC. Paleontologists could use INT
to date fossils back to –2147483648 BC. And cosmologists could use BIGINT
to chart time from the Big Bang to the future Big Crunch or heat death of the universe.
The examples for schema evolution (Tutorial: When Schemas Evolve) show ways to deal with data where it is not immediately clear whether the existing values fit into the range for a particular integer type.
In this book, I switch between verbose output in impala-shell
when I need to show timing information for queries, or quiet mode for demonstrating features unrelated to performance. By default, an impala-shell
session looks like this:
$impala-shell -i localhost -d oreilly
... [localhost:21000] >create table foo (x int);
Query: create table foo (x int); Returned 0 row(s) in 1.13s [localhost:21000] >select x from foo;
Query: select x from foo; Returned 0 row(s) in 0.19s
The way the statement is echoed back as a single line lets you copy and paste it, which is most useful for multiline statements that are hard to capture due to the continuation prompts.
The time measurement is useful when you’re comparing the performance of different query techniques and table structures, logging the output of a sequence of statements, or running the same statements multiple times to check if performance is consistent across runs.
A “quiet” session looks like this, without the query echoed back or the elapsed time for the query:
$impala-shell -i localhost -d oreilly --quiet
[localhost:21000] >create table bar (s string);
[localhost:21000] >select s from bar;
This more compact form lets you see what’s happening without all the extra informational messages.
The -B
option produces an even more compact output style, with no ASCII boxes around the query results. You can think of -B
as the “benchmark” option, because if all you want to do is get the results as fast as possible, suppressing the boxes lets impala-shell
display the results much faster. The -B
option is often used in combination with -q
(run a single query) or -f
(run all the statements in a file), for benchmarking, setup scripts, or any kinds of automated jobs.
This example runs a single SHOW TABLES
statement and then massages the results to produce a set of DROP TABLE
statements, which are then stored in a .sql script file:
$ impala-shell -B --quiet -q 'show tables in oreilly' |
sed -e 's/^/drop table /' | sed -e 's/$/;/' |
tee drop_all_tables.sql
drop table bar;
drop table billion_numbers;
drop table billion_numbers_compacted;
...
This example runs a sequence of statements from an input file. Here we leave out the --quiet
option because we are interested in the output showing the original query, and the time taken. We include the -d
option to specify the database where all the queries should run, so that we do not need to use fully qualified table names.
$ impala-shell -d oreilly -B -f benchmark.sql
...some startup banner messages...
Query: use oreilly
Query: select count(*) from canada_facts
13
Returned 1 row(s) in 0.21s
Query: select count(*) from canada_regions
13
Returned 1 row(s) in 0.19s
Query: select count(*) from usa_cities
289
Returned 1 row(s) in 0.19s
One of the tenets of Hadoop is “schema on read,” meaning that you’re not required to do extensive planning up front about how your data is laid out, and you’re not penalized if you later need to change or fine-tune your original decisions. Historically, this principle has clashed with the traditional SQL model where a CREATE TABLE
statement defines a precise layout for a table, and data is reorganized to match this layout during the load phase. Impala bridges these philosophies in clever ways:
NULL
if fields are missing from the data file. You can rewrite the table definition to have more or fewer columns and mix and match data files with the old and new column definitions.
The benefits of this approach include more flexibility, less time and effort spent converting data into a rigid format, and less resistance to the notion of fine-tuning the schema as needs change and you gain more experience. For example, if a counter exceeds the maximum value for an INT
, you can promote it to a BIGINT
with minimal hassle. If you originally stored postal codes or credit card numbers as integers and later received data values containing dashes or spaces, you could switch those columns to strings without reformatting the original data.
For example, the SAMPLE_DATA
table used in several earlier examples has a column named ZEROFILL
containing 6-digit integer values, including leading zeros where needed so that every value really has 6 characters. That field could be used to represent an encoded value where each digit or group of digits has some meaning, as with a credit card or telephone number. Treating that column as a STRING
data type makes it easier to do SUBSTR()
calls to pull out the first 3 digits, last 4 digits, or search and replace to get rid of optional punctuation characters, in the case of a phone number. Other times, it might be preferable to treat that column as a number, for example, to construct the next sequential value. Or what seems like a small range of values might later turn out to be a larger one, so you might initially treat it as a SMALLINT
but then later change the column type to INT
or BIGINT
.
Regardless of how the column is defined, you can always use CAST()
to convert its values to a different type during a query. What is the “best” type is a question of convenience, query readability, and efficiency. (Remember, your queries will likely process millions or billions of rows, so any unnecessary type conversions can add considerable overhead.) And when you convert to a binary file format such as Parquet or Avro, numbers can be stored more compactly than strings, potentially saving gigabytes of disk space for each byte you can remove from all the rows.
Impala lets you try out all these representations to see which one works best in practice. When the data is in a relatively unstructured file format, such as a delimited text file, you can make unlimited changes to the types and names of columns. Farther along the data pipeline, when the data files are in a structured format such as Parquet or Avro, the table schema is embedded in each data file and the changes you can make are more limited. For example, with Parquet you can change a column’s type between TINYINT
, SMALLINT
, and INT
, but not between other types such as STRING
or TIMESTAMP
.
You could also discover that some fields supplied in the data aren’t really needed and so remove them from the table definition, or that new fields are useful and so add those to the table definition and any new data files. These techniques work in all file formats, but apply only to the last columns in the table, so define any optional or less-important columns last.
In the following example, we first treat the ZEROFILL
column as a string (its original definition) to find values starting with 0123
:
SELECT zerofill FROM sample_data WHERE zerofill LIKE '0123%' LIMIT 5; +----------+ | zerofill | +----------+ | 012330 | | 012372 | | 012350 | | 012301 | | 012327 | +----------+ Returned 5 row(s) in 0.57s
Next, we change the ZEROFILL
column to a number, finding examples of even values, doing some arithmetic with the values, and ignoring the leading zeros:
ALTER TABLE sample_data CHANGE zerofill zerofill INT; SELECT zerofill AS even, zerofill+1 AS odd FROM sample_data WHERE zerofill % 2 = 0 LIMIT 5; +----------+--------+ | even | odd | +----------+--------+ | 3838 | 3839 | | 97464 | 97465 | | 87046 | 87047 | | 12158 | 12159 | | 55478 | 55479 | +----------+--------+ Returned 5 row(s) in 0.31s
Finally, we change ZEROFILL
back to a string for some regular expression matching, to find values containing a sequence of three 1
digits.
The CHANGE
clause repeats the name ZEROFILL
twice because it also sets a new name for the column; when changing only the data type, specify the same name again.
ALTER TABLE sample_data CHANGE zerofill zerofill STRING; SELECT zerofill FROM sample_data WHERE zerofill REGEXP '1{3}' LIMIT 5; +----------+ | zerofill | +----------+ | 081116 | | 031110 | | 091118 | | 011138 | | 061110 | +----------+ Returned 5 row(s) in 0.56s
If your table has values that are out of range for the specified integer type, they will be returned as the maximum value for the type. Thus, if you see numbers that bump up against the top of the range, you might need a bigger type for that column. Here is how you might deal with integer values where you do not know in advance whether their range will fit into the column type for an existing table.
The column X
starts off as a TINYINT
, which can only hold a very limited range of values (–128 to 127).
CREATE TABLE unknown_range (x BIGINT); INSERT INTO unknown_range VALUES (-50000), (-4000), (0), (75), (33000); ALTER TABLE unknown_range CHANGE x x TINYINT;
We call the MIN_TINYINT()
and MAX_TINYINT()
functions, and later the equivalents for other types, so that we don’t have to remember the exact ranges.
SELECT x FROM unknown_range LIMIT 10; +------+ | x | +------+ | -128 | | -128 | | 0 | | 75 | | 127 | +------+ SELECT count(x) AS "Suspicious values" FROM unknown_range WHERE x IN (min_tinyint(), max_tinyint()); +-------------------+ | suspicious values | +-------------------+ | 3 | +-------------------+
ALTER TABLE unknown_range CHANGE x x SMALLINT; SELECT x FROM unknown_range LIMIT 10; +--------+ | x | +--------+ | -32768 | | -4000 | | 0 | | 75 | | 32767 | +--------+ SELECT count(x) AS "Suspicious values" FROM unknown_range WHERE x IN (min_smallint(), max_smallint()); +-------------------+ | suspicious values | +-------------------+ | 2 | +-------------------+ ALTER TABLE unknown_range CHANGE x x INT; SELECT x FROM unknown_range; +--------+ | x | +--------+ | -50000 | | -4000 | | 0 | | 75 | | 33000 | +--------+ SELECT count(x) AS "Suspicious values" FROM unknown_range WHERE x IN (min_smallint(), max_smallint()); +-------------------+ | suspicious values | +-------------------+ | 0 | +-------------------+
At this point, you know the column is a large enough type to hold all the existing values without being larger than necessary and wasting space on disk and in memory.
Just as a refresher, here are the ranges for the different integer types:
[localhost:21000] >select min_bigint(), max_bigint();
+----------------------+---------------------+ | min_bigint() | max_bigint() | +----------------------+---------------------+ | -9223372036854775808 | 9223372036854775807 | +----------------------+---------------------+ [localhost:21000] >select min_int(), max_int();
+-------------+------------+ | min_int() | max_int() | +-------------+------------+ | -2147483648 | 2147483647 | +-------------+------------+ [localhost:21000] >select min_smallint(), max_smallint();
+----------------+----------------+ | min_smallint() | max_smallint() | +----------------+----------------+ | -32768 | 32767 | +----------------+----------------+ [localhost:21000] >select min_tinyint(), max_tinyint();
+---------------+---------------+ | min_tinyint() | max_tinyint() | +---------------+---------------+ | -128 | 127 | +---------------+---------------+
If you need a larger integer than MAX_BIGINT()
, you can define a DECIMAL(
. The maximum value for n is 38, which can hold up to 999999… (9 repeated 38 times).n
)
SQL shares some of the convenience of functional programming languages, where the end result is built from multiple layers, each performing some easily understood transformation. Whatever result you get from a query, you can enhance the results further by running individual columns through an additional function, or layering another query on top by using a WITH
clause or a subquery, or pushing down the complexity by turning the query into a view.
In this example, we received some string data that is not in the optimal format. It is in all lowercase, and it has double quotes around the values, which is not appropriate for Impala text data. We run the string columns through a regular expression function to remove leading and trailing quotation marks. Then we run the result through another formatting function to capitalize the first letter. After finding the right combination of functions to achieve the desired output, we embed the details in a view, which hides the complexity of the function calls and makes subsequent queries more readable.
SELECT * FROM bad_format; +------------+-----------+ | first_name | last_name | +------------+-----------+ | "john" | "smith" | | "jane" | "doe" | +------------+-----------+ SELECT regexp_replace(first_name,'(^"|"$)','') AS first_name FROM bad_format; +------------+ | first_name | +------------+ | john | | jane | +------------+ SELECT initcap(regexp_replace(first_name,'(^"|"$)','')) AS first_name FROM bad_format; +------------+ | first_name | +------------+ | John | | Jane | +------------+ CREATE VIEW good_format AS SELECT initcap(regexp_replace(first_name,'(^"|"$)','')) AS first_name, initcap(regexp_replace(last_name,'(^"|"$)','')) AS last_name FROM bad_format; SELECT * FROM good_format; +------------+-----------+ | first_name | last_name | +------------+-----------+ | John | Smith | | Jane | Doe | +------------+-----------+
This example uses a subquery in the WITH
clause to evaluate a temperature conversion formula and then runs calculations on the converted values. This is a handy technique to avoid repeating complicated expressions multiple times. Because the WITH
clause does not create any permanent object, you avoid cluttering the namespace with new tables or views.
WITH celsius_temps AS (SELECT (degrees_f - 32) * 5 / 9 AS degrees_c FROM fahrenheit_temps) SELECT min(degrees_c), max(degrees_c), avg(degrees_c) FROM celsius_temps;
This example encodes the Fahrenheit-to-Celsius conversion formula in a view, then filters the converted data by querying the view, referring only to the Celsius values.
CREATE VIEW celsius_temps AS SELECT (degrees_f - 32) * 5 / 9 AS degrees_c, year, month, day, location FROM fahrenheit_temps; SELECT max(degrees_c), min(degrees_c) FROM celsius_temps WHERE year = 1999 AND degrees_c BETWEEN -40 and 40;
This example builds another view on top of the first one, to take a numeric value and do some string formatting to make it suitable for use in a report. The final query doesn’t need to know anything about the original Fahrenheit values or the raw numbers used in the report.
CREATE VIEW celsius_pretty_printed AS SELECT concat(cast(degrees_c as string)," degrees Celsius") AS degrees_c, year, month, day, location FROM celsius_temps; SELECT degrees_c, year, month, day location FROM celsius_pretty_printed WHERE year = 1998 ORDER BY year, month, day;
18.119.124.49