© The Author(s), under exclusive license to APress Media, LLC, part of Springer Nature 2021
H. LuuBeginning Apache Spark 3https://doi.org/10.1007/978-1-4842-7383-8_3

3. Spark SQL: Foundation

Hien Luu1  
(1)
SAN JOSE, CA, USA
 

As Spark evolves and matures as a unified data processing engine with more features in each new release, its programming abstraction also evolves. The resilient distributed dataset (RDD) was the initial core programming abstraction when Spark was introduced to the world in 2012. In Spark version 1.6, a new programming abstraction called Structured APIs was introduced. This is the new and preferred way to handle data engineering tasks such as performing data processing or building data pipelines. The Structured APIs were designed to enhance developer productivity with easy-to-use, intuitive and expressive APIs. The new programming abstract requires the data available in a structured format, and the data computation logic needs to follow a certain structure. Armed with these two pieces of information, Spark can perform the necessary and sophisticated optimizations to speed up data processing applications.

Figure 3-1 shows how the Spark SQL component is built on top of the good old reliable Spark Core component. This layered architecture enables it to easily take advantage of any new improvements introduced in the Spark Core component.
../images/419951_2_En_3_Chapter/419951_2_En_3_Fig1_HTML.jpg
Figure 3-1

Spark SQL components

This chapter covers the Spark SQL module, which is designed for structured data processing. It provides an easy-to-use abstraction to express the data processing logic with the minimum amount of code, and underneath the cover, it intelligently performs necessary optimizations.

Spark SQL module consists of two main parts. The first one is the representations of the structure APIs called DataFrame and Dataset that define the high-level APIs for working with structured data. The DataFrame concept was inspired by the Python pandas DataFrame. The main difference is that a DataFrame in Spark can handle a large volume of data spread across many machines. The second part of the Spark SQL module is the Catalyst optimizer, which is responsible for all the complex machinery that works behind the scenes to make your life easier and ultimately speed up your data processing logic. One of the cool things that the Spark SQL module offers is executing SQL queries to perform data processing. With this capability, Spark can gain a new group of users called business analysts, who are very familiar with SQL language because it is one of the main tools they use regularly.

One main concept that differentiates structured data from unstructured data is the schema, which defines the data structure in the form of column names and associated data types. The schema concept is an integral part of Spark Structured APIs.

Structured data is often captured in a certain format. Some of the formats are text-based, and some of them are binary-based. Common formats for text data are CSV, XML, and JSON, and the common formats for binary data are Avro, Parquet, and ORC. Out of the box, the Spark SQL module makes it very easy to read data and write data from and to any of those formats. One unanticipated consequence of this versatility is that Spark can be used as a data format conversion tool.

Before going into Structured APIs, let’s discuss the initial programming abstraction to better understand the motivations behind the new one.

Understanding RDD

To truly understand how Spark works, you must understand the essence of RDD. It provides a solid foundation and the abstraction that the Structured APIs are built upon. In short, an RDD represents a fault-tolerant collection of elements partitioned across the nodes of a cluster that can be operated in parallel. It consists of the following characteristics.
  • A set of dependencies on parent RDDs

  • A set of partitions, which are the chunks that make up the entire dataset

  • A function for computing all the rows in the dataset

  • The metadata about the partitioning scheme (optional)

  • The location of where the data resides on the cluster (optional)

These five pieces of information are used by Spark runtime to schedule and execute the data processing logics expressed using the RDD operations.

The first three pieces of information make up the lineage information, which Spark uses for two purposes. The first is to determine the order of execution of RDDs and the second is for failure recovery.

The set of dependencies are essentially the input data to an RDD. This information is needed to reproduce the RDD in failure scenarios, and therefore it provides the resiliency characteristic.

The set of partitions enables Spark to execute the computation logic in parallel to speed up the computation time.

The last part that Spark needs to produce the RDD output is the compute function, which is provided by Spark users. The compute function is sent to each executor in the cluster to execute against each row in each partition.

The RDD abstraction is both simple and flexible. The flexibility has a drawback, where Spark has no insights into the user’s intentions. It has no idea whether the computation logic is performing data filtering, joining, or aggregation. Therefore, Spark can’t perform any optimizations, such as performing predicate pushdowns to reduce the amount of data to read from the input sources, recommending a more efficient join type to speed up the computation, or pruning the columns that are no longer needed by the output.

Introduction to the DataFrame API

A DataFrame is an immutable, distributed collection of data organized into rows. Each one consists of a set of columns and each column has a name and an associated type. In other words, this distributed collection of data has a structure defined by a schema. If you are familiar with the table concept in a relational database management system (RDBMS), you realize that a DataFrame is essentially equivalent. A generic Row object represents each row in the DataFrame. Unlike RDD APIs, DataFrame APIs offer a set of domain specific operations that are relational and have rich semantics. You learn more about these APIs in upcoming sections. Like the RDD APIs, the DataFrame APIs are classified into two types: transformation and action. The evaluation semantics are identical in RDD. Transformations are lazily evaluated, and actions are eagerly evaluated.

A DataFrame can be created by reading data from many structured data sources and by reading data from tables in Hive or other databases. In addition, the Spark SQL module provides APIs to easily convert an RDD to a DataFrame by providing the schema information about the data in the RDD. The DataFrame API is available in Scala, Java, Python, and R.

Creating a DataFrame

There are many ways to create a DataFrame; one common thing among them is providing a schema, either implicitly or explicitly.

Creating a DataFrame from RDD

Let’s start with creating a DataFrame from an RDD. Listing 3-1 first creates an RDD with two columns of integers. Then it calls the toDF implicit function that converts an RDD to a DataFrame using the specified column names. The column types are inferred from the data values in the RDD. Listing 3-2 shows two commonly used functions in a DataFrame, printSchema, and show. The printSchema function prints out the column names and their associated type to the console. The function prints out the data in a DataFrame in a tabular format. By default, it displays 20 rows. To change the default number of rows to display, you can pass a number to the show function. Listing 3-3 is an example of specifying the number of rows to display.
import scala.util.Random
val rdd = spark.sparkContext.parallelize(1 to 10).map(x => (x, Random.nextInt(100)* x))
val kvDF = rdd.toDF("key","value")
Listing 3-1

Creating DataFrame from an RDD of Numbers

kvDF.printSchema
|-- key: integer (nullable = false)
|-- value: integer (nullable = false)
kvDF.show
+----+-------+
| key|  value|
+----+-------+
|   1|     58|
|   2|     18|
|   3|    237|
|   4|     32|
|   5|     80|
|   6|    210|
|   7|    567|
|   8|    360|
|   9|    288|
|  10|    260|
+----+-------+
Listing 3-2

Print Schema and Show the Data of a DataFrame

kvDF.show(5)
+----+------+
| key| value|
+----+------+
|   1|    59|
|   2|    60|
|   3|    66|
|   4|   280|
|   5|    40|
+----+------+
Listing 3-3

Call show Function to Display 5 Rows in Tabular Format

Note

The actual numbers in the value column may look different for you because they are generated randomly by calling the Random.nextInt() function.

Another way of creating a DataFrame is by specifying an RDD and a schema, which can be programmatically created. Listing 3-4 first creates an RDD using an array of Row objects, where each row object contains three columns. It creates a schema programmatically and finally provides the RDD and schema to the createDataFrame function to convert to a DataFrame. Listing 3-5 shows the schema and the data in the peopleDF DataFrame.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val peopleRDD = spark.sparkContext.parallelize(Array(Row(1L, "John Doe",  30L),Row(2L, "Mary Jane", 25L)))
val schema = StructType(Array(
        StructField("id", LongType, true),
        StructField("name", StringType, true),
        StructField("age", LongType, true)
))
val peopleDF = spark.createDataFrame(peopleRDD, schema)
Listing 3-4

Create a DataFrame from a RDD with a Schema Created Programmatically

peopleDF.printSchema
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
peopleDF.show
+----+-------------+----+
| id |        name | age|
+----+-------------+----+
|   1|     John Doe|  30|
|   2|    Mary Jane|  25|
+----+-------------+----+
Listing 3-5

Display Schema of peopleDF and Its Data

The ability to programmatically create a schema gives Spark applications the flexibility to adjust the schema based on some external configuration.

Each StructField object has three pieces of information: name, type, whether the value is nullable or not.

Each column type in a DataFrame is mapped to an internal Spark type, which can be a simple scalar type or a complex type. Table 3-1 references the available Scala type in Spark in the order of scalar type first and then the complex type last.
Table 3-1

Spark Scala Type Reference

Data Type

Scala Type

BooleanType

Boolean

ByteType

Byte

ShortType

Short

IntegerType

Int

LongType

Long

FloatType

Float

DoubleType

Double

DecimalType

java.math.BigDecial

StringType

String

BinaryType

Array[Byte]

TimestampType

java.sql.Timestamp

DateType

java.sql.Date

ArrayType

scala.collection.Seq

MapType

scala.collection.Map

StructType

org.apache.spark.sql.Row

Creating a DataFrame from a Range of Numbers

Spark 2.0 introduced a new entry point for Spark applications that primarily use DataFrame and Dataset APIs. This new entry point is represented by the SparkSession class, which has a convenient function called range that you can use to easily create a dataset with a single column with id as the name and LongType as the type. This function has a few variations that can take additional parameters to specify the end and the step. Listing 3-6 provides examples of using this function to create a DataFrame.
val df1 = spark.range(5).toDF("num").show
+-----+
|  num|
+-----+
|    0|
|    1|
|    2|
|    3|
|    4|
+-----+
spark.range(5,10).toDF("num").show
+-----+
|  num|
+-----+
|    5|
|    6|
|    7|
|    8|
|    9|
+-----+
spark.range(5,15,2).toDF("num").show
+------+
|   num|
+------+
|     5|
|     7|
|     9|
|    11|
|    13|
+------+
Listing 3-6

