8. Ingestion from databases

This chapter covers

  • Ingesting data from relational databases
  • Understanding the role of dialects in communication between Spark and databases
  • Building advanced queries in Spark to address the database prior to ingestion
  • Understanding advanced communication with databases
  • Ingesting from Elasticsearch

In the big data and enterprise context, relational databases are often the source of the data on which you will perform analytics. It makes sense to understand how to extract data from those databases, both through the whole table or through SQL SELECT statements.

In this chapter, you’ll learn several ways to ingest data from those relational databases, ingesting either the full table at once or asking the database to perform some operations before the ingestion. Those operations could be filtering, joining, or aggregating data at the database level to minimize data transfer.

You will see in this chapter which databases are supported by Spark. When you work with a database not supported by Spark, a custom dialect is required. The dialect is a way to inform Spark of how to communicate with the database. Spark comes with a few dialects and, in most cases, you won’t need to even think about them. However, for those special situations, you’ll learn how to build one.

Figure 8.1 This chapter focuses on ingestion from databases, whether the database is supported by Spark, or is not supported and requires a custom dialect to be used.

Finally, many enterprises use document stores and NoSQL databases. In this chapter, you’ll also learn how to connect to Elasticsearch and go through a complete ingestion scenario. Elasticsearch is the only NoSQL database that you will study.

In this chapter, you will work with MySQL, IBM Informix, and Elasticsearch.

Figure 8.1 illustrates where you stand on your road to ingestion.

Lab Examples from this chapter are available in GitHub at https://github .com/jgperrin/net.jgp.books.spark.ch08 . Appendix F provides links, tips, and help with installing relational databases. Appendix L is a reference for ingestion.

8.1 Ingestion from relational databases

As you probably know, relational databases are a cornerstone of the transactional data stores you’ll find in any enterprise. In most cases, as soon as a transaction happens, it involves an existing relational database somewhere.

Let’s imagine you have a relational database containing film actors and want to display them alphabetically. To accomplish that, you’ll learn about the elements Spark needs to establish a database connection (and, spoiler alert, if you are familiar with JDBC, it’s the same). You’ll then learn a bit about the sample database, its data, and its schema; play with the sample database; look at the output; and, finally, dig into the code.

8.1.1 Database connection checklist

Spark needs a small list of information to connect to a database. Spark connects directly to relational databases by using Java Database Connectivity (JDBC) drivers. To connect to a database, Spark needs the following:

The driver may need other information that is driver-specific. For example, Informix needs DELIMIDENT=Y , and MySQL server expects SSL by default, so you may want to specify useSSL=false .

Naturally, the JDBC driver needs to be provided to the application. It can be in your pom.xml file:

<dependency>      <groupId> mysql </groupId>      <artifactId> mysql-connector-java </artifactId>      <version> 8.0.8-dmr </version> ...   </dependency>

Driver definition for MySQL

Artifact ID

Version

Listing 8.3 describes the pom.xml file in more detail.

8.1.2 Understanding the data used in the examples

For your first ingestion from a database, you are going to use the Sakila database in MySQL. Sakila is a standard sample database that comes with MySQL; a lot of tutorials are available, and you may even have learned MySQL with it. This section describes what the database is about--its purpose, structure, and data. Figure 8.2 will summarize our operation.

The Sakila sample database is designed to represent a DVD rental store. Okay, I get it--some of you are probably wondering what a DVD is and why you would even rent such a thing. DVD stands for digital video (or versatile ) disc . A DVD is a shiny disk, 12 cm (about 5 inches) in diameter, which is used to store digital information. In the early days (1995), it was used to store movies in digital format. People could buy or rent those objects and watch the movie on their TV by using a device called a DVD player.

If you did not want to buy the DVD, you could rent it from small shops or larger chains like Blockbuster in the United States. Those stores needed people to check in and out the disks and other artifacts like VHS tapes (as much as I recognize that this is even more obscure than a DVD, it is really beyond the scope of this book). In 1997, an innovative firm, Netflix, started to rent DVDs via mail.

You will use the demo database, which includes about 15 tables and a few views (figure 8.2) for this scenario.

Figure 8.2 Spark ingests the database, stored in a MySQL instance. Spark needs a JDBC driver, like any Java application. MySQL stores the sample database, called Sakila, which contains 23 tables and views. Some are represented here, such as actor , actor_info , category , and so on. You’ll focus on actor .

As you can see in figure 8.2, you’re going to use the actor table for this project, which is described in more detail in table 8.1. You’ll notice that last_update implements change data capture (CDC).

Table 8.1 The actor table as it is defined in the Sakila database in MySQL

Column

Type

Attributes

Comment

actor_id

SMALLINT

Primary key, not null, unique, auto-increment. In MySQL, an integer with autoincrement will automatically get a new value when you insert a new row. Informix and PostgreSQL use a SERIAL and SERIAL8 datatype; other databases use stored procedures or sequences.

The actor’s unique identifier.

first_name

VARCHAR(45)

Not null.

The actor’s first name.

last_name

VARCHAR(45)

Not null.

The actor’s last name. As you can see, the database modeler did not think of movies starring Cher or Madonna.

