This chapter covers
In the big data and enterprise context, relational databases are often the source of the data on which you will perform analytics. It makes sense to understand how to extract data from those databases, both through the whole table or through SQL SELECT
statements.
In this chapter, you’ll learn several ways to ingest data from those relational databases, ingesting either the full table at once or asking the database to perform some operations before the ingestion. Those operations could be filtering, joining, or aggregating data at the database level to minimize data transfer.
You will see in this chapter which databases are supported by Spark. When you work with a database not supported by Spark, a custom dialect is required. The dialect is a way to inform Spark of how to communicate with the database. Spark comes with a few dialects and, in most cases, you won’t need to even think about them. However, for those special situations, you’ll learn how to build one.
Figure 8.1 This chapter focuses on ingestion from databases, whether the database is supported by Spark, or is not supported and requires a custom dialect to be used.
Finally, many enterprises use document stores and NoSQL databases. In this chapter, you’ll also learn how to connect to Elasticsearch and go through a complete ingestion scenario. Elasticsearch is the only NoSQL database that you will study.
In this chapter, you will work with MySQL, IBM Informix, and Elasticsearch.
Figure 8.1 illustrates where you stand on your road to ingestion.
Lab Examples from this chapter are available in GitHub at https://github .com/jgperrin/net.jgp.books.spark.ch08 . Appendix F provides links, tips, and help with installing relational databases. Appendix L is a reference for ingestion.
As you probably know, relational databases are a cornerstone of the transactional data stores you’ll find in any enterprise. In most cases, as soon as a transaction happens, it involves an existing relational database somewhere.
Let’s imagine you have a relational database containing film actors and want to display them alphabetically. To accomplish that, you’ll learn about the elements Spark needs to establish a database connection (and, spoiler alert, if you are familiar with JDBC, it’s the same). You’ll then learn a bit about the sample database, its data, and its schema; play with the sample database; look at the output; and, finally, dig into the code.
Spark needs a small list of information to connect to a database. Spark connects directly to relational databases by using Java Database Connectivity (JDBC) drivers. To connect to a database, Spark needs the following:
The driver may need other information that is driver-specific. For example, Informix needs DELIMIDENT=Y
, and MySQL server expects SSL by default, so you may want to specify useSSL=false
.
Naturally, the JDBC driver needs to be provided to the application. It can be in your pom.xml file:
<dependency>
<groupId>
mysql </groupId>
❶
<artifactId>
mysql-connector-java </artifactId>
❷
<version>
8.0.8-dmr </version>
❸...
</dependency>
Listing 8.3 describes the pom.xml file in more detail.
For your first ingestion from a database, you are going to use the Sakila database in MySQL. Sakila is a standard sample database that comes with MySQL; a lot of tutorials are available, and you may even have learned MySQL with it. This section describes what the database is about--its purpose, structure, and data. Figure 8.2 will summarize our operation.
The Sakila sample database is designed to represent a DVD rental store. Okay, I get it--some of you are probably wondering what a DVD is and why you would even rent such a thing. DVD stands for digital video (or versatile ) disc . A DVD is a shiny disk, 12 cm (about 5 inches) in diameter, which is used to store digital information. In the early days (1995), it was used to store movies in digital format. People could buy or rent those objects and watch the movie on their TV by using a device called a DVD player.
If you did not want to buy the DVD, you could rent it from small shops or larger chains like Blockbuster in the United States. Those stores needed people to check in and out the disks and other artifacts like VHS tapes (as much as I recognize that this is even more obscure than a DVD, it is really beyond the scope of this book). In 1997, an innovative firm, Netflix, started to rent DVDs via mail.
You will use the demo database, which includes about 15 tables and a few views (figure 8.2) for this scenario.
Figure 8.2 Spark ingests the database, stored in a MySQL instance. Spark needs a JDBC driver, like any Java application. MySQL stores the sample database, called Sakila, which contains 23 tables and views. Some are represented here, such as actor
, actor_info
, category
, and so on. You’ll focus on actor
.
As you can see in figure 8.2, you’re going to use the actor
table for this project, which is described in more detail in table 8.1. You’ll notice that last_update
implements change data capture (CDC).
Let’s look at the output you’ll render. The following listing shows the result you will achieve programmatically: five actors from the actor
table, as well as the metadata.
+--------+----------+---------+-------------------+ ❶
|actor_id|first_name|last_name| last_update |
+--------+----------+---------+-------------------+
| 92 | KIRSTEN | AKROYD |2006-02-14 22:34:33|
| 58 | CHRISTIAN| AKROYD |2006-02-14 22:34:33|
| 182 | DEBBIE | AKROYD |2006-02-14 22:34:33|
| 118 | CUBA | ALLEN |2006-02-14 22:34:33|
| 145 | KIM | ALLEN |2006-02-14 22:34:33|
+--------+----------+---------+-------------------+
only showing top 5 rows
root ❷
|-- actor_id: integer (nullable = false) ❸
|-- first_name: string (nullable = false) ❸
|-- last_name: string (nullable = false) ❸
|-- last_update: timestamp (nullable = false) ❸
The dataframe contains 200 record(s). ❹
❸ Datatypes are directly converted from the table’s type to Spark’s datatype.
❹ Counting the number of records in the dataframe
Let’s look at the code to produce the output you want. You’ll see three options for doing that, and you’ll get to choose your favorite. You’ll also learn how to modify your pom.xml file to load the JDBC driver, by simply adding it in the dependencies section.
The first thing to do is to identify the column by using the dataframe’s col()
method, and then you can use its orderBy()
method. An introduction to the dataframe is in chapter 3. (You’ll study more transformation and manipulation in chapters 11 to 13.)
Finally, you can sort the output by using a single line:
df
= df
.orderBy( df
.col( "last_name"
));
Lab This is lab #100. It is available on GitHub at https://github.com/jgperrin/ net.jgp.books.spark.ch08 . It requires a MySQL or MariaDB connection.
The following listing shows you the first option.
package
net.jgp.books.spark.ch08.lab100_mysql_ingestion;
import
java.util.Properties;
import
org.apache.spark.sql.Dataset;import
org.apache.spark.sql.Row;import
org.apache.spark.sql.SparkSession;
public class
MySQLToDatasetApp {
public static void
main(String[] args
) { MySQLToDatasetApp
app
= new
MySQLToDatasetApp();
app
.start(); }
private void
start() { SparkSession
spark
= SparkSession.builder() ❶ .appName(
"MySQL to Dataframe using a JDBC Connection"
) .master(
"local"
) .getOrCreate();
Properties
props
= new
Properties(); ❷
props
.put( "user"
, "root"
); ❸
props
.put( "password"
, "Spark<3Java"
); ❹
props
.put( "useSSL"
, "false"
); ❺
Dataset<Row>
df
= spark
.read().jdbc(
"jdbc:mysql://localhost:3306/sakila?serverTimezone=EST"
, ❻
"actor"
, ❼props
); ❽
df
= df
.orderBy( df
.col( "last_name"
)); ❾
df
.show(5);
df
.printSchema(); System.
out
.println( "The dataframe contains "
+
df
.count() + " record(s)."
); }
}
❷ Creates a Properties object, which is going to be used to collect the properties needed
❸ Username property--you may not want to use root on a production system
❺ Custom property--here, MySQL needs to be told that it won’t use SSL in its communication
Note If you read chapter 7, you’ll notice that the ingestion mechanism is similar, whether you ingest files or databases.
PASSWORDS I know you know, and you know I know you know, right? A small reminder is always useful: do not hardcode a password in your code; anyone can extract it from the JAR file in a matter of seconds (especially if your variable is called password
!). In the repository, lab #101 uses the same code as listing 8.2 (lab #100) but gets the password from an environment variable.
You’ll also need to ensure that Spark has access to your JDBC driver, as shown in listing 8.3. One of the easiest ways to do that is to list the database drivers you need in your project’s pom.xml file.
Note We’ll use MySQL, Informix, and Elasticsearch in this chapter: MySQL because it is a fully supported database, Informix because it will require a dialect, and Elasticsearch because it is a NoSQL database.
...
<properties>
...
<mysql.version>
8.0.8-dmr </mysql.version>
❶
<informix-jdbc.version>
4.10.8.1 </informix-jdbc.version>
❷
<elasticsearch-hadoop.version>
6.2.1 </elasticsearch-hadoop.version>
❸
</properties>
<dependencies>
...
<dependency>
<groupId>
mysql </groupId>
❹
<artifactId>
mysql-connector-java </artifactId>
<version>
${mysql.version} </version>
❺
</dependency>
<dependency>
<groupId>
com.ibm.informix </groupId
> ❻
<artifactId>
jdbc </artifactId>
<version>
${informix-jdbc.version} </version>
❼
</dependency>
<dependency>
<groupId>
org.elasticsearch </groupId>
❽<artifactId>
elasticsearch-hadoop </artifactId>
<version>
${elasticsearch-hadoop.version} </version>
❾
</dependency>
...
❶ You are using MySQL JDBC driver v8.0.8-dmr.
❷ You are going to use Informix JDBC driver v4.10.8.1.
❸ You are using Elasticsearch v6.2.1.
❹ In this example, you are using MySQL.
❺ You are using MySQL JDBC driver v8.0.8-dmr.
❻ Later in this chapter, you’ll use Informix to build a custom dialect.
❼ You are going to use Informix JDBC driver v4.10.8.1.
❽ Toward the end of this chapter, you’ll connect to Elasticsearch.
❾ You are using Elasticsearch v6.2.1.
Note that the version numbers are different in the repository, as I keep up with, as much as possible, the latest versions.
As is often the case, there are various ways to write the same operations. Let’s look at how you can tweak the parameters and URL to connect to the database. In listing 8.2, you used the following:
Properties
props
= new
Properties();props
.put( "user"
, "root"
);props
.put( "password"
, "Spark<3Java"
);props
.put( "useSSL"
, "false"
);
Dataset<Row>
df
= spark
.read().jdbc(
"jdbc:mysql://localhost:3306/sakila?serverTimezone=EST"
,
"actor"
,
props
);
You can replace this code with one of two options (you can find the full code in MySQLToDatasetWithLongUrlApp.java in this chapter’s repository). The first option is to build a longer URL. You may already have a library in your application or platform with a JDBC URL generator, so you can reuse it easily. Note that you need to give the Spark reader object an empty list of properties, materialized by new Properties()
:
String
jdbcUrl
= "jdbc:mysql://localhost:3306/sakila"
❶ +
"?user=root"
+
"&password=Spark<3Java"
+
"&useSSL=false"
+
"&serverTimezone=EST"
;Dataset<Row>
df
= spark
.read() .jdbc(
jdbcUrl
, "actor"
, new
Properties()); ❷
❷ Empty properties are still required.
The second option is to use only the options; this may be useful if you read the properties from a configuration file. The following code snippet shows you how to do that:
Dataset<Row>
df
= spark
.read() .option(
"url"
, "jdbc:mysql://localhost:3306/sakila"
) .option(
"dbtable"
, "actor"
) .option(
"user"
, "root"
) .option(
"password"
, "Spark<3Java"
) .option(
"useSSL"
, "false"
) .option(
"serverTimezone"
, "EST"
) .format(
"jdbc"
) .load();
You can find the full code in MySQLToDatasetWithOptionsApp.java in this chapter’s repository.
Note that in this version, you are not using the jdbc()
method of the object returned by read()
--an instance of DataFrameReader
. You are using the format()
and load()
methods. As you can see, the table is only a property called dbtable
. There is no preference in the syntax to use; however, you may encounter them all, and it could appear confusing. If you are working on a project with a team, I advise you to set a standard for the team, knowing that, most likely, those parameters will be read from a configuration file.
As a reminder, properties are case-insensitive. Note that the values are interpreted by the driver, and they might be case-sensitive.
Let’s have a look at how Spark can handle nonsupported databases with custom dialects.
The dialect is the translation block between Spark and the database. In this section, you’ll examine the role of a dialect, the dialects that are provided with Spark, and in which situation you’ll have to write your own dialect.
The dialect is a small software component, often implemented in a single class, that bridges Apache Spark and the database; see figure 8.3.
When Spark imports the data and stores it, Spark needs to know what database types map to a Spark type. For example, Informix and PostgreSQL have a SERIAL
type, which is an integer type that automatically increments when you insert new values in the database. It is convenient for defining unique identifiers. However, Spark needs to be told that this datatype is an integer when it hits the Tungsten storage. Tungsten is the storage manager that optimizes memory usage for objects, bypassing the JVM’s storage mechanism for more efficiency.
The dialect defines Spark’s behavior when communicating with the database.
Figure 8.3 Within Spark, the dataframe communicates with the database via a dialect acting as a complementary driver.
Spark comes with a few database dialects as part of the standard distribution. This means you can connect to those databases directly out of the box. As of Spark v3.0.0, those dialects are the following:
If your database is not in this list, you can have a look at the Spark Packages website at https://spark-packages.org/?q=tags%3A%22Data%20Sources%22 or write/build your own.
If you are using a relational database that is not in the list, you may want to contact your database vendor before implementing your own dialect. If the vendor does not offer a dialect, don’t panic. Implementing your own is relatively easy; you’ll learn how to do it here. The example you are going to see is based on IBM Informix, but you can easily adapt the code to your own database. No knowledge of Informix is required here, although it is a wonderful RDBMS.
The reason I picked IBM Informix is that it does not have a dialect of its own, compared to all the other ones. Nevertheless, it remains a vibrant database, especially in the IoT sphere.
Lab This is lab #200. It is available on GitHub at https://github.com/jgperrin/ net.jgp.books.spark.ch08 . It requires an Informix database; you can download and use the Developer Edition for free from https://www.ibm.com/products/ informix .
The dialect is one class, extending JdbcDialect
, as shown in listing 8.4. The canHandle()
method is the entry point to the dialect; this method acts as a filter to know whether Spark should use this dialect. It receives the JDBC URL as input, and you can filter based on the URL. In this situation, you’ll filter on the beginning of the URL, looking for the distinctive pattern of the Informix JDBC driver: informix-sqli
. Each JDBC driver has a unique signature through its URL.
The getCatalystType()
method will convert a JDBC type to a Catalyst type.
package
net.jgp.books.spark.ch08.lab_200_informix_dialect;
import
org.apache.spark.sql.jdbc.JdbcDialect;import
org.apache.spark.sql.types.DataType;import
org.apache.spark.sql.types.DataTypes;import
org.apache.spark.sql.types.MetadataBuilder;
import
scala.Option;
public class
InformixJdbcDialect extends
JdbcDialect {
private static final long
serialVersionUID
= -672901; ❶ @Override
public boolean
canHandle(String url
) { ❷
return
url
.startsWith( "jdbc:informix-sqli"
); ❸ }
@Override
public
Option<DataType> getCatalystType(int sqlType
, ❹ String
typeName
, int
size
, MetadataBuilder md
) {
if
( typeName
.toLowerCase().compareTo( "serial"
) == 0) { ❺
return
Option.apply(DataTypes. IntegerType
); ❻ }
if
( typeName
.toLowerCase().compareTo( "calendar"
) == 0) {
return
Option.apply(DataTypes. BinaryType
); ❻ }
...
return
Option.empty(); ❻ }
}
❶ JdbcDialect is serializable, hence a unique ID for this class.
❷ The filter method that will allow Spark to know which driver to use in which context
❸ The signature of the Informix JDBC driver is Informix-sqli.
❹ Converts a SQL type to a Spark type
❺ In this case, Spark does not know what a SERIAL datatype is, but, for Spark, it is simply an integer. Do not worry about testing typeName for null.
❻ Returning the value, Scala-style
I could have used equalsIgnoreCase()
or many other ways to compare strings, but over my many years of dealing with Java developers, I’ve seen that there is never a consensus. Defining the proper methods to use (or settling the eternal debate on naming conventions) is not my mission here.
Make sure your driver’s signature is sufficient. If I had used something like return url.contains("informix");,
it could have been too broad. You could add the colon after sqli
too. I could have written this example using a switch/case statement, but I wanted to leave a little room for improvement, right?
The return type is what is expected by Spark. As you may know, Spark is written in Scala, so, at some touchpoints, you will need to return Scala types. In this situation, the return type corresponds to an empty option.
As a reminder, Catalyst is the optimization engine built in Spark (see chapter 4). The getCatalystType()
method receives four arguments:
java.sql.Types
(see http://mng.bz/ 6wxR ). However, note that complex datatypes, for which you may be writing this conversion, will probably not be in the list; you will have to see how they are seen in this method.SERIAL
datatype, an integer with automatic incrementing you’ll find in Informix and PostgreSQL.The getCatalystType()
method return type is one of the following:
BinaryType
, BooleanType
, ByteType
, CalendarIntervalType
, DateType
, DoubleType
, FloatType
, IntegerType
, LongType
, NullType
, ShortType
, StringType
, or TimestampType
org.apache.spark.sql.types.DataType
and can be ArrayType
, HiveStringType
, MapType
, NullType
, NumericType
, ObjectType
, or StructType
Appendix L covers the datatypes in more detail. It also describes additional methods you may need in order to implement a full dialect (such as a reverse conversion, from a Spark type to a SQL type), how to truncate a table, and so on.
Sometimes you don’t want to copy all the data from a table to a dataframe. You know that you won’t use some data, and you don’t want to copy the rows you won’t use, because transfer is an expensive operation. A typical use case would be to run analytics on yesterday’s sales, compare month-to-month sales, and so on.
In this section, you’ll learn about ingesting data from relational databases by using SQL queries, avoiding superfluous data transfer. This operation is called filtering the data . You’ll also learn about partitioning while ingesting.
In SQL, one way to filter data is to use a WHERE
clause as part of your SELECT
statement. Let’s see how to integrate such a clause in your ingestion mechanism. As with full table ingestion, Spark still uses JDBC underneath.
Lab This is lab #300. It is available on GitHub at https://github.com/jgperrin/ net.jgp.books.spark.ch08 . It requires a MySQL or MariaDB database.
The syntax is similar to what we have been using for ingesting a full table. However, the dbtable
option (if you use the load()
method) or table
parameter (if you use the jdbc()
method) must use the following syntax:
(<SQL select statement>) <table alias>
(SELECT * FROM film WHERE (title LIKE "%ALIEN%"
➥
OR title LIKE "%victory%" OR title LIKE "%agent%"➥
OR description LIKE "%action%") AND rental_rate>1
➥
AND (rating="G" OR rating="PG")) film_alias
In this query, you’re looking for movies with a title containing alien , victory , or agent , or with a description containing action ; with a rental rate of more than $1; and with a rating of either G (General audience) or PG (Parental Guidance suggested). Note that, because we use LIKE
, the case does not matter: you will get all movies with alien
, Alien
, and ALIEN
. The last keyword ( film_alias
) is simply an alias. Figure 8.4 illustrates the process.
The following listing shows the desired output, which is the records that are highlighted in figure 8.4.
+-------+--------------+--------------------+------------+-----------+...
|film_id| title | description |release_year|language_id|...
+-------+--------------+--------------------+------------+-----------+...
| 6 | AGENT TRUMAN |A Intrepid Panora...| 2005-12-31 | 1 |...
| 13 | ALI FOREVER |A Action-Packed D...| 2005-12-31 | 1 |...
...
+-------+--------------+--------------------+------------+-----------+...
only showing top 5 rows
root
|-- film_id: integer (nullable = true)
|-- title: string (nullable = true)
|-- description: string (nullable = true)
|-- release_year: date (nullable = true)
|-- language_id: integer (nullable = true)
|-- original_language_id: integer (nullable = true)
|-- rental_duration: integer (nullable = true)
|-- rental_rate: decimal(4,2) (nullable = true)
|-- length: integer (nullable = true)
|-- replacement_cost: decimal(5,2) (nullable = true)
|-- rating: string (nullable = true)
|-- special_features: string (nullable = true)
|-- last_update: timestamp (nullable = true)
The dataframe contains 16 record(s).
Figure 8.4 The ingestion process from MySQL, using the MySQL JDBC driver. In this scenario, you are interested in only some films matching the SQL query--the ones highlighted in bold.
The following listing shows the code that enables the ingestion through an SQL query.
package
net.jgp.books.spark.ch08.lab300_advanced_queries;
import
java.util.Properties;
import
org.apache.spark.sql.Dataset;import
org.apache.spark.sql.Row;import
org.apache.spark.sql.SparkSession;
public class
MySQLWithWhereClauseToDatasetApp {
public static void
main(String[] args
) { MySQLWithWhereClauseToDatasetApp
app
=
new
MySQLWithWhereClauseToDatasetApp();
app
.start(); }
private void start() {
SparkSession
spark
= SparkSession.builder() .appName(
"MySQL with where clause to Dataframe using a JDBC Connection"
) .master(
"local"
) .getOrCreate();
Properties
props
= new
Properties();
props
.put( "user"
, "root"
);
props
.put( "password"
, "Spark<3Java"
);
props
.put( "useSSL"
, "false"
);
props
.put( "serverTimezone"
, "EST"
);
String
sqlQuery
= "select * from film where "
❶ +
"(title like "%ALIEN%" or title like "%victory%" "
+
"or title like "%agent%" or description like "%action%") "
+
"and rental_rate>1 "
+
"and (rating="G" or rating="PG")"
; ❷
Dataset<Row>
df
= spark
.read().jdbc(
"jdbc:mysql://localhost:3306/sakila"
,
"("
+ sqlQuery
+ ") film_alias"
, ❸
props
);
df
.show(5);
df
.printSchema(); System.
out
.println( "The dataframe contains "
+ df
.count() +
" record(s)."
); }
}
❶ Your SQL query can be built like any other string or can come from a configuration file, a generator, and so on.
❸ The syntax is respected; the SQL query is between parentheses, and the table alias is at the end.
The following are important things to remember:
select
statement.Using a similar technique, you can join data in the database prior to ingesting it in Spark. Spark can join data between dataframes (for more information, see chapter 12 and appendix M), but for performance and optimization reasons, you may want to ask your database to perform the operation. In this section, you’ll learn how to perform a join at the database level and ingest the data from the join.
Lab This is lab #310. It is available on GitHub at https://github.com/jgperrin/ net.jgp.books.spark.ch08 . It requires a MySQL or MariaDB database.
The SQL statement you will run is as follows:
SELECT actor.first_name, actor.last_name, film.title, film.description
FROM actor, film_actor, film
WHERE actor.actor_id = film_actor.actor_id
AND film_actor.film_id = film.film_id
Figure 8.5 illustrates the operation.
Figure 8.5 Spark ingests the data stored in the MySQL database after the database server performs the joins between the three tables.
The following listing illustrates the result. Note that the dataframe contains more records as you join tables.
+----------+---------+--------------------+--------------------+
|first_name|last_name| title | description |
+----------+---------+--------------------+--------------------+
| PENELOPE | GUINESS | ACADEMY DINOSAUR |A Epic Drama of a...|
| PENELOPE | GUINESS |ANACONDA CONFESSIONS|A Lacklusture Dis...|
| PENELOPE | GUINESS | ANGELS LIFE |A Thoughtful Disp...|
| PENELOPE | GUINESS |BULWORTH COMMANDM...|A Amazing Display...|
| PENELOPE | GUINESS | CHEAPER CLYDE |A Emotional Chara...|
+----------+---------+--------------------+--------------------+
only showing top 5 rows
root
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
|-- title: string (nullable = true)
|-- description: string (nullable = true)
The dataframe contains 5462 record(s).
The next listing shows the code. As you would expect, it is similar to the simple filtering in listing 8.6. I have removed the superfluous lines of code.
package
net.jgp.books.spark.ch08_lab310_sql_joins;...
public class
MySQLWithJoinToDatasetApp {...
private void
start() {...
String
sqlQuery
=
"select actor.first_name, actor.last_name, film.title, "
+
"film.description "
+
"from actor, film_actor, film "
+
"where actor.actor_id = film_actor.actor_id "
+
"and film_actor.film_id = film.film_id"
; ❶ Dataset<Row>
df
= spark
.read().jdbc(
"jdbc:mysql://localhost:3306/sakila"
,
"("
+ sqlQuery
+ ") actor_film_alias"
, ❷
props
);...
}
}
❶ A basic SQL query joining three tables
❷ Except for the alias name, this is the same call as in listing 8.6.
If we had used SELECT * FROM actor, film_actor
... in the query in listing 8.7, Spark would have been confused by columns having the same name and would have returned the following error: Duplicate column name 'actor_id'
. Spark will not build a fully qualified name ( <table>.<column>
) for you; you will have to explicitly name the columns and alias them. The code producing this exception is available in GitHub at http://mng.bz/KEOO .
Note The SQL you are writing here is directly sent to MySQL. It will not be interpreted by Spark, and, as a consequence, if you write specific Oracle SQL, it will not work with PostgreSQL (although it should work with IBM Db2, as it understands Oracle syntax).
In this section, you’ll have a quick look at an advanced feature of Spark: ingesting from a database and automatically assigning to the partitions. Figure 8.6 shows the dataframe after you ingest the film table, as in listing 8.9.
Figure 8.6 Looking at the dataframe, after ingesting 1,000 films from the film table. They are all in one partition.
Figure 8.7 illustrates the partitions in the resilient distributed dataset (RDD) in the dataframe after using partitions. You may recall that the RDD is the data storage part of the dataframe (see chapter 3). The process of ingesting by partition is detailed in listing 8.11.
Figure 8.7 In this scenario, you asked Spark to split the data into 10 partitions. You still have one dataframe and one RDD. Physical nodes are not represented, but this diagram could split over several nodes.
Lab This is lab #320. It is available on GitHub at https://github.com/jgperrin/ net.jgp.books.spark.ch08 . It requires a MySQL or MariaDB database.
The following listing is similar to listing 8.2, but you are going to ingest movies from the film
table.
package
net.jgp.books.spark.ch08.lab320_ingestion_partinioning;...
public class
MySQLToDatasetWithoutPartitionApp {...
private void
start() {...
Properties
props
= new
Properties();
props
.put( "user"
, "root"
);
props
.put( "password"
, "Spark<3Java"
);
props
.put( "useSSL"
, "false"
);
props
.put( "serverTimezone"
, "EST"
);
Dataset<Row>
df
= spark
.read().jdbc( ❶
"jdbc:mysql://localhost:3306/sakila"
,
"film"
,
props
);
df
.show(5);
df
.printSchema(); System.
out
.println( "The dataframe contains "
+ df
.count() +
" record(s)."
); System.
out
.println( "The dataframe is split over "
+ df
.rdd() .getPartitions().
length
+ " partition(s)."
); }
}
The output is shown in the following listing.
+-------+----------------+--------------------+------------+-----------+--...
|film_id| title | description |release_year|language_id|or...
+-------+----------------+--------------------+------------+-----------+--...
| 1|ACADEMY DINOSAUR |A Epic Drama of a...| 2005-12-31 | 1 | ...
| 2| ACE GOLDFINGER |A Astounding Epis...| 2005-12-31 | 1 | ...
| 3|ADAPTATION HOLES |A Astounding Refl...| 2005-12-31 | 1 | ...
| 4|AFFAIR PREJUDICE |A Fanciful Docume...| 2005-12-31 | 1 | ...
| 5| AFRICAN EGG |A Fast-Paced Docu...| 2005-12-31 | 1 | ...
+-------+----------------+--------------------+------------+-----------+--...
only showing top 5 rows
root
|-- film_id: integer (nullable = true)
|-- title: string (nullable = true)
|-- description: string (nullable = true)
|-- release_year: date (nullable = true)
|-- language_id: integer (nullable = true)
|-- original_language_id: integer (nullable = true)
|-- rental_duration: integer (nullable = true)
|-- rental_rate: decimal(4,2) (nullable = true)
|-- length: integer (nullable = true)
|-- replacement_cost: decimal(5,2) (nullable = true)
|-- rating: string (nullable = true)
|-- special_features: string (nullable = true)
|-- last_update: timestamp (nullable = true)
The dataframe contains 1000 record(s).
The dataframe is split over 1 partition(s).
You can focus on the last line of the output of listing 8.10. Data is in one partition. The following listing adds the partitioning code. Chapter 17 talks more about partitioning.
package
net.jgp.books.spark.ch08.lab320_ingestion_partinioning;...
public class
MySQLToDatasetWithPartitionApp {...
Properties
props
= new
Properties(); ❶
props
.put( "user"
, "root"
);
props
.put( "password"
, "Spark<3Java"
);
props
.put( "useSSL"
, "false"
);
props
.put( "serverTimezone"
, "EST"
);
props
.put( "partitionColumn"
, "film_id"
); ❷
props
.put( "lowerBound"
, "1"
); ❸
props
.put( "upperBound"
, "1000"
); ❹
props
.put( "numPartitions"
, "10"
); ❺
Dataset<Row>
df
= spark
.read().jdbc( ❻
"jdbc:mysql://localhost:3306/sakila"
,
"film"
,
props
); ...
❶ Properties to set up the database connection
In this scenario, the data is split into 10 partitions, and the following is the last line of the output:
...
The dataframe is split over 10 partition(s).
In these sections, you have learned how to better ingest data from your RDBMS. Because you probably won’t perform those operations all the time, appendix L provides reference tables to help your ingestion operations.
In this section, you’ll learn how to ingest data directly from Elasticsearch. Elasticsearch has been growing in popularity since 2010 (the year I started using it) as a scalable document store and search engine. A bidirectional communication with Elasticsearch helps Spark store and retrieve complex documents.
I know that some purists will argue that Elasticsearch is not a database, but it is an incredible data store, which makes it a prime candidate for this chapter on ingestion from data stores.
Note See appendix N for help installing Elasticsearch and adding a sample dataset for the examples we are using. If you want to know more about this search engine, you can read Elasticsearch in Action by Radu Gheorghe et al. (Manning, 2015), available at https://www.manning.com/books/elasticsearch-in-action .
You’ll first have a look at the architecture and then run your first Elasticsearch ingestion.
Figure 8.8 illustrates the flows between Spark and Elasticsearch. For Spark, Elasticsearch is like a database, and it needs a driver, like a JDBC driver.
Figure 8.8 Elasticsearch communicates with Spark via a driver provided by Elastic.
As you may remember from listing 8.3, we modified the pom.xml file. This is the abstract required by Elasticsearch:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>6.2.1</version>
</dependency>
This entry defines the driver needed by Elasticsearch and provided by Elastic (the company behind Elasticsearch), allowing a bidirectional communication between Elasticsearch and Spark, as well as Hadoop.
Let’s look at the result of the code you’re going to create. Elasticsearch stores documents in JSON format; therefore, it should not a be a surprise to see nested constructs such as those in listing 8.12. In this output, I added timings, so you can visualize where time is spent. Time is summarized in table 8.2.
Lab This is lab #400. It is available on GitHub at https://github.com/jgperrin/ net.jgp.books.spark.ch08 . It requires Elasticsearch.
Getting some records, enough to display 10 and infer the schema |
|||
Counting the number of partitions where the records are stored |
Note The timings used in table 8.2 are based on a local laptop (this is also one of the beauties of Spark and Elasticsearch: everything can run on a laptop). Those timings will probably differ markedly on your system, but not the proportion.
Getting a session took: 1524 ms
Init communication and starting to get some results took: 1694 ms
+--------------------+--------------------+---------+--------+--------+---...
| Action | Address | Boro |Building| Camis |...
+--------------------+--------------------+---------+--------+--------+...
|Violations were c...|10405 METROPOLITA...| QUEENS | 10405 |40704305|[-7...
|Violations were c...|10405 METROPOLITA...| QUEENS | 10405 |40704305|[-7...
|Violations were c...|10405 METROPOLITA...| QUEENS | 10405 |40704305|[-7...
|Violations were c...|10405 METROPOLITA...| QUEENS | 10405 |40704305|[-7...
|Violations were c...|181 WEST 4 STREET...|MANHATTAN| 181 |40704315|[-7...
|Violations were c...|181 WEST 4 STREET...|MANHATTAN| 181 |40704315|[-7...
|Violations were c...|181 WEST 4 STREET...|MANHATTAN| 181 |40704315|[-7...
|Violations were c...|181 WEST 4 STREET...|MANHATTAN| 181 |40704315|[-7...
|Violations were c...|181 WEST 4 STREET...|MANHATTAN| 181 |40704315|[-7...
|Violations were c...|1007 LEXINGTON AV...|MANHATTAN| 1007 |40704453|[-7...
+--------------------+--------------------+---------+--------+--------+---...
only showing top 10 rows
Showing a few records took: 10450 ms
root
|-- Action: string (nullable = true)
|-- Address: string (nullable = true)
|-- Boro: string (nullable = true)
|-- Building: string (nullable = true)
|-- Camis: long (nullable = true)
|-- Coord: array (nullable = true)
| |-- element: double (containsNull = true)
|-- Critical_Flag: string (nullable = true)
|-- Cuisine_Description: string (nullable = true)
|-- Dba: string (nullable = true)
|-- Grade: string (nullable = true)
|-- Grade_Date: timestamp (nullable = true)
|-- Inspection_Date: array (nullable = true)
| |-- element: timestamp (containsNull = true)
|-- Inspection_Type: string (nullable = true)
|-- Phone: string (nullable = true)
|-- Record_Date: timestamp (nullable = true)
|-- Score: double (nullable = true)
|-- Street: string (nullable = true)
|-- Violation_Code: string (nullable = true)
|-- Violation_Description: string (nullable = true)
|-- Zipcode: long (nullable = true)
Displaying the schema took: 1 ms
The dataframe contains 473039 record(s).
Counting the number of records took: 33710 ms
The dataframe is split over 5 partition(s).
Counting the # of partitions took: 118 ms
As you can conclude from the timings, Spark does not require the whole dataset to be in memory before it can infer the schema and display a few rows. However, when you ask Spark to count the number of records, it needs to have everything in memory, hence the 33 seconds to download the rest of the data.
Let’s walk through the following code to ingest the New York City restaurants dataset from Elasticsearch into Spark.
package
net.jgp.books.spark.ch08.lab400_es_ingestion;
import
org.apache.spark.sql.Dataset;import
org.apache.spark.sql.Row;import
org.apache.spark.sql.SparkSession;
public class
ElasticsearchToDatasetApp {
public static void
main(String[] args
) { ElasticsearchToDatasetApp
app
=
new
ElasticsearchToDatasetApp();
app
.start(); }
private void
start() {
long
t0
= System.currentTimeMillis();
SparkSession
spark
= SparkSession.builder() .appName(
"Elasticsearch to Dataframe"
) .master(
"local"
) .getOrCreate();
long
t1
= System.currentTimeMillis(); System.
out
.println( "Getting a session took: "
+ ( t1
- t0
) + " ms"
); ❶
Dataset<Row>
df
= spark
.read() ❷
.format(
"org.elasticsearch.spark.sql"
) ❸
.option(
"es.nodes"
, "localhost"
) ❹ .option(
"es.port"
, "9200"
) ❺ .option(
"es.query"
, "?q=*"
) ❻ .option(
"es.read.field.as.array.include"
, "Inspection_Date"
) ❼ .load(
"nyc_restaurants"
); ❽
long
t2
= System.currentTimeMillis(); System.
out
.println(
"Init communication and starting to get some results took: "
+ (
t2
- t1
) + " ms"
);
df
.show(10);
long
t3
= System.currentTimeMillis(); System.
out
.println( "Showing a few records took: "
+ ( t3
- t2
) + " ms"
); #A
df
.printSchema();
long
t4
= System.currentTimeMillis(); System.
out
.println( "Displaying the schema took: "
+ ( t4
- t3
) + " ms"
); #A
System.
out
.println( "The dataframe contains "
+
df
.count() + " record(s)."
);
long
t5
= System.currentTimeMillis(); System.
out
.println( "Counting the number of records took: "
+ ( t5
- t4
) +
" ms"
); #A
System.
out
.println( "The dataframe is split over "
+ df
.rdd() .getPartitions().
length
+ " partition(s)."
);
long
t6
= System.currentTimeMillis(); System.
out
.println( "Counting the # of partitions took: "
+ ( t6
- t5
) +
" ms"
); #A }
}
❶ Introducing timing after major step to understand where time is spet
❷ As with any ingestion, we start with read().
❸ Name of the format, which can be a short name like csv, jdbc, or a full class name (more in chapter 9)
❹ Introducing timing after major step to understand where time is spent
❺ Elasticsearch port (optional, as you use 9200)
❻ Query (here, you want all; can be omitted)
❼ You need to convert the Inspection_Date field.
As you can see, data ingestion from Elasticsearch follows the same principles as files (chapter 7) and databases. You can find the list of options for importing data in appendix L.
(<select statement>) <table alias>
syntax instead of the table name.3.17.167.114