Storing data as Parquet files

Parquet (https://parquet.apache.org/) is rapidly becoming the go-to data storage format in the world of big data because of the distinct advantages it offers:

  • It has a column-based representation of data. This is better represented in a picture, as follows:
    Storing data as Parquet files

    As you can see in the preceding screenshot, Parquet stores data in chunks of rows, say 100 rows. In Parquet terms, these are called RowGroups. Each of these RowGroups has chunks of columns inside them (or column chunks). Column chunks can hold more than a single unit of data for a particular column (as represented in the blue box in the first column). For example. Jai, Suri, and Dhina form a single chunk even though they are composed of three single units of data for Name.

    Another unique feature is that these column chunks (groups of a single column's information) can be read independently. Let's consider the following image:

    Storing data as Parquet files

    We can see that the items of column data are stored next to each other in a sequence. Since our queries are focused on just a few columns (a projection) most of the time and not on the entire table, this storage mechanism enables us to retrieve data much faster than reading the entire row data that is stored and filtering for columns. Also, with Spark's in-memory computations, the memory requirements are reduced in this way.

  • The second advantage is that there is very little that is needed for our transition from the existing data models that we already use to represent the data. While Parquet has its own native object model, we are pretty much free to choose Avro, ProtoBuf, Thrift, and a variety of existing object models, and use an intermediate converter to serialize our data in Parquet. Most of these converters are readily available at the Parquet-MR project (https://github.com/Parquet/parquet-mr).

In this recipe, we'll cover the following steps:

  1. Load a simple CSV file and convert it into a DataFrame.
  2. Save it as a Parquet file.
  3. Install Parquet tools.
  4. Use the tools to inspect the Parquet file.
  5. Enable compression for the Parquet file.

How to do it…

Before we dive into the steps, let's briefly look at our build.sbt file, specifically the library dependencies and Avro settings (which we'll talk about in the following sections):

organization := "com.packt"

name := "chapter3-data-loading-parquet"

scalaVersion := "2.10.4"

val sparkVersion="1.4.1"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.avro" % "avro" % "1.7.7",
  "org.apache.parquet" % "parquet-avro" % "1.8.1",
  "com.twitter" %% "chill-avro" % "0.6.0"
)

resolvers ++= Seq(
  "Apache HBase" at "https://repository.apache.org/content/repositories/releases",
  "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Twitter" at "http://maven.twttr.com/"
)

fork := true

seq( sbtavro.SbtAvro.avroSettings : _*)

(stringType in avroConfig) := "String"

javaSource in sbtavro.SbtAvro.avroConfig <<= (sourceDirectory in Compile)(_ / "java")

Now that we have build.sbt out of the way, let's go ahead and look at the code behind each of the listed steps.

Load a simple CSV file, convert it to case classes, and create a DataFrame from it

We can actually create a DataFrame directly from CSV using the com.databricks/spark-csv file, as we saw in Chapter 2, Getting Started with Apache Spark DataFrames, but for this recipe, we'll just tokenize the CSV and create classes from it. The input CSV has a header column. So, the conversion process involves skipping the first row.

There are just two interesting things that you might notice in the code:

sqlContext.setConf("spark.sql.parquet.binaryAsString","true")

Some Parquet producing systems, such as Impala, binary encode the strings. In order to work around this issue, we set the following configuration, which says that if it sees binary data, it should be treated as a string:

Instead of using sqlContext.createDataFrame, we just use a toDF() on the RDD[Student]. The SQLContext.Implicits object has a number of implicit conversions that help us convert an RDD[T] to a DataFrame directly. The only requirement for us, as expected, is to import the implicits:

  import sqlContext.implicits._

The rest of the code is the same as we saw earlier:

val conf = new SparkConf().setAppName("CaseClassToParquet").setMaster("local[2]")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)

  //Treat binary encoded values as Strings
  sqlContext.setConf("spark.sql.parquet.binaryAsString","true")

  import sqlContext.implicits._

  //Convert each line into Student
  val rddOfStudents = convertCSVToStudents("StudentData.csv", sc)

  //Convert RDD[Student] to a Dataframe using sqlContext.implicits
  val studentDFrame = rddOfStudents.toDF()

The convertCSVToStudents method, which converts each line into a Student object, looks like this:

def convertCSVToStudents(filePath: String, sc: SparkContext): RDD[Student] = {
    val rddOfStudents: RDD[Student] =sc.textFile(filePath).flatMap(line => {
      val data = line.split("\|")
      if (data(0) == "id") None else Some(Student(data(0), data(1), data(2), data(3)))
    })
    rddOfStudents
  }

Save it as a Parquet file

