Chapter 3. Impala for the Database Developer

As a database developer who knows SQL, you are in an ideal position to jump in and start using Impala right away.

This section covers some of the background information and coding techniques that help you start down the path to high performance and scalability, as you graduate from dedicated development and test environments to huge data sets on production clusters running under heavy load.

When you come to Impala from a background with a traditional relational database product, you find the same familiar SQL query language and DDL statements. Data warehouse experts will already be familiar with the notion of partitioning. If you have only dealt with smaller OLTP-style (online transaction processing) databases, the emphasis on large data volumes will expand your horizons.

Note

You might find that a certain SQL idiom is not yet supported in Impala, or your favorite built-in function from database system XYZ is not available yet. If so, don’t be discouraged. You can often work around these with a simple query change. Because Impala is often used in organizations that already have substantial database infrastructure, prioritize which kinds of workloads you can try out with Impala in the short term. You might find that you can save many hours in your data pipeline or queries, even when only using Impala in a few places. Impala roadmap items are regularly introduced, so check the New Features page often.

The SQL Language

The great thing about coming to Impala with relational database experience is that the query language is completely familiar: it’s just SQL! (See the Impala SQL Language Reference for all the supported statements and functions.) The SELECT syntax works like you’re used to, with joins, views, relational operators, aggregate functions, ORDER BY and GROUP BY, casts, column aliases, built-in functions, and so on.

The original core column data types are STRING for all string and character data; INT and its cousins such as TINYINT and BIGINT for integers; FLOAT and DOUBLE for floating-point values; TIMESTAMP for all date- and time-related values; and BOOLEAN for true/false conditions. The Impala 1.4 release introduced DECIMAL for high-precision floating-point values (especially currency).

The CREATE TABLE and INSERT statements incorporate some of the format clauses that you might expect to be part of a separate data-loading utility, because Impala is all about the shortest path to ingest and analyze data.

The EXPLAIN statement provides a logical overview of statement execution. Instead of showing how a query uses indexes, the Impala EXPLAIN output illustrates how parts of the query are distributed among the nodes in a cluster, and how intermediate results are combined at the end to produce the final result set.

Standard SQL

Impala implements SQL-92 standard features for queries, with some enhancements from later SQL standards.

Limited DML

Because Hadoop Distributed File System (HDFS) is optimized for bulk insert and append operations, Impala currently doesn’t have OLTP-style Data Manipulation Language (DML) operations such as DELETE or UPDATE. It also does not have indexes, constraints, or foreign keys; data warehousing experts traditionally minimize their reliance on these relational features because they involve performance overhead that can be too much when dealing with large amounts of data.

If you have new raw data files, you use LOAD DATA to move them into an Impala table directory. If you have data in one table that you want to copy into another table, optionally filtering, transforming, and converting in the process, you use an INSERT ... SELECT statement. If there is something not satisfactory about some set of data, you replace entire tables or partitions with an INSERT OVERWRITE statement.

No Transactions

The typical OLTP example of depositing to a bank account and withdrawing at the same time is not really appropriate for a data warehousing context. That’s only one row! Impala is intended to analyze what happens across millions or billions of banking operations, ticket purchases, web page visits, and so on.

Impala only appends or replaces; it never actually updates existing data. In write operations, Impala deals not with one row at a time, but millions of rows through statements such as LOAD DATA and INSERT ... SELECT. Even on a transactional DBMS, this volume of new data can be impractical to roll back.

Rather than deleting arbitrary rows with a DELETE statement, you delete large groups of related rows at once, either through DROP TABLE or ALTER TABLE ... DROP PARTITION. If you make a mistake, the original files are still recoverable from the HDFS trashcan.

Operations performed directly in Impala work like the “autocommit” settings available on some database systems. All Impala nodes in the cluster are notified about new data from a LOAD DATA or INSERT statement, or DDL operations such as CREATE TABLE and DROP TABLE.

Remember that Impala has the flexibility to operate on data produced by external programs and pipelines too. When new files are deposited in an Impala table directory by some non-Impala command, the Impala REFRESH table_name statement acts much like a traditional COMMIT statement, causing Impala to re-evaluate the data in that table at the current moment.

