Loading your data

As we have outlined in previous chapters, traditional system engineering commonly adopts a pattern to move the data from its source to its destination, that is, ETL, whereas Spark tends to rely on schema-on-read. As it's important to understand how these concepts relate to schemas and input formats, let's describe this aspect in more detail:

Loading your data

On the face of it, the ETL approach seems to be sensible, and indeed has been implemented by just about every organization that stores and handles data. There are some very popular, feature-rich products out there that perform the ETL task very well - not to mention Apache's open source offering, Apache Camel http://camel.apache.org/etl-example.html.

However, this apparently straightforward approach belies the true effort required to implement even a simple data pipeline. This is because we must ensure that all data complies with a fixed schema before we can use it. For example, if we wanted to ingest some data from a starting directory, the minimal work is as follows:

  1. Ensure we are always looking at the pickup directory.
  2. When data arrives, collect it.
  3. Ensure the data is not missing anything and validate according to a predefined ruleset.
  4. Extract the parts of the data that we are interested in, according to a predefined ruleset.
  5. Transform these selected parts according to a predefined schema.
  6. Load the data to a repository (for example, a database) using the correct versioned schema.
  7. Deal with any failed records.

We can immediately see a number of formatting issues here that must be addressed:

  1. We have a predefined ruleset and, therefore, this must be version controlled. Any mistakes will mean bad data in the end database and a re-ingest of that data through the ETL process to correct it (very time and resource expensive). Any change to the format of the inbound dataset, and this ruleset must be changed.
  2. Any change to the target schema will require very careful management. At the very least, a version control change in the ETL, and possibly even a reprocessing of some or all of the previous data (which could be a very time consuming and expensive backhaul).
  3. Any change to the end repository will result in at least a version control schema change, and perhaps even a new ETL module (again, very time and resource intensive).
  4. Inevitably, there will be some bad data that makes it through to the database. Therefore, an administrator will need set rules to monitor the referential integrity of tables to ensure damage is kept to a minimum and arrange for the re-ingestion of any corrupted data.

If we now consider these issues and massively increase the volume, velocity, variety, and veracity of the data, it is easy to see that our straightforward ETL system has quickly grown into a near unmanageable system. Any formatting, schema, and business rule changes will have a negative impact. In some cases, there may not be enough processor and memory resources to even keep pace, due to all the processing steps required. Data cannot be ingested until all of the ETL steps have been agreed and are in place. In large corporations it can take months to agree schema transforms before any implementation even commences, thus resulting in a large backlog, or even loss of data. All this results in a brittle system that is difficult to change.

Schema agility

To overcome this, schema-on-read encourages us to shift to a very simple principle: apply schema to the data at runtime, as opposed to applying it on load (that is, at ingest). In other words, a schema is applied to the data when it is read in for processing. This simplifies the ETL process somewhat:

Schema agility

Of course, it does not mean you eliminate the transform step entirely. You're simply deferring the act of validation, applying business rules, error handling, ensuring referential integrity, enriching, aggregating, and otherwise inflating the model until the point you are ready to use it. The idea is that, by this point, you should know more about the data and certainly about the way you wish to use it. Therefore, you can use this increased knowledge of the data to effect efficiencies in the loading method. Again, this is a trade-off. What you save in upfront processing costs, you may lose in duplicate processing and potential inconsistency. However, techniques such as persistence, indexing, memorization, and caching can all help here. As mentioned in the previous chapter, this process is commonly known as ELT due to the reversal in the order of processing steps.

One benefit of this approach is that it allows greater freedom to make appropriate decisions about the way you represent and model data for any given use case. For example, there are a variety of ways that data can be structured, formatted, stored, compressed, or serialized, and it makes sense to choose the most appropriate method given the set of specific requirements related to the particular problem you are trying to solve.