last_update

TIMESTAMP

Not null, CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP .

A timestamp that is automatically updated each time you do an update. This is a good practice: if you are using and designing RDBMSs, it is a good thing to integrate. It will allow you to implement CDC.

8.1.3 Desired output

Let’s look at the output you’ll render. The following listing shows the result you will achieve programmatically: five actors from the actor table, as well as the metadata.

Listing 8.1 List of actors and metadata

+--------+----------+---------+-------------------+ |actor_id|first_name|last_name| last_update       | +--------+----------+---------+-------------------+ | 92     | KIRSTEN  | AKROYD  |2006-02-14 22:34:33| | 58     | CHRISTIAN| AKROYD  |2006-02-14 22:34:33| | 182    | DEBBIE   | AKROYD  |2006-02-14 22:34:33| | 118    | CUBA     | ALLEN   |2006-02-14 22:34:33| | 145    | KIM      | ALLEN   |2006-02-14 22:34:33| +--------+----------+---------+-------------------+ only showing top 5 rows root   |-- actor_id: integer (nullable = false)   |-- first_name: string (nullable = false)   |-- last_name: string (nullable = false)   |-- last_update: timestamp (nullable = false)    The dataframe contains 200 record(s).

A sample of the data

Schema

Datatypes are directly converted from the table’s type to Spark’s datatype.

Counting the number of records in the dataframe

 

8.1.4 Code

Let’s look at the code to produce the output you want. You’ll see three options for doing that, and you’ll get to choose your favorite. You’ll also learn how to modify your pom.xml file to load the JDBC driver, by simply adding it in the dependencies section.

The first thing to do is to identify the column by using the dataframe’s col() method, and then you can use its orderBy() method. An introduction to the dataframe is in chapter 3. (You’ll study more transformation and manipulation in chapters 11 to 13.)

Finally, you can sort the output by using a single line:

        df = df .orderBy( df .col( "last_name" ));

Lab This is lab #100. It is available on GitHub at https://github.com/jgperrin/ net.jgp.books.spark.ch08 . It requires a MySQL or MariaDB connection.

The following listing shows you the first option.

Listing 8.2 MySQLToDatasetApp.java

package net.jgp.books.spark.ch08.lab100_mysql_ingestion;   import java.util.Properties;   import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession;   public class MySQLToDatasetApp {        public static void main(String[] args ) {       MySQLToDatasetApp app = new MySQLToDatasetApp();        app .start();     }        private void start() {       SparkSession spark = SparkSession.builder()             .appName(                  "MySQL to Dataframe using a JDBC Connection" )             .master( "local" )             .getOrCreate();         Properties props = new Properties();        props .put( "user" , "root" );        props .put( "password" , "Spark<3Java" );        props .put( "useSSL" , "false" );         Dataset<Row> df = spark .read().jdbc(              "jdbc:mysql://localhost:3306/sakila?serverTimezone=EST" ,              "actor" , props );        df = df .orderBy( df .col( "last_name" ));          df .show(5);        df .printSchema();       System. out .println( "The dataframe contains " +              df .count() + " record(s)." );     }  }

Gets a session

Creates a Properties object, which is going to be used to collect the properties needed

Username property--you may not want to use root on a production system

Password property

Custom property--here, MySQL needs to be told that it won’t use SSL in its communication

JDBC URL

Table you are interested in

Properties you just defined

Sorts by last name

Note If you read chapter 7, you’ll notice that the ingestion mechanism is similar, whether you ingest files or databases.

PASSWORDS I know you know, and you know I know you know, right? A small reminder is always useful: do not hardcode a password in your code; anyone can extract it from the JAR file in a matter of seconds (especially if your variable is called password !). In the repository, lab #101 uses the same code as listing 8.2 (lab #100) but gets the password from an environment variable.

You’ll also need to ensure that Spark has access to your JDBC driver, as shown in listing 8.3. One of the easiest ways to do that is to list the database drivers you need in your project’s pom.xml file.

Note We’ll use MySQL, Informix, and Elasticsearch in this chapter: MySQL because it is a fully supported database, Informix because it will require a dialect, and Elasticsearch because it is a NoSQL database.

Listing 8.3 Your modified pom.xml for database access

...      <properties> ...        <mysql.version> 8.0.8-dmr </mysql.version>        <informix-jdbc.version> 4.10.8.1 </informix-jdbc.version>        <elasticsearch-hadoop.version> 6.2.1 </elasticsearch-hadoop.version>      </properties>        <dependencies> ...        <dependency>          <groupId> mysql </groupId>          <artifactId> mysql-connector-java </artifactId>          <version> ${mysql.version} </version>        </dependency>          <dependency>          <groupId> com.ibm.informix </groupId >          <artifactId> jdbc </artifactId>          <version> ${informix-jdbc.version} </version>        </dependency>        <dependency>          <groupId> org.elasticsearch </groupId> <artifactId> elasticsearch-hadoop </artifactId>          <version> ${elasticsearch-hadoop.version} </version>        </dependency>  ...

You are using MySQL JDBC driver v8.0.8-dmr.

