Columnar formats – Parquet

In this section, we'll be looking at the second schema-based format, Parquet. The following topics will be covered:

  • Saving data in Parquet format
  • Loading Parquet data
  • Testing

This is a columnar format, as the data is stored column-wise and not row-wise, as we saw in the JSON, CSV, plain text, and Avro files.

This is a very interesting and important format for big data processing and for making the process faster. In this section, we will focus on adding Parquet support to Spark, saving the data into the filesystem, reloading it again, and then testing. Parquet is similar to Avro as it gives you a parquet method but this time, it is a slightly different implementation.

In the build.sbt file, for the Avro format, we need to add an external dependency, but for Parquet, we already have that dependency within Spark. So, Parquet is the way to go for Spark because it is inside the standard package.

Let's have a look at the logic that's used in the SaveParquet.scala file for saving and loading Parquet files.

First, we coalesce the two partitions, specify the format, and then specify that we want to save parquet:

package com.tomekl007.chapter_4

import com.databricks.spark.avro._
import com.tomekl007.UserTransaction
import org.apache.spark.sql.SparkSession
import org.scalatest.{BeforeAndAfterEach, FunSuite}

import scala.reflect.io.Path

class SaveParquet extends FunSuite with BeforeAndAfterEach {
val spark = SparkSession.builder().master("local[2]").getOrCreate()

private val FileName = "transactions.parquet"

override def afterEach() {
val path = Path(FileName)
path.deleteRecursively()
}

test("should save and load parquet") {
//given
import spark.sqlContext.implicits._
val rdd = spark.sparkContext
.makeRDD(List(UserTransaction("a", 100), UserTransaction("b", 200)))
.toDF()

//when
rdd.coalesce(2)
.write
.parquet(FileName)

The read method also implements exactly the same method:

    val fromFile = spark.read.parquet(FileName)

fromFile.show()
assert(fromFile.count() == 2)
}

}

Let's begin this test but, before that, we will comment out the following code withing our SaveParquet.scala file to see the structure of the files:

//    override def afterEach() {
// val path = Path(FileName)
// path.deleteRecursively()
// }

A new transactions.parquet folder gets created and we have two parts inside it—part-r-00000 and part-r-00001. This time, however, the format is entirely binary and there is some metadata embedded with it.

We have the metadata embedded and also the amount and userID fields, which are of the string type. The part r-00000 is exactly the same and has the schema embedded. Hence, Parquet is also a schema-based format. When we read the data, we can see that we have the userID and amount columns available.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.74.211