Interacting with data sources

A major challenge in data science or engineering is dealing with the wealth of input and output formats for persisting data. We might receive or send data as CSV files, JSON files, or through a SQL database, to name a few.

Spark provides a unified API for serializing and de-serializing DataFrames to and from different data sources.

JSON files

Spark supports loading data from JSON files, provided that each line in the JSON file corresponds to a single JSON object. Each object will be mapped to a DataFrame row. JSON arrays are mapped to arrays, and embedded objects are mapped to structs.

This section would be a little dry without some data, so let's generate some from the GitHub API. Unfortunately, the GitHub API does not return JSON formatted as a single object per line. The code repository for this chapter contains a script, FetchData.scala which will download and format JSON entries for Martin Odersky's repositories, saving the objects to a file named odersky_repos.json (go ahead and change the GitHub user in FetchData.scala if you want). You can also download a pre-constructed data file from data.scala4datascience.com/odersky_repos.json.

Let's dive into the Spark shell and load this data into a DataFrame. Reading from a JSON file is as simple as passing the file name to the sqlContext.read.json method:

scala> val df = sqlContext.read.json("odersky_repos.json")
df: DataFrame = [archive_url: string, assignees_url: ...]

Reading from a JSON file loads data as a DataFrame. Spark automatically infers the schema from the JSON documents. There are many columns in our DataFrame. Let's sub-select a few to get a more manageable DataFrame:

scala> val reposDF = df.select("name", "language", "fork", "owner")
reposDF: DataFrame = [name: string, language: string, ...]    

scala> reposDF.show
+----------------+----------+-----+--------------------+
|            name|  language| fork|               owner|
+----------------+----------+-----+--------------------+
|           dotty|     Scala| true|[https://avatars....|
|        frontend|JavaScript| true|[https://avatars....|
|           scala|     Scala| true|[https://avatars....|
|      scala-dist|     Scala| true|[https://avatars....|
|scala.github.com|JavaScript| true|[https://avatars....|
|          scalax|     Scala|false|[https://avatars....|
|            sips|       CSS|false|[https://avatars....|
+----------------+----------+-----+--------------------+

Let's save the DataFrame back to JSON:

scala> reposDF.write.json("repos_short.json")

If you look at the files present in the directory in which you are running the Spark shell, you will notice a repos_short.json directory. Inside it, you will see files named part-000000, part-000001, and so on. When serializing JSON, each partition of the DataFrame is serialized independently. If you are running this on several machines, you will find parts of the serialized output on each computer.

You may, optionally, pass a mode argument to control how Spark deals with the case of an existing repos_short.json file:

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

scala> reposDF.write.mode(
  SaveMode.Overwrite).json("repos_short.json")

Available save modes are ErrorIfExists, Append (only available for Parquet files), Overwrite, and Ignore (do not save if the file exists already).

Parquet files

Apache Parquet is a popular file format well-suited for storing tabular data. It is often used for serialization in the Hadoop ecosystem, since it allows for efficient extraction of specific columns and rows without having to read the entire file.

Serialization and deserialization of Parquet files is identical to JSON, with the substitution of json with parquet:

scala> reposDF.write.parquet("repos_short.parquet")

scala> val newDF = sqlContext.read.parquet("repos_short.parquet")
newDF: DataFrame = [name: string, language: string, fo...]

scala> newDF.show
+----------------+----------+-----+--------------------+
|            name|  language| fork|               owner|
+----------------+----------+-----+--------------------+
|           dotty|     Scala| true|[https://avatars....|
|        frontend|JavaScript| true|[https://avatars....|
|           scala|     Scala| true|[https://avatars....|
|      scala-dist|     Scala| true|[https://avatars....|
|scala.github.com|JavaScript| true|[https://avatars....|
|          scalax|     Scala|false|[https://avatars....|
|            sips|       CSS|false|[https://avatars....|
+----------------+----------+-----+--------------------+

In general, Parquet will be more space-efficient than JSON for storing large collections of objects. Parquet is also much more efficient at retrieving specific columns or rows, if the partition can be inferred from the row. Parquet is thus advantageous over JSON unless you need the output to be human-readable, or de-serializable by an external program.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.165.115