One of the most important opportunities that this approach provides is that you can choose how to physically lay out the data, that is, decide on the directory structure where data is kept. It is generally not advised to store all your data in a single directory because, as the number of files grows, it takes punitively longer amounts of time for the underlying filesystem to address them. But, ideally, we want to be able to specify the smallest possible data split to fulfill the functionality and efficiently store and retrieve at the volumes required. Therefore, data should be logically grouped depending upon the analysis that is required and the amount of data that you expect to receive. In other words, data may be divided across directories based upon type, subtype, date, time, or some other relevant property, but it should be ensured that no single directory bears undue burden. Another important point to realize here is that, once the data is landed, it can always be reformatted or reorganized at a later date, whereas, in an ETL paradigm, this is usually far more difficult.

In addition to this, ELT can also have a surprising benefit on change management and version control. For example, if external factors cause the data schema to change, you can simply load different data to a new directory in your data store and use a flexible schema tolerant serialization library, such as Avro or Parquet, which both support schema evolution (we will look at these later in this chapter); or, if the results of a particular job are unsatisfactory, we need only change the internals of that one job before rerunning it. This means that schema changes become something that can be managed on a per analytic basis, rather than on a per feed basis, and the impact of change is better isolated and managed.

By the way, it's worth considering a hybrid approach, particularly useful in streaming use cases, whereby some processing can be done during collection and ingest, and others during runtime. The decision around whether to use ETL or ELT is not necessarily a binary one. Spark provides features that give you control over your data pipelines. In turn, this affords you the flexibility to transform or persist data when it makes sense to do so, rather than adopting a one-size-fits-all approach.

The best way to determine which approach to take is to learn from the actual day-to-day use of a particular dataset and adjust its processing accordingly, identifying bottlenecks and fragility as more experience is gained. There may also be corporate rules levied, such as virus scanning or data security, which will determine a particular route. We'll look more into this at the end of the chapter.

Reality check

As with most things in computing, there's no silver bullet. ELT and schema-on-read will not fix all your data formatting problems, but they are useful tools in the toolbox and, generally speaking, the pros usually outweigh the cons. It is worth noting, however, that there are situations where you can actually introduce difficulties if you're not careful.

In particular, it can be more involved to perform ad hoc analysis on complex data models (as opposed to in databases). For example, in the simple case of extracting a list of all of the names of the cities mentioned in news articles, in a SQL database you could essentially run select CITY from GKG, whereas, in Spark, you first need to understand the data model, parse and validate the data, and then create the relevant table and handle any errors on-the-fly, sometimes each time you run the query.

Again, this is a trade-off. With schema-on-read you lose the built-in data representation and inherent knowledge of a fixed schema, but you gain the flexibility to apply different models or views as required. As usual, Spark provides features designed to assist in exploiting this approach, such as, transformations, DataFrames, SparkSQL, and REPL, and when used properly, they allow you to maximize the benefits of schema-on-read. We'll learn more about this as we go furthur.

GKG ELT

As our NiFi pipeline writes data as is to HDFS, we can take full advantage of schema-on-read and immediately start to use it without having to wait for it to be processed. If you would like to be a bit more advanced, then you could load the data in a splittable and/or zipped format such as bzip2 (native to Spark). Let's take a look at a simple example.

Note

HDFS uses a block system to store data. In order to store and leverage data in the most efficient way, HDFS files should be splittable where possible. If the CSV GDELT files are loaded using TextOutputFormat class, for example, then files larger than the block size will be split across filesize/blocksize blocks. Partial blocks do not occupy a full block size on disk.

By using DataFrames, we can write SQL statements to explore the data or with datasets we can chain fluent methods, but in either case there is some initial preparation required.

The good news is that usually this can be done entirely by Spark, as it supports the transparent loading of data into Datasets via case classes, using Encoders and so most of the time you won't need to worry too much about the inner workings. Indeed, when you have a relatively simple data model, it's usually enough to define a case class, map your data onto it, and convert to a dataset using toDS method. However, in most real-world scenarios, where data models are more complex, you will be required to write your own custom parser. Custom parsers are nothing new in data engineering, but in a schema-on-read setting, they are often required to be used by data scientists, as the interpretation of data is done at runtime and not load time. Here's an example of the use of the custom GKG parser to be found in our repository:

 
import org.apache.spark.sql.functions._      
 
