Avro

We have seen how easy it can be to ingest some data and use Spark to analyze it without the need for any traditional ETL tools. While it is very useful to work in an environment where schemas are all but ignored, this is not realistic in the commercial world. There is, however, a good middle ground, which gives us some great advantages over both ETL and unbounded data processing-Avro.

Apache Avro is serialization technology, similar in purpose to Google's protocol buffers. Like many other serialization technologies, Avro uses a schema to describe data, but the key to its usefulness is that it provides the following features:

  • It stores the schema alongside the data. This allows for efficient storage because the schema is only stored once, at the top of the file. It also means that data can be read even if the original class files are no longer available.
  • It supports schema-on-read and schema evolution. This means it can implement different schemas for the reading and writing of data, providing the advantages of schema versioning without the disadvantages of large administrative overhead every time we wish to make data amendments.
  • It is language agnostic. Therefore, it can be used with any tool or technology that allows custom serialization framework. It is particularly useful for writing directly to Hive, for example.

As Avro stores the schema with the enclosed data, it is self-describing. So instead of struggling to read the data because you have no classes, or trying to guess which version of a schema applies, or in the worst case having to throw away the data altogether, we can simply interrogate the Avro file for the schema that the data was written with.

Avro also allows amendments to a schema in the form of additive changes, or appends, that can be accommodated thus making a specific implementation backwards compatible with older data.

As Avro represents data in a binary form, it can be transferred and manipulated more efficiently. Also, it takes up less space on disk due to its inherent compression.

For the reasons stated above, Avro is an incredibly popular serialization format, used by a wide variety of technologies and end-systems, and you will no doubt have cause to use it at some point. Therefore, in the next sections we will demonstrate two different ways to read and write Avro-formatted data. The first is an elegant and simple method that uses a third party, purpose-built library, called spark-avro, and the second is an under-the-covers method, useful for understanding how the mechanics of Avro work.

Spark-Avro method

To address the complexities of implementing Avro, the spark-avro library has been developed. This can be imported in the usual ways, using maven:

 
<dependency> 
    <groupId>com.databricks</groupId> 
    <artifactId>spark-avro_2.11</artifactId> 
    <version>3.1.0</version> 
</dependency> 
 

For this implementation, we will create the Avro schema using a StructType object, transform the input data using an RDD, and create a DataFrame from the two. Finally, the result can be written to file, in Avro format, using the spark-avro library.

The StructType object is a variation on the GkgCoreSchema used above and in Chapter 4, Exploratory Data Analysis, and is constructed as follows:

val GkgSchema = StructType(Array(
   StructField("GkgRecordId", GkgRecordIdStruct, true), 
   StructField("V21Date", LongType, true), 
   StructField("V2SrcCollectionId", StringType, true), 
   StructField("V2SrcCmnName", StringType, true), 
   StructField("V2DocId", StringType, true), 
   StructField("V1Counts", ArrayType(V1CountStruct), true),            
   StructField("V21Counts", ArrayType(V21CountStruct), true),           
   StructField("V1Themes", ArrayType(StringType), true),
   StructField("V2EnhancedThemes",ArrayType(EnhancedThemes),true),    
   StructField("V1Locations", ArrayType(V1LocationStruct), true),         
   StructField("V2Locations", ArrayType(EnhancedLocations), true), 
   StructField("V1Persons", ArrayType(StringType), true), 
   StructField("V2Persons", ArrayType(EnhancedPersonStruct), true),   
   StructField("V1Orgs", ArrayType(StringType), true), 
   StructField("V2Orgs", ArrayType(EnhancedOrgStruct), true),      
   StructField("V1Stone", V1StoneStruct, true), 
   StructField("V21Dates", ArrayType(V21EnhancedDateStruct), true),    
   StructField("V2GCAM", ArrayType(V2GcamStruct), true), 
   StructField("V21ShareImg", StringType, true), 
   StructField("V21RelImg", ArrayType(StringType), true), 
   StructField("V21SocImage", ArrayType(StringType), true), 
   StructField("V21SocVideo", ArrayType(StringType), true), 
   StructField("V21Quotations", ArrayType(QuotationStruct), true), 
   StructField("V21AllNames", ArrayType(V21NameStruct), true), 
   StructField("V21Amounts", ArrayType(V21AmountStruct), true), 
   StructField("V21TransInfo", V21TranslationInfoStruct, true), 
   StructField("V2ExtrasXML", StringType, true) 
 ))

We have used a number of custom StructTypes, which could be specified inline for GkgSchema, but which we have broken out for ease of reading.

For example, GkgRecordIdStruct is:

val GkgRecordIdStruct = StructType(Array(
  StructField("Date", LongType),
  StructField("TransLingual", BooleanType),     
  StructField("NumberInBatch";, IntegerType)
))

