We have seen how easy it can be to ingest some data and use Spark to analyze it without the need for any traditional ETL tools. While it is very useful to work in an environment where schemas are all but ignored, this is not realistic in the commercial world. There is, however, a good middle ground, which gives us some great advantages over both ETL and unbounded data processing-Avro.
Apache Avro is serialization technology, similar in purpose to Google's protocol buffers. Like many other serialization technologies, Avro uses a schema to describe data, but the key to its usefulness is that it provides the following features:
As Avro stores the schema with the enclosed data, it is self-describing. So instead of struggling to read the data because you have no classes, or trying to guess which version of a schema applies, or in the worst case having to throw away the data altogether, we can simply interrogate the Avro file for the schema that the data was written with.
Avro also allows amendments to a schema in the form of additive changes, or appends, that can be accommodated thus making a specific implementation backwards compatible with older data.
As Avro represents data in a binary form, it can be transferred and manipulated more efficiently. Also, it takes up less space on disk due to its inherent compression.
For the reasons stated above, Avro is an incredibly popular serialization format, used by a wide variety of technologies and end-systems, and you will no doubt have cause to use it at some point. Therefore, in the next sections we will demonstrate two different ways to read and write Avro-formatted data. The first is an elegant and simple method that uses a third party, purpose-built library, called spark-avro
, and the second is an under-the-covers method, useful for understanding how the mechanics of Avro work.
To address the complexities of implementing Avro, the spark-avro
library has been developed. This can be imported in the usual ways, using maven:
<dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.11</artifactId> <version>3.1.0</version> </dependency>
For this implementation, we will create the Avro schema using a StructType
object, transform the input data using an RDD
, and create a DataFrame
from the two. Finally, the result can be written to file, in Avro format, using the spark-avro
library.
The StructType
object is a variation on the GkgCoreSchema
used above and in Chapter 4, Exploratory Data Analysis, and is constructed as follows:
val GkgSchema = StructType(Array( StructField("GkgRecordId", GkgRecordIdStruct, true), StructField("V21Date", LongType, true), StructField("V2SrcCollectionId", StringType, true), StructField("V2SrcCmnName", StringType, true), StructField("V2DocId", StringType, true), StructField("V1Counts", ArrayType(V1CountStruct), true), StructField("V21Counts", ArrayType(V21CountStruct), true), StructField("V1Themes", ArrayType(StringType), true), StructField("V2EnhancedThemes",ArrayType(EnhancedThemes),true), StructField("V1Locations", ArrayType(V1LocationStruct), true), StructField("V2Locations", ArrayType(EnhancedLocations), true), StructField("V1Persons", ArrayType(StringType), true), StructField("V2Persons", ArrayType(EnhancedPersonStruct), true), StructField("V1Orgs", ArrayType(StringType), true), StructField("V2Orgs", ArrayType(EnhancedOrgStruct), true), StructField("V1Stone", V1StoneStruct, true), StructField("V21Dates", ArrayType(V21EnhancedDateStruct), true), StructField("V2GCAM", ArrayType(V2GcamStruct), true), StructField("V21ShareImg", StringType, true), StructField("V21RelImg", ArrayType(StringType), true), StructField("V21SocImage", ArrayType(StringType), true), StructField("V21SocVideo", ArrayType(StringType), true), StructField("V21Quotations", ArrayType(QuotationStruct), true), StructField("V21AllNames", ArrayType(V21NameStruct), true), StructField("V21Amounts", ArrayType(V21AmountStruct), true), StructField("V21TransInfo", V21TranslationInfoStruct, true), StructField("V2ExtrasXML", StringType, true) ))
We have used a number of custom StructTypes
, which could be specified inline for GkgSchema
, but which we have broken out for ease of reading.
For example, GkgRecordIdStruct
is:
val GkgRecordIdStruct = StructType(Array( StructField("Date", LongType), StructField("TransLingual", BooleanType), StructField("NumberInBatch";, IntegerType) ))
Before we use this schema, we must first produce an RDD
by parsing the input GDELT data into a Row
:
val gdeltRDD = sparkContext.textFile("20160101020000.gkg.csv") val gdeltRowOfRowsRDD = gdeltRDD.map(_.split(" ")) .map(attributes => Row( createGkgRecordID(attributes(0)), attributes(1).toLong, createSourceCollectionIdentifier(attributes(2), attributes(3), attributes(4), createV1Counts(attributes(5), createV21Counts(attributes(6), . . . ) ))
Here you see a number of custom parsing functions, for instance, createGkgRecordID
, that take raw data and contain the logic for reading and interpreting each field. As GKG fields are complex and often contain nested data structures, we need a way to embed them into the Row
. To help us out, Spark allows us to treat them as Rows
inside Rows
. Therefore, we simply write parsing functions that return Row
objects, like so:
def createGkgRecordID(str: String): Row = { if (str != "") { val split = str.split("-") if (split(1).length > 1) { Row(split(0).toLong, true, split(1).substring(1).toInt) } else { Row(split(0).toLong, false, split(1).toInt) } } else { Row(0L, false, 0) } }
Putting the code together, we see the entire solution in just a few lines:
import org.apache.spark.sql.types._ import com.databricks.spark.avro._ import org.apache.spark.sql.Row val df = spark.createDataFrame(gdeltRowOfRowsRDD, GkgSchema) df.write.avro("/path/to/avro/output")
Reading the Avro files into a DataFrame
is similarly simple:
val avroDF = spark .read .format("com.databricks.spark.avro") .load("/path/to/avro/output")
This gives a neat solution for dealing with Avro files, but what's going on under the covers?
In order to explain how Avro works, let's take a look at a roll your own solution. In this case, the first thing we need to do is to create an Avro schema for the version or versions of data that we intend to ingest.
There are Avro implementations for several languages, including Java. These implementations allow you to generate bindings for Avro so that you can serialize and deserialize your data objects efficiently. We are going to use a maven plugin to help us automatically compile these bindings using an Avro IDL representation of the GKG schema. The bindings will be in the form of a Java class that we can use later on to help us build Avro objects. Use the following imports in your project:
<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.7</version> </dependency> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.7.7</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory> ${project.basedir}/src/main/avro/ </sourceDirectory> <outputDirectory> ${project.basedir}/src/main/java/ </outputDirectory> </configuration> </execution> </executions> </plugin>
We can now take a look at our Avro IDL schema created from a subset of the available Avro types:
+----------------+-------------+ | primitive| complex| +----------------+-------------+ |null | record| |Boolean | enum| |int | array| |long | map| |float | union| |double | fixed| |bytes | | |string | | +----------------+-------------+
The full Avro IDL schema for GDELT 2.1 can be found in our code repo, but here's a snippet:
@namespace("org.io.gzet.gdelt.gkg") protocol Gkg21 { @namespace("org.io.gzet.gdelt.gkg.v1") record Location { int locationType = 0; union { null , string } fullName = null; union { null , string } countryCode = null; union { null , string } aDM1Code = null; float locationLatitude = 0.0; float locationLongitude = 0.0; union { null , string } featureId = null; } @namespace("org.io.gzet.gdelt.gkg.v1") record Count { union { null , string } countType = null; int count = 0; union { null , string } objectType = null; union { null , org.io.gzet.gdelt.gkg.v1.Location } v1Location = null; } @namespace("org.io.gzet.gdelt.gkg.v21") record Specification { GkgRecordId gkgRecordId; union { null , long } v21Date = null; union { null , org.io.gzet.gdelt.gkg.v2.SourceCollectionIdentifier } v2SourceCollectionIdentifier = null; union { null , string } v21SourceCommonName = null; union { null , string } v2DocumentIdentifier = null; union { null , array<org.io.gzet.gdelt.gkg.v1.Count> } v1Counts = null; union { null , array<org.io.gzet.gdelt.gkg.v21.Count> } v21Counts = null; union { null , array<string> } v1Themes = null; }
Avro provides an extensible type system that supports custom types. It's also modular and offers namespaces, so that we can add new types and reuse custom types as the schema evolves. In the preceding example, we can see primitive types extensively used, but also custom objects such as org.io.gzet.gdelt.gkg.v1.Location
.
To create Avro files, we can use the following code (full example in our code repository):
val inputFile = new File("gkg.csv"); val outputFile = new File("gkg.avro"); val userDatumWriter = new SpecificDatumWriter[Specification](classOf[Specification]) val dataFileWriter = new DataFileWriter[Specification](userDatumWriter) dataFileWriter.create(Specification.getClassSchema, outputFile) for (line <- Source.fromFile(inputFile).getLines()) dataFileWriter.append(generateAvro(line)) dataFileWriter.close() def generateAvro(line: String): Specification = { val values = line.split(" ",-1) if(values.length == 27){ val specification = Specification.newBuilder() .setGkgRecordId(createGkgRecordId(values{0})) .setV21Date(values{1}.toLong) .setV2SourceCollectionIdentifier( createSourceCollectionIdentifier(values{2})) .setV21SourceCommonName(values{3}) .setV2DocumentIdentifier(values{4}) .setV1Counts(createV1CountArray(values{5})) .setV21Counts(createV21CountArray(values{6})) .setV1Themes(createV1Themes(values{7})) .setV2EnhancedThemes(createV2EnhancedThemes(values{8})) .setV1Locations(createV1LocationsArray(values{9})) . . . } }
The Specification
object is created for us once we compile our IDL (using the maven plugin). It contains all of the methods required to access the Avro model, for example setV2EnhancedLocations
. We are then left with creating the functions to parse our GKG data; two examples are shown, as follows:
def createSourceCollectionIdentifier(str: String) : SourceCollectionIdentifier = { str.toInt match { case 1 => SourceCollectionIdentifier.WEB case 2 => SourceCollectionIdentifier.CITATIONONLY case 3 => SourceCollectionIdentifier.CORE case 4 => SourceCollectionIdentifier.DTIC case 5 => SourceCollectionIdentifier.JSTOR case 6 => SourceCollectionIdentifier.NONTEXTUALSOURCE case _ => SourceCollectionIdentifier.WEB } } def createV1LocationsArray(str: String): Array[Location] = { val counts = str.split(";") counts map(createV1Location(_)) }
This approach creates the required Avro files, but it is shown here to demonstrate how Avro works. As it stands, this code does not operate in parallel and, therefore, should not be used on big data. If we wanted to parallelize it, we could create a custom InputFormat
, wrap the raw data into an RDD, and perform the processing on that basis. Fortunately, we don't have to, as spark-avro
has already done it for us.
In order to make best use of Avro, next, we need to decide when it is best to transform the data. Converting to Avro is a relatively expensive operation, so it should be done at the point when it makes most sense. Once again, it's a tradeoff. This time, it's between a flexible data model supporting unstructured processing, exploratory data analysis, ad hoc querying, and a structured type system. There are two main options to consider:
Now, let's look at a related technology that is used extensively throughout Spark, that is Apache Parquet.
3.147.77.208