Examples Using SparkSession.range Function to Create a DataFrame

The last version of the range function takes three parameters. The first one represents the starting value, the second represents the end value (exclusive), and the last represents step size. Notice the range function can create only a single column DataFrame. Do you have any ideas about how to create a two-column DataFrame?

One option to create a multicolumn DataFrame uses Spark’s implicits, which convert a collection of tuples inside a Scala Seq collection. Listing 3-7 is an example of Spark’s toDF implicit.
val movies = Seq(("Damon, Matt", "The Bourne Ultimatum", 2007L),
                 ("Damon, Matt", "Good Will Hunting", 1997L))
val moviesDF = movies.toDF("actor", "title", "year")
moviesDF.printSchema
|-- actor: string (nullable = true)
|-- title: string (nullable = true)
|-- year: long (nullable = false)
moviesDF.show
+-----------+--------------------+------+
|      actor|               title|  year|
+-----------+--------------------+------+
|Damon, Matt|The Bourne Ultimatum|  2007|
|Damon, Matt|   Good Will Hunting|  1997|
+-----------+--------------------+------+
Listing 3-7

Converting a Collection Tuples to a DataFrame Using Spark’s toDF Implicit

These fun ways to create a DataFrame make it easy to learn and work with DataFrame APIs without loading the data from some external files. However, when you start performing serious data analysis with large datasets, it is imperative to know how to load data from external data sources, which is covered next.

Creating a DataFrame from Data Sources

Out of the box, Spark SQL supports a set of built-in data sources, where each one is mapped to a data format. The data source layer in the Spark SQL module is designed to be extensible, so custom data sources can be easily integrated into the DataFrame APIs. The Spark community writes hundreds of custom data sources, and it is not too difficult to implement them.

The two main classes in Spark for reading and writing data are DataFrameReader and DataFrameWriter, respectively. This section covers working with the APIs in the DataFrameReader class and the various available options when reading data from a specific data source.

An instance of the DataFrameReader class is as available as the read variable of the SparkSession class. You can refer to it from a Spark shell or in a Spark application, as shown in Listing 3-8.
spark.read
Listing 3-8

Using read Variable from SparkSession

The common pattern for interacting with DataFrameReader is described in Listing 3-9.
spark.read.format(...).option("key", value").schema(...).load()
Listing 3-9

Common Pattern for Interacting with DataFrameReader

Table 3-2 describes the three main pieces of information used when reading data: format, option, and schema. More on these three pieces of information is discussed in later in the chapter.
Table 3-2

Main Information on DataFrameReader

Name

Optional

Comments

format

No

It can be one of the built-in data sources or custom format. For a built-in format, you can use a short name (json, parquet, jdbc, orc, csv, text). For a custom data source, it requires providing a fully qualified name. See Listing 3-10 for examples.

option

Yes

DataFrameReader has a set of default options for each of the data source formats. You can override those default values by providing a value as the option function.

schema

Yes

Some data sources have the schema embedded in the data files, especially Parquet and ORC. In those cases, the schema is automatically inferred. For other cases, you may need to provide a schema.

spark.read.json("<path>")
spark.read.format("json")
spark.read.parquet("<path>")
spark.read.format("parquet")
spark.read.jdbc
spark.read.format("jdbc")
spark.read.orc("<path>")
spark.read.format("orc")
spark.read.csv("<path>")
spark.read.format("csv")
spark.read.text("<path>")
spark.read.format("text")
// custom data source – fully qualified package name
spark.read.format("org.example.mysource")
Listing 3-10

Specifying Data Source Format

Table 3-3 describes Spark’s six built-in data sources and provides comments for each of them.
Table 3-3

Spark’s Built-in Data Sources

Name

Data Format

Comments

Text file

Text

No structure.

CSV

Text

Comma-separated values. Can specify another delimiter. The column name can be referred from the header.

JSON

Text

Popular semistructured format. Column name and data type are inferred automatically

Parquet

Binary

(Default format) The popular binary format in the Hadoop community.

ORC

Binary

Another popular binary format in the Hadoop community.

JDBC

Binary

The common format for reading and writing to RDBMS.

Creating a DataFrame by Reading Text Files

Text files contain unstructured data. As it is read into Spark, each line becomes a row in the DataFrame. There are a lot of free books that are available for download in plain text format at www.gutenberg.org. For plain text files, one common way to parse the words is by splitting each line with a space delimiter. This is similar to how a typical word count example works. Listing 3-11 is an example of a README text file.
val textFile = spark.read.text("README.md")
textFile.printSchema
|-- value: string (nullable = true)
// show 5 lines and don't truncate
textFile.show(5, false)
+-------------------------------------------------------------------------+
|value                                                                    |
+-------------------------------------------------------------------------+
|# Apache Spark                                                           |
|                                                                         |
|Spark is a fast and general cluster computing system for Big Data. It provides |
|high-level APIs in Scala, Java, Python, and R, and an optimized engine that    |
|supports general computation graphs for data analysis. It also supports a      |
+-------------------------------------------------------------------------+
Listing 3-11

Read README.md File as a Text File from Spark Shell

If a text file contains a delimiter that you can use to parse the columns in each line, then it is better to read it using CSV format , which is covered in the following section.

Creating a DataFrame by Reading CSV Files

One of the popular text file formats is CSV , which stands for comma-separated values. Popular tools like Microsoft Excel can easily import and export data in CSV format. The CSV parser in Spark is designed to be flexible such that it can parse a text file using a user-provided delimiter. The comma delimiter just happens to be the default one. This means you can use CSV format to read tab-separated value text files or other text files with an arbitrary delimiter.

Some CSV files have a header, and some don’t. Since a column value may contain a comma, it is a common and good practice to escape it using a special character. Table 3-4 describes commonly used options when working with CSV format. For a complete list of options, please see the CSVOptions class at https://github.com/apache/spark.
Table 3-4

CSV Common Options

Key

Value(s)

Default

Description

sep

Single character

,

The single character value used as a delimiter for each column.

header

true,false

false

If the value is true, it means the first line in the file represents the column names.

escape

Any character

The character to use to escape the character in the column value is the same as sep.

inferSchema

true,false

false

Whether Spark should try to infer the column type based on column value.

Specifying the header and inferSchema options as true won’t require you to specify the schema. Otherwise, you need to define a schema by hand or programmatically and pass it into the schema function. If the inferSchema option is false and no schema is provided, Spark assumes the data type for all the columns to be the string type.

The data file you are using as an example is called movies.csv in the data/chapter4 folder. This file contains a header for each column: actor, title, year. Listing 3-12 provides a few examples of reading CSV files .
val movies = spark.read.option("header","true").csv("<path>/book/chapter4/data/movies/movies.csv")
movies.printSchema
 |-- actor: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: string (nullable = true)
// now try to infer the schema
val movies2 = spark.read.option("header","true").option("inferSchema","true")
                          .csv("<path>/book/chapter4/data/movies/movies.csv")
movies2.printSchema
 |-- actor: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
// now try to manually provide a schema
import org.apache.spark.sql.types._
val movieSchema = StructType(Array(StructField("actor_name", StringType, true),
                                              StructField("movie_title", StringType, true),
                                              StructField("produced_year", LongType, true)))
val movies3 = spark.read.option("header","true").schema(movieSchema)
                                .csv("<path>/book/chapter4/data/movies/movies.csv")
movies3.printSchema
 |-- actor_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)
movies3.show(5)
+-----------------+--------------+--------------+
|       actor_name|   movie_title| produced_year|
+-----------------+--------------+--------------+
|McClure, Marc (I)| Freaky Friday|          2003|
|McClure, Marc (I)|  Coach Carter|          2005|
|McClure, Marc (I)|   Superman II|          1980|
|McClure, Marc (I)|     Apollo 13|          1995|
|McClure, Marc (I)|      Superman|          1978|
+-----------------+--------------+--------------+
Listing 3-12

Read CSV Files with Various Options

The first example reads the file movies.csv with specifying the first line as the header. Spark can recognize column names. However, since the inferSchema option was not set to true, all the columns have string as the type. The second example added the inferSchema option, and Spark was able to identify column type. The third example provides a schema with column names different from what is in the header, so Spark uses the provided column names .

Now let’s try to read in a text file with a different delimiter, not a comma. In this case, you specify a value for the sep option for Spark to use. Listing 3-13 shows a file called movies.tsv in the data/chapter4 folder.
val movies4 = spark.read.option("header","true").option("sep", " ")
                                        .schema(movieSchema).csv("<path>/book/chapter4/data/movies/movies.tsv")
movies.printSchema
|-- actor_name: string (nullable = true)
|-- movie_title: string (nullable = true)
|-- produced_year: long (nullable = true)
Listing 3-13

Read a TSV File with CSV Format

As you can see, it is quite easy to work with text files that have comma-separated values and other-separated values .

Creating a DataFrame by Reading JSON Files

JSON is a very well-known format in the JavaScript community. It is considered a semistructured format because each object (aka row) has a structure, and each column has a name. In the web application development space, JSON is widely used as a data format for transferring data between the backend server and the browser side. One of the strengths of JSON is that it provides a flexible format that can model any use case, and it can support nested structure. JSON has one disadvantage that is related to verbosity. The column name is repeated in each row in the data file (image your data file has 1 million rows).

Spark makes it easy to read data in a JSON file. However, there is one thing that you need to pay attention to. A JSON object can be expressed on a single line or across multiple lines, and this is something you need to let Spark know. Given that the JSON data file contains only column names and no data type, how can Spark come up with a schema? Spark tries its best to infer the schema by parsing a set of sample records. The number of records to sample is determined by the samplingRatio option, which has a default value of 1.0. Therefore, it is quite expensive to load a very large JSON file. In this case, you can lower the samplingRatio value to speed the data loading process. Table 3-5 describes a list of common options for the JSON format.
Table 3-5