val rdd = rawDS map GdeltParser.toCaseClass    
val ds = rdd.toDS()     
  
// DataFrame-style API 
ds.agg(avg("goldstein")).as("goldstein").show()    
   
// Dataset-style API 
ds.groupBy(_.eventCode).count().show() 

You can seen preceding that, once the data is parsed, it can be used in the full variety of Spark APIs.

If you're more comfortable using SQL, you can define your own schema, register a table, and use SparkSQL. In either approach, you can choose how to load the data based on how it will be used, allowing for more flexibility over which aspects you spend time parsing. For example, the most basic schema for loading GKG is to treat every field as a String, like so:

import org.apache.spark.sql.types._ 
 
val schema = StructType(Array( 
    StructField("GkgRecordId"           , StringType, true), 
    StructField("V21Date"               , StringType, true), 
    StructField("V2SrcCollectionId"     , StringType, true),        
    StructField("V2SrcCmnName"          , StringType, true),  
    StructField("V2DocId"               , StringType, true),  
    StructField("V1Counts"              , StringType, true),  
    StructField("V21Counts"             , StringType, true),  
    StructField("V1Themes"              , StringType, true),  
    StructField("V2Themes"              , StringType, true),  
    StructField("V1Locations"           , StringType, true),  
    StructField("V2Locations"           , StringType, true),  
    StructField("V1Persons"             , StringType, true),  
    StructField("V2Persons"             , StringType, true),  
    StructField("V1Orgs"                , StringType, true),  
    StructField("V2Orgs"                , StringType, true),  
    StructField("V15Tone"               , StringType, true),  
    StructField("V21Dates"              , StringType, true),  
    StructField("V2GCAM"                , StringType, true),  
    StructField("V21ShareImg"           , StringType, true),  
    StructField("V21RelImg"             , StringType, true),  
    StructField("V21SocImage"           , StringType, true), 
    StructField("V21SocVideo"           , StringType, true),  
    StructField("V21Quotations"         , StringType, true),  
    StructField("V21AllNames"           , StringType, true),  
    StructField("V21Amounts"            , StringType, true), 
    StructField("V21TransInfo"          , StringType, true),  
    StructField("V2ExtrasXML"           , StringType, true)   
)) 
 
val filename="path_to_your_gkg_files"  
 
val df = spark 
   .read 
   .option("header", "false") 
   .schema(schema) 
   .option("delimiter", "t") 
   .csv(filename) 
 
df.createOrReplaceTempView("GKG") 

And now you can execute SQL queries, like so:

spark.sql("SELECT V2GCAM FROM GKG LIMIT 5").show 
spark.sql("SELECT AVG(GOLDSTEIN) AS GOLDSTEIN FROM GKG WHERE GOLDSTEIN IS NOT NULL").show() 

With this approach, you can start profiling the data straight away and it's useful for many data engineering tasks. When you're ready, you can choose other elements of the GKG record to expand. We'll see more about this in the next chapter.

Once you have a DataFrame, you can convert it into a Dataset by defining a case class and casting, like so:

val ds = df.as[GdeltEntity] 

Position matters

It's worth noting here that, when loading data from CSV, Spark's schema matching is entirely positional. This means that, when Spark tokenizes a record based on the given separator, it assigns each token to a field in the schema using its position, even if a header is present. Therefore, if a column is omitted in the schema definition, or your dataset changes over time due to data drift or data versioning, you may get a misalignment that Spark will not necessarily warn you about!

Therefore, we recommend doing basic data profiling and data quality checks on a routine basis to mitigate these situations. You can use the built-in functions in DataFrameStatFunctions to assist with this. Some examples are shown as follows:

 
df.describe("V1Themes").show 
 
df.stat.freqItems(Array("V2Persons")).show 
 
df.stat.crosstab("V2Persons","V2Locations").show 

Next, let's explain a great way to put some structure around our code, and also reduce the amount of code written, by using Avro or Parquet.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.79.84