Numbers

Early releases of Impala included binary-style numeric types: 8-bit, 16-bit, 32-bit, and 64-bit integer types, and 32-bit and 64-bit IEEE-754-style floating-point types. These types are well-suited to the kinds of scientific processing and numerical analysis done by many of the Impala early adopters. The following example shows queries with the basic integer and floating-point types.

-- STORE_ID is a SMALLINT, a 32-bit integer that holds up to 32,767.
SELECT DISTINCT(store_id) FROM sales_data;
-- DEGREES is a DOUBLE, a floating-point number from a sensor.
SELECT cos(degrees) FROM telemetry_data;

Impala 1.4 adds support for the DECIMAL data type, which represents base 10 values with varying numbers of significant digits. This type is well-suited for currency calculations, opening up Impala for a range of use cases with financial data. You can also use it to hold integer values with a larger range than the INT or even BIGINT types.

This example shows how with a DECIMAL value (in this case, three digits after the decimal point and nine digits total precision), you get back exactly the value you started with. For some fractional values, FLOAT and DOUBLE are very close but cannot represent them precisely. The extra predictability and accuracy during mathematical operations makes DECIMAL convenient for columns used with GROUP BY, comparisons to literal values, summing large numbers of values, and other cases where the inexact fractional representation of FLOAT and DOUBLE could cause problems.

CREATE TABLE dec_vs_float (dec DECIMAL(9,3), flt FLOAT, dbl DOUBLE);
INSERT INTO dec_vs_float VALUES (98.6,cast(98.6 AS FLOAT),98.6);
SELECT * FROM dec_vs_float;
+--------+-------------------+-------------------+
| dec    | flt               | dbl               |
+--------+-------------------+-------------------+
| 98.600 | 98.59999847412109 | 98.59999999999999 |
+--------+-------------------+-------------------+

Note

In this example, the DEC column can represent the value 98.600 exactly, but we didn’t define enough fractional digits to precisely represent 98.6001 or 98.5999. The 3 in the DECIMAL(9,3) declaration means the column only stores 3 digits after the decimal point. Therefore, with DECIMAL values, you decide in advance how many overall digits and fractional digits are required to represent all the relevant values.

Recent Additions

Impala’s SQL functionality grows with each release. Here are some of the high points from the Impala 1.4 (July 2014) release. Because this book was finalized well in advance of the Impala 2.0 release, the first edition doesn’t include examples of those new features. Always check the Impala new features documentation page to see what SQL enhancements (along with other kinds of features) were added recently.

Early Impala releases required intermediate results for ORDER BY queries to fit in memory. In terms of syntax, all ORDER BY queries had to also include a LIMIT clause to cap the size of the result set. Impala 1.4 lifts this restriction, saving intermediate sort results to a disk scratch area when necessary.

The DECIMAL data type, introduced in Impala 1.4, lets Impala represent currency data with the kind of accuracy and rounding characteristics that are ideal for financial analysis. See Numbers for more on the DECIMAL type.

Each release of Impala includes additional built-in functions, particularly for math and date/time operations. Impala 1.4 introduced EXTRACT() and TRUNC() for date/time values, and STDDEV() and VARIANCE() for statistical processing.

Big Data Considerations

The guidelines throughout this book typically apply to use cases that involve Big Data. But how big is Big, and what are the implications for your workflow, database schema, and SQL code?

Billions and Billions of Rows

Although Impala can work with data of any volume, its performance and scalability shine when the data is large enough that you can’t produce, manipulate, and analyze it in reasonable time on a single server. Therefore, after you do your initial experiments to learn how all the pieces fit together, you very quickly scale up to working with tables containing billions of rows and gigabytes, terabytes, or even larger of total volume. The queries that you tinker with might involve data sets bigger than you ever used before.

You might have to rethink your benchmarking techniques if you’re used to using smaller volumes, meaning millions of rows or a few tens of gigabytes. You’ll start relying on the results of analytic queries because the scale will be bigger than you can grasp through your intuition. You’ll become used to adding a LIMIT clause to many exploratory queries to prevent unexpected huge result sets.