JSON Common Options

Key

Value(s)

Default

Description

allowComments

true,false

false

Ignore comments in JSON file

multiLine

true,false

false

Treat the entire file as one large JSON object that spans across many lines

samplingRatio

0.3

1.0

The sampling size to read to infer the schema

Listing 3-14 shows two examples of reading JSON files. The first one simply reads a JSON file without overriding any option value. Notice Spark automatically detects the column name and data type based on the information in the JSON file. The second example specifies a schema.
val movies5 = spark.read.json("<path>/book/chapter4/data/movies/movies.json")
movies.printSchema
 |-- actor_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)
// specify a schema to override the Spark's inferring schema.
// producted_year is specified as integer type
import org.apache.spark.sql.types._
val movieSchema2 = StructType(Array(StructField("actor_name", StringType, true),
                                              StructField("movie_title", StringType, true),
                                              StructField("produced_year", IntegerType, true)))
val movies6 = spark.read.option("inferSchema","true").schema(movieSchema2)
                                        .json("<path>/book/chapter4/data/movies/movies.json")
movies6.printSchema
 |-- actor_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: integer (nullable = true)
Listing 3-14

Various Example of Reading a JSON File

What happens when a column data type specified in the schema doesn’t match the value in the JSON file? By default, when Spark encounters a corrupted record or runs into a parsing error, it set the value for all the columns in that row to be null. Instead of getting null values, you can tell Spark to fail fast. Listing 3-15 tells Spark’s parsing logic to fail fast by specifying the mode option as failFast .
// set data type for actor_name as BooleanType
import org.apache.spark.sql.types._
val badMovieSchema = StructType(Array(StructField("actor_name", BooleanType, true),
                                               StructField("movie_title", StringType, true),
                                               StructField("produced_year", IntegerType, true)))
val movies7 = spark.read.schema(badMovieSchema)
                                        .json("<path>/book/chapter4/data/movies/movies.json")
movies7.printSchema
 |-- actor_name: boolean (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: integer (nullable = true)
movies7.show(5)
+----------+-----------+-------------+
|actor_name|movie_title|produced_year|
+----------+-----------+-------------+
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
+----------+-----------+-------------+
// tell Spark to fail fast when facing a parsing error
val movies8 = spark.read.option("mode","failFast").schema(badMovieSchema)
                                        .json("<path>/book/chapter4/data/movies/movies.json")
movies8.printSchema
 |-- actor_name: boolean (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: integer (nullable = true)
// Spark will throw a RuntimeException when executing an action
movies8.show(5)
ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.RuntimeException : Failed to parse a value for data type BooleanType (current token: VALUE_STRING).
Listing 3-15

Parsing Error and How to Tell Spark to Fail Fast

Creating a DataFrame by Reading Parquet Files

Parquet is one of the most popular open source columnar storage formats in the Hadoop ecosystem. It was created on Twitter. Its popularity is due to its self-describing data format, and it stores data in a highly compact structure by leveraging compressions. The columnar storage format is designed to work well with data analytics workload where only a small subset of columns are used during the data analysis. Parquet stores each column’s data in a separate file; therefore, columns that are not needed in data analysis wouldn’t have to be unnecessarily read in. It is quite flexible when it comes to supporting a complex data type with a nested structure. Text file formats like CSV and JSON are good for small files, and they are human-readable. Parquet is a much better file format for working with large datasets to reduce storage cost and speed up the reading step. If you peek at the movies.parquet file in the chapter4/data/movies folder, you see that its size is about one-sixth of the size of movies.csv.

Spark works extremely well with Parquet file format, and in fact, Parquet is the default file format for reading and writing data in Spark. Listing 3-16 shows an example of reading a Parquet file. Notice you don’t need to provide a schema or ask Spark to infer the schema. Spark can retrieve the schema from the Parquet file.

A cool optimization that Spark does when reading data from Parquet is decompression and decoding in column batches, which considerably speeds up the reading.
// Parquet is the default format, so we don't need to specify the format when reading
val movies9 = spark.read.load("<path>/book/chapter4/data/movies/movies.parquet")
movies9.printSchema
 |-- actor_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)
// If we want to more explicit, we can specify the path to the parqet function
val movies10 = spark.read.parquet("<path>/book/chapter4/data/movies/movies.parquet")
movies10.printSchema
 |-- actor_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)
Listing 3-16

Reading a Parquet File in Spark

Creating a DataFrame by Reading ORC Files

Optimized Row Columnar (ORC) is another popular open source self-describing columnar storage format in the Hadoop ecosystem. It was created by Cloudera as a part of the initiative to massively speed up Hive. It is quite similar to Parquet in terms of efficiency and speed and was designed for analytics workload. Working with ORC files is just as easy as working with Parquet files. Listing 3-17 shows an example of creating a DataFrame from reading from an ORC file.
val movies11 = spark.read.orc("<path>/book/chapter4/data/movies/movies.orc")
movies11.printSchema
 |-- actor_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)
movies11.show(5)
+--------------------------+-------------------+--------------+
|                actor_name|        movie_title| produced_year|
+--------------------------+-------------------+--------------+
|         McClure, Marc (I)|       Coach Carter|          2005|
|         McClure, Marc (I)|        Superman II|          1980|
|         McClure, Marc (I)|          Apollo 13|          1995|
|         McClure, Marc (I)|           Superman|          1978|
|         McClure, Marc (I)| Back to the Future|          1985|
+--------------------------+-------------------+--------------+
Listing 3-17

Reading ORC File in Spark

Creating a DataFrame from JDBC

JDBC is a standard application API for reading data from and writing data to a relational database management system (RDBMS) . Spark has support for JDBC data source, which means you can use Spark to read data from and write data to any of the existing RDBMSs like MySQL, PostgreSQL, Oracle, SQLite, and so on. You need to provide a few important pieces of information when working with a JDBC data source: a JDBC driver for your RDBMS, a connection URL, authentication information, and a table name.

For Spark to connect to an RDBMS, it must have access to the JDBC driver JAR file at runtime. Therefore, you need to add the location of a JDBC driver to the Spark classpath. Listing 3-18 shows how to connect to MySQL from the Spark Shell.
 ./bin/spark-shell ../jdbc/mysql-connector-java-5.1.45/mysql-connector-java-5.1.45-bin.jar  --jars ../jdbc/mysql-connector-java-5.1.45/mysql-connector-java-5.1.45-bin.jar
Listing 3-18

Specifying a JDBC Driver When Starting the Spark Shell

Once the Spark shell successfully starts, you can quickly verify if Spark can connect to your RDBMS by using java.sql.DriverManager, as shown in Listing 3-19. This example is trying to test a connection to MySQL. The URL format is a bit different if your RDBMS is not MySQL, so consult the documentation of the JDBC driver you are using.
import java.sql.DriverManager
val connectionURL = "jdbc:mysql://localhost:3306/<table>?user=<username>&password=<password>"
val connection = DriverManager.getConnection(connectionURL)
connection.isClosed()
connection close()
Listing 3-19

Testing Connection to MySQL in Spark Shell

If you didn’t get any exception about the connection, the Spark shell could successfully connect to your RDBMS.

Table 3-6 describes the main options that you need to specify when using a JDBC driver. For a complete list of options, please consult https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases.
Table 3-6

Main Options for a JDBC Data Source

Key

Description

url

The JDBC URL for Spark to connect to. At the minimum, it should contain the host, port and database name. For MySQL, it may look something like jdbc:mysql://localhost:3306/sakila.

dbtable

The name of the database table that Spark read data from or write data to.

driver

The class name of the JDBC driver that Spark instantiate to connect to the preceding URL. Consult the JDBC driver documentation that you are using. For MySQL Connector/J driver, the class name is com.mysql.jdbc.Driver.

Listing 3-20 shows an example of reading data from a film table of the Sakila database in a MySQL server.
val mysqlURL= "jdbc:mysql://localhost:3306/sakila"
val filmDF = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver")
                                                        .option("url", mysqlURL)
                                                        .option("dbtable", "film")
                                                        .option("user", "<username>")
                                                        .option("password","<pasword>")
                                                        .load()
filmDF.printSchema
 |-- film_id: integer (nullable = false)
 |-- title: string (nullable = false)
 |-- description: string (nullable = true)
 |-- release_year: date (nullable = true)
 |-- language_id: integer (nullable = false)
 |-- original_language_id: integer (nullable = true)
 |-- rental_duration: integer (nullable = false)
 |-- rental_rate: decimal(4,2) (nullable = false)
 |-- length: integer (nullable = true)
 |-- replacement_cost: decimal(5,2) (nullable = false)
 |-- rating: string (nullable = true)
 |-- special_features: string (nullable = true)
 |-- last_update: timestamp (nullable = false)
filmDF.select("film_id","title").show(5)
+-------+---------------------+
|film_id|                title|
+-------+---------------------+
|      1|     ACADEMY DINOSAUR|
|      2|       ACE GOLDFINGER|
|      3|     ADAPTATION HOLES|
|      4|     AFFAIR PREJUDICE|
|      5|          AFRICAN EGG|
+-------+---------------------+
Listing 3-20

Reading Data from a Table in MySQL Server

When working with a JDBC data source, Spark pushes the filter conditions all the way down to the RDBMS as much as possible. By doing this, much of the data is filtered out at the RDBMS level, and therefore this speeds up the data filtering logic and dramatically reduces the amount of data Spark needs to read. This optimization is known as predicate pushdown, and Spark often does this when it knows the data source can support the filtering capability. Parquet is another data source that has this capability. The “Catalyst Optimizer” section in Chapter 4 provides an example of what this looks like.

Working with Structured Operations