This is just a one-liner once we have the DataFrame. This can be done using either the saveAsParquetFile or the save method. If you wish to save it in a Hive table (https://hive.apache.org/), then there is also a saveAsTable method for you:

//Save DataFrame as Parquet using saveAsParquetFile
studentDFrame.saveAsParquetFile("studentPq.parquet")

//OR

//Save DataFrame as Parquet using the save method
studentDFrame.save("studentPq.parquet", "parquet", SaveMode.Overwrite)

Note

The save method allows the usage of SaveMode, which has the following alternatives: Append, ErrorIfExists, Ignore, or Overwrite.

The save methods create a directory in the location that you specify (here, we simply store it in our project directory). The directory holds the files that represent the serialized data. It is not entirely human readable, but you may notice that the data of a single column is stored together.

Just as we do for the rest of the recipes, let's read the file and sample the data for confirmation:

//Read data for confirmation
  val pqDFrame=sqlContext.parquetFile("studentPq.parquet")
  pqDFrame.show()

The following is the output:

Save it as a Parquet file

Install Parquet tools

Other than using the printSchema method of the DataFrame to inspect the schema, we can use some interesting parquet tools provided as part of the parquet project to get a variety of other information.

Note

The parquet-tools is a subproject of Parquet and is available at https://github.com/Parquet/parquet-mr/tree/master/parquet-tools.

Since Spark 1.4.1 uses Parquet 1.6.0rc3, we'll need to download that version of the tools from the Maven repository. The executables and the JARs can be downloaded as one bundle from https://repo1.maven.org/maven2/com/twitter/parquet-tools/1.6.0rc3/parquet-tools-1.6.0rc3-bin.tar.gz.

Using the tools to inspect the Parquet file

Let's put the tools into action. Specifically, we'll do three things in this step:

  • Display the schema in Parquet format
  • Display the meta information that is stored in Parquet's footer
  • Sample the data using head and cat
  • Displaying the schema: This can be achieved by calling the parquet-tools command with schema and the parquet file as the parameter. As an example, let's print the schema using one of the part files:
    bash-3.2$ parquet-tools-1.6.0rc3/parquet-tools meta part-r-00000-20a8b58c-fe1d-43e7-b148-f874b78eb5ec.gz.parquet
    
    message root {
      optional binary id (UTF8);
      optional binary name (UTF8);
      optional binary phone (UTF8);
      optional binary email (UTF8);
    }
    

    We see that the schema is indeed available in Parquet format and is derived from our case classes.

  • Displaying the meta information of a particular Parquet file: As we saw earlier, meta information is stored in the footer. Let's print it to see it.

    We see that the extra information has the schema that is specific to the data model we used. This information is used when the data is deserialized. The meta parameter of parquet-tools will help achieve this:

    bash-3.2$  parquet-tools-1.6.0rc3/parquet-tools meta part-r-00000-20a8b58c-fe1d-43e7-b148-f874b78eb5ec.gz.parquet
    creator:     parquet-mr version 1.6.0rc3 (build d4d5a07ec9bd262ca1e93c309f1d7d4a74ebda4c)
    extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true, [more]...
    
    file schema: root
    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    id:          OPTIONAL BINARY O:UTF8 R:0 D:1
    name:        OPTIONAL BINARY O:UTF8 R:0 D:1
    phone:       OPTIONAL BINARY O:UTF8 R:0 D:1
    email:       OPTIONAL BINARY O:UTF8 R:0 D:1
    
    row group 1: RC:50 TS:3516
    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    id:           BINARY GZIP DO:0 FPO:4 SZ:140/326/2.33 VC:50 ENC:RLE,BIT_PACKED,PLAIN
    name:         BINARY GZIP DO:0 FPO:144 SZ:313/483/1.54 VC:50 ENC:RLE,BIT_PACKED,PLAIN
    phone:        BINARY GZIP DO:0 FPO:457 SZ:454/961/2.12 VC:50 ENC:RLE,BIT_PACKED,PLAIN
    email:        BINARY GZIP DO:0 FPO:911 SZ:929/1746/1.88 VC:50 ENC:RLE,BIT_PACKED,PLAIN
    
  • Sampling data using head and cat: Let's now have a sneak peek at the first few rows of the data. The head function will help us do that. It accepts an additional -n parameter, where you can specify the number of records to be displayed:
    bash-3.2$ parquet-tools-1.6.0rc3/parquet-tools head -n 2  part-r-00001.parquet
    

The preceding command will display only two rows because of the additional -n 2 parameter.

The following is the output of this command:

id = 1
name = Burke
phone = 1-300-746-8446
email = [email protected]

id = 2
name = Kamal
phone = 1-668-571-5046
email = [email protected]

Optionally, if you wish to display all the records in the file, you can use the cat parameter with the parquet-tools command:

parquet-tools cat part-r-00001.parquet

Enable compression for the Parquet file

As you can see from the meta information, the data is gzipped by default. In order to use Snappy compression, all that we need to do is set a configuration to our SQLContext (actually the SQLConf of SQLContext). There's just one catch with regard to enabling Lempel–Ziv–Oberhumer (LZO) compression—we are required to install native-lzo on all the machines where this data is stored. Otherwise, we get a "native-lzo library not available" error message.

Let's enable Snappy (http://google.github.io/snappy/) compression by passing the configuration parameter of Parquet compression to Snappy:

sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")

After running the program, let's use the parquet-tools meta command to verify it:

parquet-tools meta part-r-00000-aee54b77-288e-44b2-8f36-53b38a489e8d.snappy.parquet
Enable compression for the Parquet file
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.247.125