Terminology Tip

In this book, when I refer to “a billion” of anything, I mean the US billion: one thousand million. 109. 100 Indian crore. When talking about gigabytes, I am referring to the disk or network gigabyte (a round number of one billion bytes) rather than the memory gigabytes (230 bytes, also sometimes called a gibibyte).

The main exception to this rule is for Parquet files, where the data is buffered in memory up to one gibibyte and then that same amount is written to disk.

For problems that do not tax the capabilities of a single machine, many alternative techniques offer about the same performance. After all, if all you want to do is sort or search through a few files, you can do that plenty fast with Perl scripts or Unix commands such as grep. The Big Data issues come into play when the files are too large to fit on a single machine, when you want to run hundreds of such operations concurrently, or when an operation that takes only a few seconds for megabytes of data takes hours or even days when the data volume is scaled up to gigabytes or petabytes.

You can learn the basics of Impala SQL and confirm that all the prerequisite software is configured correctly using tiny data sets, as in most examples in Chapters 1-4. That’s what we call a canary test, to make sure all the pieces of the system are hooked up properly.

To start exploring scenarios involving performance testing, scalability, and multinode cluster configurations, you typically use much, much larger data sets. Later on, in Tutorial: The Journey of a Billion Rows, we’ll generate a billion rows of synthetic data. Then when the raw data is in Impala, we’ll experiment with different combinations of file formats, compression codecs, and partitioning schemes. We’ll even try some join queries involving a million billion combinations.

Don’t put too much faith in performance results that involve only a few megabytes or gigabytes of data. Only when you blow past the data volume that a single server could reasonably handle, or saturate the I/O channels of your storage array, can you fully appreciate the performance increase of Impala over competing solutions and the effects of the various tuning techniques. To really be sure, do trials using volumes of data similar to your real-world system.

If today your data volume is not at this level, next year it might be. Don’t wait until your storage is almost full (or even half full) to set up a big pool of HDFS storage on cheap commodity hardware. Whether or not your organization has already adopted the Apache Hadoop software stack, experimenting with Cloudera Impala is a valuable exercise to future-proof your enterprise.

HDFS Block Size

Because an HDFS data block contains up to 128 MB by default, you can think of any table less than 128 MB as small (tiny, even). That data could be represented in a single data block, which would be processed by a single core on a single server, with no parallel execution at all. In a partitioned table, the data for each partition is physically split up. Therefore, a table partition of less than 128 MB is in the same situation with limited opportunity for parallel execution. It’s true that the 128 MB block might be split into several smaller files that are processed in parallel. Still, with such small amounts of data, it’s hardly worth the overhead to send the work to different servers across the cluster.

Parquet Files: The Biggest Blocks of All

When it comes to Parquet files, Impala writes data files with a default block size of 1 GB. This design choice means that Impala is perfectly happy to process tables or even partitions with many, many gigabytes. For example, if you have a 100-node cluster with 16 cores per node, Impala could potentially process 1.6 TB of Parquet data in parallel, if nothing else were running on the cluster. Larger data volumes would only require a little waiting for the initial set of data blocks to be processed.

Because many organizations do not have those kinds of data volumes, you can decrease the block size before inserting data into a Parquet table. This technique creates a greater number of smaller files. You still want to avoid an overabundance of tiny files, but you might find a sweet spot at 256 MB, 128 MB, 64 MB, or even a little smaller for the Parquet block size. The key is to have enough data files to keep the nodes of the cluster busy, without those files being so small that the overhead of parallelizing the query swamps the performance benefit of parallel execution.

How Impala Is Like a Data Warehouse

With Impala, you can unlearn some notions from the RDBMS world. Long-time data warehousing users might already be in the right mindset, because some of the traditional database best practices naturally fall by the wayside as data volumes grow and raw query speed becomes the main consideration. With Impala, you will do less planning for normalization, skip the time and effort that goes into designing and creating indexes, and worry less about full-table scans.