You are going to use Informix JDBC driver v4.10.8.1.

You are using Elasticsearch v6.2.1.

In this example, you are using MySQL.

You are using MySQL JDBC driver v8.0.8-dmr.

Later in this chapter, you’ll use Informix to build a custom dialect.

You are going to use Informix JDBC driver v4.10.8.1.

Toward the end of this chapter, you’ll connect to Elasticsearch.

You are using Elasticsearch v6.2.1.

Note that the version numbers are different in the repository, as I keep up with, as much as possible, the latest versions.

8.1.5 Alternative code

As is often the case, there are various ways to write the same operations. Let’s look at how you can tweak the parameters and URL to connect to the database. In listing 8.2, you used the following:

Properties props = new Properties(); props .put( "user" , "root" ); props .put( "password" , "Spark<3Java" ); props .put( "useSSL" , "false" );   Dataset<Row> df = spark .read().jdbc(        "jdbc:mysql://localhost:3306/sakila?serverTimezone=EST" ,        "actor" ,   props );

You can replace this code with one of two options (you can find the full code in MySQLToDatasetWithLongUrlApp.java in this chapter’s repository). The first option is to build a longer URL. You may already have a library in your application or platform with a JDBC URL generator, so you can reuse it easily. Note that you need to give the Spark reader object an empty list of properties, materialized by new Properties() :

String jdbcUrl = "jdbc:mysql://localhost:3306/sakila"       + "?user=root"       + "&password=Spark<3Java"       + "&useSSL=false"       + "&serverTimezone=EST" ; Dataset<Row> df = spark .read()        .jdbc( jdbcUrl , "actor" , new Properties());

Longer URL

Empty properties are still required.

The second option is to use only the options; this may be useful if you read the properties from a configuration file. The following code snippet shows you how to do that:

Dataset<Row> df = spark .read()       .option( "url" , "jdbc:mysql://localhost:3306/sakila" )       .option( "dbtable" , "actor" )       .option( "user" , "root" )       .option( "password" , "Spark<3Java" )       .option( "useSSL" , "false" )       .option( "serverTimezone" , "EST" )       .format( "jdbc" )        .load();

You can find the full code in MySQLToDatasetWithOptionsApp.java in this chapter’s repository.

Note that in this version, you are not using the jdbc() method of the object returned by read() --an instance of DataFrameReader . You are using the format() and load() methods. As you can see, the table is only a property called dbtable . There is no preference in the syntax to use; however, you may encounter them all, and it could appear confusing. If you are working on a project with a team, I advise you to set a standard for the team, knowing that, most likely, those parameters will be read from a configuration file.

As a reminder, properties are case-insensitive. Note that the values are interpreted by the driver, and they might be case-sensitive.

Let’s have a look at how Spark can handle nonsupported databases with custom dialects.

8.2 The role of the dialect

The dialect is the translation block between Spark and the database. In this section, you’ll examine the role of a dialect, the dialects that are provided with Spark, and in which situation you’ll have to write your own dialect.

8.2.1 What is a dialect, anyway?

The dialect is a small software component, often implemented in a single class, that bridges Apache Spark and the database; see figure 8.3.

When Spark imports the data and stores it, Spark needs to know what database types map to a Spark type. For example, Informix and PostgreSQL have a SERIAL type, which is an integer type that automatically increments when you insert new values in the database. It is convenient for defining unique identifiers. However, Spark needs to be told that this datatype is an integer when it hits the Tungsten storage. Tungsten is the storage manager that optimizes memory usage for objects, bypassing the JVM’s storage mechanism for more efficiency.

The dialect defines Spark’s behavior when communicating with the database.

Figure 8.3 Within Spark, the dataframe communicates with the database via a dialect acting as a complementary driver.

8.2.2 JDBC dialects provided with Spark

Spark comes with a few database dialects as part of the standard distribution. This means you can connect to those databases directly out of the box. As of Spark v3.0.0, those dialects are the following:

  • IBM Db2
  • Apache Derby
  • MySQL
  • Microsoft SQL Server
  • Oracle
  • PostgreSQL
  • Teradata Database

If your database is not in this list, you can have a look at the Spark Packages website at https://spark-packages.org/?q=tags%3A%22Data%20Sources%22 or write/build your own.

8.2.3 Building your own dialect

If you are using a relational database that is not in the list, you may want to contact your database vendor before implementing your own dialect. If the vendor does not offer a dialect, don’t panic. Implementing your own is relatively easy; you’ll learn how to do it here. The example you are going to see is based on IBM Informix, but you can easily adapt the code to your own database. No knowledge of Informix is required here, although it is a wonderful RDBMS.

The reason I picked IBM Informix is that it does not have a dialect of its own, compared to all the other ones. Nevertheless, it remains a vibrant database, especially in the IoT sphere.

Lab This is lab #200. It is available on GitHub at https://github.com/jgperrin/ net.jgp.books.spark.ch08 . It requires an Informix database; you can download and use the Developer Edition for free from https://www.ibm.com/products/ informix .

