This chapter covers
Ingestion is the first step of your big data pipeline. You will have to onboard the data in your instance of Spark, whether it is in local mode or cluster mode. As you know by now, data in Spark is transient, meaning that when you shut down Spark, it’s all gone. You will learn how to import data from standard files including CSV, JSON, XML, and text.
In this chapter, after learning about common behaviors among various parsers, you’ll use made-up datasets to illustrate specific cases, as well as datasets coming from open data platforms. It will be tempting to start performing analytics with those datasets. As you see the data displayed onscreen, you will start thinking, “What happens if I join this dataset with this other one? What if I start aggregating this field . . . ?” You will learn how to do those actions in chapters 11 through 15 and chapter 17, but first you need to get all that data into Spark!
The examples in this chapter are based on Spark v3.0. Behaviors have evolved over time, especially when dealing with CSV files.
Appendix L accompanies this chapter as a reference for data ingestion options. After you learn how to ingest files in this chapter, you can use that appendix as a reference for options so you can develop more quickly by finding all the formats and options in one convenient location.
For each format studied in this chapter, you will find, in this order
New file formats are used in the world of bigger data, as CSV or JSON could not make the cut anymore. Later in this chapter, you’ll learn about these new, popular file formats (such as Avro, ORC, Parquet, and Copybook).
Figure 7.1 illustrates where you are on your journey to data ingestion.
Figure 7.1 On your journey to file ingestion, this chapter focuses on files.
Lab All the examples of this chapter are available on GitHub at https:// github.com/jgperrin/net.jgp.books.spark.ch07 . Appendix L is a reference for ingestion.
Parsers are the tools that transfer data from an unstructured element, such as a file, to an internal structure; in Spark’s case, the dataframe. All the parsers you are going to use have similar behaviors:
a*
, you will ingest all the files starting with an a
.multiline
and multiLine
are the same.These behaviors are implementation-dependent, so when you use third-party ingestion libraries (see chapter 9), you may not have the same behaviors. If you have a specific file format to ingest, you can build custom data sources (also explained in chapter 9); however, keep these generic behaviors in mind as you build the component.
Comma-separated values ( CSV ) is probably the most popular data-exchange format around.1 Because of its age and wide use, this format has many variations in its core structure: separators are not always commas, some records may span over multiple lines, there are various ways of escaping the separator, and many more creative considerations. So when your customer tells you, “I’ll just send you a CSV file,” you can certainly nod and slowly start to freak out.
Fortunately for you, Spark offers a variety of options for ingesting CSV files. Ingesting CSV is easy, and the schema inference is a powerful feature.
You already ingested CSV files in chapters 1 and 2, so here you’ll look at more-advanced examples with more options that illustrate the complexity of CSV files in the outside world. You will first look at the file you will ingest and understand its specifications. You will then look at the result and, finally, build a mini application to achieve the result. This pattern will repeat for each format.
LAB This is lab #200. The example you are going to study is net.jgp.books .spark.ch07.lab200_csv_ingestion.ComplexCsvToDataframeApp.
Figure 7.2 illustrates the process you are going to implement.
Figure 7.2 Spark ingests a complex CSV-like file with nondefault options. After ingesting the file, the data will be in a dataframe, from which you can display records and the schema--in this case, the schema is inferred by Spark.
In listing 7.1, you will find an excerpt of a CSV file with two records and a header row. Note that CSV has become a generic term: nowadays, the C means character more than comma . You will find files in which values are separated by semicolons, tabs, pipes ( |
), and more. For the purist, the acronym may matter, but for Spark, all these separators fall into the same category.
¶
) to show the end of each line. It is not in the file.Language?
and before An
.id;authorId;title;releaseDate;link ¶
4;1;*Harry Potter and the Chamber of Secrets: The Illustrated Edition (Harr
➥
y Potter; Book 2)*;10/04/2016;http://amzn.to/2kYhL5n ¶6;2;*Development Tools in 2006: any Room for a 4GL-style Language? ¶
An independent study by Jean Georges Perrin, IIUG Board Member*;12/28/2016;
➥
http://amzn.to/2vBxOe1 ¶
The following listing shows the possible output. I added the paragraph marks to illustrate the new lines, as long records are really not easy to read.
Excerpt of the dataframe content:
+---+--------+------------------------------------------------------------------------------------------+-----------+-----------------------+¶
| id|authorId| title|releaseDate| link|¶
+---+--------+------------------------------------------------------------------------------------------+-----------+-----------------------+¶
...
| 4 | 1 | Harry Potter and the Chamber of Secrets: The Illustrated Edition (Harry Potter; Book 2) | 10/14/16 |http://amzn.to/2kYhL5n |¶
...
| 6 | 2 |Development Tools in 2006: any Room for a 4GL-style Language? ¶
An independent study by...| 12/28/16|http://amzn.to/2vBxOe1|¶ ❶
...
+---+--------+------------------------------------------------------------------------------------------+-----------+-----------------------+¶
only showing top 7 rows
Dataframe's schema:
root
|-- id: integer (nullable = true) ❷
|-- authorId: integer (nullable = true) ❷
|-- title: string (nullable = true)
|-- releaseDate: string (nullable = true) ❸
|-- link: string (nullable = true)
❶ The line break that was in your CSV file is still here.
❷ The datatype is an integer. In CSV files, everything is a string, but you will ask Spark to make an educated guess!
❸ Note that the release date is seen as a string, not a date!
To achieve the result in listing 7.2, you will have to code something similar to listing 7.3. You will first get a session, and then configure and run the parsing operation in one call using method chaining. Finally, you will show some records and display the schema of the dataframe. If you are not familiar with schemas, you can read more about them in appendix E.
package
net.jgp.books.spark.ch07.lab200_csv_ingestion;
import
org.apache.spark.sql.Dataset;import
org.apache.spark.sql.Row;import
org.apache.spark.sql.SparkSession;
public class
ComplexCsvToDataframeApp {
public static void
main(String[] args
) { ComplexCsvToDataframeApp
app
= new
ComplexCsvToDataframeApp();
app
.start(); }
private void
start() { SparkSession
spark
= SparkSession.builder() .appName(
"Complex CSV to Dataframe"
) .master(
"local"
) .getOrCreate();
Dataset<Row>
df
= spark
.read().format( "csv"
) ❶ .option(
"header"
, "true"
) ❷ .option(
"multiline"
, true
) ❸ .option(
"sep"
, ";"
) ❹ .option(
"quote"
, "*"
) ❺ .option(
"dateFormat"
, "M/d/y"
) ❻ .option(
"inferSchema"
, true
) ❼ .load(
"data/books.csv"
);
System.
out
.println( "Excerpt of the dataframe content:"
);
df
.show(7, 90); System.
out
.println( "Dataframe's schema:"
);
df
.printSchema(); }
}
❷ First line of your CSV file is a header line
❸ Some of our records split over multiple lines; you can use either a string or a Boolean, making it easier to load values from a configuration file.
❹ Separator between values is a semicolon (;)
❺ Quote character is a star (*)
❻ Date format matches the month/day/year format, as commonly used in the US
❼ Spark will infer (guess) the schema.
As you probably guessed, you will need to know what your file looks like (separator character, escape character, and so on) before you can configure the parser. Spark will not infer those. This format is part of the contract that comes with your CSV files (though most of the time you never get a clear description of the format, so you have to guess).
The schema inference feature is a pretty neat one. However, as you can see here, it did not infer that the releaseDate
column was a date. One way to tell Spark that it is a date is to specify a schema.
As you just read, ingesting CSV is easy, and the schema inference is a powerful feature. However, when you know the structure (or schema) of the CSV file, it can be useful to specify the datatypes by telling Spark what schema to use. Inferring the schema is a costly operation, and specifying enables you to have better control of the datatypes (see appendix L for a list of datatypes and more hints on ingestion).
LAB This is lab #300. The example you are going to study is net.jgp.books .spark.ch07.lab300_csv_ingestion_with_schema.ComplexCsvToDataframeWithSchemaApp.
In this example, similar to the previous one, you will start a session, define a schema, and parse the file with the help of the schema. Figure 7.3 illustrates this process.
Figure 7.3 Spark ingests a complex CSV-like file, with the help of a schema. Spark does not have to infer the schema. After the ingestion, Spark will display some records and the schema.
Unfortunately, you cannot specify whether the columns can be null
(nullability). The option exists, but it is ignored by the parser (listing 7.5 shows this option). You are going to ingest the same file as in listing 7.1.
The desired output is similar to listing 7.2. However, the schema will look like the following listing, as you are using your schema instead of the one that Spark could infer.
+---+---------+---------------+-----------+---------------+
| id|authordId| bookTitle|releaseDate| url| ❶
+---+---------+---------------+-----------+---------------+
| 1 | 1 |Fantastic Be...| 2016-11-18|http://amzn....|
| 2 | 1 |Harry Potter...| 2015-10-06|http://amzn....|
| 3 | 1 |The Tales of...| 2008-12-04|http://amzn....|
| 4 | 1 |Harry Potter...| 2016-10-04|http://amzn....|
| 5 | 2 |Informix 12....| 2017-04-23|http://amzn....|
+---+---------+---------------+-----------+---------------+
only showing top 5 rows
root
|-- id: integer (nullable = true)
|-- authordId: integer (nullable = true)
|-- bookTitle: string (nullable = true)
|-- releaseDate: date (nullable = true) ❷
|-- url: string (nullable = true)
❶ The column headers are different from the ones in the original CSV files.
Listing 7.5 shows you how to build a schema, ingest a CSV file with a specified schema, and display the dataframe.
To keep the example small, I removed some imports (they are similar to the ones in listing 7.3), and the main()
method, whose only purpose is to create an instance and call the start()
method (as in listing 7.3 and many other examples in this book). Blocks of code have been replaced by an ellipsis ( ...
).
package
net.jgp.books.spark.ch07.lab300_csv_ingestion_with_schema;...
import
org.apache.spark.sql.types.DataTypes;import
org.apache.spark.sql.types.StructField;import
org.apache.spark.sql.types.StructType;
public class
ComplexCsvToDataframeWithSchemaApp {...
private void
start() { SparkSession
spark
= SparkSession.builder() .appName(
"Complex CSV with a schema to Dataframe"
) .master(
"local"
) .getOrCreate();
StructType
schema
= DataTypes.createStructType( new
StructField[] { ❶ DataTypes.createStructField(
"id"
, ❷ DataTypes.
IntegerType
, ❸
false
), ❹ DataTypes.createStructField(
"authordId"
, DataTypes.
IntegerType
,
true
), ❺ DataTypes.createStructField(
"bookTitle"
, DataTypes.
StringType
,
false
), DataTypes.createStructField(
"releaseDate"
, DataTypes.
DateType
,
true
), ❺ DataTypes.createStructField(
"url"
, DataTypes.
StringType
,
false
) });
Dataset<Row>
df
= spark
.read().format( "csv"
) .option(
"header"
, "true"
) .option(
"multiline"
, true
) .option(
"sep"
, ";"
) .option(
"dateFormat"
, "MM/dd/yyyy"
) .option(
"quote"
, "*"
) .schema(
schema
) ❻ .load(
"data/books.csv"
);
df
.show(5, 15);
df
.printSchema(); }
}
❶ This is one way to create a schema; in this example, our schema is an array of StructField.
❷ Name of the field; it will overwrite the column’s name in the file.
❸ The datatype; see listing 7.3 for a list of values with explanations
❹ Is this field nullable? Equivalent to: can this field accept a null value?
❺ This value is ignored by the parser.
❻ Tells the reader to use your schema
If you unchain the method chain, you can see that the read()
method returns an instance of DataFrameReader
. This is the object you configure using the option()
methods, the schema()
method, and finally the load()
method.
As I previously said, CSV has an impressive number of variants, so Spark has an equally impressive number of options--and they keep growing. Appendix L lists those options.
Over the last few years, JavaScript Object Notation ( JSON ) has become the new cool kid in town in terms of data exchange, mainly after representational state transfer (REST) supplanted Simple Object Access Protocol (SOAP) and Web Services Description Language (WSDL, written in XML) in web services-oriented architecture.
JSON is easier to read, less verbose, and brings fewer constraints than XML. It supports nested constructs such as arrays and objects. You can find out more about JSON at https://www.json.org . Nevertheless, JSON is still very verbose!
A subformat of JSON is called JSON Lines . JSON Lines ( http://jsonlines.org ) stores a record on one line, easing parsing and readability. Here is a small example copied from the JSON Lines website; as you can see, it supports Unicode:
{"name": "Gilbert", "wins": [["straight", "7♣"], ["one pair", "10♥"]]}
{"name": "Alexa", "wins": [["two pair", "4♠"], ["two pair", "9♠"]]}
{"name": "May", "wins": []}
{"name": "Deloise", "wins": [["three of a kind", "5♣"]]}
Before Spark v2.2.0, JSON Lines was the only JSON format that Spark could read. Figure 7.4 illustrates JSON ingestion.
For your first JSON ingestion, you are going to use the foreclosure dataset from the city of Durham, NC, for 2006 to 2016. You can download this dataset for free from the city’s recently changed portal at https://live-durhamnc.opendata.arcgis.com/ .
Figure 7.4 Spark ingests a JSON Lines file. The records are in JSON but on one line each. After the ingestion, Spark will display some records and the schema.
LAB This is lab #400. The example you are going to study is net.jgp.books.spark .ch07.lab400_json_ingestion.JsonLinesToDataframeApp. The data comes from Open Durham, the open data portal of the city and county of Durham, NC. The data being used comes from the former open data portal using Opendatasoft’s solution, which provides data as JSON Lines.
As shown in figure 7.4, each record appears on one line. The following listing shows three records (the first two and the last).
[{"datasetid": "foreclosure-2006-2016", "recordid": "629979c85b1cc68c1d4ee8
➥
cc351050bfe3592c62", "fields": {"parcel_number": "110138", "geocode": [➥
36.0013755, -78.8922549], "address": "217 E CORPORATION ST", "year": "2➥
006"}, "geometry": {"type": "Point", "coordinates": [-78.8922549, 36.00➥
13755]}, "record_timestamp": "2017-03-06T12:41:48-05:00"},{"datasetid": "foreclosure-2006-2016", "recordid": "e3cce8bbc3c9b804cbd87e2
➥
67a6ff121285274e0", "fields": {"parcel_number": "110535", "geocode": [3➥
5.995797, -78.895396], "address": "401 N QUEEN ST", "year": "2006"}, "g➥
eometry": {"type": "Point", "coordinates": [-78.895396, 35.995797]},...
{"datasetid": "foreclosure-2006-2016", "recordid": "1d57ed470d533985d5a3c3d
➥
fb37c294eaa775ccf", "fields": {"parcel_number": "194912", "geocode": [3➥
5.955832, -78.742107], "address": "2516 COLEY RD", "year": "2016"}, "ge➥
ometry": {"type": "Point", "coordinates": [-78.742107, 35.955832]}, "re
➥
cord_timestamp": "2017-03-06T12:41:48-05:00"}]
The following listing shows an indented version of the first record (pretty printed via JSONLint [ https://jsonlint.com/ ] and Eclipse), so you can see the structure: field names, arrays, and nested structure.
[
{
"datasetid"
: "foreclosure-2006-2016"
,
"recordid"
: "629979c85b1cc68c1d4ee8cc351050bfe3592c62"
,
"fields"
: {
"parcel_number"
: "110138"
,
"geocode"
: [
36.0013755
,
-78.8922549
],
"address"
: "217 E CORPORATION ST"
,
"year"
: "2006"
},
"geometry"
: {
"type"
: "Point"
,
"coordinates"
: [
-78.8922549
,
36.0013755
]
},
"record_timestamp"
: "2017-03-06T12:41:48-05:00"
}
...
]
The following listing shows the output of a dataframe’s data and schema after ingesting a JSON Lines document.
+-------------+-------------+-------------+----------------+-------------+
| datasetid | fields | geometry |record_timestamp| recordid |
+-------------+-------------+-------------+----------------+-------------+
|foreclosur...|[217 E COR...|[WrappedAr...| 2017-03-06... |629979c85b...|
|foreclosur...|[401 N QUE...|[WrappedAr...| 2017-03-06... |e3cce8bbc3...|
|foreclosur...|[403 N QUE...|[WrappedAr...| 2017-03-06... |311559ebfe...|
|foreclosur...|[918 GILBE...|[WrappedAr...| 2017-03-06... |7ec0761bd3...|
|foreclosur...|[721 LIBER...|[WrappedAr...| 2017-03-06... |c81ae2921f...|
+-------------+-------------+-------------+----------------+-------------+
only showing top 5 rows
root
|-- datasetid: string (nullable = true) ❶
|-- fields: struct (nullable = true) ❷
| |-- address: string (nullable = true)
| |-- geocode: array (nullable = true) ❸
| | |-- element: double (containsNull = true)
| |-- parcel_number: string (nullable = true) ❶
| |-- year: string (nullable = true) ❶
|-- geometry: struct (nullable = true)
| |-- coordinates: array (nullable = true) ❸
| | |-- element: double (containsNull = true)
| |-- type: string (nullable = true) ❶
|-- record_timestamp: string (nullable = true) ❶
|-- recordid: string (nullable = true) ❶
|-- year: string (nullable = true) ❶
❶ For every field that Spark cannot precisely identify the datatype, it will use a string.
❷ The “fields” field is a structure with nested fields.
❸ The data-frame can contain arrays.
When you see a piece of data like that, aren’t you tempted to group by the year to see the evolution of foreclosures or display each event on a map to see if there are areas more subject to foreclosures, and compare with average incomes in this area? This is good--let your inner data-scientist spirit come out! Data transformations are covered in part 2 of this book.
Reading JSON is not much more complex that ingesting a CSV file, as you can see in the following listing.
package
net.jgp.books.spark.ch07.lab400_json_ingestion;
import
org.apache.spark.sql.Dataset;import
org.apache.spark.sql.Row;import
org.apache.spark.sql.SparkSession;
public class
JsonLinesToDataframeApp {
public static void
main(String[] args
) { JsonLinesToDataframeApp
app
=
new
JsonLinesToDataframeApp();
app
.start(); }
private void
start() { SparkSession
spark
= SparkSession.builder() .appName(
"JSON Lines to Dataframe"
) .master(
"local"
) .getOrCreate();
Dataset<Row>
df
= spark
.read().format( "json"
) ❶ .load(
"data/durham-nc-foreclosure-2006-2016.json"
);
df
.show(5, 13);
df
.printSchema(); }
}
❶ That’s it! This is the only change you have to do to ingest JSON.
Starting with v2.2, Spark can ingest more-complex JSON files and is not constrained to the JSON Lines format. This section will show you how to process these files.
For this JSON ingestion, you’ll use travel advisory data from the Bureau of Consular Affairs at the US Department of State.
LAB This is lab #500. The example you are going to study is net.jgp.books .spark.ch07.lab500_json_multiline_ingestion.MultilineJsonToDataframeApp. The Bureau of Consular Affairs runs an open data portal based on CKAN. CKAN is an open source open data portal; you can learn more about CKAN at https://ckan.org/ . You can access the bureau’s portal at https://cadatacatalog .state.gov/ . Click the Travel link and then the countrytravelinfo link; then click the Go to Resource button to download the file.
Figure 7.5 illustrates the process.
Figure 7.5 Spark ingests a JSON file, in which records are spread over multiple lines. After the ingestion, Spark will display some records and the schema.
The following listing shows an excerpt of the file. For readability purposes, I have shortened the long descriptions.
[ {
"tag"
: "A1"
,
"geopoliticalarea"
: "Bonaire, Sint Eustatius, and Saba (BES) (Dutch Cari
➥
bbean)"
,
"travel_transportation"
: "<p><b>Road Conditions ..."
,
"health"
: "<p>Medical care on the BES islands ..."
,
"local_laws_and_special_circumstances"
: "<p> </p><p><b>..."
,
"entry_exit_requirements"
: "<p>All U.S. citizens must..."
,
"destination_description"
: "<p>The three islands of Bonaire..."
,
"iso_code"
: ""
,
"travel_embassyAndConsulate"
: " <div class="content ..."
,
"last_update_date"
: "Last Updated: September 21, 2016 "
}, {
"tag"
: "A2"
,
"geopoliticalarea"
: "French Guiana"
,
"travel_transportation"
: "<p><b>Road Conditions and ..."
,
"local_laws_and_special_circumstances"
: "<p><b>Criminal Penalties..."
,
"safety_and_security"
: "<p>French Guiana is an overseas department..."
,
"entry_exit_requirements"
: "<p>Visit the..."
,
"destination_description"
: "<p>French Guiana is an overseas..."
,
"iso_code"
: "GF"
,
"travel_embassyAndConsulate"
: " <div class="content..."
,
"last_update_date"
: "Last Updated: October 12, 2017 "
}, ... ]
As you can see, this is a rather basic JSON file with an array of objects. Each object has simple key/value pairs. The content of some fields contains rich text in HTML or dates in a nonstandard format (JSON dates should match RFC 3339), which makes extracting information a little more complex. But I am pretty sure you have seen similar examples in your day-to-day projects.
The following listing shows the output of the travel advisory as it is digested. I removed some columns to fit the code on this page.
+-----------------------+-----------------------+--------------------+---...
|destination_description|entry_exit_requirements| geopoliticalarea | ...
+-----------------------+-----------------------+--------------------+---...
| <p>The three isla... | <p>All U.S. citiz... |Bonaire, Sint Eus...|<p>...
| <p>French Guiana ... | <p>Visit the ... | French Guiana|<p>...
| <p>See the Depart... | <p><b>Passports a... | St Barthelemy|<p>...
| <p>Read the Depar... | <p>Upon arrival i... | Aruba|<p>...
| <p>See the Depart... | <p><b>Passports a... | Antigua and Barbuda|<p>...
+-----------------------+-----------------------+--------------------+---...
only showing top 5 rows
root
|-- destination_description: string (nullable = true)
|-- entry_exit_requirements: string (nullable = true)
|-- geopoliticalarea: string (nullable = true)
|-- health: string (nullable = true)
|-- iso_code: string (nullable = true)
|-- last_update_date: string (nullable = true) ❶
|-- local_laws_and_special_circumstances: string (nullable = true)
|-- safety_and_security: string (nullable = true)
|-- tag: string (nullable = true)
|-- travel_embassyAndConsulate: string (nullable = true)
|-- travel_transportation: string (nullable = true)
❶ Your date is not really a date! You will learn how to turn this nonstandard field into a date as you work on data quality in chapters 12 and 14.
The following listing shows the Java code needed to process the Department of State’s travel advisory.
package
net.jgp.books.spark.ch07.lab500_json_multiline_ingestion;
import
org.apache.spark.sql.Dataset;import
org.apache.spark.sql.Row;import
org.apache.spark.sql.SparkSession;
public clas
s MultilineJsonToDataframeApp {
public static void
main(String[] args
) { MultilineJsonToDataframeApp
app
=
new
MultilineJsonToDataframeApp();
app
.start(); }
private void
start() { SparkSession
spark
= SparkSession.builder() .appName(
"Multiline JSON to Dataframe"
) .master(
"local"
) .getOrCreate();
Dataset<Row>
df
= spark
.read() .format(
"json"
) .option(
"multiline"
, true
) ❶ .load(
"data/countrytravelinfo.json"
);
df
.show(3);
df
.printSchema(); }
}
❶ The key to processing multiline JSON!
If you forget the multiline
option, your dataframe will be composed of a single column called _corrupt_record
:
+--------------------+
| _corrupt_record |
+--------------------+
| [ { |
| "tag" : "A1", |
| "geopoliticalar... |
+--------------------+
only showing top 3 rows
In this section, you will ingest an Extensible Markup Language ( XML ) document containing National Aeronautics and Space Administration (NASA) patents and then display some patents and the dataframe’s schema. Note that, in this context, the schema is not an XML Schema (or XSD), but the dataframe schema. Quite a few years ago, when I discovered XML, I really thought it could become a unified lingua franca of data exchange. XML can be described as follows:
You can read more about XML at https://www.w3.org/XML/ . XML looks like HTML and any other markup language since SGML:
<rootElement>
<element attribute="attribute's value">
Some payload in a text element
</element>
<element type="without sub nodes"/>
</rootElement>
Unfortunately, XML is verbose and harder to read than JSON. Nevertheless, XML is still widely used, and Apache Spark ingests it nicely.
LAB This is lab #600. The example you are going to study is net.jgp.books .spark.ch07.lab600_xml_ingestion.XmlToDataframeApp.
Figure 7.6 shows a fragment of the XML file and illustrates the process.
For this XML example, you are going to ingest the NASA patents. NASA offers various open datasets at https://data.nasa.gov . Listing 7.13 shows a record of this file.
Figure 7.6 Spark ingests an XML file containing NASA patents. Spark uses an external plugin, provided by Databricks, to perform the ingestion. Spark will then display records and the dataframe schema (not to be confused with an XML Schema).
LAB You can download the NASA patents dataset from https://data.nasa .gov/Raw-Data/NASA-Patents/gquh-watm . For this example, I used Spark v2.2.0 on Mac OS X v10.12.6 with Java 8 as well as the Databricks XML parser v0.4.1. The dataset was downloaded in January 2018.
<response>
❶
<row
❷ _
id
= "1"
❸ _
uuid
= "BAC69188-84A6-4D28-951E-FC687ACB6D4A"
❸ _
position
= "1"
❸ _
address
= "
https://data.nasa.gov/resource/nasa-patents/1
"
>
❸
<center>
NASA Ames Research Center
</center>
<status
> Issued
</status>
<case_number>
ARC-14048-1
</case_number>
<patent_number>
5694939
</patent_number>
<application_sn>
08/543,093
</application_sn>
<title>
Autogenic-Feedback Training Exercise Method & System
</title>
<patent_expiration_date>
2015-10-03T00:00:00
</patent_expiration_date>
</row>
...
</response>
❶ Root element of your list of patents
❷ Element (or tag) designing our record
❸ Attributes are prefixed by one underscore (_).
Listing 7.14 shows the output of a dataframe’s data and schema after ingesting the NASA patents as an XML document. You can see that the attributes are prefixed by an underscore (attributes already had an underscore as a prefix in the original document, so now they have two), and the element’s name is used as a column name.
+--------------------+----+----------+--------------------+--------------+...
| __address |__id|__position| __uuid |application_sn|...
+--------------------+----+----------+--------------------+--------------+...
|https://data.nasa...| 407| 407 |2311F785-C00F-422...| 13/033,085 |...
|https://data.nasa...| 1 | 1 |BAC69188-84A6-4D2...| 08/543,093 |...
|https://data.nasa...| 2 | 2 |23D6A5BD-26E2-42D...| 09/017,519 |...
|https://data.nasa...| 3 | 3 |F8052701-E520-43A...| 10/874,003 |...
|https://data.nasa...| 4 | 4 |20A4C4A9-EEB6-45D...| 09/652,299 |...
+--------------------+----+----------+--------------------+--------------+...
only showing top 5 rows
root
|-- __address: string (nullable = true)
|-- __id: long (nullable = true)
|-- __position: long (nullable = true)
|-- __uuid: string (nullable = true)
|-- application_sn: string (nullable = true)
|-- case_number: string (nullable = true)
|-- center: string (nullable = true)
|-- patent_expiration_date: string (nullable = true)
|-- patent_number: string (nullable = true)
|-- status: string (nullable = true)
|-- title: string (nullable = true)
As usual, our code will start with a main()
method, which calls a start()
method to create a Spark session. The following listing is the Java code needed to ingest the NASA XML file and then display five records and its schema.
package
net.jgp.books.spark.ch07.lab600_xml_ingestion;
import
org.apache.spark.sql.Dataset;import
org.apache.spark.sql.Row;import
org.apache.spark.sql.SparkSession;
public class
XmlToDataframeApp {
public static void
main(String[] args
) { XmlToDataframeApp
app
= new
XmlToDataframeApp();
app
.start(); }
private void
start() { SparkSession
spark
= SparkSession.builder() .appName(
"XML to Dataframe"
) .master(
"local"
) .getOrCreate();
Dataset<Row>
df
= spark
.read().format( "xml"
) ❶ .option(
"rowTag"
, "row"
) ❷ .load(
"data/nasa-patents.xml"
);
df
.show(5);
df
.printSchema(); }
}
❶ Specifies XML as the format. Case does not matter.
❷ Element or tag that indicates a record in the XML file
I had to modify the original NASA document because it contained an element with the same name as the record, wrapping the records. Unfortunately, as of now, Spark cannot change this element’s name for us. The original structure was as follows:
<response>
<row>
<row
_ id
= "1" ...
>
...
</row>
...
</row>
</response>
If the first child of response
had been rows
, or anything else other than row
, I wouldn’t have had to remove it (another option is to rename it).
Because the parser is not part of the standard Spark distribution, you will have to add it to the pom.xml file, as described in the following listing. To ingest XML, you will use spark-xml_2.12 (the artifact) from Databricks, in version 0.7.0.
...
<properties>
...
<scala.version>
2.12 </scala.version>
❶
<spark-xml.version>
0.7.0 </spark-xml.version>
❷
</properties>
<dependencies>
...
<dependency>
<groupId>
com.databricks </groupId>
<artifactId>
spark-xml_${scala.version} </artifactId>
❸
<version>
${spark-xml.version} </version>
❹
<exclusions>
❺
<exclusion>
<groupId>
org.slf4j </groupId>
<artifactId>
slf4j-simple </artifactId>
</exclusion>
</exclusions>
</dependency>
...
</dependencies>
...
❶ Scala version on which the XML is built
❸ Equivalent to spark-xml_2.12
❺ Optional: I excluded the logger from other packages to have better control over the one I use.
More details on Spark XML can be found at https://github.com/databricks/spark-xml .
Although text files are less popular in enterprise applications, they are still used, so you’ll see them from time to time. The growing popularity of deep learning and artificial intelligence also drives more natural language processing (NLP) activities. In this section, you will not do any NLP, but simply ingest text files. To learn more about NLP, see Natural Language Processing in Action by Hobson Lane, Cole Howard, and Hannes Max Hapke (Manning, 2019).
In lab #700, you are going to ingest Shakespeare’s Romeo and Juliet . Project Gutenberg ( http://www.gutenberg.org ) hosts numerous books and resources in digital format.
Each line of the book will become a record of our dataframe. There is no feature to cut by sentence or word. Listing 7.17 shows an excerpt of the file you’ll work on.
LAB This is lab #700. The example you are going to study is net.jgp.books .spark.ch07.lab700_text_ingestion.TextToDataframeApp. You can download Romeo and Juliet from www.gutenberg.org/cache/epub/1777/pg1777.txt .
This Etext file is presented by Project Gutenberg, in
cooperation with World Library, Inc., from their Library of the
Future and Shakespeare CDROMS. Project Gutenberg often releases
Etexts that are NOT placed in the Public Domain!!
...
ACT I. Scene I.
Verona. A public place.
Enter Sampson and Gregory (with swords and bucklers) of the house
of Capulet.
Samp. Gregory, on my word, we'll not carry coals.
Greg. No, for then we should be colliers.
Samp. I mean, an we be in choler, we'll draw.
Greg. Ay, while you live, draw your neck out of collar.
Samp. I strike quickly, being moved.
Greg. But thou art not quickly moved to strike.
Samp. A dog of the house of Montague moves me.
...
The following listing shows the first five rows of Romeo and Julie t after it has been ingested by Spark and transformed into a dataframe.
+--------------------+
| value |
+--------------------+
| |
|This Etext file i...|
|cooperation with ...|
|Future and Shakes...|
|Etexts that are N...|
...
root
|-- value: string (nullable = true)
The following listing is the Java code needed to turn Romeo and Juliet into a dataframe.
package
net.jgp.books.spark.ch07.lab700_text_ingestion;
import
org.apache.spark.sql.Dataset;import
org.apache.spark.sql.Row;import
org.apache.spark.sql.SparkSession;
public class
TextToDataframeApp {
public static void
main(String[] args
) { TextToDataframeApp
app
= new
TextToDataframeApp();
app
.start(); }
private void
start() { SparkSession
spark
= SparkSession.builder() .appName(
"Text to Dataframe"
) .master(
"local"
) .getOrCreate();
Dataset<Row>
df
= spark
.read().format( "text"
) ❶ .load(
"data/romeo-juliet-pg1777.txt"
);
df
.show(10);
df
.printSchema(); }
}
❶ Specify “text” when you want to ingest a text file.
Unlike with other formats, there are no options to set with text.
Big data brings its own set of file formats. If you haven’t seen an Avro, ORC, or Parquet file yet, you will definitely see one (if not all of them) on your journey with Spark. It is important to understand what those files are before you ingest them.
I hear you: “Why do I need more file formats?” In this section, I answer this question. Then I discuss these three newer formats. In section 7.9, I’ll show you how to ingest data in these newer formats.
In the context of big data, traditional files formats such as text, CSV, JSON, and XML have limitations you should be aware of.
In most big data projects, you will have to pull the data from somewhere (the sources) and you may have to put it back somewhere else (the destinations). Figure 7.7 describes this process.
The sources can be files (studied in this chapter), databases (chapter 8), complex systems or APIs (chapter 9), or even streams (chapter 10). Even if you can access all those resources in a more efficient way than files, for some odd reason you still have to deal with files and their annoying life cycles.
“So why can’t I just use JSON, XML, or CSV?” Here are some reasons:
“Okay, so why not use a binary format like the ones used by RDBMSs?” Because every vendor has its own format, you would end up with a myriad of formats. Other formats, such as COBOL Copybook or its Programming Language One (PL/I), or IBM’s High Level Assembler (HLASM), would be too complex and linked to IBM’s mainframe to be used generically.
Therefore, new formats had to be developed, and as usual, the industry created a plethora. The most popular ones are Avro, ORC, and Parquet.
In most organizations, you probably will not have a choice of which format you will have to deal with. That choice has probably already been made. Some file formats may have been inherited from the Hadoop distribution your team started working with. But if your organization has not made a choice, the following quick definitions can help you make a more educated decision.
You may also encounter other file formats from time to time, but the ones I just mentioned are the most popular. Let’s have a look at each of them.
Figure 7.7 A classic big data scenario with ingestion from multiple sources, transformation, and export. This section is about understanding the limit of traditional file formats as well as the benefits of Avro, ORC, and Parquet in the context of big data.
Apache Avro is a data serialization system, which provides rich data structures in a compact, fast, and binary data format.
Avro was designed for remote procedure calls (RPCs) in a similar way as Protocol Buffers (Protobuf), a popular method for transferring serializable data developed and open sourced by Google; learn more at https://developers.google.com/protocol-buffers/. Avro supports dynamic modification of the schema. Avro offers a schema, written in JSON. Because an Avro file is row-based, the file is easier to split, like CSV.
Avro is available in Java, C, C++, C#, Go, Haskell, Perl, PHP, Python, Ruby, and Scala. The reference for Avro can be found at https://avro.apache.org/docs/current/ .
Apache Optimized Row Columnar (ORC), along with its predecessor RCFile, is a columnar storage format. ORC is ACID-compliant (atomicity, consistency, isolation, durability).
Beyond the standard datatypes, ORC supports compound types including structs, lists, maps, and unions. ORC supports compression, which reduces file size and network transfer time (always a bonus for big data).
Apache ORC is backed by Hortonworks, which means that all Cloudera-based tools, such as Impala (a SQL query engine for data stored in a Hadoop cluster), may not fully support ORC. With the merger of Hortonworks and Cloudera, it is not yet known what to expect for the support of those file formats.
ORC is available in Java and C++. The reference for ORC can be found at https://orc.apache.org/ .
Like ORC, Apache Parquet is a columnar file format. Parquet supports compression, and you can add columns at the end of the schema. Parquet also supports compound types such as lists and maps.
Parquet seems to be the most popular format among big data practitioners. Apache Parquet is backed by Cloudera, in collaboration with Twitter. Once more, with the merger of Hortonworks and Cloudera, it is not yet known what to expect for the support of those file formats. Parquet seems more popular nevertheless.
Parquet is available in Java. The reference for Parquet can be found at https:// parquet.apache.org/ .
Big data file formats add a layer of complexity. Nevertheless, I hope you understand their necessity and the differences between them. The following are qualities that ORC, Parquet, and Avro share:
Based on popularity, if you have a choice to make, Parquet is probably the way to go. Remember that your organization may already have a standard for big data file formats, and even if it is not Parquet, it is probably a good idea to stick with that choice.
If you are interested in more technical details, you can read “Big Data File Formats Demystified” by Alex Woodie ( http://mng.bz/2JBa ) and “Hadoop File Formats: It’s Not Just CSV Anymore” by Kevin Hass ( http://mng.bz/7zAQ ). Both articles were written prior to the Hortonworks and Cloudera merger.
In this last section, I’ll show you how to ingest Avro, ORC, and Parquet files. Earlier in this chapter, you learned about traditional formats including CSV, JSON, XML, and text files. As you may recall, the constructs of those file formats are similar. As expected, the ingestion process will be similar for the big data file format.
In all the examples, I used sample datafiles from the Apache projects themselves. They are, unfortunately, not as inspiring for creative analytics as you might have expected, considering all the other datasets I used in this book.
To ingest an Avro file, you need to add a library to your project, as Avro is not natively supported by Spark. After that, the ingestion is straightforward, like any file ingestion.
LAB The example you are going to study is net.jgp.books.spark.ch07 .lab910_avro_ingestion.AvroToDataframeApp. The sample file comes from the Apache Avro project itself.
The following listing shows the expected output of this example.
+------------+-------------+----+
| station | time |temp|
+------------+-------------+----+
|011990-99999|-619524000000| 0 |
|011990-99999|-619506000000| 22 |
|011990-99999|-619484400000| -11|
|012650-99999|-655531200000| 111|
|012650-99999|-655509600000| 78 |
+------------+-------------+----+
root
|-- station: string (nullable = true)
|-- time: long (nullable = true)
|-- temp: integer (nullable = true)
The dataframe has 5 rows.
Since Spark v2.4, Avro is part of the Apache community. Prior to this version, it was maintained by Databricks (as the XML data source). You will still need to manually add the dependency to your pom.xml file. The additional library is available through Maven Central, and you can add the library definition in your pom.xml:
...
<properties>
<scala.version>
2.12 </scala.version>
<spark.version>
3.0.0 </spark.version>
...
</properties>
<dependencies>
<dependency>
<groupId>
org.apache.spark </groupId>
<artifactId>
spark-avro_${scala.version }</artifactId>
<version>
${spark.version} </version>
</dependency>
...
</dependencies>
<dependency>
<groupId>
org.apache.spark </groupId>
<artifactId>
spark-avro_2.12 </artifactId>
<version>
3.0.0 </version>
</dependency>
After you have added the library, you can write your code, as in the following listing.
package
net.jgp.books.spark.ch07.lab900_avro_ingestion;
import
org.apache.spark.sql.Dataset; ❶import
org.apache.spark.sql.Row; ❶import
org.apache.spark.sql.SparkSession; ❶
public class
AvroToDataframeApp {
public static void
main(String[] args
) { AvroToDataframeApp
app
= new
AvroToDataframeApp();
app
.start(); }
private void
start() { SparkSession
spark
= SparkSession.builder() .appName(
"Avro to Dataframe"
) .master(
"local"
) .getOrCreate();
Dataset<Row>
df
= spark
.read() .format(
"avro"
) ❷ .load(
"data/weather.avro"
);
df
.show(10);
df
.printSchema(); System.
out
.println( "The dataframe has "
+ df
.count() +
" rows."
); }
}
❶ You do not need any special imports; the library will be dynamically loaded.
❷ Specifies the format; no short code is available.
To learn more about Avro’s support in Spark prior to v2.4, you can refer to the Databricks GitHub repository at https://github.com/databricks/spark-avro .
Ingesting ORC is a straightforward process. The format code needed by Spark is orc
. In Spark versions prior to Spark v2.4, you also need to configure your session by specifying the implementation if you do not already use Apache Hive.
Lab The example you are going to study is net.jgp.books.spark.ch07 .lab920_orc_ingestion.OrcToDataframeApp. The sample file comes from the Apache ORC project itself.
The following listing shows the expected output of this example.
+-----+-----+-----+-------+-----+-----+-----+-----+-----+
|_col0|_col1|_col2| _col3 |_col4|_col5|_col6|_col7|_col8|
+-----+-----+-----+-------+-----+-----+-----+-----+-----+
| 1 | M | M |Primary| 500 | Good| 0 | 0 | 0 |
| 2 | F | M |Primary| 500 | Good| 0 | 0 | 0 |
...
| 10 | F | U |Primary| 500 | Good| 0 | 0 | 0 |
+-----+-----+-----+-------+-----+-----+-----+-----+-----+
only showing top 10 rows
root
|-- _col0: integer (nullable = true)
|-- _col1: string (nullable = true)
|-- _col2: string (nullable = true)
|-- _col3: string (nullable = true)
|-- _col4: integer (nullable = true)
|-- _col5: string (nullable = true)
|-- _col6: integer (nullable = true)
|-- _col7: integer (nullable = true)
|-- _col8: integer (nullable = true)
The dataframe has 1920800 rows.
The following listing provides the sample code to read an ORC file.
package
net.jgp.books.spark.ch07.lab910_orc_ingestion;
import
org.apache.spark.sql.Dataset;import
org.apache.spark.sql.Row;import
org.apache.spark.sql.SparkSession;
public class
OrcToDataframeApp {
public static void
main(String[] args
) { OrcToDataframeApp
app
= new
OrcToDataframeApp();
app
.start(); }
private void
start() { SparkSession
spark
= SparkSession.builder() .appName(
"ORC to Dataframe"
) .config(
"spark.sql.orc.impl"
, "native"
) ❶ .master
("local"
) .getOrCreate();
Dataset<Row>
df
= spark
.read() .format(
"orc"
) ❷ .load(
"data/demo-11-zlib.orc"
); ❸
df
.show(10);
df
.printSchema(); System.
out
.println( "The dataframe has "
+ df
.count() + " rows."
); }
}
❶ Use the native implementation to access the ORC file, not the Hive implementation.
❸ A standard file that comes from the Apache ORC project
The implementation parameter can have either the native
value or the hive
value. The native implementation means that it uses the implementation that comes with Spark. It is the default value starting with Spark v2.4.
In this section, you’ll look at how Spark ingests Parquet. Spark will easily ingest Parquet files natively: there is no need for an additional library or configuration. It is important to remember that Parquet is also the default format used by Spark and Delta Lake (in chapter 17).
Lab The example you are going to study is net.jgp.books.spark.ch07 .lab930_parquet_ingestion.ParquetToDataframeApp. The sample file comes from the Apache Parquet Testing project itself, available at https://github .com/apache/parquet-testing .
The following listing shows the expected output of this example.
+---+--------+-----------+------------+-------+----------+---------+...
| id|bool_col|tinyint_col|smallint_col|int_col|bigint_col|float_col|...
+---+--------+-----------+------------+-------+----------+---------+...
| 4 | true | 0 | 0 | 0 | 0 | 0.0 |...
| 5 | false | 1 | 1 | 1 | 10 | 1.1 |...
| 6 | true | 0 | 0 | 0 | 0 | 0.0 |...
| 7 | false | 1 | 1 | 1 | 10 | 1.1 |...
| 2 | true | 0 | 0 | 0 | 0 | 0.0 |...
| 3 | false | 1 | 1 | 1 | 10 | 1.1 |...
| 0 | true | 0 | 0 | 0 | 0 | 0.0 |...
| 1 | false | 1 | 1 | 1 | 10 | 1.1 |...
+---+--------+-----------+------------+-------+----------+---------+...
root
|-- id: integer (nullable = true)
|-- bool_col: boolean (nullable = true)
|-- tinyint_col: integer (nullable = true)
|-- smallint_col: integer (nullable = true)
|-- int_col: integer (nullable = true)
|-- bigint_col: long (nullable = true)
|-- float_col: float (nullable = true)
|-- double_col: double (nullable = true)
|-- date_string_col: binary (nullable = true)
|-- string_col: binary (nullable = true)
|-- timestamp_col: timestamp (nullable = true)
The dataframe has 8 rows.
The following listing provides the sample code to read a Parquet file.
package
net.jgp.books.spark.ch07.lab930_parquet_ingestion;
import
org.apache.spark.sql.Dataset;import
org.apache.spark.sql.Row;import
org.apache.spark.sql.SparkSession;
public class
ParquetToDataframeApp {
public static void
main(String[] args
) { ParquetToDataframeApp
app
= new
ParquetToDataframeApp();
app
.start(); }
private void
start() { SparkSession
spark
= SparkSession. builder () .appName(
"Parquet to Dataframe"
) .master(
"local"
) .getOrCreate();
Dataset<Row>
df
= spark
.read() .format(
"parquet"
) ❶ .load(
"data/alltypes_plain.parquet"
);
df
.show(10);
df
.printSchema(); System.
out
.println ("The dataframe has "
+ df
.count() + " rows."
); }
}
❶ Spark code for reading Parquet
Table 7.1 summarizes the Spark format code for each file type you want to ingest.
1. For more information, see Wikipedia’s page on CSV at https://en.wikipedia.org/wiki/Comma-separated _values . In the History section, you’ll learn that CSV has been around for quite some time.
18.225.149.32