Impala, as with many other parts of the Hadoop software stack, is optimized for fast bulk read and data load operations. Many data warehouse-style queries involve either reading all the data (“What is the highest number of different visitors our website ever had in one day?”) or reading some large set of values organized by criteria such as time (“What were the total sales for the company in the fourth quarter of last year?”). Impala divides up the work of reading large data files across the nodes of a cluster. Impala also does away with the performance and disk space overhead of creating and maintaining indexes, instead taking advantage of the multimegabyte HDFS block size to read and process high volumes of data in parallel across multiple networked servers. As soon as you load the data, it’s ready to be queried. Impala can run efficient ad hoc queries against any columns, not just preplanned queries using a small set of indexed columns.

In a traditional database, normalizing the data and setting up primary key/foreign key relationships can be time-consuming for large data volumes. That is why data warehouses (and also Impala) are more tolerant of denormalized data, with values that are duplicated and possibly stored in raw string form rather than condensed to numeric IDs. The Impala query engine works very well for data warehouse-style input data by doing bulk reads and distributing the work among nodes in a cluster. Impala can even automatically condense bulky, raw data into a data-warehouse-friendly layout as part of a conversion to the Parquet file format.

When executing a query involves sending requests to several servers in a cluster, Impala minimizes total resource consumption (disk I/O, network traffic, and so on) by making each server do as much local processing as possible before sending back the results. Impala queries typically work on data files in the multimegabyte or gigabyte range, whereas a server can read through large blocks of data very quickly. Impala does as much filtering and computation as possible on the server that reads the data, to reduce overall network traffic and resource usage on the other nodes in the cluster. Thus, Impala can very efficiently perform full table scans of large tables, the kinds of queries that are common in analytical workloads.

Impala makes use of partitioning, another familiar notion from the data warehouse world. Partitioning is one of the major optimization techniques you’ll employ to reduce disk I/O and maximize the scalability of Impala queries. Partitioned tables physically divide the data based on one or more criteria, typically by date or geographic region, so that queries can filter out irrelevant data and skip the corresponding data files entirely. Although Impala can quite happily read and process huge volumes of data, your query will be that much faster and more scalable if a query for a single month only reads one-twelfth of the data for that year, or if a query for a single US state only reads one-fiftieth of the data for the entire country. Partitioning typically does not impose much overhead on the data loading phase; the partitioning scheme usually matches the way data files are already divided, such as when you load a group of new data files each day. In Working with Partitioned Tables, we’ll see some examples of partitioned tables and queries.

Physical and Logical Data Layouts

When you’re thinking in SQL, you’re primarily concerned with the logical level. Your data is divided into tables, which have columns, and each column has a data type. Views let you impose a different logical arrangement without changing the actual tables and columns. Built-in functions and user-defined functions help to hide implementation details for complicated comparisons and computations.

Impala does not enforce constraints such as unique columns, NOT NULL constraints, or foreign keys. You validate those aspects earlier in the data pipeline.

The physical aspects come into play for performance. When you have a day’s worth of data to ingest, can you finish all the necessary file operations before the next day’s data is ready? That question depends on whether you need to copy, reorganize, or convert the data files. When you run queries, how much data is read from disk, how much memory is required, and how fast do the responses come back? The answer depends on physical aspects such as file format and partitioning.

The HDFS Storage Model

Data stored in Impala is stored in HDFS, a distributed filesystem mounted on one or more Linux servers. When a file is stored in HDFS, the underlying filesystem takes care of making it available across the cluster. Each data block within the file is automatically replicated to some number of hosts (typically at least three), so that all the data is still retrievable if one or two machines experience a hardware, software, or network problem. And when a block needs to be read and processed, that work can be farmed out to any of the servers that hold a copy of that block.

HDFS data blocks are much, much larger than you might have encountered on previous systems. The HDFS block size is typically in the tens of megabytes—often 128 MB or 64 MB. This size is more like what you see with data warehouse software or dedicated analytic hardware appliances. HDFS avoids the issue of wasteful writes by being an append-only filesystem. By reducing the number of possible operations, it focuses on doing a few things well: speed, reliability, and low cost.