The dialect is one class, extending JdbcDialect , as shown in listing 8.4. The canHandle() method is the entry point to the dialect; this method acts as a filter to know whether Spark should use this dialect. It receives the JDBC URL as input, and you can filter based on the URL. In this situation, you’ll filter on the beginning of the URL, looking for the distinctive pattern of the Informix JDBC driver: informix-sqli . Each JDBC driver has a unique signature through its URL.

The getCatalystType() method will convert a JDBC type to a Catalyst type.

Listing 8.4 A minimalist dialect allowing Spark to communicate with your database

package net.jgp.books.spark.ch08.lab_200_informix_dialect;   import org.apache.spark.sql.jdbc.JdbcDialect; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.MetadataBuilder;   import scala.Option;   public class InformixJdbcDialect extends JdbcDialect {      private static final long serialVersionUID = -672901;     @Override      public boolean canHandle(String url ) {        return url .startsWith( "jdbc:informix-sqli" );     }       @Override      public Option<DataType> getCatalystType(int sqlType ,         String typeName , int size , MetadataBuilder md ) {        if ( typeName .toLowerCase().compareTo( "serial" ) == 0) {          return Option.apply(DataTypes. IntegerType );       }        if ( typeName .toLowerCase().compareTo( "calendar" ) == 0) {          return Option.apply(DataTypes. BinaryType );       } ...        return Option.empty();     }  }

JdbcDialect is serializable, hence a unique ID for this class.

The filter method that will allow Spark to know which driver to use in which context

The signature of the Informix JDBC driver is Informix-sqli.

Converts a SQL type to a Spark type

In this case, Spark does not know what a SERIAL datatype is, but, for Spark, it is simply an integer. Do not worry about testing typeName for null.

Returning the value, Scala-style

I could have used equalsIgnoreCase() or many other ways to compare strings, but over my many years of dealing with Java developers, I’ve seen that there is never a consensus. Defining the proper methods to use (or settling the eternal debate on naming conventions) is not my mission here.

Make sure your driver’s signature is sufficient. If I had used something like return url.contains("informix");, it could have been too broad. You could add the colon after sqli too. I could have written this example using a switch/case statement, but I wanted to leave a little room for improvement, right?

The return type is what is expected by Spark. As you may know, Spark is written in Scala, so, at some touchpoints, you will need to return Scala types. In this situation, the return type corresponds to an empty option.

