Chapter 8. Data Engineering with Drill

Drill is a SQL engine that reads large data files stored in a distributed filesystem such as HDFS, MapR-FS, or Amazon S3. Drill works best with data stored in Parquet, but data seldom arrives in Parquet, and it is often handy to work with data in its original format. In this chapter, you will see that with Drill you can read data in many formats, and use specialized tricks to overcome schema-related issues. However, for production, Parquet is the preferred file format.

Although some of the material in this chapter has been covered in previous chapters, this chapter will go into much greater detail on how Drill actually processes data, which is vital to understand if you are developing extensions for Drill or if you encounter files with an ambiguous schema.

Schema-on-Read

Apache Drill is designed for the modern data lake, which consists of a very large number of files, organized into directories and stored in a wide variety of file formats. Although Drill is optimized for Parquet files, it can read data from many different file formats using extensible storage plug-ins.

Unlike Hive, which requires a schema to define a file, Drill uses the structure within the file itself. This strategy, known as schema-on-read, works very well for file formats such as Parquet, which carry a clear, unambiguous schema within the file itself. But as you will see in this chapter, you must provide Drill a bit of help to read other file formats, such as CSV or JSON, for which the schema can be ambiguous in subtle ways.

The SQL Relational Model

Drill takes on a daunting challenge: to infer schemas from a variety of file formats and map those schemas into the SQL relational model. We use the term schema inference for this process. (Drill itself does not use this term; it is borrowed from Apache Spark.) Making Drill work for you entails understanding the strengths (and limitations) of Drill’s schema inference mechanism. The relational model describes a set of relations (tables) comprising a fixed set of domains (columns). Each column is described by a name and a type. (Columns often also have a defined position, as we will see shortly for CSV files.)

Drill uses JSON as its reference model. Although JSON can represent relational data, it also can represent additional data structures such as arrays and maps. Drill extends the SQL relational model to include these types. Because SQL itself does not support these extended types, Drill provides a number of techniques to bridge the gap between JSON and SQL.

Relational databases use a schema-on-write approach to store data in a predefined format. The big data world uses schema-on-read, but in two distinct ways.

Hive and Impala define a schema separately from the data, with the schema stored in the Hive metastore. When data is ambiguous, the Hive schema instructs each tool how to interpret the data. The cost, however, is that the user must maintain this schema, which becomes a complex task for large deployments and is awkward during data exploration. Drill can work with Hive data and the Hive metastore when available, but it can also work without them.

To work without a predefined schema, Drill takes a pragmatic approach: the schema is defined as a negotiation between the query and the file. For example, the same JSON file can be thought of as having typed fields or text fields. The same CSV file can be thought of as containing a single column (an array of VARCHAR) or as a set of columns. Yesterday’s version of a JSON file might have had four fields per record; today’s has six after an enhancement to the source of the data.

Data Life Cycle: Data Exploration to Production

When you start a new data project, you are presented with a collection of files, perhaps in a variety of formats. Drill’s schema inference rules allow you to do data exploration quickly and easily, directly on the source data. Because that data might have an ambiguous structure, part of the exploration is to identify the issues and workarounds.

As you work, you can capture what you learn in SQL views. With views, you can rename columns, clean up formats, convert data types, and more.

Then, as a project moves into production, Drill encourages you to extract, transform, and load (ETL) your data into partitioned Parquet files. Parquet carries a clear schema, avoiding schema ambiguities. Parquet is also fast to read, especially if a query reads just a subset of columns. Partitioning data reduces the number of files that must be read to produce a result, further speeding up the query. (Think of partitioning as big data’s answer to a table index.)

Sample Data

When learning Drill, it is often handy to have some sample data available. Drill provides the FoodMart sample dataset using the cp schema. (The FoodMart dataset is originally from Microsoft, and was made available to Drill by Julian Hyde of the Calcite project.) The data is available without the “test.” prefix shown in the ER diagram in the link just provided. In fact, it is the FoodMart data that you query with the sample query in the Drill Web Console:

SELECT * FROM cp.`employee.json` LIMIT 20

The FoodMart dataset is very rich and allows you to try out many advanced features on known good data before trying them on your own data. The data is available in JSON format.

Drill also includes some TPC-H sample data in Parquet format in the tpch directory:

SELECT * FROM cp.`tpch/partsupp.parquet` LIMIT 20

The schema is described in the TPC-H specification available from the TPC website.

See also the “Sample Datasets” and “Analyzing the Yelp Academic Dataset” sections in the online Drill documentation for more datasets.

Schema Inference

Whether during data exploration or in a production system, Drill uses the same schema inference steps. Inference starts with the type of storage system, then proceeds to the type of file and to how to split up a file for reading. As the file is read, Drill must infer the type of each column, then determine how to combine schemas from different files and blocks within a file. Much of this is automatic, especially with Parquet files.

When doing data exploration with other kinds of files, proper inference often requires that you either set things up correctly within Drill, or write your query to resolve ambiguities. The next few sections walk through the overall process so that you know how the pieces work and how to resolve issues when things don’t go as expected.

Further, because Drill is distributed, each fragment of a query (parallel execution path) makes its own independent decision about the schema. That is, fragments do not communicate to negotiate a shared view of the schema. Instead, each fragment infers the schema from the first record that it reads (or from metadata stored in the particular file that it reads for CSV or Parquet files).

The key issue to keep in mind when using Drill is this: Drill cannot predict the future. That is, Drill cannot choose types based on what might appear in the file in the future. Further, when running a distributed query, one scan operator cannot predict what another scan will infer about the schema.

Data Source Inference

Drill supports a wide range of file formats. Its first step in scanning any file is to infer the type of the file from the filename. Drill does this using a file type inference process that uses a storage configuration and an associated format configuration.

Storage Plug-ins

Drill is extensible. It accesses your data via a storage plug-in. By now you are familiar with storage plug-ins for distributed filesystems HBase, Hive, and so on. Here, we focus on the dfs plug-in that provides access to files on your local system, in HDFS, in MapR–FS, in Amazon S3, and so on.

Elsewhere in this book, for convenience, we refer to data sources such as HDFS, the local filesystem, Amazon S3, and MapR as different storage plug-ins. Truth be told, they are all just different configurations of the same dfs plug-in. Internally, dfs uses the HDFS client to work with the other storage formats. This means that Drill can work with any format for which an HDFS client implementation is available, including Isilon, Alluxio, Microsoft ADSL, and many more. Apache Drill does not test these add-on formats, but if they adhere to the HDFS standard, they have a good chance of working in Drill.

Storage Configurations

A storage plug-in is a piece of Java code, built into Drill or delivered as a JAR. To use a storage plug-in, you must define a storage configuration for the plug-in.

Confusingly, the Drill UI uses the terms “storage” and “storage plug-in” for the storage configuration. Some people use the term “storage plug-in configuration” for the same idea. Just remember that if you edit it in the Drill Web Console, it is a “configuration.” If you write code in Java, it is a “plug-in.”

Each storage plug-in can have multiple configurations. For example, you might have different storage configurations for the dfs plug-in depending on your needs. You create the storage configuration in the Drill web console. When run as a daemon, Drill stores the configurations in ZooKeeper so that they are visible to all Drillbits in your cluster. When run embedded in SQLLine (or in unit tests), the configurations are saved to local disk and are generally discarded at the end of the run.

Each storage configuration is given a name when you create the configuration. This is the name you use in queries. Each configuration also has a type, which connects the configuration to the plug-in implementation in code. For example, the dfs storage configuration that Drill ships with maps to the file storage plug-in, as shown here:

{
 "type": "file",

You can use the Web Console to create a second storage configuration for the file plug-in using the Storage tab. Drill provides no default; it is handy to copy and paste an existing JSON definition.

The default dfs configuration points to your local filesystem and is handy for learning Drill. Change this to point to HDFS or Amazon S3 for a production system.

There is nothing magic about the dfs name, by the way. Feel free to instead create a local configuration for your local filesystem, and an hdfs or s3 configuration for your distributed storage.

Here is how to create the example local storage configuration we’ll use in this section:

  1. Start Drill, as explained earlier.

  2. Start the Drill web console, as explained earlier.

  3. Click the Storage tab.

  4. Find the default dfs plug-in, and then click Update.

  5. Select all the JSON text and copy it to the clipboard.

  6. Click Back.

  7. In the New Storage Plugin area, enter the name local, and then click Create.

  8. Paste the JSON into the editor.

  9. Replace the contents of the workspaces object with the following:

    "workspaces": {},
  10. Click Update.

Workspaces

File storage configurations provide several properties to customize behavior. The storage configuration by itself refers to the root of the target filesystem. The local configuration you just defined points to the root of your local filesystem: that’s what the file:// in the configuration means.

The GitHub repositiory for this book contains a data directory that contains the files used in this section. Suppose your download these files to your home directory in /Users/arina/drillbook. You can query the files using the following:

SELECT * FROM `local`.`/Users/arina/drillbook/data/drill/cust.csv`

The backticks are sometimes optional, but they are required if a name contains a character that is not a valid symbol character in SQL or if the name is the same as a SQL keyword. The simplest rule is to always enclose schema, table, and column names with backticks.

Using an absolute path is often unwieldy, however. You might have test and production filesets. If you share local queries with others, they will need to edit the file paths to work for their machines before running the queries. And data might move around in HDFS, for various reasons.

To avoid this issue, Drill defines the idea of a workspace, which is just a named subdirectory. So, you might define a data workspace for the book’s data directory:

  1. In the Drill Web Console, click the Storage tab.

  2. Find the local configuration that you created earlier, and click Update.

  3. Edit the JSON to revise the workspaces element as shown here:

    "workspaces": {
       "data": {
           "location": "/Users/arina/drillbook",
           "writable": true,
           "defaultInputFormat": null,
           "allowAccessOutsideWorkspace": false
        }
    }
    

    Replace the path in the preceding example with the location where you stored the book files.

  4. Click Update.

Then, you can reference the workspace in a query:

SELECT * FROM `local`.`data`.`cust.csv`

In SQL terms, the configuration (with optional workspace) is like a database (MySQL) or schema (Oracle). The end result is that, either way, Drill now knows the location of the file that you want to query.

Querying Directories

If you have multiple files in a directory, you can use the directory name as a table name in a query. Drill will query all files within that directory. Suppose that you have a directory cust containing multiple customer CSV files. Here’s how to query all of the files within that directory:

SELECT * FROM `local`.`data`.`cust`

For this to work, the files must be of the same type (all CSV, for example). However, you cannot query the workspace directly. That is, both of the following are invalid:

SELECT * FROM `local`.`data`  -- Not legal
SELECT * FROM `local`.`data`.`/` -- Not legal

Thus, you need to define the workspace to point one level above your data directory.

Workspaces are particularly useful when working with partitioned data, as explained in “Partitioning Data Directories”.

Special Files in Directory Scans

When scanning a directory, Drill ignores files whose names begin with either a period or an underscore. Indeed, Drill creates such files for its cached Parquet metadata. If, for some odd reason, you are presented with such a file, you can query it by spelling out the filename. For example: SELECT * FROM `_oddFile.csv`.

Default Schema

Drill supports the SQL USE command to declare a default schema. To try this, start a local Drill:

cd $DRILL_HOME
bin/sqlline -u jdbc:drill:drillbit=localhost

Now you can use local as your default schema:

USE `local`;
SELECT * FROM `/Users/arina/drillbook/data/cust.csv`;

Both storage configurations and workspaces can become the default schema:

USE `local`.`data`;
SELECT * FROM `cust.csv`;

The default workspace is a session property: it remains in effect as long as you are connected to Drill and must be defined on each new connection. If you have authentication enabled, the Web Console is also stateful and USE (along with ALTER SESSION) will work. Without authentication, the Web Console is stateless and the USE statement is in effect only for the single request.

File Type Inference

The storage configuration and optional workspace instruct Drill where to look for files. Next, Drill must figure out the type of each file that it will scan. Drill uses the format plug-in mechanism to support an extensible set of formats.

Format Plug-ins and Format Configuration

Storage plug-ins have two parts: the plug-in (code) and configuration (JSON). Format plug-ins follow this same pattern. In Drill, file formats are extensible; we describe how to write your own format plug-in in Chapter 12.

Storage configurations appear in the Storage section of the UI. However, format plug-ins appear as part of some dfs-based storage plug-in: be it local, hdfs, s3, or similar. (Format plug-ins are supported only for the dfs storage plug-in, other plug-ins do not support format plug-ins.)

Format configurations allows us to fine-tune file formats for different types of data files, as we discuss in a few moments. You will see the default set of format configurations when you use the Drill Web Console to update a storage configuration as we did previously. If you delete a format configuration from a storage configuration, that file type won’t be available within that configuration. Similarly, if you write or obtain a new format plug-in, you must add a format configuration to your storage configuration in order to use the new format.

Drill ships with a default set of format configurations in the default dfs storage configuration. Here is one example:

"csv": {
   "type": "text",
   "extensions": [
      "csv"
    ],
    "delimiter": ","
},

The name csv is purely for documentation; Drill does not use it. The important bits are what follows. The type names the underlying format plug-in—text, in this case. The text format plug-in is actually quite flexible and is used to define the psv and tsv formats as well. What makes this a CSV format are the property values. The extensions field identifies the file suffix (extension) used for this format. The delimiter field specifies that this is a comma-separated file.

You can customize these as needed, often by creating a custom storage configuration to hold your custom format configurations. Just be sure to give each a unique name and to map each to a distinct file suffix (more on this shortly).

Drill uses the file suffix to associate the file with a format configuration. It then uses the format configuration to locate the underlying format plug-in.

Format Inference

We can now assemble the all of this information to describe how Drill picks a format configuration for each file. Suppose that you have this query:

SELECT * FROM `local`.`data`.`cust.csv`

Drill proceeds according to the following steps:

  1. Look up the schema local to find the storage configuration.

  2. Use the type field in the storage configuration to find the storage plug-in implementation.

  3. Use the next name, data, to find a workspace within the configuration.

  4. Use the workspace to find the working directory for the query.

  5. Use the working directory to find the target file(s).

  6. Use the file suffix (in the query or by scanning a directory of files) to find the associated format configuration.

  7. Use the type of the format configuration to find the format plug-in implementation. The format plug-in provides the reader to use to deserialize the data, as discussed in a few moments.

Drill now has all the information it needs to perform the file scan: the filesystem, the directory that contains the file, the name of the file to scan, and the format of the file.

Knowing this format inference process can be handy when things go wrong. (Drill’s error messages are, to put it kindly, cryptic, so you must often figure out the problem yourself.)

File Format Variations

Files come in many varied formats. CSV files are notorious for having many standards (with or without headers, quoted or nonquoted strings, allowing multiline field values or not, etc.). As we saw, CSV files themselves are one variation of a more general “text” format with other variations such as pipe-separated values (PSV), tab-separated values (TSV), and so on.

The format configurations allow you to fine-tune the scan to your exact file format. This works well as long as each format variation has its own distinct file suffix: .csv for CSV files with a header, .psv for pipe-separated values, .tsv for tab-separated values, and so on.

However, because CSV has so much variation, you might find a situation in which some of your CSV files have headers whereas others do not, yet both variations end with .csv. To handle this, store the different file formats in different directories and define a storage configuration (not just workspace) for each. So, you might pull your store sales files into a stores directory and define a stores storage configuration for that directory. Then, in the stores configuration, you can configure the .csv extension to refer to a CSV format configuration that includes headers.

The general rule is that if files have distinct formats that require distinct format configurations, you must give them distinct suffixes. If you cannot do that, you must store them in separate directories, referenced by distinct storage configurations.

Schema Inference Overview

Internally, Drill uses a unique late-binding schema model. Unlike traditional schema-driven engines, the Drill query planner makes very few assumptions about the columns in your tables. This is why Drill (or at least its query planner) is sometimes called “schema-less.”

As discussed in Chapter 9, the planner converts your SQL query into a directed acyclic graph (DAG), which consists of operators grouped into fragments. The leaf operators are file scans. You might at first think that Drill discovers the schema when it reads the data in the scan operators. However, this is only partly true. The implementation is actually much more subtle—and it is this subtlety that we discuss in this chapter so that you can get the most out of Drill.

The scan operator itself discovers the schema of each file (or file block) as the scan operator proceeds. But because Drill runs many scan operators in parallel, it is possible for some scan operators to read one schema (perhaps the current version of the schema), whereas others read a different schema (perhaps an older version of the schema).

Allowing a single query to read two distinct schemas is not actually a bug, as it might first seem; instead, it is an important feature because it allows Drill to work with a collection of files even as the schemas of those files evolve (often called schema evolution).

Recall that Drill divides a query into major fragments. Perhaps the leaf fragment scans a table and applies a filter. Drill then runs many copies of this fragment in parallel; these instances are called minor fragments.

Let’s focus on minor fragment 1. The scan operator in that fragment begins to read a file and discovers the schema for that file (schema-on-read). Drill gathers records into batches. The scan operator sends the batches downstream to the filter operator. How does the filter operator learn the schema? By looking at the schema of the first batch of data. For all operators except scans, Drill uses code generation to create custom code that does the work for the operator. In this case, the filter operator in fragment 1 uses the schema of the first batch to generate its code and then processes the batch.

Suppose that, for some reason, the scan operator in fragment 1 encounters a later record with a new schema (as can happen for, say, a JSON file). The scan operator passes the new batch of records along with a flag that says, “Hey, the schema for this batch is different than for the previous ones.” This is called a schema change. Although some operators in Drill can handle a schema change, others cannot. We discuss these cases shortly.

Meanwhile, over on another node, in fragment 2, the exact same process occurs for a different file or block in the same file. Because Drill uses a shared-nothing distribution architecture, the two fragments do not communicate directly; they make their own independent schema decisions. In a pathological case, fragment 1 will happily read a file with the schema (a, b), whereas fragment 2 reads a file with schema (c, d).

Eventually, the two schemas must be merged, which happens after a network exchange. Fragments 1 and 2 both send their rows (actually, batches of rows called record batches) to a common fragment, maybe the root or an intermediate fragment. Let’s assume that they send their rows to a sort operator. The sort operator receives the schemas from the two leaf fragments. If the schemas are identical (the typical case), all is fine. But, if they differ (as in our pathological example), the sort will fail with a schema change exception because it does not know how to combine mismatched schemas.

On the other hand, if there is no sort (or other schema-sensitive operator), the record batches will be sent all the way back to the client. The native Drill client handles these cases: it tells you the schema for each batch of data as it arrives. You can then apply your own rules to work with the data. The native Drill client is thus very powerful, having been designed to handle varying schemas.

However, you will most likely use the JDBC or ODBC clients, which cannot handle schema changes. In this case, the driver itself will detect the schema change and throw an error if it cannot handle the change.

The conclusion is that Drill as a whole is very flexible in dealing with multiple schemas per table, but various operators (such as the sort operator discussed earlier) and clients (such as JDBC and ODBC) have inherent restrictions that limit your ability to exploit divergent schemas. If your tables have a single, consistent schema, everything just works—but if your schema varies or is ambiguous, you need to know some tricks to make things work. We explore those tricks a bit later.

Note

At the time of this writing, the Drill community was beginning to discuss adding a schema mechanism to Drill. If such a mechanism becomes available, you might be able to work around some of the schema-on-read issues by declaring a schema up front so that Drill knows how to resolve schema conflicts when they occur. Drill also supports Hive metastore, which does much the same thing.

Distributed File Scans

Let’s take a deeper look at how Drill scans files and decodes records. As we said earlier, Drill is a distributed query engine, meaning that it tries to spread file scanning work as widely as possible across your cluster. If you have a single machine, such as when running Drill on your laptop, your “cluster” is just one node. Regardless of cluster size, Drill runs multiple scans per Drillbit; by default, the number of scans is about 70% of the number of CPU cores on the node. It is helpful to know how Drill divides up the scanning work.

If your query references a directory, Drill creates a separate scan task for each file. If you use the local filesystem (the file:// protocol), this is the maximum parallelism Drill can achieve.

In Drill, leaf minor fragments are those that directly scan data files. Each leaf fragment contains one or more readers, each of which scans a single block of the file for splittable files on HDFS, or the entire file for nonsplittable or local files.

As discussed in the previous section, Drill divides a query into a set of major fragments, each of which is parallelized into a set of minor fragments. Each minor fragment contains one or more operators and an exchange. Drill distributes minor fragments symmetrically across Drillbits. Leaf fragments are those that read data. Drill creates leaf fragments on each data node. The number of leaf fragments per node depends on the number of CPUs on that node. (Drill assumes nodes are symmetric; all nodes have the same number of CPUs.) In the preceding example, if the nodes have eight CPUs, Drill will run something like six minor fragments per node. If you query the entire file, with 32 blocks per node, then each minor fragment will read 5 or 6 blocks.

The Drill web console provides a tool to visualize these concepts: you will see major fragments or stages visualized graphically. Then, in the associated tables, you see the worker tasks, or minor fragments for each stage. At least one set of minor fragments will be for the scan. What might not be apparent from the UI is that each scan minor fragment actually runs one or more individual readers, each of which scans a single block of an HDFS file.

Consider the following query which sorts a large file, then throws away the results (which provides a non-trivial plan without overloading our query tool):

SELECT t.a, t.b FROM (
  SELECT a, b
  FROM `dfs`.`/user/test/sample.csvh`
  ORDER BY b) t
WHERE t.a < 0

The resulting visualized plan is shown in Figure 8-1.

Figure 8-1. Visualized plan for the sample query

Because data is replicated, Drill has a great deal of flexibility in how to assign scan operators to nodes. In general, Drill tries to run a scan operator on a node that contains the block to be scanned, but also tries to avoid skew by ensuring that all nodes have the same number of scan operators ±1. In extreme cases, Drill will perform remote reads (running the scan on a node remote from the data and transferring the data over the network) when necessary to avoid skew.

From this description, we can learn two things:

  • We get better performance from Drill when data is stored in a splittable file format such as Parquet, most versions of CSV, and so on. By contrast, JSON is not splittable: even if we have a 1 GB JSON file, Drill must read that entire file in a single scan operator, which causes skew and very likely requires remote reads.

  • Much of Drill’s internal work entails moving data across the network from scan nodes, to internal worker tasks, to the Foreman node (the one that runs the query), and finally back to the client (such as JDBC, ODBC, SQLLine, etc.).

Schema Inference for Delimited Data

Schema inference is the process of creating a relational schema for each file, or file block, as Drill reads it. Schema inference is only as good as the data itself. If the data contains sufficient hints, Drill can do an excellent job of inferring the schema. But certain data patterns that we will encounter later in the chapter can run into the limitations of schema inference. The general rule of thumb is that schema inference cannot predict the future; it can only react to data as it is read.

As noted earlier, Drill infers schemas in two main ways:

  • From the data itself (specifically, from the first record of the file or file block)

  • From metadata stored in the same file as the data, as in Parquet

We will explore schema inference from the simplest to the most complex cases using some of the file formats most commonly used with Drill.

All of the sample files used here reside in the book’s GitHub repository. We begin by setting the GitHub repository’s data folder as the default schema:

USE `local`.`data`;

CSV with header

The simplest file format for Drill is CSV. Drill supports a wide variety of text file formats, but CSV is the most widely used. A CSV file can have a header, or it can exclude the header. In Drill, this option is configured in the format plug-in configuration, as described in “Format Plug-ins and Format Configuration”.

You indicate to Drill that your CSV file has a schema via the format configuration. The default dfs storage configuration defines a csvh (CSV with header) format:

"csvh": {
    "type": "text",
    "extensions": [
    "csvh"
    ],
    "extractHeader": true,
    "delimiter": ","
}

You can copy this into the local storage configuration you created earlier. Because Drill considers CSV files with headers to be a different format than CSV files without headers, we must use a distinct file suffix for our examples with headers: .csvh. By default Drill uses .csv for files without headers and .csvh for files with headers, but you are free to assign suffixes as you want as long as each format has a distinct suffix or they are declared in distinct storage configurations.

For files with a header, every reader starts by reading the header line, which must be the first line of the file. Even if the CSV file is large enough to be split into blocks, each of which is read by a different reader, all readers start by reading the header line.

For example, suppose we have the file csvh/cust.csvh:

custId,name,balance,status
123,Fred,456.78
125,Betty,98.76,VIP
128,Barney,1.23,PAST DUE,30

What is the schema of this file? As a human, we might know that this is a simple customer list with customer ID, name, balance, and status. We might know that the customerID is an INT and the balance is a FLOAT.

But all Drill knows is that this is a CSV file: a file consisting of comma-separated values. We use the query to impose types beyond that inferred from the file. For example, run this query:

SELECT * FROM `csvh/cust.csvh`;
+---------+---------+----------+-----------+
| custId  |  name   | balance  |  status   |
+---------+---------+----------+-----------+
| 123     | Fred    | 456.78   |           |
| 125     | Betty   | 98.76    | VIP       |
| 128     | Barney  | 1.23     | PAST DUE  |
+---------+---------+----------+-----------+

Drill’s CSV schema inference has determined two things from our file:

  • The file contains four columns

  • The names of each column

The format plug-in creates each column as type (non-nullable) VARCHAR. The fields are all VARCHAR because Drill does not attempt to guess column types. This might be surprising, because some tools, such as Excel, will sample the data to infer column type. However, Drill does not do type inference for CSV files.

The first record of our file omitted the status field. Drill knows the type of the missing field must be VARCHAR and that CSV columns are required; that is, they are not nullable. So, recent versions of Drill simply fill the missing field with an empty string, as if the line were the following:

123,Fred,456.78,

The third record had an extra fifth field at the end (the number of days past due). Drill ignored this field because the header declared only four fields.

Explicit projection

Explicit projection describes the case in which the query specifies the columns to read from a table. By contrast, implicit projection, or wildcard projection, occurs when we use SELECT *.

Suppose we use the same CSV file as in the previous example but with the following query:

SELECT CustId, Name, Address FROM `csvh/cust.csvh`;
+---------+---------+----------+
| CustId  |  Name   | Address  |
+---------+---------+----------+
| 123     | Fred    |          |
| 125     | Betty   |          |
| 128     | Barney  |          |
+---------+---------+----------+

Note that the Address column does not actually appear in the file. Because Drill uses schema-on-read, it cannot know that Address is missing until it actually reads the file. After Drill realizes that the field is missing, it will create a dummy column. Often Drill creates the column as nullable INT. But because Drill knows that all CSV columns are always of type VARCHAR, it creates Address as non-nullable VARCHAR and fills the column with empty strings.

Notice that we have subtly renamed the columns. The customer ID column is shown as custID in the CSV file and when using a wildcard query. But, because we referenced the name as the capitalized CustId in the explicit project list, that is how Drill returned the column. Here is the general rule:

  • Drill uses the name as it appears in the projection list, if listed explicitly.

  • Drill uses the name as it appears in the table for wildcard queries.

TypeOf functions

We can determine part of the field type using Drill’s typeof() function, as demonstrated here:

SELECT typeof(custId) AS custId_type FROM `csvh/cust.csvh` LIMIT 1;
+--------------+
| custId_type  |
+--------------+
| VARCHAR      |
+--------------+

This function has a couple of limitations, however. First, it won’t show if the column is nullable, non-nullable, or repeated (an array). Second, if the column contains a NULL value, typeof() reports the type as NULL even though Drill does not actually have a NULL type. This makes it difficult to illustrate some of the issues described here using the current Drill version.

Further, typeof() usually but not always reports Drill’s internal type names, not the type names you use in SQL statements.

We can use typeof() to spy on column types to see what Drill thinks the type is within the aforementioned limitations:

SELECT typeof(custId) as custId_type, typeof(address) AS addr_type
FROM `csvh/cust.csvh` LIMIT 1;
+--------------+------------+
| custId_type  | addr_type  |
+--------------+------------+
| VARCHAR      | VARCHAR    |
+--------------+------------+

To work around these limitations, we contributed two new functions to Drill for inclusion in Drill 1.14. If you are using an earlier version of Drill, only typeof() is available.

The first function, sqlTypeOf(), returns the actual SQL type name for a column, whether that column is NULL or not. The SQL type name is the name that you can use in a CAST (discussed later) to force a column to that type.

Second, modeOf(), returns the cardinality of the column, which Drill calls the mode: NULLABLE, NOT NULL, or ARRAY.

We use these new functions next to get a clearer view of Drill’s internal data types.

Casts to specify types

We as humans can use our extra knowledge of the columns to see that custId is really an INT and balance is a FLOAT. Or, said another way, we might choose to represent those columns as those types for some purposes. In Drill, we impose the types by forcing a data conversion using a CAST. We can apply a CAST only with explicit projection; that is, when we name columns, as demonstrated in the following example:

SELECT CAST(custID AS INT) AS custId, name,
    CAST(balance AS FLOAT) AS balance
FROM `csvh/cust.csvh`;
+---------+---------+----------+
| custId  |  name   | balance  |
+---------+---------+----------+
| 123     | Fred    | 456.78   |
| 125     | Betty   | 98.76    |
| 128     | Barney  | 1.23     |
+---------+---------+----------+

We can use the type functions to ensure that the types were converted correctly:

SELECT sqlTypeOf(custId) As custId_type,
       modeOf(custId) AS custId_mode,
       sqlTypeOf(balance) AS bal_type,
       modeOf(balance) AS bal_mode
FROM (
  SELECT CAST(custID AS INT) AS custId,
         name, CAST(balance AS FLOAT) AS balance
  FROM `csvh/cust.csvh`)
LIMIT 1;
+--------------+--------------+-----------+-----------+
| custId_type  | custId_mode  | bal_type  | bal_mode  |
+--------------+--------------+-----------+-----------+
| INTEGER      | NOT NULL     | FLOAT     | NOT NULL  |
+--------------+--------------+-----------+-----------+

The columns retain the NOT NULL cardinality from the original VARCHAR column.

We can now apply numeric operations to the numeric values. It is often handy to express this as a nested query, here using nonsensical operations:

SELECT AVG(custId) AS avgId, SUM(balance) totalBal
FROM (
  SELECT CAST(custID AS INT) AS custId,
         name, CAST(balance AS FLOAT) AS balance
  FROM `csvh/cust.csvh`)
+---------------------+--------------------+
|        avgId        |      totalBal      |
+---------------------+--------------------+
| 125.33333333333333  | 556.7700009346008  |
+---------------------+--------------------+

The key thing to remember, however, is that we treated the two fields as numbers only because we wanted to for this query: we could also keep them as VARCHARs if that were more convenient for some other operation.

CSV Summary

In summary, for a CSV file with headers:

  • The number of columns and the column names are specified by the headers in the first line of the file. If any name repeats, Drill will create a unique field name by appending a number. If a name is blank, Drill uses EXPR$0 and so on.

  • The type of every column is non-nullable VARCHAR.

  • Extra columns past those described in the header are ignored.

  • Columns declared in the file header but missing from a row are set to a blank VARCHAR.

  • Columns referenced in the SELECT clause but missing from the file are set to a blank VARCHAR and filled with empty (not NULL) values.

  • Columns are read as VARCHAR. Use a CAST to convert a column to a different type.

CSV without a header row

When a CSV file does not provide a header row, Drill uses a different rule to infer the schema as you saw in Chapter 4. Consider a simple example (csv/fred.csv):

123,Fred,456.78

We discussed CSV files with headers, for which Drill can determine the column count and names. Without headers, Drill does not know the names of the columns. Without names, Drill is not even sure of the column count that the next record could well have. Maybe some customer records have an additional loyalty level—Betty turns out to be very loyal:

125,Betty,98.76,VIP

To handle this case, Drill supports array (so-called repeated) types. So, Drill will represent each row using the special columns column.

SELECT * FROM `csv/cust.csv`;
+--------------------------------+
|            columns             |
+--------------------------------+
| ["123","Fred","456.78"]        |
| ["125","Betty","98.76","VIP"]  |
+--------------------------------+

Although this file has three or four columns, Drill stores each record in a single columns column as an array of VARCHAR. As it turns out, CSV is the only file format that uses the columns approach.

Again, you can use the type functions to see this in action:

SELECT sqlTypeOf(columns) AS cols_type,
       modeOf(columns) AS cols_mode
FROM `csv/cust.csv` LIMIT 1;
+--------------------+------------+
|     cols_type      | cols_mode  |
+--------------------+------------+
| CHARACTER VARYING  | ARRAY      |
+--------------------+------------+

Explicit projection

Sometimes, the array of values is all you need. But neither JDBC nor ODBC handles arrays very well, so most often you’ll want to “parse” the array into named columns. Notice that indexing starts at 0:

SELECT columns[0] AS custId,
    columns[1] AS custName, 
    columns[2] AS balance,
    columns[3] AS loyalty
FROM `csv/cust.csv`;
+---------+-----------+----------+----------+
| custId  | custName  | balance  | loyalty  |
+---------+-----------+----------+----------+
| 123     | Fred      | 456.78   | null     |
| 125     | Betty     | 98.76    | VIP      |
+---------+-----------+----------+----------+

Notice also that Fred’s missing loyalty value is shown as NULL. This is different from the CSV-with-headers case, in which missing values are returned as empty VARCHARs. Although Drill provides no way to visualize the fact, the columns we created here are nullable VARCHAR, unlike the CSV-with-headers case in which the columns are non-nullable VARCHAR.

If you want a blank value to be consistent with the CSV-with-headers case, you can manipulate the data:

SELECT custId, custName, balance,
    CASE WHEN loyalty IS NULL THEN '' ELSE loyalty END
FROM (
  SELECT columns[0] AS custId,
      columns[1] AS custName, 
      columns[2] AS balance,
      columns[3] AS loyalty
  FROM `csv/cust.csv`);

And of course, as in the previous section, you can use CAST statements to convert the text data to some other type when needed.

In summary, for CSV files without headers:

  • All columns are represented by the special columns array of type repeated VARCHAR.

  • The length of the array depends on the number of fields on a given line and will differ across records if the number of fields differs across lines in the CSV file.

  • If a value is not available in the columns array, you can still request it, but the value returned is NULL.

Schema Inference for JSON

The Drill project website notes that Drill uses JSON as its native data model. That is, Drill can support (a subset of) the same structures that JSON does:

  • Scalar types

  • Null values (unlike JSON, Drill’s nulls are typed)

  • Lists

  • Maps

In practice JSON is much more expressive than Drill, for the simple reason that JSON can express arbitrary tree-structured data, but Drill must coerce all input into an extended relational structure.

Limitations of JSON Data

An inherent limitation of JSON is that no matter how large your JSON file is, Drill cannot parallelize the scan of a single file. This is a limitation of the JSON format itself. Splittable formats require a clear “end-of-record” marker (such as a newline); however, JSON provides no such marker. Still, if your query reads multiple JSON files, Drill will read the files in parallel.

JSON column names

Drill infers column names from the names of JSON map elements:

{"column1": 10, "column2": "foo"}

Here’s a simple query:

SELECT * from `json/quoted.json`;
+----------+----------+
| column1  | column2  |
+----------+----------+
| 10       | foo      |
+----------+----------+

As a harmless extension to JSON, Drill does not require column names to be enclosed in quotes, so the following is also valid:

{column1: 10, column2: "foo"}

SELECT * from `json/unquoted.json`;
+----------+----------+
| column1  | column2  |
+----------+----------+
| 10       | foo      |
+----------+----------+

In JSON, column names are case sensitive. So, the following has two columns:

{a: 10, A: 20}

Drill, however, follows the SQL standard in which column names are case insensitive. Drill treats the preceding example as a single column; the first field provides the name, but the second value overwrites the first:

SELECT * FROM `json/ambig.json`;
+-----+
|  a  |
+-----+
| 20  |
+-----+

JSON scalar types

JSON supports a small set of scalar types: number, string, null, true, and false. Drill maps the JSON types to Drill types, which you can see listed in Table 8-1.

Table 8-1. Drill versus JSON data types
JSON type Drill data type
number

Nullable DOUBLE if the number contains a decimal point;

nullable BIGINT otherwise

string Nullable VARCHAR
true, false

Nullable BOOLEAN (a single-byte, unsigned integer with true = 1 and false = 0)

null A NULL value for a column type previously inferred

Limitations on Schema Inference

Recall the key rule about Drill’s schema inference: Drill cannot predict the future. Drill infers column type using the first row only. (Actually, Drill infers column type from the first appearance of each column, which is often, but not necessarily, in the first row.)

We use the term schema ambiguity to refer to the case in which information in the first row is insufficient for Drill to select the proper column type. You can often resolve the ambiguity using a set of session options and with the clever use of query features.

Of course, if your JSON files are well structured, you may not run into the schema ambiguity issues. On the other hand, in a data lake, you often don’t have control over the format of JSON files created by others. If a file contains nulls or missing values, is sloppily written, or has a schema that has changed over time, you might run into schema ambiguity issues and can resolve them as described here. Converting such files to Parquet is your best production solution; use the solutions here to explore the files prior to conversion.

Ambiguous Numeric Schemas

JSON has only one numeric type: number. But Drill has multiple numeric types, and chooses a type for the field based on the first record. In general, if the first value contains a decimal point, Drill chooses DOUBLE; otherwise, it chooses BIGINT.

Because Drill is more strict than JSON, the “can’t predict the future” rule can surprise the unwary. Suppose we have the following JSON file (json/int-float.json):

{a: 10}
{a: 10.1}

A query against this table fails:

SELECT * FROM `json/int-float.json`;
Error: INTERNAL_ERROR ERROR: You tried to write a Float8 type when
  you are using a ValueWriter of type NullableBigIntWriterImpl.

The problem is that Drill infers the type BIGINT from the first value and then fails because the second value is not a valid BIGINT. You cannot fix this problem in the query because the problem occurs during read, long before your query statements take effect.

Drill provides a solution, albeit one that’s a bit awkward. Like many SQL engines, Drill lets you customize various settings at runtime. You can make a change for just the current login session using ALTER SESSION. Or, if you have admin rights, you can change the setting for all users in all sessions using ALTER SYSTEM. Because changing a system setting might cause queries to behave differently, be very cautious before making such system-level changes. Here, we’ll make all the changes for just the one session; the options will go back to their default values in the next session.

If your file has pathological cases like this, you can read all numbers as DOUBLE:

ALTER SESSION SET `store.json.read_numbers_as_double` = true;

The query will now work fine because Drill is given a hint to use the DOUBLE type for numbers, so there is no conflict. The awkward bit is that you must remember to set the option before each query of the troublesome file, and you must remember to reset it afterward:

ALTER SESSION RESET `store.json.read_numbers_as_double`;

Some query tools don’t provide a convenient way to issue such statements. Further, you cannot encapsulate such statements in a view. As a result, it is much better to avoid creating such ambiguous files in the first place.

Note

The session options must be set in each connection session before querying the file. You must reset them before querying the next file. This means that the users of Drill must know which options to set for each file. This is a good motivation to convert troublesome files to Parquet, as explained later in the chapter.

Mixed string and number types

A similar issue occurs if you mix string and number types that are distinct JSON types. Mixing of types can occur if the JSON schema evolves. Perhaps a product number starts as a number but is later changed to a string to allow codes such as 104A. Or perhaps a sloppy script writes numbers as strings and is later corrected, as in this case (json/int-str.json):

{a: 10}
{a: "20"}

Try to query this file:

SELECT * FROM `json/int-str.json`;
Error: INTERNAL_ERROR ERROR: You tried to write a VARCHAR type when you are using 
a ValueWriter of type NullableFloat8WriterImpl.

Here you can use an even more general solution, all-text mode, which instructs JSON to read all scalar fields as nullable VARCHAR:

ALTER SESSION SET `store.json.all_text_mode` = true;
SELECT * FROM `json/int-str.json`;
+-----+
|  a  |
+-----+
| 10  |
| 20  |
+-----+

It is not clear from the SQLLine output, but the type of the preceding fields is VARCHAR, which you can verify as follows:

SELECT typeof(a) AS a_type FROM `json/int-str.json` LIMIT 1;
+----------+
|  a_type  |
+----------+
| VARCHAR  |
+----------+

If you meant for the field to be a number, you can add a CAST:

SELECT CAST(a AS BIGINT) AS a FROM `json/int-str.json`;
+-----+
|  a  |
+-----+
| 10  |
| 20  |
+-----+

Missing values

JSON is very flexible, allowing you to simply omit values if they are not needed. That is, the following is perfectly valid (json/missing2.json):

{custId: 123, name: "Fred", balance: 123.45}
{custId: 125, name: "Barney"}

When Drill encounters this, it simply fills in a NULL for the missing value:

SELECT * FROM `json/missing2.json`;
+---------+---------+----------+
| custId  |  name   | balance  |
+---------+---------+----------+
| 123     | Fred    | 123.45   |
| 125     | Barney  | null     |
+---------+---------+----------+

The “can’t predict the future” rule can trip you up if the file contains too many missing fields at the beginning. Consider the following file (json/missing3.json):

{a: 0}
{a: 1}
{a: 2, b: "hello there!"}

Now consider this query:

SELECT a, b FROM `json/missing3.json` WHERE b IS NOT NULL ORDER BY a;
+----+---------------+
| a  |       b       |
+----+---------------+
| 2  | hello there!  |
+----+---------------+

Do the same but with 70,000 of the a-only records (gen/70kmissing.json). This time you get:

SELECT a, b FROM `gen/70kmissing.json`
WHERE b IS NOT NULL ORDER BY a;

Error: UNSUPPORTED_OPERATION ERROR:
  Schema changes not supported in External Sort.
  Please enable Union type.
Previous schema BatchSchema [fields=[[`a` (BIGINT:OPTIONAL)],
  [`b` (INT:OPTIONAL)]], selectionVector=NONE]
Incoming schema BatchSchema [fields=[[`a` (BIGINT:OPTIONAL)],
  [`b` (FLOAT8:OPTIONAL)]], selectionVector=NONE]

The reason is that Drill breaks data into batches of 65,536 or fewer records. The 70,000 records are enough to cause Drill to split the data into at least two batches. The first has only one column, but the second has two. Because you requested a column b, even in batches in which b does not exist, Drill invents a dummy column of type nullable INT. This change in schema confuses the sort operator. In Drill terminology, the preceding situation is a schema change.

Sometimes schema changes are benign, as you can see by running this query in SQLLine:

SELECT a, b,
       sqlTypeOf(b) AS b_type, modeof(b) AS b_mode
FROM `gen/70kmissing.json`
WHERE mod(a, 70000) = 1;
+--------+-------+----------+-----------+
|   a    |   b   |  b_type  |  b_mode   |
+--------+-------+----------+-----------+
| 1      | null  | INTEGER  | NULLABLE  |
| 70001  | 10.5  | DOUBLE   | NULLABLE  |
+--------+-------+----------+-----------+

The schema change is present, but because of the way SQLLine works, the change is harmless. Specifically, SQLLine converts all columns to string, regardless of type. Because both DOUBLE and INT columns can be converted to a string, SQLLine does not complain.

You can be surprised when you get an error when using a tool or Drill operator that is stricter about types. For example, the sort operation failed because it is such a strict operator. The error message says that the sort operator does not know how to combine a DOUBLE (called FLOAT8 internally) and an INT into the same column. Use the sqltypeof() function, as shown in the prior example, to see the type conflict if you run into this kid of error.

All-text mode should work to force the missing fields to VARCHAR, but a bug in Drill 1.13 (DRILL-6359) prevents that from happening. You can work around this by doing the conversion yourself:

SELECT a, CAST(b AS DOUBLE) AS b
FROM `gen/70kmissing.json`
WHERE b IS NOT NULL ORDER BY a;
+--------+-------+
|   a    |   b   |
+--------+-------+
| 70001  | 10.5  |
+--------+-------+

The general rule is that, with JSON, leading nulls can cause issues because Drill cannot predict the type that will eventually appear. Similarly, if a column is missing at the beginning of the file, Drill has no way to predict that the column will appear (or its eventual type). As a result, JSON works best with Drill when all columns are present with non-null values or if you add query logic to resolve the ambiguity.

Leading null values

We noted earlier that Drill treats missing JSON values as if the column were present, but contains nulls. As a result, the discussion about missing values applies to null values, as well.

JSON treats null as a type separate from all other types. However, in Drill (as in SQL), NULL is a state of a value, and that value must be of some type. That is, despite what the typeof() function seems to say, Drill has no NULL type, only null values.

Drill’s behavior with JSON nulls is a bit different from how it treats missing values in one way: when Drill gets to the end of the first batch, it must guess a type for the all-null column, and Drill guesses nullable INT. This is, in fact, an odd guess given that no JSON value is ever treated as INT, and so this Drill behavior might change in future releases. For example, given this file:

{a: null}
{a: null}

we get these query results:

SELECT a FROM `json/all-null.json`;
+-------+
|   a   |
+-------+
| null  |
| null  |
+-------+

You can see the type using new functions added in Drill 1.14:

SELECT sqlTypeOf(a) AS a_type, modeOf(a) AS a_mode 
FROM `json/all-null.json` LIMIT 1;
+----------+-----------+
|  a_type  |  a_mode   |
+----------+-----------+
| INTEGER  | NULLABLE  |
+----------+-----------+

The general rule is that you should include a non-null value somewhere in the first hundred records or so.

Null versus missing values in JSON output

As just noted, Drill internally represents JSON null and empty values in the same way: as a NULL value of some type. If you were to use a CREATE TABLE statement in Drill to copy your input JSON to an output JSON file, you would see that Drill omits columns with NULL values:

ALTER SESSION SET `store.format` = 'json';
CREATE TABLE `out/json-null` AS SELECT * FROM `json/null2.json`;

The output is:

{
  "custId" : 123,
  "name" : "Fred",
  "balance" : 123.45
} {
  "custId" : 125,
  "name" : "Barney"
}

The general rule is this: although a null and a missing value are different in JSON, Drill’s mapping of JSON into SQL equates them.

Aligning Schemas Across Files

Drill can query multiple files, and these are read in distinct threads or on distinct nodes, as explained earlier. As a result, the reader on one node cannot predict what another node will read. Because the JSON reader decides on a schema based on the first record in each file, the readers can settle on different schemas because of the aforementioned ambiguities. In this case, Drill will detect the problem only when the query attempts to combine the data from the various readers.

This behavior can lead to issues similar to those described earlier if you query multiple files and column b occurs in some files but not others:

json/missing/
  file1.json:
    {a: 1}
  file2.json:
    {a: 2, b: "foo"}

If Drill reads the preceding in the same minor fragment, this example might work if Drill happens to read file2 before file1 (the order in which Drill reads files is random). But the query will fail if Drill reads file1 before file2. How it fails depends on the query.

For example, in one test, the following query works:

SELECT a, b FROM `json/missing` ORDER BY a;

But if you rename file1.json to file3.json, the same query now produces an error:

Error: UNSUPPORTED_OPERATION ERROR:
  Schema changes not supported in External Sort.
  Please enable Union type.

Previous schema BatchSchema [fields=[[`a` (BIGINT:OPTIONAL)]],
  selectionVector=NONE]
Incoming schema BatchSchema [fields=[[`a` (BIGINT:OPTIONAL)],
  [`b` (VARCHAR:OPTIONAL)]], selectionVector=NONE]

You can use the sqlTypeOf() function to visualize the problem:

SELECT sqlTypeOf(b) AS b_type FROM `json/missing`;
+--------------------+
|       b_type       |
+--------------------+
| INTEGER            |
| CHARACTER VARYING  |
+--------------------+

You can apply the same workaround as earlier, including an explicit cast:

SELECT a, CAST(b AS VARCHAR) AS b FROM `json/missing` ORDER BY a;
+----+-------+
| a  |   b   |
+----+-------+
| 1  | null  |
| 2  | foo   |
+----+-------+

Again, the general lesson is this: ensure identical schemas across all of the JSON files to be read by a query. This means that if your schema evolves, you must go back and update any existing files so that they have a matching schema.

JSON Objects

JSON allows tree-structured data such as the following for a customer (json/nested.json):

{custId: 101, name: {first: "John", last: "Smith"}}

Try querying the file:

SELECT * FROM `json/nested.json`;
+---------+----------------------------------+
| custId  |               name               |
+---------+----------------------------------+
| 101     | {"first":"John","last":"Smith"}  |
+---------+----------------------------------+

In JSON, both the outer and inner groupings are objects. Drill converts the outer object to a record, and it converts the inner structure to a Drill MAP. Although Drill uses the name MAP for this data type, it is more like a Hive STRUCT: it is actually a nested record with its own set of named columns. It is not, as you might suspect from the name, a collection of name/value pairs. Chapter 5 discusses how to work with MAP columns.

Maps in JDBC

The MAP type is very useful, but should be used only within the query itself. Convert the map to a set of top-level columns when you use the JDBC or ODBC clients. JDBC does not support the idea of a nested tuple. JDBC does support, however, the idea of a column returned as a generic Java object. SQLLine uses JDBC to connect to Drill. SQLLine uses a special trick to make it look like JDBC does support the MAP type. The SQLLine tool retrieves the Java object value for each column and displays the data by calling the Java toString() method on that object. To ensure that the toString() method produces the nice formatting shown in the previous example, the Drill JDBC driver returns the MAP as the Java implementation of a JSON object. You can use this same trick in your own Java code that uses the Drill JDBC driver.

You can use the sqltypeof() function to verify that the column is actually a Drill MAP:

SELECT sqlTypeOf(`name`) AS name_type FROM `json/nested.json`;
+------------+
| name_type  |
+------------+
| MAP        |
+------------+

You can also peek inside the MAP to determine the type of the map’s members:

SELECT sqlTypeOf(`t`.`name`.`first`) AS name_type
FROM `json/nested.json` AS t;
+--------------------+
|     name_type      |
+--------------------+
| CHARACTER VARYING  |
+--------------------+

Since neither JDBC nor ODBC support MAP columns, you usually don’t want to return the MAP to the client. Instead, you want to flatten MAP columns using the Drill functions provided, or manually:

SELECT custId,
       `t`.`name`.`first` AS `first`,
       `t`.`name`.`last` AS `last`
FROM `json/nested.json` AS `t`;
+---------+--------+--------+
| custId  | first  |  last  |
+---------+--------+--------+
| 101     | John   | Smith  |
+---------+--------+--------+

Note the format required:

  • When referencing a MAP, you must prefix the entire name by a table name, or the SQL parser will think that the map name refers to a table.

  • Because Drill table names are complex, it is handy to give the table an alias as we did with `t`.

JSON allows fields to contain dots in their names. To identify that you want the name of a map followed by its member, you must enclose each name part in backticks, and separate the parts by dots outside the backticks.

Because the inner map is a separate structure, names in one map (including the top-level record) can be the same as those in another:

{a: {c: 10}, b: {c: 20}}

The names are unique in their full path names, however: `a`.`c` and `b`.`c`.

JSON itself places no restrictions on the contents of objects. Two records can have completely different object schemas (json/disjoint-nest.json):

{a: {b: 10}}
{a: {c: "foo"}}

If Drill encounters objects such as those in the preceding example, it simply adds each column to a compound map. That is, Drill treats the previous example as if it were the following:

{a: {b: 10, c: null}}
{a: {b: null, c: "foo"}}

This is, in fact, really just the same rule that we saw applied to the top-level record itself. So, the same caveats apply.

However, when displayed in SQLLine, it appears that Drill has treated the maps as separate schemas due to the “omit null columns” rule for SQLLine output:

SELECT * FROM `json/disjoint-nest.json`;
+--------------+
|      a       |
+--------------+
| {"b":10}     |
| {"c":"foo"}  |
+--------------+

Despite this convenient formating, both columns are present in both maps. As we saw earlier, when Drill renders a row (or MAP) as JSON it omits null values, giving the output shown in the preceding example.

JSON Lists in Drill

JSON provides a list type (json/int-list.json):

{a: [10, 20, 30]}

Querying this data produces the following results:

SELECT * FROM `json/int-list.json`;
+-------------+
|      a      |
+-------------+
| [10,20,30]  |
+-------------+
SELECT sqlTypeOf(a) AS a_type, modeOf(a) AS a_mode
FROM `json/int-list.json`;
+---------+---------+
| a_type  | a_mode  |
+---------+---------+
| BIGINT  | ARRAY   |
+---------+---------+

In JSON, a list is not a list of numbers or list of strings, it is simply a list of values, and the values can be of any type. That is, the following is perfectly fine JSON (json/mixed-list.json):

{a: [10, "foo", {b: 30}, ["fred", "barney"]]}

Again, Drill must pull a relational structure out of the JSON data, and it cannot do so for heterogeneous lists.1

Sometimes, clever programmers use JSON arrays as a record format: just list the values in order without the overhead of name/value pairs when using objects. However, Drill cannot read a JSON array that contains mixed scalar types (json/scalar-list.json):

{a: [ 123, "Fred", 123.45 ] }

Here’s what happens when you query the file:

SELECT * FROM `json/scalar-list.json`;
Error: UNSUPPORTED_OPERATION ERROR: In a list of type BIGINT,
 encountered a value of type VARCHAR. Drill does not support lists of different types.

The workaround is to use all_text_mode:

ALTER SESSION SET `store.json.all_text_mode` = true;
SELECT * FROM `json/scalar-list.json`;
+--------------------------+
|            a             |
+--------------------------+
| ["123","Fred","123.45"]  |
+--------------------------+

Now you can pull out the fields and cast them to the proper type:

SELECT * FROM (
  SELECT CAST(a[0] AS INT) AS custId,
         a[1] AS name,
         CAST(a[2] AS DOUBLE) AS balance
  FROM `json/scalar-list.json`);
+---------+-------+----------+
| custId  | name  | balance  |
+---------+-------+----------+
| 123     | Fred  | 123.45   |
+---------+-------+----------+
ALTER SESSION RESET `store.json.all_text_mode`;

The general rule in Drill is that if a column is represented by a JSON list, all the values in the list must be of the same type. This allows Drill to infer the type of the list as one of the types discussed earlier, but with repeated cardinality. Using all_text_mode is a workaround, but it has the fiddly issues discussed earlier.

JSON lists can include null values (json/int-null-list.json):

{a: [10, null, 30]}

However, Drill does not allow null values in lists. JSON data with nulls in lists will cause the query to fail:

SELECT * FROM `json/int-null-list.json`;
Error: UNSUPPORTED_OPERATION ERROR:
  Null values are not supported in lists by default.
  Please set `store.json.all_text_mode` to true
  to read lists containing nulls. Be advised that this will treat
  JSON null values as a string containing the word 'null'.

Here, the message helpfully explains the workaround:

ALTER SESSION SET `store.json.all_text_mode` = true;
SELECT * FROM `json/int-null-list.json`;
+---------------------+
|          a          |
+---------------------+
| ["10","null","30"]  |
+---------------------+
ALTER SESSION RESET `store.json.all_text_mode`;

In this case, there is no good way to convert the text fields to numbers short of turning them into top-level fields.

JSON allows a list to be null. Just as Drill does not differentiate between missing and null scalar values in JSON, it does not differentiate between missing, empty, and null lists. Consider this file (json/null-list.json):

{a: 1, b: [10, 20]}
{a: 2, b: null}
{a: 3, b: []}
{a: 4}

If you query the file, you get these results:

SELECT * FROM `json/null-list.json`;
+----+----------+
| a  |    b     |
+----+----------+
| 1  | [10,20]  |
| 2  | []       |
| 3  | []       |
| 4  | []       |
+----+----------+

The “can’t predict the future” rule applies to lists too. Here’s another example (json/empty-str-list.json):

{a: []}
{a: ["Fred", "Barney"]}

A query on this file will work if all of the data fits into one batch. But if the empty array fills one batch, Drill must guess a type for that batch. It will guess array of INT and then will fail when the second batch tries to convert VARCHAR values to INT values. For example:

SELECT * FROM `gen/70Kempty.json`;
Error: UNSUPPORTED_OPERATION ERROR: In a list of type INT, 
encountered a value of type VARCHAR. 
Drill does not support lists of different types.

The workaround is our old friend, all text mode:

ALTER SESSION SET `store.json.all_text_mode` = true;
SELECT * FROM `gen/70Kempty.json` WHERE a=70001;
+--------+--------------------+
|   a    |         b          |
+--------+--------------------+
| 70001  | ["Fred","Barney"]  |
+--------+--------------------+

Drill allows lists to contain other lists, a so-called repeated list. A list can also contain maps, which is a repeated map. In these cases, the use of all-text mode will not work to handle long runs of empty lists. Drill will guess text, but when the first non-null value appears and is an array or a map, Drill reports an error. The rule is that if the array is nonscalar, it must begin with a non-null value, or Drill will be unable to resolve the ambiguity. Another alternative is to do an ETL of the data into Parquet, which provides metadata to resolve the ambiguity.

JSON Summary

In summary, the following rules apply to JSON:

  • JSON keys become Drill column names.

  • JSON types map to specific Drill types.

  • Drill can handle columns with list or object values, with certain restrictions.

  • JSON object keys are case sensitive, but Drill column names are case insensitive.

To get the best results with Drill, take care to produce JSON that maps cleanly to a relational representation by doing the following:

  • Avoid null values in the first record.

  • For each column, avoid missing values early in the file.

  • Lists must be homogeneous with no null values.

  • Avoid empty lists in the first record.

  • Ensure that floating-point numbers always include a decimal point.

  • Avoid including some column in one set of files, but excluding it from another set.

A workaround for some but not all of these issues is to enable all text mode for the JSON table in the JSON format plug-in configuration, which will read all scalar columns as nullable VARCHAR and all scalar arrays as repeated VARCHAR. The query can then perform its own type conversions as demonstrated earlier.

Note

As mentioned in Chapter 5, Drill provides a Union type that might help to resolve these issues. However, this feature is still considered experimental.

Using Drill with the Parquet File Format

Apache Parquet has become Drill’s preferred file format, for many reasons. Drill developers have heavily optimized the Parquet reader. It has the best read performance across all data file formats and is the ultimate answer to the schema issues described for text files.

Parquet provides optimal performance because of the following:

  • Parquet is a columnar file format that allows Drill to read only those columns that are needed for a query. You see the maximum benefit when you project a subset of the available columns.

  • Parquet is compressed, minimizing the amount of input/output (I/O) needed to read data from disk at the cost of extra CPU to decode the data.

  • Parquet files contain schema metadata, which avoids the need for Drill to infer the schema from the data.

  • Parquet files often contain additional metadata, such as data ranges, which Drill can use to skip unneeded blocks of data.

Parquet has many of the benefits of a database’s own storage format, but in the form of a file that usable by many tools, including Drill. Parquet is not, however, a source file format: data often arrives in text or JSON formats. You then use an ETL process to create the Parquet files.

Some people use Drill for ETL, which works fine as long as the source files are well structured to ensure that Drill’s schema inference rules pick the right types, or you can apply the data manipulation techniques described earlier. If you encounter a file that requires Drill to predict the future in order to correctly infer types, Drill will be a poor choice for ETL for that file. Instead, you can use some other tool for ETL: Hive and Spark are popular options. Many commercial options are also available. ETL is also a good option when converting from file formats that Drill does not support.

The ETL process often writes the converted Parquet files into a partitioned structure, as discussed shortly.

Parquet is an evolving format, as is Drill’s support for Parquet. Although Drill should read most Parquet files written by other tools, and other tools can usually read Parquet written by Drill, there are exceptions. See the Drill documentation, or you can discuss it with the Drill community, to understand limitations in current Drill versions.

Schema Evolution in Parquet

Suppose that we have a directory, parquetDir, that contains two Parquet files: file1 and file2. file1 has a single column a of type Integer. file2 has two columns: the same column a, but also column b of type String.

As explained for JSON, if we do an explicit SELECT:

SELECT a, b FROM `parquetDir` ORDER BY b

Drill will fail for the same reasons explained earlier. From this we can see that we have to plan schema evolution carefully, even for Parquet. After we have a collection of Parquet files, we must plan how to handle the situation when columns are added to or removed from the files from which we create our Parquet files. Our ETL process that creates Parquet files should be rerun to re-create older files when we find the need to insert or remove columns with a type other than nullable INT. (Or, we should reprocess the older files to add or remove the columns in question.)

Partitioning Data Directories

Suppose that your data lake contains Parquet files that record the details of sales for each retail store per day, with data accumulated over the past three years. How might you store this data?

On the one hand, you could write the data for each store for each day into a separate file. Although this strategy is fine for small datasets, in this case you would create 100,000 files. If all the files reside in one large sales directory, Drill must scan all of them to answer each query. For example, suppose that you want only one month of data. Drill must still scan all the files to read the year column needed in the query:

SELECT ... FROM `sales` WHERE `year` = 2017

If you stored the data in a relational database, you would define indexes on the table, perhaps for date, store, product, and so on. With a date index, the database could quickly ignore all years except 2017.

However, indexes don’t work very well in the big data world. Instead, we use a related concept: partitioning. With partitioning, you divide data into subdirectories based on some key. Although a database can index on multiple columns (called “dimensions” in a data warehouse), partitioning works along only a single dimension. We then give Drill the information it needs to scan a subset of our partition subdirectories.

Typically, if your data is large enough to be considered “big data,” you have time series data: few other datasets grow to such large scales. In this example case, you have a time series of sales. Most queries will include a date range: “sales for stores in the Western region for product X over the last four quarters” or “sales by store for the last week.” Because most queries include dates, the date column is the natural one to use for your partitions:

sales
|- ...
|- 2017
    |- 01
    |- ...
    |- 12
        |- 01
        |- ...
        |- 31
            |- store001.parquet
            |- ...
            |- store100.parquet

Suppose that you want to query the sales over a month period that spans from December 15, 2017 to January 14, 2018. Let’s see how to do that.

Because Drill uses the directory structure as its own schema, we’ll write a query to work with our directory structure. We’ll write our query against the sales folder. We need to know that dir0 is the first level of subdirectory (year), dir1 is the second (month), and dir2 is the third (day). Even though we can include these implicit columns in the SELECT clause, they are most useful in the WHERE clause. We can now write the query like this:

SELECT ... FROM `sales`
WHERE (dir0 = '2017' AND dir1 = '12' AND dir2 >= '15')
   OR (dir0 = '2018' AND dir1 = '01' AND dir2 < '15')

This works, but it is not pleasant to use. A better strategy is to use a single level of directories using an ISO-encoded date as the directory name. For example, directories might be named 2016-04-21, 2016-04-22, … 2018-04-22. We would have a about 1,000 directories, each containing 100 files, one for each store on that date. For example:

sales
|- ...
|- 2017-12-31
   |- store001.parquet
   |- ...
   |- store100.parquet

With this structure, the preceding query becomes much easier to write:

SELECT ... FROM `sales`
WHERE dir0 BETWEEN '2017-12-15' AND '2018-01-14'

Although you could parse the date out from the dir0 value, it is more convenient to repeat the date within the file itself. For something like sales, the file will probably actually contain the full timestamp of the sale.

If the history were much larger, say 10 years of data, you might be forced to choose the multilevel structure. Unfortunately, if you do that, you must rewrite all the queries because the queries encode the directory structure. This is a case in which you want to get the structure right the first time.

Sometimes you can use the directory structure directly as part of the table name rather than in the WHERE clause. For example, if you want only sales for 2017:

SELECT ... FROM `sales/2017`

You can also use wildcards in directory or filenames. Suppose that you want only sales for the month of July using the one-level structure:

SELECT ... FROM `sales/*-07-*`

You can also use the same trick to pick out specific files across directories. For example, if you want only 2017 sales data for store 123:

SELECT ... FROM `sales/2017-*/store123.parquet`

Choose a structure that works for your data. The goal is to look at typical queries and group data so that most queries hit just a single partition at the leaf or higher level, and the query does not scan unnecessary data. Further, choose a partition structure that reflects the dimensions (columns) most likely to appear in the WHERE clause.

Although this section focused on Parquet files, partitioning works for all file formats and can be useful if your data arrives from a data source that partitions data as it arrives (typically by time).

In this example, if you know that most analysis is done by date across all stores, the partition-by-day format might be best. If, on the other hand, most analysis was done by store and then by date, perhaps a better partitioning structure would be to first partition by store and then by date.

Although we’ve focused on using partitions with Drill, partitions are a common big data structure used by many tools. For example, partitions in Drill are similar to those in Hive and serve the same purpose. The main difference is that Hive explicitly attaches a schema to a directory hierarchy, whereas Drill does not. Also, Hive encodes partition information in directory names, whereas Drill does not. Drill handles manual partitions as well as partitions created by Hive in identical fashion with no loss of functionality or performance.

If you use Drill for your ETL tasks, Drill itself can create the partitions using the PARTITION BYclause of the CREATE TABLE AS (CTAS) statement.

Defining a Table Workspace

The partition directories and files within the directories are essentially a table. Hive, in fact, defines the entire structure as a table and automatically works out how to scan the data. Because it uses no external schema, Drill is not aware of the table concept. The closest we can get in Drill is to simulate tables using directory structures. Suppose that you want to define multiple partitioned tables: sales, returns, promotions, and so on. You begin by defining a top-level directory with subdirectories for each table:

retail
|- sales
|- returns
|- promotions

Each of the child directories is partitioned, say by date. Next, you declare a workspace, and then you can reference your tables within that schema:

SELECT * FROM `retail`.`sales` LIMIT 10

When you use a directory name (only) in a query, Drill assumes that you want to scan all files in that directory and all its children. If you use a directory pattern (with wildcards), Drill assumes that you want all files in the selected directories.

Drill requires that all files in a single FROM element be of the same type and have the same suffix. In this case, perhaps all the sales files are in Parquet format.

If the directories contain multiple file types, you can use wildcards to pick out a single type.

Working with Queries in Production

In the previous section, you saw many ways that Drill can work with schema-free source data, and the inherent limitations and ambiguities that you might encounter in these situations. We also discussed how to avoid most issues by converting your data from its source format into Parquet.

Finally, in this section, we discuss how to apply these rules to production queries.

Capturing Schema Mapping in Views

You have seen various ways in which you can use session options and query constructs to work around ambiguities in various kinds of data files. After you work out the proper solutions, it makes sense that you would like to reuse them for future queries. Views are Drill’s solution to such reuse.

You will have noticed that several of the examples are written in such a way that the grunt work is handled by a subquery. This is not an accident: the subquery can be reused as a view.

Unfortunately, Drill cannot also encapsulate session options in a view. This means that if you must use all text mode, say, to handle schema ambiguities, the users of your view must also set all text mode before using the view and remember to reset the mode afterward.

This is important if you want to run such queries in a business intelligence tool such as Tableau. In such a tool, you might not have the luxury of turning session options on and off around each query, especially if the queries are automated as part of a dashboard or other structure. Your best bet in such cases is to convert the data to Parquet and then query the converted data.

Running Challenging Queries in Scripts

Another handy approach is to run challenging queries (those that require specific session options) in a script. You can then use SQLLine to run the script, which sets up the required session options and then runs the query.

SQLLine has a number of command-line options that you can see using the -h option:

$DRILL_HOME/bin/sqlline -h

Here’s the partial output:

Usage: java sqlline.SqlLine
...
 --showHeader=[true/false]   show column names in query results
 --numberFormat=[pattern]    format numbers using DecimalFormat
                             pattern
 --maxWidth=MAXWIDTH         the maximum width of the terminal
 --silent=[true/false]       be more silent
 --outputformat=[table/vertical/csv/tsv] 
                             format mode for result display
 --run=/path/to/file         run one script and then exit

What if you wanted to run a single query to sum the sales for 2017? You might create a script file such as the following:

USE `local`.`data`;
SELECT SUM(CAST(amount AS DOUBLE)) FROM `sales/2017-*`;

You would do so like this (assuming a single Drillbit on localhost):

$DRILL_HOME/bin/sqlline 
   -u jdbc:drill:drillbit=localhost 
   --run=/full/path/to/countSales.sql 
   --silent=true 
   --outputformat=csv 
   --numberFormat=##,###.00

Here’s what the results look like:

'ok','summary'
'true','Default schema changed to [local.data]'
'EXPR$0'
'10,914.00'

Although not perfect, you could apply a bit of text editing to pull out the lines that you want, ignoring the unwanted output from the USE command. You can extend this technique to add ALTER SESSION commands and to use scripting to pass parameters into the query. You can do something similar in Java using the Drill JDBC driver, or in Python as shown in Chapter 7.

Conclusion

You now have a deep understanding of how to apply Drill to your most challenging data file formats as well as how to diagnose problems when they occur. Drill is very powerful, but it cannot predict the future, meaning that certain data patterns are difficult for Drill to understand. You now have the tools to identify such situations and to work around them. You also now have a deeper appreciation for the many benefits of Parquet as the preferred data format for Drill.

1 Drill does have a LIST data type originally designed to represent heterogeneous lists. However, at the time of this writing, the type is still experimental and not fully implemented.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.59.134.218