Distributed Queries

Distributed queries are the heart and soul of Impala. Once upon a time, you needed a doctorate in parallel computing to even be able to think about doing such esoteric, obscure operations. Now, with Impala running on Hadoop, you just need…a laptop! And ideally, also an IT department with a cluster of Linux servers running Cloudera Distribution with Hadoop (CDH). But in a pinch, a single laptop with a VM will work for development and prototyping.

When an Impala query runs on a Hadoop cluster, Impala breaks down the work into multiple stages and automatically sends the appropriate requests to the nodes in the cluster. This automatic division of labor is why Impala is so well-suited to Big Data use cases. Queries that would strain the capacity of a single machine are a snap to run when the work can be divided between 4, 10, 100, or more machines. There is some overhead to dividing up the work and scheduling it across so many machines, which is why it is important to organize your schema for efficient query processing, and to help Impala estimate how much work is involved for a particular query.

The execution steps for each distributed query go something like this (greatly simplified):

  1. Node #1, core #1: Read this portion of that gigantic data file. I know you have the relevant data block on your local storage device.
  2. Node #1, core #2: Read a different portion of the same file. Each request like this goes to a node that has one of the replicated copies of the particular block. The multicore aspect means that each server can potentially process 4, 8, 16, or more data blocks in parallel.
  3. Node #2, core #1: Read this entire small data file. It is small enough to fit in a single HDFS block, so you’ll process the whole thing.
  4. Repeat for all the data nodes in the cluster and cores on each node, up to the number of disks available for each node. Keep going until all relevant HDFS blocks have been processed.
  5. Only columns X, Y, and Z are needed to process the query and produce the result set. Each node: Ignore the data from all other columns. (With Parquet format, this ignored data never gets read at all.) This operation is known as projection.
  6. Each node: As you read the data file, ignore all the rows that don’t match the WHERE clause. This is a filtering operation; the conditions in the WHERE clause are referred to as predicates.
  7. Each node: Now take the more manageable amount of data that remains and do summing, sorting, grouping, function calls, or other manipulation on it.
  8. Go through these steps for all the relevant data files in the table until all the needed data has been read and each core on each node has its own portion of the result set.
  9. Condense the result set even more if the query has a LIMIT clause. Each node: Assume you are the one that found all the “top N” results, and send back a result set with only N rows.
  10. Now if there is a JOIN or a UNION clause, each node transmits the intermediate results where necessary to other nodes that perform cross-checking, duplicate elimination, and so on. Repeat for all join and union clauses.
  11. When all of the intermediate results are ready for all stages of the query, do as much consolidation as possible on the remote nodes, and then send final or almost-final results back to whichever node initiated the query in the first place. This coordinator node does any necessary final sorting, grouping, and aggregating. For example, the final determinations such as “top 10 visitors” can only be made when all the intermediate results can be compared against each other.

All of this parallel processing has implications for the nature of the results:

  • Any write operations potentially produce multiple output files, with each node sending back its own contribution as a separate file.
  • Which data is processed by which node is not fixed ahead of time. Thus, there’s some degree of performance variation on consecutive queries.
  • You cannot rely on the results on disk being returned in a particular order by subsequent queries. The work might be spread out differently among the nodes, or intermediate results might be returned in a different order depending on how fast each node finishes its part.
  • The planning phase for each query tries to resolve as many unknowns as possible before distributing the work across the cluster. Impala turns expressions into constants wherever possible rather than re-evaluating them on each node. When you call a time-related function such as NOW(), that moment in time is captured at the start of the query, and the same value is used on all the nodes. It is not re-evaluated at the exact moment that each node starts working.
  • The time needed to transmit final results back to the coordinator node is proportional to the size of the result set. Thus, Impala queries typically avoid SELECT * for wide tables, and typically include multiple WHERE clauses, a LIMIT clause, or aggregate functions to condense the results to a relatively small volume and minimize network overhead.

Normalized and Denormalized Data

One of the great debates in the database world is about normalization and denormalization.