As a reminder, Catalyst is the optimization engine built in Spark (see chapter 4). The getCatalystType() method receives four arguments:

  • The SQL type as an integer, as defined in java.sql.Types (see http://mng.bz/ 6wxR ). However, note that complex datatypes, for which you may be writing this conversion, will probably not be in the list; you will have to see how they are seen in this method.
  • The type name as a string. This is the “real name” you would have used to create the table in SQL. In this example, I used the SERIAL datatype, an integer with automatic incrementing you’ll find in Informix and PostgreSQL.
  • The size for numeric, string, and binary types. This argument is useful for avoiding side effects in conversions.
  • The last argument is a metadata builder, which you can use to augment the information about the conversion (more on that in chapter 17).

The getCatalystType() method return type is one of the following:

  • A datatype, as listed at http://mng.bz/omgD : BinaryType , BooleanType , ByteType , CalendarIntervalType , DateType , DoubleType , FloatType , IntegerType , LongType , NullType , ShortType , StringType , or TimestampType
  • A subtype of org.apache.spark.sql.types.DataType and can be ArrayType , HiveStringType , MapType , NullType , NumericType , ObjectType , or StructType

Appendix L covers the datatypes in more detail. It also describes additional methods you may need in order to implement a full dialect (such as a reverse conversion, from a Spark type to a SQL type), how to truncate a table, and so on.

8.3 Advanced queries and ingestion

Sometimes you don’t want to copy all the data from a table to a dataframe. You know that you won’t use some data, and you don’t want to copy the rows you won’t use, because transfer is an expensive operation. A typical use case would be to run analytics on yesterday’s sales, compare month-to-month sales, and so on.

In this section, you’ll learn about ingesting data from relational databases by using SQL queries, avoiding superfluous data transfer. This operation is called filtering the data . You’ll also learn about partitioning while ingesting.

8.3.1 Filtering by using a WHERE clause

In SQL, one way to filter data is to use a WHERE clause as part of your SELECT statement. Let’s see how to integrate such a clause in your ingestion mechanism. As with full table ingestion, Spark still uses JDBC underneath.

Lab This is lab #300. It is available on GitHub at https://github.com/jgperrin/ net.jgp.books.spark.ch08 . It requires a MySQL or MariaDB database.

The syntax is similar to what we have been using for ingesting a full table. However, the dbtable option (if you use the load() method) or table parameter (if you use the jdbc() method) must use the following syntax:

(<SQL select statement>) <table alias>

Let’s look at a few examples:

  1. Cheap movies:

    (SELECT * FROM film WHERE rental_rate = 0.99) film_alias

    This returns all films with a rental rate of 99 cents.

  2. Specific movies:

    (SELECT * FROM film WHERE (title LIKE "%ALIEN%" OR title LIKE "%victory%" OR title LIKE "%agent%" OR description LIKE "%action%") AND rental_rate>1   AND (rating="G" OR rating="PG")) film_alias

In this query, you’re looking for movies with a title containing alien , victory , or agent , or with a description containing action ; with a rental rate of more than $1; and with a rating of either G (General audience) or PG (Parental Guidance suggested). Note that, because we use LIKE , the case does not matter: you will get all movies with alien , Alien , and ALIEN . The last keyword ( film_alias ) is simply an alias. Figure 8.4 illustrates the process.

The following listing shows the desired output, which is the records that are highlighted in figure 8.4.

Listing 8.5 Desired output of the query with a WHERE clause

+-------+--------------+--------------------+------------+-----------+... |film_id| title        | description        |release_year|language_id|... +-------+--------------+--------------------+------------+-----------+... | 6     | AGENT TRUMAN |A Intrepid Panora...| 2005-12-31 | 1         |... | 13    | ALI FOREVER  |A Action-Packed D...| 2005-12-31 | 1         |... ... +-------+--------------+--------------------+------------+-----------+... only showing top 5 rows   root   |-- film_id: integer (nullable = true)   |-- title: string (nullable = true)   |-- description: string (nullable = true)   |-- release_year: date (nullable = true)   |-- language_id: integer (nullable = true)   |-- original_language_id: integer (nullable = true)   |-- rental_duration: integer (nullable = true)   |-- rental_rate: decimal(4,2) (nullable = true)   |-- length: integer (nullable = true)   |-- replacement_cost: decimal(5,2) (nullable = true)   |-- rating: string (nullable = true)   |-- special_features: string (nullable = true)   |-- last_update: timestamp (nullable = true)    The dataframe contains 16 record(s).

Figure 8.4 The ingestion process from MySQL, using the MySQL JDBC driver. In this scenario, you are interested in only some films matching the SQL query--the ones highlighted in bold.

The following listing shows the code that enables the ingestion through an SQL query.

Listing 8.6 MySQLWithWhereClauseToDatasetApp.java

package net.jgp.books.spark.ch08.lab300_advanced_queries;   import java.util.Properties;   import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession;   public class MySQLWithWhereClauseToDatasetApp {        public static void main(String[] args ) {       MySQLWithWhereClauseToDatasetApp app =              new MySQLWithWhereClauseToDatasetApp();        app .start();     }       private void start() {       SparkSession spark = SparkSession.builder()             .appName( "MySQL with where clause to Dataframe using a JDBC Connection" )             .master( "local" )             .getOrCreate();         Properties props = new Properties();        props .put( "user" , "root" );        props .put( "password" , "Spark<3Java" );        props .put( "useSSL" , "false" );        props .put( "serverTimezone" , "EST" );         String sqlQuery = "select * from film where "             + "(title like "%ALIEN%" or title like "%victory%" "             + "or title like "%agent%" or description like "%action%") "             + "and rental_rate>1 "             + "and (rating="G" or rating="PG")" ;         Dataset<Row> df = spark .read().jdbc(              "jdbc:mysql://localhost:3306/sakila" ,              "(" + sqlQuery + ") film_alias" ,              props );          df .show(5);        df .printSchema();       System. out .println( "The dataframe contains " + df             .count() + " record(s)." );     }  }

Your SQL query can be built like any other string or can come from a configuration file, a generator, and so on.

The SQL query

The syntax is respected; the SQL query is between parentheses, and the table alias is at the end.

The following are important things to remember:

  • You can nest parentheses in the select statement.
  • The table alias must not be an existing table in your database.

8.3.2 Joining data in the database

Using a similar technique, you can join data in the database prior to ingesting it in Spark. Spark can join data between dataframes (for more information, see chapter 12 and appendix M), but for performance and optimization reasons, you may want to ask your database to perform the operation. In this section, you’ll learn how to perform a join at the database level and ingest the data from the join.

Lab This is lab #310. It is available on GitHub at https://github.com/jgperrin/ net.jgp.books.spark.ch08 . It requires a MySQL or MariaDB database.

The SQL statement you will run is as follows:

SELECT actor.first_name, actor.last_name, film.title, film.description FROM actor, film_actor, film WHERE actor.actor_id = film_actor.actor_id      AND film_actor.film_id = film.film_id

Figure 8.5 illustrates the operation.

Figure 8.5 Spark ingests the data stored in the MySQL database after the database server performs the joins between the three tables.

The following listing illustrates the result. Note that the dataframe contains more records as you join tables.

Listing 8.7 Output of join performed at database level, ingested by Spark

+----------+---------+--------------------+--------------------+ |first_name|last_name| title              | description        | +----------+---------+--------------------+--------------------+ | PENELOPE | GUINESS | ACADEMY DINOSAUR   |A Epic Drama of a...| | PENELOPE | GUINESS |ANACONDA CONFESSIONS|A Lacklusture Dis...| | PENELOPE | GUINESS | ANGELS LIFE        |A Thoughtful Disp...| | PENELOPE | GUINESS |BULWORTH COMMANDM...|A Amazing Display...| | PENELOPE | GUINESS | CHEAPER CLYDE      |A Emotional Chara...| +----------+---------+--------------------+--------------------+ only showing top 5 rows   root   |-- first_name: string (nullable = true)   |-- last_name: string (nullable = true)   |-- title: string (nullable = true)   |-- description: string (nullable = true)    The dataframe contains 5462 record(s).

The next listing shows the code. As you would expect, it is similar to the simple filtering in listing 8.6. I have removed the superfluous lines of code.

Listing 8.8 MySQLWithJoinToDatasetApp.java (excerpt)

package net.jgp.books.spark.ch08_lab310_sql_joins; ... public class MySQLWithJoinToDatasetApp { ...      private void start() { ...       String sqlQuery =              "select actor.first_name, actor.last_name, film.title, "                 + "film.description "                 + "from actor, film_actor, film "                 + "where actor.actor_id = film_actor.actor_id "                 + "and film_actor.film_id = film.film_id" ;       Dataset<Row> df = spark .read().jdbc(              "jdbc:mysql://localhost:3306/sakila" ,              "(" + sqlQuery + ") actor_film_alias" ,              props ); ...     }  }

A basic SQL query joining three tables

Except for the alias name, this is the same call as in listing 8.6.

If we had used SELECT * FROM actor, film_actor ... in the query in listing 8.7, Spark would have been confused by columns having the same name and would have returned the following error: Duplicate column name 'actor_id' . Spark will not build a fully qualified name ( <table>.<column> ) for you; you will have to explicitly name the columns and alias them. The code producing this exception is available in GitHub at http://mng.bz/KEOO .

Note The SQL you are writing here is directly sent to MySQL. It will not be interpreted by Spark, and, as a consequence, if you write specific Oracle SQL, it will not work with PostgreSQL (although it should work with IBM Db2, as it understands Oracle syntax).

8.3.3 Performing Ingestion and partitioning

In this section, you’ll have a quick look at an advanced feature of Spark: ingesting from a database and automatically assigning to the partitions. Figure 8.6 shows the dataframe after you ingest the film table, as in listing 8.9.

Figure 8.6 Looking at the dataframe, after ingesting 1,000 films from the film table. They are all in one partition.

Figure 8.7 illustrates the partitions in the resilient distributed dataset (RDD) in the dataframe after using partitions. You may recall that the RDD is the data storage part of the dataframe (see chapter 3). The process of ingesting by partition is detailed in listing 8.11.

Figure 8.7 In this scenario, you asked Spark to split the data into 10 partitions. You still have one dataframe and one RDD. Physical nodes are not represented, but this diagram could split over several nodes.

Lab This is lab #320. It is available on GitHub at https://github.com/jgperrin/ net.jgp.books.spark.ch08 . It requires a MySQL or MariaDB database.

The following listing is similar to listing 8.2, but you are going to ingest movies from the film table.

Listing 8.9 MySQLToDatasetWithoutPartitionApp.java

package net.jgp.books.spark.ch08.lab320_ingestion_partinioning; ... public class MySQLToDatasetWithoutPartitionApp { ...      private void start() { ...       Properties props = new Properties();        props .put( "user" , "root" );        props .put( "password" , "Spark<3Java" );        props .put( "useSSL" , "false" );        props .put( "serverTimezone" , "EST" );         Dataset<Row> df = spark .read().jdbc(              "jdbc:mysql://localhost:3306/sakila" ,              "film" ,              props );          df .show(5);        df .printSchema();       System. out .println( "The dataframe contains " + df             .count() + " record(s)." );       System. out .println( "The dataframe is split over " + df .rdd()             .getPartitions(). length + " partition(s)." );     }  }

Ingesting the film table

The output is shown in the following listing.

Listing 8.10 Output of MySQLToDatasetWithoutPartitionApp.java

+-------+----------------+--------------------+------------+-----------+--... |film_id| title          | description        |release_year|language_id|or... +-------+----------------+--------------------+------------+-----------+--... | 1|ACADEMY DINOSAUR     |A Epic Drama of a...| 2005-12-31 | 1         | ... | 2| ACE GOLDFINGER      |A Astounding Epis...| 2005-12-31 | 1         | ... | 3|ADAPTATION HOLES     |A Astounding Refl...| 2005-12-31 | 1         | ... | 4|AFFAIR PREJUDICE     |A Fanciful Docume...| 2005-12-31 | 1         | ... | 5| AFRICAN EGG         |A Fast-Paced Docu...| 2005-12-31 | 1         | ... +-------+----------------+--------------------+------------+-----------+--... only showing top 5 rows   root   |-- film_id: integer (nullable = true)   |-- title: string (nullable = true)   |-- description: string (nullable = true)   |-- release_year: date (nullable = true)   |-- language_id: integer (nullable = true)   |-- original_language_id: integer (nullable = true)   |-- rental_duration: integer (nullable = true)   |-- rental_rate: decimal(4,2) (nullable = true)   |-- length: integer (nullable = true)   |-- replacement_cost: decimal(5,2) (nullable = true)   |-- rating: string (nullable = true)   |-- special_features: string (nullable = true)   |-- last_update: timestamp (nullable = true)    The dataframe contains 1000 record(s).   The dataframe is split over 1 partition(s).

You can focus on the last line of the output of listing 8.10. Data is in one partition. The following listing adds the partitioning code. Chapter 17 talks more about partitioning.

Listing 8.11 MySQLToDatasetWithPartitionApp.java

package net.jgp.books.spark.ch08.lab320_ingestion_partinioning; ... public class MySQLToDatasetWithPartitionApp { ...       Properties props = new Properties();        props .put( "user" , "root" );        props .put( "password" , "Spark<3Java" );        props .put( "useSSL" , "false" );        props .put( "serverTimezone" , "EST" );          props .put( "partitionColumn" , "film_id" );        props .put( "lowerBound" , "1" );        props .put( "upperBound" , "1000" );        props .put( "numPartitions" , "10" );         Dataset<Row> df = spark .read().jdbc(              "jdbc:mysql://localhost:3306/sakila" ,              "film" ,              props );  ...

Properties to set up the database connection

Column to partition on

Lower bound of the stride

Upper bound of the stride

Number of partitions

Ingesting the film table

In this scenario, the data is split into 10 partitions, and the following is the last line of the output:

...  The dataframe is split over 10 partition(s).

8.3.4 Summary of advanced features

In these sections, you have learned how to better ingest data from your RDBMS. Because you probably won’t perform those operations all the time, appendix L provides reference tables to help your ingestion operations.

8.4 Ingestion from Elasticsearch

In this section, you’ll learn how to ingest data directly from Elasticsearch. Elasticsearch has been growing in popularity since 2010 (the year I started using it) as a scalable document store and search engine. A bidirectional communication with Elasticsearch helps Spark store and retrieve complex documents.

I know that some purists will argue that Elasticsearch is not a database, but it is an incredible data store, which makes it a prime candidate for this chapter on ingestion from data stores.

Note See appendix N for help installing Elasticsearch and adding a sample dataset for the examples we are using. If you want to know more about this search engine, you can read Elasticsearch in Action by Radu Gheorghe et al. (Manning, 2015), available at https://www.manning.com/books/elasticsearch-in-action .

You’ll first have a look at the architecture and then run your first Elasticsearch ingestion.

8.4.1 Data flow

Figure 8.8 illustrates the flows between Spark and Elasticsearch. For Spark, Elasticsearch is like a database, and it needs a driver, like a JDBC driver.

Figure 8.8 Elasticsearch communicates with Spark via a driver provided by Elastic.

As you may remember from listing 8.3, we modified the pom.xml file. This is the abstract required by Elasticsearch:

<dependency>     <groupId>org.elasticsearch</groupId>     <artifactId>elasticsearch-hadoop</artifactId>     <version>6.2.1</version>  </dependency>

This entry defines the driver needed by Elasticsearch and provided by Elastic (the company behind Elasticsearch), allowing a bidirectional communication between Elasticsearch and Spark, as well as Hadoop.

8.4.2 The New York restaurants dataset digested by Spark

Let’s look at the result of the code you’re going to create. Elasticsearch stores documents in JSON format; therefore, it should not a be a surprise to see nested constructs such as those in listing 8.12. In this output, I added timings, so you can visualize where time is spent. Time is summarized in table 8.2.

Lab This is lab #400. It is available on GitHub at https://github.com/jgperrin/ net.jgp.books.spark.ch08 . It requires Elasticsearch.

Timings in ingesting Elasticsearch data

Step

Time (ms)

Sum (ms)

Description

1

1524

1524

Getting a session

2

1694

3218

Connecting to Elasticsearch

3

10450

13668

Getting some records, enough to display 10 and infer the schema

4

1

13669

Displaying the schema

5

33710

47379

Getting the rest of the records

6

118

47497

Counting the number of partitions where the records are stored

 

 

Note The timings used in table 8.2 are based on a local laptop (this is also one of the beauties of Spark and Elasticsearch: everything can run on a laptop). Those timings will probably differ markedly on your system, but not the proportion.

Listing 8.12 Result of ingesting New York restaurants in Spark from Elasticsearch

Getting a session took: 1524 ms Init communication and starting to get some results took: 1694 ms +--------------------+--------------------+---------+--------+--------+---... | Action             | Address            | Boro    |Building| Camis  |... +--------------------+--------------------+---------+--------+--------+... |Violations were c...|10405 METROPOLITA...| QUEENS  | 10405  |40704305|[-7... |Violations were c...|10405 METROPOLITA...| QUEENS  | 10405  |40704305|[-7... |Violations were c...|10405 METROPOLITA...| QUEENS  | 10405  |40704305|[-7... |Violations were c...|10405 METROPOLITA...| QUEENS  | 10405  |40704305|[-7... |Violations were c...|181 WEST 4 STREET...|MANHATTAN| 181    |40704315|[-7... |Violations were c...|181 WEST 4 STREET...|MANHATTAN| 181    |40704315|[-7... |Violations were c...|181 WEST 4 STREET...|MANHATTAN| 181    |40704315|[-7... |Violations were c...|181 WEST 4 STREET...|MANHATTAN| 181    |40704315|[-7... |Violations were c...|181 WEST 4 STREET...|MANHATTAN| 181    |40704315|[-7... |Violations were c...|1007 LEXINGTON AV...|MANHATTAN| 1007   |40704453|[-7... +--------------------+--------------------+---------+--------+--------+---... only showing top 10 rows   Showing a few records took: 10450 ms root   |-- Action: string (nullable = true)   |-- Address: string (nullable = true)   |-- Boro: string (nullable = true)   |-- Building: string (nullable = true)   |-- Camis: long (nullable = true)   |-- Coord: array (nullable = true)   |   |-- element: double (containsNull = true)   |-- Critical_Flag: string (nullable = true)   |-- Cuisine_Description: string (nullable = true)   |-- Dba: string (nullable = true)   |-- Grade: string (nullable = true)   |-- Grade_Date: timestamp (nullable = true)   |-- Inspection_Date: array (nullable = true)   |   |-- element: timestamp (containsNull = true)   |-- Inspection_Type: string (nullable = true)   |-- Phone: string (nullable = true)   |-- Record_Date: timestamp (nullable = true)   |-- Score: double (nullable = true)   |-- Street: string (nullable = true)   |-- Violation_Code: string (nullable = true)   |-- Violation_Description: string (nullable = true)   |-- Zipcode: long (nullable = true)   Displaying the schema took: 1 ms The dataframe contains 473039 record(s). Counting the number of records took: 33710 ms The dataframe is split over 5 partition(s).  Counting the # of partitions took: 118 ms

As you can conclude from the timings, Spark does not require the whole dataset to be in memory before it can infer the schema and display a few rows. However, when you ask Spark to count the number of records, it needs to have everything in memory, hence the 33 seconds to download the rest of the data.

8.4.3 Code to ingest the restaurant dataset from Elasticsearch

Let’s walk through the following code to ingest the New York City restaurants dataset from Elasticsearch into Spark.

Listing 8.13 ElasticsearchToDatasetApp.java

package net.jgp.books.spark.ch08.lab400_es_ingestion;   import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession;   public class ElasticsearchToDatasetApp {        public static void main(String[] args ) {       ElasticsearchToDatasetApp app =              new ElasticsearchToDatasetApp();        app .start();     }        private void start() {        long t0 = System.currentTimeMillis();         SparkSession spark = SparkSession.builder()             .appName( "Elasticsearch to Dataframe" )             .master( "local" )             .getOrCreate();        long t1 = System.currentTimeMillis();       System. out .println( "Getting a session took: " + ( t1 - t0 ) + " ms" );         Dataset<Row> df = spark             .read()             .format( "org.elasticsearch.spark.sql" )                          .option( "es.nodes" , "localhost" )             .option( "es.port" , "9200" )             .option( "es.query" , "?q=*" )             .option( "es.read.field.as.array.include" , "Inspection_Date" )             .load( "nyc_restaurants" );          long t2 = System.currentTimeMillis();       System. out .println(              "Init communication and starting to get some results took: "                 + ( t2 - t1 ) + " ms" );          df .show(10);        long t3 = System.currentTimeMillis();       System. out .println( "Showing a few records took: " + ( t3 - t2 ) + " ms" ); #A          df .printSchema();        long t4 = System.currentTimeMillis();       System. out .println( "Displaying the schema took: " + ( t4 - t3 ) + " ms" ); #A         System. out .println( "The dataframe contains " +              df .count() + " record(s)." );        long t5 = System.currentTimeMillis();       System. out .println( "Counting the number of records took: " + ( t5 - t4 )             + " ms" ); #A         System. out .println( "The dataframe is split over " + df .rdd()             .getPartitions(). length + " partition(s)." );        long t6 = System.currentTimeMillis();       System. out .println( "Counting the # of partitions took: " + ( t6 - t5 )             + " ms" ); #A     }  }

Introducing timing after major step to understand where time is spet

As with any ingestion, we start with read().

Name of the format, which can be a short name like csv, jdbc, or a full class name (more in chapter 9)

Introducing timing after major step to understand where time is spent

Elasticsearch port (optional, as you use 9200)

Query (here, you want all; can be omitted)

You need to convert the Inspection_Date field.

Name of the dataset

As you can see, data ingestion from Elasticsearch follows the same principles as files (chapter 7) and databases. You can find the list of options for importing data in appendix L.

Summary

  • To connect to a database from Spark, you’ll need its JDBC drivers.
  • You can use properties or long URLs to connect to the database, as with JDBC.
  • You can build a dedicated dialect to connect to data sources that are not available, and it’s not very difficult.
  • Spark comes with out-of-the-box support for IBM Db2, Apache Derby, MySQL, Microsoft SQL Server, Oracle, PostgreSQL, and Teradata Database.
  • You can filter the data you are ingesting by using the (<select statement>) <table alias> syntax instead of the table name.
  • You can perform joins at the database level, prior to ingesting in Spark, but you can also do joins in Spark!
  • You can automatically assign data from the database in multiple partitions.
  • Connecting to Elasticsearch is as easy as connecting to a database.
  • Ingesting data from Elasticsearch follows the same principle as any other ingestion.
  • Elasticsearch contains JSON documents, which are ingested as is in Spark.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.17.167.114