Now that you know how to create a DataFrame, the next part is to learn how to manipulate or transform them using structured operations. Unlike the RDD operations, the structured operations are designed to be more relational, meaning the operations mirror the kind of expressions you can do with SQL, such as projection, filtering, transforming, joining, and so on. Similar to RDD operations, the structured operations are divided into two categories: transformation and action. The semantics of the structured transformations and actions are identical to the ones in RDDs. In other words, structured transformations are lazily evaluated, and structured actions are eagerly evaluated.

Structured operations are sometimes described as a domain-specific language (DSL) for distributed data manipulation. DSL is a computer language specialized for a particular application domain. In this case, the application domain is the distributed data manipulation. If you have ever worked with SQL, then it is easy to learn the structured operations.

Table 3-7 describes the commonly used DataFrame structured transformations. As a reminder, a DataFrame is immutable, and its transformation operation always returns a new DataFrame.
Table 3-7

Commonly Used DataFrame Structured Transformations

Operation

Description

select

Select one or more columns from an existing set of columns in the DataFrame. A more technical term for select is projection. During the projection process, columns can be transformed and manipulated.

selectExpr

Similar to select but provide powerful SQL expressions in transforming each column.

filter

where

Both filter and where have the same semantics. where is more relational and similar to the where condition in SQL. They are both used for filtering rows based on the given boolean condition(s).

distinct

dropDuplicates

Remove duplicate rows from the DataFrame

sort

orderBy

Sort the DataFrame by the provided column(s)

limit

Return a new DataFrame by taking the first “n” rows.

union

Combine the rows from two DataFrame and return it as a new DataFrame.

withColumn

Use to add a column or replace an existing column in the DataFrame

withColumnRenamed

Renames an existing column. If a given column name doesn’t exist in the schema, then it is a no-op.

drop

Drop one or more columns from DataFrame. The operation does nothing if schema doesn’t contain the given column name(s)

sample

Randomly select a set of rows based on the given fraction, an optional seed value, and an optional replacement option.

randomSplit

Split the DataFrame into one or more DataFrames based on the given weights. Splits the master dataset into training and test datasets in the machine learning process.

join

Join two DataFrames. Spark supports many types of joins. More information is covered in the next chapter.

groupBy

Group the DataFrame by one or more columns. A common pattern is to perform aggregation after the groupBy. More information is covered in the next chapter.

Working with Columns

Most of the DataFrame structured operations in Table 3-7 require you to specify one or more columns. For some, the columns are specified in a string; for others, the columns need to be specified as instances of the Column class. It is completely fair to question why there are two options and when to use what. To answer those questions, you need to understand the functionality the Column class provides. At a high level, the Column class’s functionality can be broken down into the following categories.
  • Mathematical operations, like addition, multiplication, and so forth

  • Logical comparisons between column value or a literal, such as equality, greater than, and less than

  • String pattern matching, such as starting with, ending with, and so on.

For a complete list of available functions in the Column class, refer to the Scala documentation at https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column.

With an understanding of the functionality that the Column class provides, you can conclude that whenever there is a need to specify a column expression, it is necessary to specify the column as an instance of Column class rather than a string. The upcoming examples make this clear.

There are different ways to refer to a column, which has created confusion in the Spark user community. A common question is when to use which one, and the answer is—it depends. Table 3-8 describes the available function options.
Table 3-8

Ways to Refer to a Column

Function

Example

Description

""

"columnName"

Refers to column as string type.

col

col("columnName")

The col function returns an instance of the Column class.

column

column("columnName")

Similar to col, this function returns an instance of the Column class.

$

$"columnName"

A syntactic sugar way of constructing a Column class in Scala only.

' (tick)

''columnName

A syntactic sugar way of constructing a Column class in Scala by leveraging Scala symbol literals feature.