Normalization means splitting columns into separate tables, and referring to the original column values through unique IDs, instead of repeating long strings such as a city name. This is a very popular technique in OLTP systems, where rarely updated data is separated out to speed up updates to reservation, sales, or similar fast-changing information. It is also used in data warehousing systems (under names like star schema and snowflake schema) where queries on big tables can do their initial work using the compact IDs, and only retrieve the bulky string data at the final stage (say, after you’ve decided which customers should receive an advertisement, and now you need to get their addresses).

Denormalization is when the pendulum swings the other way, and you find that for convenience or performance it’s better to have fewer tables with more columns. Perhaps you receive data all bundled together in a format that would take too long to reorganize. Or you’re recording events in real time, and it’s easier to store a value like the browser “user agent” string verbatim rather than figuring out that this is browser number such-and-such and storing the corresponding ID number. This technique is mainly used in data warehouse systems.

Impala can work just fine in either paradigm. When data is normalized, join queries cross-reference data from multiple tables, with Impala automatically deducing the most efficient way to parallelize the work across the cluster. (See Deep Dive: Joins and the Role of Statistics for a demonstration of how to optimize your join queries.) With the Parquet file format, you can use normalized or denormalized data. Parquet uses a column-oriented layout, which avoids the performance overhead normally associated with wide tables (those with many columns). The compression and encoding of Parquet data minimizes storage overhead for repeated values.

File Formats

You can use different file formats for Impala data files, similar to the notion of storage engines or special kinds of tables in other database systems. Some file formats are more convenient to produce, such as human-readable text. Others are more compact because of compression, or faster for data-warehouse-style queries because of column-oriented layout. The key advantage for Impala is that each file format is open, documented, and can be written and read by multiple Hadoop components, rather than being a “black box” where the data can only be accessed by custom-built software. So you can pick the best tool for each job: collecting data (typically with Sqoop and Flume), transforming it as it moves through the data pipeline (typically with Pig or Hive), and analyzing it with SQL queries (Impala, naturally) or programs written for frameworks such as MapReduce or Spark.

For this book, I’ll focus on the two file formats that are polar opposites: text (most convenient and flexible) and Parquet (most compact and query-optimized). The other formats that Impala supports (Avro, RCFile, and SequenceFile) are ones you might be familiar with if your organization is already using Hadoop. But they are not optimized for the kinds of analytic queries that you do with Impala. If you’re using Impala to produce data intended for use with analytic queries, use Parquet format for best results.

Text File Format

I’m always of two minds when talking about text format. It’s the most familiar and convenient for beginners. It’s the default file format for CREATE TABLE commands. It’s very flexible, with a choice of delimiter characters. If you download freely available data sets, such as from a government agency, the data is probably in some sort of text format. You can create textual data files with a simple Unix command, or a Perl or Python script on any computer whether or not it’s running Hadoop. You can fix format errors with any text editor. For small volumes of data, you can even do your own searches, sorts, and so on with simple Unix commands like grep, awk, and sort. Within Impala, you can change your mind at any time about whether a column is a STRING, INT, TINYINT, BIGINT, and so on. One minute you’re summing numbers, the next you’re doing SUBSTR() calls to check for leading zeros in a string, as illustrated in Numbers Versus Strings.

And yet, it’s also the bulkiest format, thus the least efficient for serious Big Data applications. The number 1234567 takes up 7 bytes on disk; –1234567 takes up 8 bytes; –1234.567 takes up 9 bytes. The current date and time, such as 2014-07-09 15:31:01.409820000, takes up 29 bytes. When you’re dealing with billions of rows, each unnecessary character represents gigabytes of wasted space on disk, and a proportional amount of wasted I/O, wasted memory, and wasted CPU cycles during queries.

Therefore, I’m going to advise again and again to prefer Parquet tables over text tables wherever practical. The column-oriented layout and compact storage format, with compression added on top, make Parquet the obvious choice when you are dealing with Big-Data-scale volume.

Pro Tip

It’s easy to fall into the habit of using minimal CREATE TABLE statements, which default to using text format. Always stay conscious of the file format of each new table by including a STORED AS clause. Even when you intentionally create a text table, include the clause STORED AS TEXTFILE. This clause goes after any ROW FORMAT clause that defines the separator and escape character.

