Using the Avro data model in Parquet

Parquet is a kind of highly efficient columnar storage, but it is also relatively new. Avro (https://avro.apache.org) is a widely used row-based storage format. This recipe showcases how we can retain the older and flexible Avro schema in our code but still use the Parquet format during storage.

The Spark MR project (yes, the one that has the Parquet tools we saw in the previous recipe) has converters for almost all the popular data formats. These model converters take your format and convert it into Parquet format before causing it to persist.

How to do it…

In this recipe, we'll use the Avro data model and serialize the data in a Parquet file. The recipe involves the following steps:

  1. Create the Avro Model.
  2. Generate Avro objects using the sbt avro plugin.
  3. Construct the RDD of your generated object (StudentAvro) from Students.csv.
  4. Save the RDD[StudentAvro] in a Parquet file.
  5. Read the file back for verification.
  6. Use Parquet-tools to verify.

Creation of the Avro model

The Avro schema is defined using JSON. In our case, we'll just use the same Student.csv as the input file. So, let's code the four fields— id, name, phone, and email—in the schema:

{"namespace": "studentavro.avro",
 "type": "record",
 "name": "StudentAvro",
 "fields": [
     {"name": "id", "type": ["string", "null"]},
     {"name": "name",  "type": ["string", "null"]},
     {"name": "phone", "type": ["string", "null"]},
     {"name": "email", "type": ["string", "null"]}
 ]
}

Probably, you are already familiar with Avro, or you have already understood the schema just by taking a look at it, but let me bore you with some explanation of the schema anyway.

The namespace and name attributes in the JSON translate into our package name and class name in our world, respectively. So, our generated class will have a fully qualified name as studentavro.avro.StudentAvro. The "record" (of the type attribute) is one of the complex types in Avro (http://avro.apache.org/docs/1.7.6/spec.html#schema_complex). Let me rephrase this again. A record roughly translates to classes in Java/Scala. It is at the topmost level in the schema hierarchy. A record can have multiple fields encapsulated inside it, and these fields can be primitives (https://avro.apache.org/docs/1.7.7/spec.html#schema_primitive) or other complex types. The last bit about the type having an array of types is interesting ("type": ["string", "null"]). It just means that the field can be more than one type. In Avro terms, it is called a union.

Now that we are done with the schema, let's save this file with an extension of .avsc. I have saved it as student.avsc in the src/main/avro directory.

Generation of Avro objects using the sbt-avro plugin

The next step is to generate a class from the schema. The reason we stored the avro schema file in the src/main/avro folder is this: we'll be using an sbt-avro plugin (https://github.com/cavorite/sbt-avro) to generate a Java class from the schema. Configuring the plugin is as easy as configuring any other plugin for SBT:

  • Let's add the plugin to project/plugins.sbt:
    addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2")
    
  • Add the default settings of the plugin to our build.sbt:
    seq( sbtavro.SbtAvro.avroSettings : _*)
    
  • Let's generate the Java class now. We can do this by calling sbt avro:generate. You can see the generated Java file at target/scala-2.10/src_managed/main/compiled_avro/studentavro/avro/StudentAvro.java.
    Generation of Avro objects using the sbt-avro plugin
    Generation of Avro objects using the sbt-avro plugin
  • We also need the following library dependencies. Finally, let's perform an SBT compile to compile the class so that the rest of the project picks up the generated Java file:
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % sparkVersion,
      "org.apache.spark" %% "spark-sql" % sparkVersion,
      "org.apache.spark" %% "spark-mllib" % sparkVersion,
      "org.apache.spark" %% "spark-hive" % sparkVersion,
      "org.apache.avro" % "avro" % "1.7.7",
      "org.apache.parquet" % "parquet-avro" % "1.8.1",
      "com.twitter" %% "chill-avro" % "0.6.0"
    )
      sbt compile

Constructing an RDD of our generated object from Students.csv

This step is very similar to the previous recipe in the sense that we use the convertCSVToStudents function to generate an RDD of the StudentAvro object. Also, since this isn't a Scala class and the generated Java object comes up with a builder inside it, we use the builder to construct the class fluently (http://en.wikipedia.org/wiki/Fluent_interface):

val conf = new SparkConf().setAppName("AvroModelToParquet").setMaster("local[2]")

  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
  val rddOfStudents = convertCSVToStudents("StudentData.csv", sc)


//The CSV has a header row.  Zipping with index and skipping the first row
  def convertCSVToStudents(filePath: String, sc: SparkContext): RDD[StudentAvro] = {
    val rddOfStudents: RDD[StudentAvro]=sc.textFile(filePath).flatMap(eachLine => {
      val data = eachLine.split("\|")
      if (data(0) == "id") None
      else Some(StudentAvro.newBuilder()
        .setId(data(0))
        .setName(data(1))
        .setPhone(data(2))
        .setEmail(data(3)).build())
    })
    rddOfStudents
  }

Saving RDD[StudentAvro] in a Parquet file

This is a tricky step and involves multiple substeps. Let's decipher this step backwards. We fall back to RDD[StudentAvro] in this example instead of a DataFrame because DataFrames can be constructed only from an RDD of case classes (or classes that extend Product, as we saw earlier in this chapter) or from RDD[org.apache.spark.sql.Row]. If you prefer to use DataFrames, you can read the CSV as an array of values, and use RowFactory.create for each array of values. Once an RDD[Row] is available, we can use sqlContext.createDataFrame to convert it to a DataFrame:

  • In order to save the RDD as a Hadoop SequenceFile, we can use saveAsNewAPIHadoopFile. A sequence file is simply a text file that holds key-value pairs. We could have chosen one of the Student attributes as a key, but for the sake of it, let's have it as a Void in this example.

    To represent a pair (key-value) in Spark, we use PairRDD (https://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions). Not surprisingly, saveAsNewAPIHadoopFile is available only for PairRDDs. To convert the existing RDD[StudentAvro] to a PairRDD[Void,StudentAvro], we use the map function:

    val pairRddOfStudentsWithNullKey = rddOfStudents.map(each => (null, each))
  • Spark uses Java serialization by default to serialize the RDD to be distributed across the cluster. However, the Avro model doesn't implement the serializable interface, and hence it won't be able to leverage Java serialization. That's no reason for worry, however, because Spark provides another 10x performant serialization mechanism called Kryo. The only downside is that we need to explicitly register our serialization candidates:
      val conf = new SparkConf().setAppName("AvroModelToParquet").setMaster("local[2]")
      conf.set("spark.kryo.registrator", classOf[StudentAvroRegistrator].getName)
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    So, we say using the "spark.serializer" configuration that we intend to use KryoSerializer, and that our registrator is StudentAvroRegistrator. As you may expect, what the Registrator does is register our StudentAvro class as a candidate for Kryo serialization. The twitter-chill project (https://github.com/twitter/chill) provides a nice extension to delegate the Kryo serializer to use the Avro serialization:

    class StudentAvroRegistrator extends KryoRegistrator {
      override def registerClasses(kryo: Kryo) {
        kryo.register(classOf[StudentAvro], AvroSerializer.SpecificRecordBinarySerializer[StudentAvro])
      }
    }
  • The intent of this recipe is to write a Parquet file, but the data model (schema) is Avro. Since we are going to write this down as a sequence file, we'll be using a bunch of Hadoop APIs. The org.apache.hadoop.mapreduce.OutputFormat specifies the output format of the file that we are going to write, and as expected, we use ParquetOutputFormat (this is available in the parquet-hadoop subproject in the parquet-mr project). There are two things that an OutputFormat requires:
    • The WriteSupport class, which knows how to convert the Avro data model to the actual format. This is achieved with the following line:
        ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
      
    • The schema needs to be written to the footer of the Parquet file too. The schema of StudentAvro is accessible by using the getClassSchema function. This line of code achieves that:
        AvroParquetOutputFormat.setSchema(job, StudentAvro.getClassSchema)
      

    Now, what's that job parameter doing here in these two lines of code? The job object is just an instance of org.apache.hadoop.mapreduce.Job:

      val job = new Job()
    

    When we call the setWriteSupportClass and setSchema methods of ParquetOutputFormat and AvroParquetOutputFormat, the resulting configuration is captured inside the JobConf encapsulated inside the Job object. We'll be using this job configuration while saving the data in a sequence file.

  • Finally, we save the file by calling saveAsNewAPIHadoopFile. The save method requires a bunch of parameters, each of which we have already discussed. The first parameter is the filename, followed by the key and the value classes. The fourth is the OutputFormat of the file, and finally comes the job configuration itself:
    pairRddOfStudentsWithNullKey.saveAsNewAPIHadoopFile("studentAvroPq",
        classOf[Void],
        classOf[StudentAvro],
        classOf[AvroParquetOutputFormat],
        job.getConfiguration())

    We saw the entire program in bits and pieces, so for the sake of completion, let's see it completely:

    object ParquetAvroSchemaMain extends App {
    
      val conf = new SparkConf().setAppName("AvroModelToParquet").setMaster("local[2]")
      conf.set("spark.kryo.registrator", classOf[StudentAvroRegistrator].getName)
      conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
      val job = new Job()
    
      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)
      sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
      val rddOfStudents = convertCSVToStudents("StudentData.csv", sc)
    
      ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
      AvroParquetOutputFormat.setSchema(job, StudentAvro.getClassSchema)
    
      val pairRddOfStudentsWithNullKey = rddOfStudents.map(each => (null, each))
    
    pairRddOfStudentsWithNullKey.saveAsNewAPIHadoopFile("studentAvroPq",
        classOf[Void],
        classOf[StudentAvro],
        classOf[AvroParquetOutputFormat],
        job.getConfiguration())
    
      //The CSV has a header row.  Zipping with index and skipping the first row
      def convertCSVToStudents(filePath: String, sc: SparkContext): RDD[StudentAvro] = {
        val rddOfStudents: RDD[StudentAvro]=sc.textFile(filePath).flatMap(eachLine => {
          val data = eachLine.split("\|")
          if (data(0) == "id") None
          else Some(StudentAvro.newBuilder()
            .setId(data(0))
            .setName(data(1))
            .setPhone(data(2))
            .setEmail(data(3)).build())
        })
        rddOfStudents
      }
    
    }
    
    class StudentAvroRegistrator extends KryoRegistrator {
      override def registerClasses(kryo: Kryo) {
        kryo.register(classOf[StudentAvro], AvroSerializer.SpecificRecordBinarySerializer[StudentAvro])
      }
    }

Reading the file back for verification

As always, let's read the file back for confirmation. The function to be called for this is newAPIHadoopFile, which accepts a similar set of parameters as saveAsNewAPIHadoopFile: the name of the file, InputFormat, the key class, the value class, and finally the job configuration. Note that we are using newAPIHadoopFile instead of the previously used the parquetFile method. This is because we are reading from a Hadoop sequence file:

//Reading the file back for confirmation.
  ParquetInputFormat.setReadSupportClass(job, classOf[AvroWriteSupport])
  val readStudentsPair = sc.newAPIHadoopFile("studentAvroPq", classOf[AvroParquetInputFormat[StudentAvro]], classOf[Void], classOf[StudentAvro], job.getConfiguration())
  val justStudentRDD: RDD[StudentAvro] = readStudentsPair.map(_._2)
  val studentsAsString = justStudentRDD.collect().take(5).mkString("
")
  println(studentsAsString)

This is the output:

Reading the file back for verification

Using Parquet tools for verification

We'll also use Parquet tools to confirm that the schema that is stored in the Parquet file is indeed an avro schema:

/Users/Gabriel/Dropbox/arun/ScalaDataAnalysis/git/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc3/parquet-tools meta /Users/Gabriel/Dropbox/arun/ScalaDataAnalysis/Code/scaladataanalysisCB-tower/chapter3-data-loading-parquet/studentAvroPq

Yup! Looks like it is! The extra section in meta does confirm that the avro schema is stored:

creator:     parquet-mr
extra:       parquet.avro.schema = {"type":"record","name":"StudentAvro","namespace":"studentavro.avro","fields":[{"name":"id","type":[{"type":"string","avro.java.string":"Stri [more]...
Using Parquet tools for verification
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.107.193