Both col and column functions are synonymous, and both are available in Scala and Python Spark APIs. If you often switch between Spark Scala and Python APIs, then it makes sense to use the col function, so there is consistency in your code. If you mostly or exclusively use Spark Scala APIs, then my recommendation is to use ' (apostrophe symbol) because there is only a single character to type. The DataFrame class has its own col function, which disambiguates between columns with the same name from two or more DataFrames when performing a join. Listing 3-21 provides examples of different ways to refer to a column.
import org.apache.spark.sql.functions._
val kvDF = Seq((1,2),(2,3)).toDF("key","value")
// to display column names in a DataFrame, we can call the columns function
kvDF.columns
Array[String] = Array(key, value)
kvDF.select("key")
kvDF.select(col("key"))
kvDF.select(column("key"))
kvDF.select($"key")
kvDF.select('key)
// using the col function of DataFrame
kvDF.select(kvDF.col("key"))
kvDF.select('key, 'key > 1).show
+---+----------+
|key| (key > 1)|
+---+----------+
|  1|     false|
|  2|      true|
+---+----------+
Listing 3-21

Different Ways of Referring to Columns

This example illustrates a column expression, and therefore it is required to specify a column as an instance of the Column class. If the column was specified as a string, it results in a type mismatch error. More examples of column expressions are available in the examples of the various DataFrame structure operations.

Working with Structured Transformations

This section provides usage examples of the structured transformations listed in Table 3-7. To be consistent, all the examples consistently use a ' (apostrophe) to refer to column(s) in a DataFrame. To reduce redundancy, most of the examples refer to the movies DataFrame created by reading from a Parquet file (see Listing 3-22).
val movies = spark.read.parquet("<path>/chapter4/data/movies/movies.parquet")
Listing 3-22

Creating the movies DataFame from a Parquet File

select(columns)
This transformation commonly performs projection, selecting all or a subset of columns from a DataFrame. During the selection, each column can be transformed via a column expression. There are two variations of this transformation. One takes the column as a string, and the other takes columns as the Column class . This transformation doesn’t permit you to mix the column type when using one of these two variations. Listing 3-23 is an example of the two variations.
movies.select("movie_title","produced_year").show(5)
+------------------------+--------------+
|             movie_title| produced_year|
+------------------------+--------------+
|            Coach Carter|          2005|
|             Superman II|          1980|
|               Apollo 13|          1995|
|                Superman|          1978|
|      Back to the Future|          1985|
+------------------------+--------------+
// using a column expression to transform year to decade
movies.select('movie_title,('produced_year - ('produced_year % 10)).as("produced_decade")).show(5)
+------------------------+----------------+
|             movie_title| produced_decade|
+------------------------+----------------+
|            Coach Carter|            2000|
|             Superman II|            1980|
|               Apollo 13|            1990|
|                Superman|            1970|
|      Back to the Future|            1980|
+------------------------+----------------+
Listing 3-23

Two Variations of Select Transformation

The second example requires two column expressions: modulo and subtraction. Both are implemented by modulo (%) and subtraction (-) functions in the Column class (see the Scala documentation). By default, Spark uses the column expression as the name of the result column. To make it more readable, the as function is renames it to a more human-readable column name. As an astute reader, you can probably figure out the select transformation that can add one or more columns to a DataFrame.

selectExpr(expressions)

This transformation is a variant of the select transformation. The one big difference is that it accepts one or more SQL expressions rather than columns. However, both are essentially performing the same projection task. SQL expressions are powerful and flexible constructs that allow you to express column transformation logic naturally, just like the way you think. You can express SQL expressions in a string format, and Spark parses them into a logical tree to evaluate them in the right order.

If you want to create a new DataFrame with all the columns in the movies DataFrame and introduce a new column to represent the decade a movie was produced in, do something like what’s shown in Listing 3-24.
movies.selectExpr("*","(produced_year - (produced_year % 10)) as decade").show(5)
+-----------------+--------------------+-------------------+----------+
|       actor_name|         movie_title|      produced_year|    decade|
+-----------------+--------------------+-------------------+----------+
|McClure, Marc (I)|        Coach Carter|               2005|      2000|
|McClure, Marc (I)|         Superman II|               1980|      1980|
|McClure, Marc (I)|           Apollo 13|               1995|      1990|
|McClure, Marc (I)|            Superman|               1978|      1970|
|McClure, Marc (I)|  Back to the Future|               1985|      1980|
+-----------------+--------------------+-------------------+----------+
Listing 3-24

Adding the Decade Column to Movies DataFrame using SQL Expression

The combination of SQL expressions and built-in functions makes it easy to perform a data analysis that otherwise take multiple steps. Listing 3-25 shows how easy it is to determine the number of unique movie titles and unique actors in the movies dataset in a single statement. The count function performs an aggregation over the entire DataFrame.
movies.selectExpr("count(distinct(movie_title)) as movies","count(distinct(actor_name)) as actors").show
+---------+--------+
|   movies| actors |
+---------+--------+
|     1409|   6527 |
+---------+--------+
Listing 3-25

Using SQL Expression and Built-in Functions

filler(condition), where(condition)
This transformation is straightforward. It filters out the rows that don’t meet the given condition, in other words, when the condition evaluates to false. A different way of looking at the behavior of the filter transformation is that it returns only the rows that meet the specified condition. The given condition can be simple or as complex as it needs to be. Using this transformation requires knowing how to leverage a few logical comparison functions in the Column class, like equality, less than, greater than, and inequality. Both the filter and where transformations have the same behavior, so pick the one that you are most comfortable with. The latter one is just a bit more relational than the former. Listing 3-26 shows a few examples of filtering.
movies.filter('produced_year < 2000)
movies.where('produced_year > 2000)
movies.filter('produced_year >= 2000)
movies.where('produced_year >= 2000)
// equality comparison require 3 equal signs
movies.filter('produced_year === 2000).show(5)
+-------------------+---------------------------+--------------+
|         actor_name|                movie_title| produced_year|
+-------------------+---------------------------+--------------+
|  Cooper, Chris (I)|         Me, Myself & Irene|          2000|
|  Cooper, Chris (I)|                The Patriot|          2000|
|    Jolie, Angelina|       Gone in Sixty Sec...|          2000|
|     Yip, Françoise|             Romeo Must Die|          2000|
|     Danner, Blythe|           Meet the Parents|          2000|
+-------------------+---------------------------+--------------+
// inequality comparison uses an interesting looking operator =!=
movies.select("movie_title","produced_year").filter('produced_year =!= 2000).show(5)
+-------------------+--------------+
|        movie_title| produced_year|
+-------------------+--------------+
|       Coach Carter|          2005|
|        Superman II|          1980|
|          Apollo 13|          1995|
|           Superman|          1978|
| Back to the Future|          1985|
+-------------------+--------------+
// to combine one or more comparison expressions, we will use either the OR and AND expression operator
movies.filter('produced_year >= 2000 && length('movie_title) < 5).show(5)
+----------------+------------+--------------+
|      actor_name| movie_title| produced_year|
+----------------+------------+--------------+
| Jolie, Angelina|        Salt|          2010|
|  Cueto, Esteban|         xXx|          2002|
|   Butters, Mike|         Saw|          2004|
|  Franko, Victor|          21|          2008|
|   Ogbonna, Chuk|        Salt|          2010|
+----------------+------------+--------------+
// the other way of accomplishing the result is by calling the filter function two times
movies.filter('produced_year >= 2000).filter(length('movie_title) < 5).show(5)
Listing 3-26

Filter Rows with Logical Comparison Functions in Column Class

distinct, dropDuplicates
These two transformations have identical behavior. However, dropDuplicates allows you to control which columns should be used in the deduplication logic. If none is specified, the deduplication logic uses all the columns in the DataFrame. Listing 3-27 shows different ways of counting how many movies are in the movies dataset.
movies.select("movie_title").distinct.selectExpr("count(movie_title) as movies").show
movies.dropDuplicates("movie_title").selectExpr("count(movie_title) as movies").show
+--------+
|  movies|
+--------+
|    1409|
+--------+
Listing 3-27

Using distinct and dropDuplicates to Achieve the Same Goal

In terms of performance, there is no difference between these two approaches because Spark transforms them into the same logical plan.

sort(columns), orderBy(columns)
Both transformations have the same semantics. The orderBy transformation is more relational than the other one. By default, the sorting is in ascending order, and it is easy to change it to descending. When specifying more than one column, it is possible to have a different order for each of those columns. Listing 3-28 has some examples.
val movieTitles = movies.dropDuplicates("movie_title")
                                       .selectExpr("movie_title", "length(movie_title) as title_length", , "produced_year")
movieTitles.sort('title_length).show(5)
+-----------+-------------+--------------+
|movie_title| title_length| produced_year|
+-----------+-------------+--------------+
|         RV|            2|          2006|
|         12|            2|          2007|
|         Up|            2|          2009|
|         X2|            2|          2003|
|         21|            2|          2008|
+-----------+-------------+--------------+
// sorting in descending order
movieTitles.orderBy('title_length.desc).show(5)
+---------------------+-------------+--------------+
|          movie_title| title_length| produced_year|
+---------------------+-------------+--------------+
| Borat: Cultural L...|           83|          2006|
| The Chronicles of...|           62|          2005|
| Hannah Montana & ...|           57|          2008|
| The Chronicles of...|           56|          2010|
| Istoriya pro Rich...|           56|          1997|
+---------------------+-------------+--------------+
// sorting by two columns in different orders
movieTitles.orderBy('title_length.desc, 'produced_year).show(5)
+---------------------+-------------+--------------+
|          movie_title| title_length| produced_year|
+---------------------+-------------+--------------+
| Borat: Cultural L...|           83|          2006|
| The Chronicles of...|           62|          2005|
| Hannah Montana & ...|           57|          2008|
| Istoriya pro Rich...|           56|          1997|
| The Chronicles of...|           56|          2010|
+---------------------+-------------+--------------+
Listing 3-28

Sorting the DataFrame in Ascending and Descending Order

Notice the title of the last two movies are at the same length, but their years are ordered in the correct ascending order.

limit(n)
This transformation returns a new DataFrame by taking the first n rows. This transformation is commonly used after the sorting is done to figure out the top n or bottom n rows based on the sorting order. Listing 3-20 shows an example of using the limit transformation to find the top ten actors with the longest names.
// first create a DataFrame with their name and associated length
val actorNameDF = movies.select("actor_name").distinct.selectExpr("*", "length(actor_name) as length")
// order names by length and retrieve the top 10
actorNameDF.orderBy('length.desc).limit(10).show
+--------------------------------+-------+
|              actor_name        | length|
+--------------------------------+-------+
|    Driscoll, Timothy 'TJ' James|     28|
|    Badalamenti II, Peter Donald|     28|
|    Shepard, Maridean Mansfield |     27|
|    Martino, Nicholas Alexander |     27|
|    Marshall-Fricker, Charlotte |     27|
|    Phillips, Christopher (III) |     27|
|    Pahlavi, Shah Mohammad Reza |     27|
|    Juan, The Bishop Don Magic  |     26|
|    Van de Kamp Buchanan, Ryan  |     26|
|     Lough Haggquist, Catherine |     26|
+--------------------------------+-------+
Listing 3-29

Using the limit Transformation to Figure Top Ten Actors with the Longest Name

union(otherDataFrame)
You learned that a DataFrame is immutable. If there is a need to add more rows to an existing DataFrame, then the union transformation is useful for that purpose and combining rows from two DataFrames. This transformation requires that both DataFrames have the same schema, meaning both column names and their order must exactly match. Let’s say one of the movies in the DataFrame is missing an actor, and you want to fix that issue. Listing 3-30 shows how to do that using union transformation.
// the movie we want to add missing actor is "12"
val shortNameMovieDF = movies.where('movie_title === "12")
shortNameMovieDF.show
+---------------------+------------+---------------+
|           actor_name| movie_title| produced_year |
+---------------------+------------+---------------+
|     Efremov, Mikhail|          12|           2007|
|      Stoyanov, Yuriy|          12|           2007|
|      Gazarov, Sergey|          12|           2007|
| Verzhbitskiy, Viktor|          12|           2007|
+---------------------+------------+---------------+
// create a DataFrame with one row
import org.apache.spark.sql.Row
val forgottenActor = Seq(Row("Brychta, Edita", "12", 2007L))
val forgottenActorRDD = spark.sparkContext.parallelize(forgottenActor)
val forgottenActorDF = spark.createDataFrame(forgottenActorRDD, shortNameMovieDF.schema)
// now adding the missing action
val completeShortNameMovieDF = shortNameMovieDF.union(forgottenActorDF)
completeShortNameMovieDF.union(forgottenActorDF).show
+----------------------+------------+---------------+
|            actor_name| movie_title|  produced_year|
+----------------------+------------+---------------+
|      Efremov, Mikhail|          12|           2007|
|       Stoyanov, Yuriy|          12|           2007|
|       Gazarov, Sergey|          12|           2007|
|  Verzhbitskiy, Viktor|          12|           2007|
|        Brychta, Edita|          12|           2007|
+----------------------+------------+---------------+
Listing 3-30

Add a Missing Actor to the movies DataFrame

withColumn(colName, column)
This transformation adds a new column to a DataFrame. It requires two input parameters; a column name and a value in the form of a column expression. You can accomplish pretty much the same goal by using the selectExpr transformation. However, if the given column name matches one of the existing ones, that column is replaced with the given column expression. Listing 3-31 provides examples of adding a new column as well as replacing an existing one.
// adding a new column based on a certain column expression
movies.withColumn("decade", ('produced_year - 'produced_year % 10)).show(5)
+------------------+------------------------+--------------+-----------+
|        actor_name|             movie_title| produced_year|     decade|
+------------------+------------------------+--------------+-----------+
| McClure, Marc (I)|            Coach Carter|          2005|       2000|
| McClure, Marc (I)|             Superman II|          1980|       1980|
| McClure, Marc (I)|               Apollo 13|          1995|       1990|
| McClure, Marc (I)|                Superman|          1978|       1970|
| McClure, Marc (I)|      Back to the Future|          1985|       1980|
+------------------+------------------------+--------------+-----------+
// now replace the produced_year with new values
movies.withColumn("produced_year", ('produced_year - 'produced_year % 10)).show(5)
+------------------+-------------------+--------------+
|        actor_name|        movie_title| produced_year|
+------------------+-------------------+--------------+
| McClure, Marc (I)|       Coach Carter|          2000|
| McClure, Marc (I)|        Superman II|          1980|
| McClure, Marc (I)|          Apollo 13|          1990|
| McClure, Marc (I)|           Superman|          1970|
| McClure, Marc (I)| Back to the Future|          1980|
+------------------+-------------------+--------------+
Listing 3-31

Add as Well Replacing a Column Using withColumn Transformation

withColumnRenamed(existingColName, newColName)
This transformation is strictly about renaming an existing column name in a DataFrame. It is fair to ask why in the world Spark provides this transformation. As it turns out, this transformation is useful in the following situations.
  • To rename a cryptic column name to a more human friendly name. The cryptic column name can come from an existing schema that you don’t control, such as when your company’s partner produced the column you need in a Parquet file.

  • Before joining two DataFrames that happen to have one or more same column name. This transformation can rename one or more columns in one of the two DataFrames, so you can refer to them easily after the join.

Notice that if the provided existingColName doesn’t exist in the schema, Spark doesn’t throw an error, and it silently does nothing. Listing 3-32 renames some of the column names in movies DataFrame to short names. By the way, this can be accomplished by using the select or selectExpr transformations as well. I leave that as an exercise for you.
movies.withColumnRenamed("actor_name", "actor")
           .withColumnRenamed("movie_title", "title")
           .withColumnRenamed("produced_year", "year").show(5)
+------------------+--------------------+------+
|             actor|               title|  year|
+------------------+--------------------+------+
| McClure, Marc (I)|        Coach Carter|  2005|
| McClure, Marc (I)|         Superman II|  1980|
| McClure, Marc (I)|           Apollo 13|  1995|
| McClure, Marc (I)|            Superman|  1978|
| McClure, Marc (I)|  Back to the Future|  1985|
+------------------+--------------------+------+
Listing 3-32

Using withColumnRenamed Transformation to Rename Some of the Column Names

drop(columnName1, columnName2)
This transformation simply drops the specified columns from the DataFrame. You can specify one or more column names to drop, but only the ones that exist in the schema are dropped, and the ones that don’t are silently ignored. You can use the select transformation to drop columns by projecting out the columns you want to keep. However, if a DataFrame has 100 columns, and you want to drop a few, then this transformation is more convenient to use than the select transformation. Listing 3-33 provides examples of dropping columns.
movies.drop("actor_name", "me").printSchema
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)
Listing 3-33

Drop Two Columns, One Exists and the Other One Doesn’t

As you can see, the second column, "me", doesn’t exist in the schema, and the drop transformation simply ignores it.

sample(fraction), sample(fraction, seed), sample(fraction, seed, withReplacement)
This transformation returns a randomly selected set of rows from the DataFrame. The number of the returned rows is approximately equal to the specified fraction, representing a percentage, and the value must be between 0 and 1. The seed seeds the random number generator, which generates a row number to include in the result. If a seed is not specified, then a randomly generated value is used. The withReplacement option determines whether a randomly selected row is placed back into the selection pool. In other words, when withReplacement is true, a particular selected row has the potential to be selected more than once. So, when would you need to use this transformation? It is useful when the original dataset is large and there is a need to reduce it down to a smaller size so you can quickly iterate on the data analysis logic. Listing 3-34 provides examples of using sample transformation.
// sample with no replacement and a ratio
movies.sample(false, 0.0003).show(3)
+--------------------+----------------------+--------------+
|          actor_name|           movie_title| produced_year|
+--------------------+----------------------+--------------+
|     Lewis, Clea (I)|  Ice Age: The Melt...|          2006|
|      Lohan, Lindsay|   Herbie Fully Loaded|          2005|
|Tagawa, Cary-Hiro...|       Licence to Kill|          1989|
+--------------------+----------------------+--------------+
// sample with replacement, a ratio and a seed
movies.sample(true, 0.0003, 123456).show(3)
+---------------------+-----------------+--------------+
|           actor_name|      movie_title| produced_year|
+---------------------+-----------------+--------------+
| Panzarella, Russ (V)|   Public Enemies|          2009|
|         Reed, Tanoai|        Daredevil|          2003|
|         Moyo, Masasa|     Spider-Man 3|          2007|
+---------------------+-----------------+--------------+
Listing 3-34

Different ways of Using the sample Transformation

As you can see, the returned movies are pretty random.

randomSplit(weights)
This transformation is commonly used during the process of preparing the data to train machine learning models. Unlike the previous transformations, this one returns one or more DataFrames. The number of DataFrames it returns is based on the number of weights you specify. If the set of weights don’t add up to 1, they are normalized accordingly to add up to 1. Listing 3-35 provides an example of splitting the movie DataFrame into three smaller ones.
// the weights need to be an Array
val smallerMovieDFs = movies.randomSplit(Array(0.6, 0.3, 0.1))
// let's see if the counts are added up to the count of movies DataFrame
movies.count
Long = 31393
smallerMovieDFs(0).count
Long = 18881
smallerMovieDFs(0).count + smallerMovieDFs(1).count + smallerMovieDFs(2).count
Long = 31393
Listing 3-35

Use randomSplit to Split movies DataFrame into Three Parts

Working with Missing or Bad Data

In reality, the data you often work with is not as clean as you would like. Maybe it’s because the data evolves, and therefore some columns have values and some don’t. It is important to deal with this kind of issue at the beginning of your data manipulation logic to prevent any unpleasant surprises that cause your long-running data processing job to stop working.

The Spark community recognizes the need to deal with missing data is a fact of life. Therefore, Spark provides a dedicated class called DataFrameNaFunctions to help in dealing with this inconvenient issue. An instance of DataFrameNaFunctions is available as the an member variable in the DataFrame class. There are three common ways of dealing with missing or bad data. The first way is to drop the rows that have missing values in one or more columns. The second way is to fill those missing values with user-provided values. The third way is to replace the bad data with something that you know how to deal with.

Let’s start with dropping rows with missing data. You can tell Spark to drop rows where any column or only the specific columns have missing data. Listing 3-36 shows a few different ways of drop rows with missing data.
// first create a DataFrame with missing values in one or more columns
import org.apache.spark.sql.Row
val badMovies = Seq(Row(null, null, null),
                    Row(null, null, 2018L),
                    Row("John Doe", "Awesome Movie", null),
                    Row(null, "Awesome Movie", 2018L),
                    Row("Mary Jane", null, 2018L))
val badMoviesRDD = spark.sparkContext.parallelize(badMovies)
val badMoviesDF = spark.createDataFrame(badMoviesRDD, movies.schema)
badMoviesDF.show
+-----------+-----------------+--------------+
| actor_name|      movie_title| produced_year|
+-----------+-----------------+--------------+
|       null|             null|          null|
|       null|             null|          2018|
|   John Doe|    Awesome Movie|          null|
|       null|    Awesome Movie|          2018|
|  Mary Jane|             null|          2018|
+-----------+-----------------+--------------+
// dropping rows that have missing data in any column
// both of the lines below achieve the same output
badMoviesDF.na.drop().show
badMoviesDF.na.drop("any").show
+----------+------------+--------------+
|actor_name| movie_title| produced_year|
+----------+------------+--------------+
+----------+------------+--------------+
// drop rows that have missing data in every single column
badMoviesDF.na.drop("all").show
+-----------+--------------+--------------+
| actor_name|   movie_title| produced_year|
+-----------+--------------+--------------+
|       null|          null|          2018|
|   John Doe| Awesome Movie|          null|
|       null| Awesome Movie|          2018|
|  Mary Jane|          null|          2018|
+-----------+--------------+--------------+
// drops rows that column actor_name has missing data
badMoviesDF.na.drop(Array("actor_name")).show
+------------+---------------+--------------+
|  actor_name|    movie_title| produced_year|
+------------+---------------+--------------+
|    John Doe|  Awesome Movie|          null|
|   Mary Jane|           null|          2018|
+------------+---------------+--------------+
Listing 3-36

Dropping Rows with Missing Data

Working with Structured Actions

This section covers the structured actions. They have the same eager evaluation semantics as the RDD actions, so they trigger the computation of all the transformations that lead up to a particular action. Table 3-9 describes a list of structured actions.
Table 3-9

Commonly Used Structured Actions

Operation

Description

show()

show(numRows)

show(truncate)

show(numRows, truncate)

Display the row in a tabular format. If numRows is not specified, it shows the top 20 rows. The truncate option controls whether to truncate a string column if it is longer than 20 characters.

head()

first()

head(n)

take(n)

Return the first row. If n is specified, then it returns the first n rows. first is an alias for first. take(n) is an alias for first(n).

takeAsList(n)

Return the first n rows as a Java list. Be careful not to take too many rows; otherwise, it may cause an out-of-memory error on the application’s driver process.

collect

collectAsList

Return all the rows as an array or a Java list. Apply the same caution as the one described in takeAsList action.

count

Return the number of rows in the DataFrame.

describe

Compute common statistics about numeric and string columns in the DataFrame. Available statistics are count, mean, stddev, min, max, and arbitrary approximate percentiles.

Most of these are self-explanatory. The show action has been used in many examples in the structured transformation section.

Another interesting action is called describe, which is discussed next.

describe(columnNames)
Sometimes it is useful to have a general sense of the basic statistics of the data you are working with. This action can compute the basic statistics of string and numeric columns, such as count, mean, standard deviation, minimum, and maximum. You have the option to choose which string or numeric column(s) to compute the statistics for. Listing 3-37 is an example.
movies.describe("produced_year").show
+-----------+-------------------------+
|    summary|            produced_year|
+-----------+-------------------------+
|      count|                    31392|
|       mean|       2002.7964449541284|
|     stddev|        6.377236851493877|
|        min|                     1961|
|        max|                     2012|
+-----------+-------------------------+
Listing 3-37

Use describe Action to Show the Statistics of produced_year Column

Introduction to Datasets

At one point, there was a lot of confusion about the differences between the DataFrame and Dataset APIs. Given these options, it is fair to ask what the differences are between them, the advantages and disadvantages of each option, and when to use which one. Recognizing this huge confusion in the Spark user community, Spark designers decided to unify the DataFrame APIs with Dataset APIs in Spark 2.0 version to have one fewer abstraction for users to learn and remember.

Starting with the Spark 2.0 release, there is only one high-level abstraction called Dataset, which has two flavors: a strongly-typed API and an untyped API. The term DataFrame doesn’t go away; instead, it has been redefined as an alias for a collection of generic objects in Dataset. From the code perspective, a DataFrame is essentially a type alias for Dataset[Row], where a Row is a generic untyped JVM object. A Dataset is a collection of strongly-typed JVM objects, represented by either a case class in Scala or a class in Java. Table 3-10 describes the Dataset API flavors available in each of the programming languages that Spark supports.
Table 3-10

Dataset Flavors

Language

Flavor

Scala

Dataset[T] and DataFrame

Java

Dataset[T]

Python

DataFrame

R

DataFrame

The Python and R languages have no compile-time type-safety; therefore, only the untyped Dataset APIs (a.k.a. DataFrame) are supported.

Consider the Dataset as a younger brother of DataFrame. Its unique properties include type safety and object-oriented. A Dataset is a strongly typed, immutable collection of data. Like a DataFrame, the data is mapped to a defined schema. However, there are a few important differences between a DataFrame and a Dataset.
  • Each row in a Dataset is represented by a user-defined object so that you can refer to an individual column as a member variable of that object. This provides you with compile-type safety.

  • The Dataset has helpers called encoders, which are smart and efficient encoding utilities that convert data in each user-defined object into a compact binary format. This translates into a reduction of memory usage when a Dataset is cached in memory and a reduction in the number of bytes when Spark needs to transfer over a network during the shuffling process.

In terms of limitations, the Dataset APIs are available in only strongly typed languages such as Scala and Java. There is the conversion cost associated with converting a Row object into a domain-specific object, and this cost can be a factor when a Dataset has millions of rows. At this point, a question should pop into your mind regarding when to use DataFrame APIs and Dataset APIs. The Dataset APIs are good for production jobs that need to run regularly and are written and maintained by a team of Data Engineers. For most interactive and explorative analysis use cases, using the DataFrame APIs is sufficient.

Note

A case class in the Scala language is like a JavaBean class in Java language; however, it has a few built-in interesting properties. An instance of a case class is immutable, and therefore it is commonly used to model domain-specific objects. In addition, it is easy to reason about the internal states of the instances of a case class because they are immutable. The toString and equals methods are automatically generated to make it easier to print out the case class's content and compare between case class instances. Scala case classes work well with the Scala pattern matching feature.

Creating Datasets

Before creating a Dataset, you need to define a domain-specific object to represent each row. There are a few ways to create a Dataset. The first way is to transform a DataFrame to a Dataset using the as(Symbol) function of the DataFrame class. The second way is to use the SparkSession.createDataset() function to create a Dataset from a collection of objects. The third way is to use the toDS implicit conversion utility. Listing 3-38 provides different examples of creating Datasets.
// define Movie case class
case class Movie(actor_name:String, movie_title:String, produced_year:Long)
// convert DataFrame to strongly typed Dataset
val moviesDS = movies.as[Movie]
// create a Dataset using SparkSession.createDataset() and the toDS implicit function
val localMovies = Seq(Movie("John Doe", "Awesome Movie", 2018L),
                                    Movie("Mary Jane", "Awesome Movie", 2018L))
val localMoviesDS1 = spark.createDataset(localMovies)
val localMoviesDS2 = localMovies.toDS()
localMoviesDS1.show
+------------+---------------+-------------+
|  actor_name|    movie_title|produced_year|
+------------+---------------+-------------+
|    John Doe|  Awesome Movie|         2018|
|   Mary Jane|  Awesome Movie|         2018|
+------------+---------------+-------------+
Listing 3-38

Different Ways of Creating Datasets

Among the different ways of creating Datasets, the first way is the most popular one. While transforming a DataFrame to a Dataset using a Scala case class, Spark performs a validation to ensure the member variable names in the Scala case class match up with column names in the schema of the DataFrame. If there is a mismatch, Spark lets you know.

Working with Datasets

Now that you have a Dataset, you can manipulate it using the transformations and actions. Earlier in the chapter, the columns in the DataFrame used one of these options. With a Dataset, each row is represented in a strongly typed object; therefore, you can just refer to the columns using the member variable names, which give you type safety and compile-time validation. If there is a misspelling in the name, the compiler flags them immediately during the development phase. Listing 3-39 are examples of manipulating a Dataset.
// filter movies that were produced in 2010 using
moviesDS.filter(movie => movie.produced_year == 2010).show(5)
+---------------------+---------------------+-------------+
|           actor_name|          movie_title|produced_year|
+---------------------+---------------------+-------------+
|    Cooper, Chris (I)|             The Town|         2010|
|      Jolie, Angelina|                 Salt|         2010|
|      Jolie, Angelina|          The Tourist|         2010|
|       Danner, Blythe|       Little Fockers|         2010|
|   Byrne, Michael (I)| Harry Potter and ...|         2010|
+---------------------+---------------------+-------------+
// displaying the title of the first movie in the moviesDS
moviesDS.first.movie_title
String = Coach Carter
// try with misspelling the movie_title and get compilation error
moviesDS.first.movie_tile
error: value movie_tile is not a member of Movie
// perform projection using map transformation
val titleYearDS = moviesDS.map(m => ( m.movie_title, m.produced_year))
titleYearDS.printSchema
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = false)
// demonstrating a type-safe transformation that fails at compile time, performing subtraction on a column with string type
// a problem is not detected for DataFrame until runtime
movies.select('movie_title - 'movie_title)
// a problem is detected at compile time
moviesDS.map(m => m.movie_title - m.movie_title)
error: value - is not a member of String
// take action returns rows as Movie objects to the driver
moviesDS.take(5)
Array[Movie] = Array(Movie(McClure, Marc (I),Coach Carter,2005), Movie(McClure, Marc (I),Superman II,1980), Movie(McClure, Marc (I),Apollo 13,1995))
Listing 3-39

Manipulating a Dataset in a Type-Safe Manner

For those who use the Scala programming language regularly, working with Dataset strongly-typed APIs feels natural and gives you the impression that those objects in the Dataset reside locally.

When you use the Dataset strongly-typed APIs, Spark implicitly converts each Row instance to the domain-specific object that you provide. This conversion has some cost in terms of performance; however, it provides more flexibility.

One general guideline to help decide when to use a Dataset over DataFrame is the desire to have a higher degree of type-safety at compile time, which are important for complex ETL Spark jobs developed and maintained by multiple Data Engineers.

Using SQL in Spark SQL

In the big data era, SQL has been described as the lingua franca for big data analysis. One of the coolest features in Spark is the ability to use SQL to perform distributed data manipulation at scale. Data analysts who are proficient at SQL can now use Spark to perform data analysis on large datasets. One important note to remember is that SQL in Spark is designed for online analytical processing (OLAP) use cases, not online transaction processing use cases (OLTP). In other words, it is not applicable for low-latency use cases.

SQL has evolved and improved over time. Spark implements a subset of ANSI SQL:2003 revision, which most popular RDBMS servers support. Being compliant with this revision means Spark SQL data processing engine can be benchmarked using a widely used industry-standard decision support benchmark called TPC-DS.

In late 2016, Facebook began migrating some of its largest Hive workloads to Spark to take advantage of the power of the Spark SQL engine (see https://code.facebook.com/posts/1671373793181703/apache-spark-scale-a-60-tb-production-use-case/).

Note

Structure Query Language (SQL) is a domain-specific language that performs data analysis and manipulation of structured data organized in a table format. The concepts in SQL are based on relational algebra; however, it is an easy language to learn. One key difference between SQL and other programming languages like Scala or Python is that SQL is a declarative programming language, which means you express what you want to do with the data and let the SQL execution engine figure out how to perform the data manipulations as well as the necessary optimizations to speed up execution time. If you are new to SQL, there is a free course at this site at www.datacamp.com/courses/intro-to-sql-for-data-science.

Running SQL in Spark

Spark provides a few different options for running SQL in Spark.
  • Spark SQL CLI (./bin/spark-sql)

  • JDBC/ODBC server

  • Programmatically in Spark applications

The first two options integrate Apache Hive to leverage its megastore, a repository that contains the metadata and schema information about the various systems and user-defined tables. This section covers only the last option.

A DataFrame and a Dataset are essentially like tables in a database. Before you can issue SQL queries to manipulate them, you need to register them as temporary views. Each view has a name, which is used as the table name in the select clause. Spark provides two levels of scoping for views. One is at the Spark session level. When a DataFrame is registered at this level, only the queries issued in the same session can refer to that DataFrame. The session-scoped level disappears when the associated Spark session is closed. The second scoping level is global, which means these views are available to SQL statements in all Spark sessions. All the registered views are maintained in the Spark metadata catalog that can be accessed via SparkSession. Listing 3-40 is an example of registering views and using the Spark catalog to inspect the metadata of the views.
// display tables in the catalog, expecting an empty list
spark.catalog.listTables.show
+-------+------------+---------------+------------+------------+
|   name|    database|    description|   tableType| isTemporary|
+-------+------------+---------------+------------+------------+
+-------+------------+---------------+------------+------------+
// now register movies DataFrame as a temporary view
movies.createOrReplaceTempView("movies")
// should see the movies view in the catalog
spark.catalog.listTables.show
+-------+---------+------------+-----------+--------------+
|   name| database| description|  tableType|   isTemporary|
+-------+---------+------------+-----------+--------------+
| movies|     null|        null|  TEMPORARY|          true|
+-------+---------+------------+-----------+--------------+
// show the list of columns of movies view in catalog
spark.catalog.listColumns("movies").show
+--------------+------------+---------+---------+------------+------------+
|          name| description| dataType| nullable| isPartition|    isBucket|
+--------------+------------+---------+---------+------------+------------+
|    actor_name|        null|   string|     true|       false|       false|
|   movie_title|        null|   string|     true|       false|       false|
| produced_year|        null|   bigint|     true|       false|       false|
+--------------+------------+---------+---------+------------+------------+
// register movies as global temporary view called movies_g
movies.createOrReplaceGlobalTempView("movies_g")
Listing 3-40

Register the movies DataFrame as a Temporary View and Inspecting Metadata Catalog

Listing 3-40 gives you a couple of views to select from. The programmatic way of issuing SQL queries is to use the sql function of SparkSession class. In the SQL statement, you have access to all SQL expressions and built-in functions. The SparkSession.sql function executes the given SQL query; it returns a DataFrame. The ability to issue SQL statements and use DataFrame transformations and actions provides you a lot of flexibility in how you choose to perform distributed data processing in Spark.

Listing 3-41 provides examples of issuing simple and complex SQL statements.
// simple example of executing a SQL statement without a registered view
val infoDF = spark.sql("select current_date() as today , 1 + 100 as value")
infoDF.show
+----------+--------+
|     today|   value|
+----------+--------+
|2017-12-27|     101|
+----------+--------+
// select from a view
spark.sql("select * from movies where actor_name like '%Jolie%' and produced_year > 2009").show
+---------------+----------------+--------------+
|     actor_name|     movie_title| produced_year|
+---------------+----------------+--------------+
|Jolie, Angelina|            Salt|          2010|
|Jolie, Angelina| Kung Fu Panda 2|          2011|
|Jolie, Angelina|     The Tourist|          2010|
+---------------+----------------+--------------+
// mixing SQL statement and DataFrame transformation
spark.sql("select actor_name, count(*) as count from movies group by actor_name")
         .where('count > 30)
         .orderBy('count.desc)
         .show
+----------------------+--------+
|            actor_name|   count|
+----------------------+--------+
|      Tatasciore, Fred|      38|
|         Welker, Frank|      38|
|    Jackson, Samuel L.|      32|
|         Harnell, Jess|      31|
+----------------------+--------+
// using a subquery to figure out the number movies produced each year.
// leverage """ to format multi-line SQL statement
spark.sql("""select produced_year, count(*) as count
                   from (select distinct movie_title, produced_year from movies)
                   group by produced_year""")
         .orderBy('count.desc).show(5)
+------------------+--------+
|     produced_year|   count|
+------------------+--------+
|              2006|      86|
|              2004|      86|
|              2011|      86|
|              2005|      85|
|              2008|      82|
+------------------+--------+
// select from a global view requires prefixing the view name with key word 'global_temp'
spark.sql("select count(*) from global_temp.movies_g").show
+--------+
|   count|
+--------+
|   31393|
+--------+
Listing 3-41

Executing SQL Statements in Spark

Instead of reading the data file through DataFrameReader class and registering the newly created DataFrame as a temporary view, there is a short and convenient way to issue SQL queries against a data file. Listing 3-42 is an example.
spark.sql("SELECT * FROM parquet.`<path>/chapter4/data/movies/movies.parquet`").show(5)
Listing 3-42

Issue SQL Query Against a Data File

Writing Data Out to Storage Systems

At this point, you know how to read data from various file formats or a database server using DataFrameReader, and you know how to use SQL or transformations and actions of structured APIs to manipulate the data. At some point, you need to write the result of the data processing logic in the DataFrame to an external storage system (i.e., a local file system, HDFS, or Amazon S3). In a typical ETL data processing job, the results most likely be written out to some persistent storage system.

In Spark SQL, the DataFrameWriter class is responsible for the logic and complexity of writing out the data in a DataFrame to an external storage system. An instance of DataFrameWriter class is available to you as the write variable in the DataFrame class. The pattern for interacting with DataFrameWriter is similar to the interacting pattern of DataFrameReader. You can refer to it from a Spark shell or in a Spark application, as shown in Listing 3-43.
movies.write
Listing 3-43

Using write Variable from DataFrame Class

Listing 3-44 describes the common pattern for interacting with DataFrameWriter.
movies.write.format(...).mode(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save(path)
Listing 3-44

Common Interacting Pattern with DataFrameWriter

Similar to DataFrameReader, the default format is Parquet; therefore, it is unnecessary to specify a format if the desired output format is Parquet. The partitionBy, bucketBy, and sortBy functions control the directory structure of the output files in the file-based data sources. Structuring the directory layout based on reading patterns dramatically reduces the amount of data that needs to be read for analysis. You learn more about this later in the chapter. The input to the save function is a directory name, not a file name.

One of the important options in the DataFrameWriter class is the save mode, which controls how Spark handles the situation when the specified output location exists. Table 3-11 lists the various supported save modes.
Table 3-11

Save Modes

Mode

Description

append

This appends the DataFrame data to the list of files that already exist at the specified destination location.

overwrite

This completely overwrites any data files that already exist at the specified destination location with the data in the DataFrame.

error

errorIfExists

default

This is the default mode. If the specified destination location exists, then DataFrameWriter throws an error.

ignore

If the specified destination location exists, then simply do nothing. In other words, silently don’t write out the data in the DataFrame.

Listing 3-45 shows a few examples of using the various combination of formats and modes
// write data out in CVS format, but using a '#' as delimiter
movies.write.format("csv").option("sep", "#").save("/tmp/output/csv")
// write data out using overwrite save mode
movies.write.format("csv").mode("overwrite").option("sep", "#").save("/tmp/output/csv")
Listing 3-45

Using DataFrameWriter to Write Out Data to File-based Sources

The number of files written out to the output directory is corresponding to the number of partitions your DataFrame has. Listing 3-46 shows how to find out the number of partitions a DataFrame has.
movies.rdd.getNumPartitions
Int = 1
Listing 3-46

Display the Number of DataFrame Partitions

When the number of rows in a DataFrame is not large, there is a need to have a single output file to make it easier to share. A small trick to achieve this goal is to reduce the number of partitions in your DataFrame to one and then write it out. Listing 3-47 shows an example of how to do that.
val singlePartitionDF = movies.coalesce(1)
Listing 3-47

Reduce the Number of Partitions in a DataFrame to 1

The idea of writing data out using partitioning and bucketing is borrowed from the Apache Hive user community. As a rule of thumb, the partition by column should have low cardinality. In the movies DataFrame, the produced_year column is a good candidate for the partition by column. Let’s say you want to write out the movies DataFrame with partitioning by the produced_year column. The DataFrameWriter writes out all the movies with the same produced_year into a single directory. The number of directories in the output folder corresponds to the number of years in the movies DataFrame. Listing 3-48 is an example of using partitionBy function .
movies.write.partitionBy("produced_year").save("/tmp/output/movies ")
// the /tmp/output/movies directory will contain the following subdirectories
produced_year=1961 to produced_year=2012
Listing 3-48

Write the movies DataFrame Using Partition By produced_year Column

The directory names generated by the partitionBy option seems strange because each directory name consists of the partitioning column name and the associated value. These two pieces of information are used at the data reading time to choose which directory to read based on the data access pattern, and therefore it ends up reading much less data than otherwise.

The Trio: DataFrame, Dataset, and SQL

Now you know there are three different ways of manipulating structured data in the Spark SQL module. Table 3-12 shows where each option falls in the syntax and analysis spectrum.
Table 3-12

Syntax and Analysis Errors Spectrum

 

SQL

DataFrame

Dataset

System Errors

Runtime

Compile time

Compile time

Analysis Errors

Runtime

Runtime

Compile time

The earlier you catch errors, the more productive you are and the more stable your data processing applications will be.

DataFrame Persistence

A DataFrame can be persisted/cached in memory just like how it is done with RDDs. The same familiar persistence APIs (persist and unpersist) are available in DataFrame class. However, there is one big difference when caching a DataFrame. Since Spark SQL knows the schema of the data in a DataFrame, it can organize the data in a columnar format and apply any applicable compressions to minimize space usage. The net result is it require much less space to store a DataFrame in memory than storing an RDD when both are backed by the same data file. All the different storage options described in Table 3-5 are applicable for persisting a DataFrame. Listing 3-49 demonstrates persisting a DataFrame with a human readable name, which is easy to identify in Spark UI.
val numDF = spark.range(1000).toDF("id")
// register as a view
numDF.createOrReplaceTempView("num_df")
// use Spark catalog to cache the numDF using name "num_df"
spark.catalog.cacheTable("num_df")
// force the persistence to happen by taking the count action
numDF.count
Listing 3-49

Persisting a DataFrame with a Human Readable Name

Next, point your browser to the Spark UI (http://localhost:4040 when running Spark shell) and click the Storage tab. Figure 3-2 shows an example.
../images/419951_2_En_3_Chapter/419951_2_En_3_Fig2_HTML.png
Figure 3-2

Storage tab

Summary

In this chapter, you learned the following.
  • The Spark SQL module provides a new and powerful abstraction for structured distributed data manipulation. Structured data has a defined schema, which consists of column names and a column data type.

  • The main programming abstraction in Spark SQL is the Dataset, and it has two flavors of APIs: a strongly typed API and an untyped API. For the strongly typed APIs, each row is represented by a domain-specified object. For the untyped APIs, the reach row is represented by a Row object. A DataFrame is now just an alias of Dataset[Row]. The strongly-typed APIs give you static-typing and compile-time checking; therefore, they are only available in strongly typed languages, such as Scala or Java.

  • Spark SQL supports reading data from a variety of popular data sources in different formats. The DataFrameReader class is responsible for creating a DataFrame by reading data from any of these data sources.

  • Like RDD, a Dataset has two types of structured operations. They are transformation and actions. The former is lazily evaluated, and the latter is eagerly evaluated.

  • Spark SQL makes it very easy to use SQL to perform data processing against large sets. This opens up Spark to data analysts and nonprogrammers.

  • Writing out data from either a Dataset or DataFrame is done via a class called DataFrameWriter.

Spark SQL Exercises

The following exercises are based on the movies.tsv and movie-ratings.tsv files in chapter3/data/movies directory. The column delimiter in these files is a tab, so make sure to use that splitting each line.

Each line in the movies.tsv file represents an actor played in a movie. If a movie has ten actors played in it, there are rows for that movie.
  1. 1.

    Compute the number of movies produced each year. The output should have two columns: year and count. The output should be ordered by the count in descending order.

     
  2. 2.

    Compute the number of movies each actor was in. The output should have two columns: actor, count. The output should be ordered by the count in descending order.

     
  3. 3.

    Compute the highest-rated movie per year and include all the actors played in that movie. The output should have only one movie per year, and it should contain four columns: year, movie title, rating, a semicolon-separated list of actor names. This question requires a join between movies.tsv and movie-ratings.tsv files. There are two approaches to this problem. The first is to figure out the highest-rated movies per year and then join with a list of actors. The second one is to perform the join first and then figure out the highest-rated movies per year and a list of actors. The result of each approach is different from the other one. Why do you think that is?

     
  4. 4.

    Determine which pair of actors worked together most. Working together is defined as appearing in the same movie. The output should have three columns: actor1, actor2, and count. The output should be sorted by the count in descending order. The solution to this question requires doing self-join.

     
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.218.168.16