These examples demonstrate creating tables for text data files. Depending on the format of the input data, we specify different delimiter characters with the rather lengthy ROW FORMAT clause. STORED AS TEXTFILE is optional because that is the default format for CREATE TABLE. The default separator character is hex 01 (ASCII Ctrl-A), a character you’re unlikely to find or enter by accident in textual data.

CREATE TABLE text_default_separator
  (c1 STRING, c2 STRING, c3 STRING);

CREATE TABLE text_including_stored_as_clause
  (c1 STRING, c2 STRING, c3 STRING) STORED AS TEXTFILE;

CREATE TABLE csv (c1 STRING, c2 STRING, c3 STRING)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY "," STORED AS TEXTFILE;

CREATE TABLE tsv (c1 STRING, c2 STRING, c3 STRING)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY "	" STORED AS TEXTFILE;

CREATE TABLE psv (c1 STRING, c2 STRING, c3 STRING)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY "|" STORED AS TEXTFILE;

Parquet File Format

The Parquet file format, which originated from collaboration between Twitter and Cloudera, is optimized for data-warehouse-style queries. Let’s explore what that means and how it affects you as a developer.

Parquet is a binary file format. Numeric values are represented with consistent sizes, packed into a small number of bytes (either 4 or 8) depending on the range of the type. TIMESTAMP values are also represented in relatively few bytes. BOOLEAN values are packed into a single bit, rather than the strings true and false as in a text table. So all else being equal, a Parquet file is smaller than the equivalent text file.

But Parquet has other tricks up its sleeve. If the same value is repeated over and over, Parquet uses run-length encoding to condense that sequence down to two values: the value that’s repeated, and how many times it’s repeated. If a column has a modest number of different values, up to 16K, Parquet uses dictionary encoding for that column: it makes up numeric IDs for the values and stores those IDs in the data file along with one copy of the values, rather than repeating the values over and over. This automatically provides space reduction if you put denormalized data straight into Parquet. For example, if a data file contains a million rows, and each has one column with a state name such as California or Mississippi, the data file is essentially the same whether you convert those strings to state #1, #2, and so on and store the numbers, or if you let Parquet’s dictionary encoding come up with the numeric IDs behind the scenes. The limit of 16K distinct values applies to each data file, so if your address table has more than 16K city names, but the table is partitioned by state so that the California cities are in one data file and the Mississippi cities are in a different data file, each data file could still use dictionary encoding for the CITY column.

See Tutorial: The Journey of a Billion Rows for a demonstration of how much space you can save with Parquet format when all the compression and encoding techniques are combined.

Parquet is a column-oriented format. This means that the data for all values in a column are arranged physically adjacent to each other on disk. This technique speeds up certain kinds of queries that do not need to examine or retrieve all the columns, but do need to examine all or most of the values from particular columns:

-- The query can read all the values of a column without having to
-- read (and ignore) the values of the other columns in each row.
SELECT c3 FROM t1;

-- Analytic queries are always counting, summing, averaging and so on
-- columns for sales figures, web site visitors, sensor readings, and so on.
-- Those computations are nice and fast when no unnecessary data is read.
-- In this example, the query only needs to read C1 and C5, skipping all
-- other columns.
SELECT count(DISTINCT c1), sum(c1), max(c1), min(c1), avg(c1)
  FROM t1 WHERE c5 = 0;

-- Here we cross-reference columns from two different tables, along
-- with an ID column that is common to both. Again, the query only reads
-- values from the exact columns that are needed, making join queries
-- practical for tables in the terabyte and petabyte range.
SELECT attr01, attr15, attr37, name, email FROM
  visitor_details JOIN contact_info USING (visitor_id)
  WHERE year_joined BETWEEN 2005 AND 2010;

Column-oriented data is a popular feature in specialized data warehouse systems. For Impala, the column-oriented file format is just a small piece of the value proposition. The file format itself is open, so you can always get data in or out of it. Parquet files are readable and writable by many Hadoop components, so you can set up an ETL pipeline to use Parquet all the way through rather than starting in one format and converting to another at the end.