Before we use this schema, we must first produce an RDD by parsing the input GDELT data into a Row:

val gdeltRDD = sparkContext.textFile("20160101020000.gkg.csv")

val gdeltRowOfRowsRDD = gdeltRDD.map(_.split("	"))
   .map(attributes =>
      Row(
       createGkgRecordID(attributes(0)),
       attributes(1).toLong,
       createSourceCollectionIdentifier(attributes(2),
       attributes(3),
       attributes(4),
       createV1Counts(attributes(5),
       createV21Counts(attributes(6),
       .
       .
       .
      )
   ))

Here you see a number of custom parsing functions, for instance, createGkgRecordID, that take raw data and contain the logic for reading and interpreting each field. As GKG fields are complex and often contain nested data structures, we need a way to embed them into the Row. To help us out, Spark allows us to treat them as Rows inside Rows. Therefore, we simply write parsing functions that return Row objects, like so:

def createGkgRecordID(str: String): Row = {
   if (str != "") {
     val split = str.split("-")
     if (split(1).length > 1) {
       Row(split(0).toLong, true, split(1).substring(1).toInt)
     }
     else {
       Row(split(0).toLong, false, split(1).toInt)
     }
   }
   else {
     Row(0L, false, 0)
   }
 }

Putting the code together, we see the entire solution in just a few lines:

import org.apache.spark.sql.types._
import com.databricks.spark.avro._
import org.apache.spark.sql.Row
 
val df = spark.createDataFrame(gdeltRowOfRowsRDD, GkgSchema)
 
df.write.avro("/path/to/avro/output")

Reading the Avro files into a DataFrame is similarly simple:

val avroDF = spark
  .read
  .format("com.databricks.spark.avro")
  .load("/path/to/avro/output")

This gives a neat solution for dealing with Avro files, but what's going on under the covers?

Pedagogical method

In order to explain how Avro works, let's take a look at a roll your own solution. In this case, the first thing we need to do is to create an Avro schema for the version or versions of data that we intend to ingest.

There are Avro implementations for several languages, including Java. These implementations allow you to generate bindings for Avro so that you can serialize and deserialize your data objects efficiently. We are going to use a maven plugin to help us automatically compile these bindings using an Avro IDL representation of the GKG schema. The bindings will be in the form of a Java class that we can use later on to help us build Avro objects. Use the following imports in your project:

<dependency>  
   <groupId>org.apache.avro</groupId>  
   <artifactId>avro</artifactId>  
   <version>1.7.7</version>
</dependency>

<plugin>  
   <groupId>org.apache.avro</groupId>  
   <artifactId>avro-maven-plugin</artifactId>  
   <version>1.7.7</version>  
   <executions>    
      <execution>      
         <phase>generate-sources</phase>      
         <goals>        
            <goal>schema</goal>      
         </goals>      
         <configuration>           
           <sourceDirectory>
           ${project.basedir}/src/main/avro/
           </sourceDirectory>          
           <outputDirectory>
              ${project.basedir}/src/main/java/
           </outputDirectory>         
         </configuration>    
      </execution>  
   </executions>
</plugin>

We can now take a look at our Avro IDL schema created from a subset of the available Avro types:

+----------------+-------------+
|       primitive|      complex|
+----------------+-------------+
|null            |       record|
|Boolean         |         enum|
|int             |        array|
|long            |          map|
|float           |        union|
|double          |        fixed|
|bytes           |             |
|string          |             |
+----------------+-------------+

The full Avro IDL schema for GDELT 2.1 can be found in our code repo, but here's a snippet:

@namespace("org.io.gzet.gdelt.gkg")
 protocol Gkg21
 {
 
    @namespace("org.io.gzet.gdelt.gkg.v1")
    record Location
    {
       int locationType = 0;
       union { null , string } fullName = null;
       union { null , string } countryCode = null;
       union { null , string } aDM1Code = null;
       float locationLatitude = 0.0;
       float locationLongitude = 0.0;
       union { null , string } featureId = null;
    }
 
    @namespace("org.io.gzet.gdelt.gkg.v1")
    record Count
    {
       union { null , string } countType = null;
       int count = 0;
       union { null , string } objectType = null;
       union { null , org.io.gzet.gdelt.gkg.v1.Location } v1Location = null;
    }

@namespace("org.io.gzet.gdelt.gkg.v21")
 record Specification
 {
    GkgRecordId gkgRecordId;
    union { null , long } v21Date = null;
    union { null , org.io.gzet.gdelt.gkg.v2.SourceCollectionIdentifier } v2SourceCollectionIdentifier = null;
    union { null , string } v21SourceCommonName = null;
    union { null , string } v2DocumentIdentifier = null;
    union { null , array<org.io.gzet.gdelt.gkg.v1.Count> } v1Counts = null;
    union { null , array<org.io.gzet.gdelt.gkg.v21.Count> } v21Counts = null;
    union { null , array<string> } v1Themes = null;
 }

Avro provides an extensible type system that supports custom types. It's also modular and offers namespaces, so that we can add new types and reuse custom types as the schema evolves. In the preceding example, we can see primitive types extensively used, but also custom objects such as org.io.gzet.gdelt.gkg.v1.Location.

To create Avro files, we can use the following code (full example in our code repository):

  val inputFile = new File("gkg.csv");
  val outputFile = new File("gkg.avro");
 
  val userDatumWriter = new  
       SpecificDatumWriter[Specification](classOf[Specification])
  
  val dataFileWriter = new
       DataFileWriter[Specification](userDatumWriter)
  
  dataFileWriter.create(Specification.getClassSchema, outputFile)

 
  for (line <- Source.fromFile(inputFile).getLines())
      dataFileWriter.append(generateAvro(line))
 
  dataFileWriter.close()


  def generateAvro(line: String): Specification = {
 
    val values = line.split("	",-1)
    if(values.length == 27){
      val specification = Specification.newBuilder()
        .setGkgRecordId(createGkgRecordId(values{0}))
        .setV21Date(values{1}.toLong)   
        .setV2SourceCollectionIdentifier(
          createSourceCollectionIdentifier(values{2}))
        .setV21SourceCommonName(values{3})
        .setV2DocumentIdentifier(values{4})
        .setV1Counts(createV1CountArray(values{5}))
        .setV21Counts(createV21CountArray(values{6}))
        .setV1Themes(createV1Themes(values{7}))
        .setV2EnhancedThemes(createV2EnhancedThemes(values{8}))
        .setV1Locations(createV1LocationsArray(values{9}))
  .
  .
  .
 
   }
 }

The Specification object is created for us once we compile our IDL (using the maven plugin). It contains all of the methods required to access the Avro model, for example setV2EnhancedLocations. We are then left with creating the functions to parse our GKG data; two examples are shown, as follows:

def createSourceCollectionIdentifier(str: String) :    SourceCollectionIdentifier = {
   str.toInt match {
   case 1 => SourceCollectionIdentifier.WEB
   case 2 => SourceCollectionIdentifier.CITATIONONLY
   case 3 => SourceCollectionIdentifier.CORE
   case 4 => SourceCollectionIdentifier.DTIC
   case 5 => SourceCollectionIdentifier.JSTOR
   case 6 => SourceCollectionIdentifier.NONTEXTUALSOURCE
   case _ => SourceCollectionIdentifier.WEB
 }
   }
def createV1LocationsArray(str: String): Array[Location] = {
   val counts = str.split(";")
   counts map(createV1Location(_))
}

This approach creates the required Avro files, but it is shown here to demonstrate how Avro works. As it stands, this code does not operate in parallel and, therefore, should not be used on big data. If we wanted to parallelize it, we could create a custom InputFormat, wrap the raw data into an RDD, and perform the processing on that basis. Fortunately, we don't have to, as spark-avro has already done it for us.

When to perform Avro transformation

In order to make best use of Avro, next, we need to decide when it is best to transform the data. Converting to Avro is a relatively expensive operation, so it should be done at the point when it makes most sense. Once again, it's a tradeoff. This time, it's between a flexible data model supporting unstructured processing, exploratory data analysis, ad hoc querying, and a structured type system. There are two main options to consider:

  1. Convert as late as possible: it is possible to perform Avro conversion in each and every run of a job. There are some obvious drawbacks here, so it's best to consider persisting Avro files at some point, to avoid the recalculation. You could do this lazily upon the first time, but chances are this would get confusing quite quickly. The easier option is to periodically run a batch job over the data at rest. This job's only task would be to create Avro data and write it back to disk. This approach gives us full control over when the conversion jobs are executed. In busy environments, jobs can be scheduled for quiet periods and priority can be allocated on an ad hoc basis. The downside is that we need to know how long the processing is going to take in order to ensure there is enough time for completion. If processing is not completed before the next batched data arrives, then a backlog builds and it can be difficult to catch up.
  2. Convert as early as possible: the alternative approach is to create an ingest pipeline, whereby the incoming data is converted to Avro on the fly (particularly useful in streaming scenarios). By doing this, we are in danger of approaching an ETL-style scenario, so it is really a judgment call as to which approach best suits the specific environment in use at the time.

Now, let's look at a related technology that is used extensively throughout Spark, that is Apache Parquet.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.77.208