People commonly assume that the Parquet column-oriented format means each column is stored in a different data file. Not so! Each Parquet data file contains all the columns for a group of rows, but the values from each column are laid out next to each other within that file. When Impala needs to read all the values from a particular Parquet column, it seeks to a designated spot in the file and reads forward from there. The performance benefits of Parquet increase as you add more columns; for example, with 100 columns, a query only needs to read roughly 1% of each data file for each column referenced in the query.

Getting File Format Information

The SHOW TABLE STATS statement provides the basic information about the file format of the table, and each individual partition where appropriate:

[localhost:21000] > show table stats csv;
+-------+--------+------+--------------+--------+
| #Rows | #Files | Size | Bytes Cached | Format |
+-------+--------+------+--------------+--------+
| -1    | 1      | 58B  | NOT CACHED   | TEXT   |
+-------+--------+------+--------------+--------+

The DESCRIBE FORMATTED statement dumps a lot of information about each table, including any delimiter and escape characters specified for text tables:

[localhost:21000] > describe formatted csv;
...
| Storage Desc Params: | NULL                  | NULL |
|                      | field.delim           | ,    |
|                      | serialization.format  | ,    |
+----------------------+-----------------------+------+

Switching File Formats

Your preferred file format might change over time, after you conduct benchmarking experiments, or because of changes earlier in your ETL pipeline. Impala preserves the flexibility to change a table’s file format at any time: simply replace the data with a new set of data files and run an ALTER TABLE ... SET FILEFORMAT statement. Or, for a partitioned table, you can leave older partitions in the previous file format, and use the new file format only for newer partitions.

This example demonstrates how to clone the structure of an existing table, switch the file format of the new table, and then copy data from the old to the new table. The data is converted to the new format during the copy operation.

CREATE TABLE t2 LIKE t1;
-- Copy the data, preserving the original file format.
INSERT INTO t2 SELECT * FROM t1;
ALTER TABLE t2 SET FILEFORMAT = PARQUET;
-- Now reload the data, this time converting to Parquet.
INSERT OVERWRITE t2 SELECT * FROM t1;

The following example demonstrates how a partitioned table could start off with one file format, but newly added partitions are switched to a different file format. Queries that access more than one partition automatically accommodate the file format for each partition.

CREATE TABLE t3 (c1 INT, c2 STRING, c3 TIMESTAMP)
  PARTITIONED BY (state STRING, city STRING);
ALTER TABLE t3 ADD PARTITION
  (state = 'CA', city = 'San Francisco'),
-- Load some text data into this partition...
ALTER TABLE t3 ADD PARTITION
  (state = 'CA', city = 'Palo Alto'),
-- Load some text data into this partition...
ALTER TABLE t3 ADD PARTITION
  (state = 'CA', city = 'Berkeley'),
ALTER TABLE t3 PARTITION
  (state = 'CA', city = 'Berkeley')
  SET FILEFORMAT = PARQUET;
-- Load some Parquet data into this partition...

Aggregation

Aggregation is a general term meaning to take many small items and combine them into fewer larger ones. In the Impala context, aggregation is generally a positive thing. It comes up in several different contexts: you aggregate table data through aggregation functions and GROUP BY clauses; you deal with cluster resources such as memory and disk capacity by considering the total (aggregate) capacity, not the capacity of a single machine; and for performance reasons, sometimes you aggregate small files into larger ones.

When Impala queries are distributed to run across all the data nodes in a Hadoop cluster, in effect, the cluster acts like a single giant computer. For example, on a 100-node cluster, the memory and CPU power available for the query are 100 times as much as on a single server. We refer to this capacity using terms such as aggregate memory. That is why when you see that a resource-intensive workload requires many gigabytes of memory, that is not cause for alarm if you have a reasonably sized cluster of reasonably powerful servers.

For information about the memory consequences of aggregation queries, see Optimizing Memory Usage.

For discussion of aggregating (or coalescing) small files, see Anti-Pattern: A Million Little Pieces.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.146.37.250