One of the foundational concepts in The Pragmatic Programmer by David Thomas and Andrew Hunt (Addison-Wesley) is that code should be Easy To Change (ETC),1 a concept I’ll expand on throughout this chapter. I find this advice to be especially relevant when working with data pipelines, where change is a way of life. When developing data pipelines you need to support changes in a multitude of areas: data size, format, and shape; data acquisition and storage; and evolving needs for data transformation and validation, not to mention changes in cloud services, providers, and data processing engines.
With all these vectors for change, even the best-intentioned codebases can turn into spaghetti, making it difficult to modify, extend, and test functionality. This in turn will negatively impact performance, reliability, and cost as more time and resources are required to debug and evolve the pipeline.
This chapter is about helping you design codebases that will be resilient to the shifting sands of data pipeline design, with a focus on developing code that is ETC.
To start, I’ll discuss some common coding environments you encounter in data pipelines and show you how to effectively manage code in each situation based on my experience developing across all these tools.
Then I’ll show you techniques for creating modular codebases, using best practices from software engineering applied to common scenarios when working with data pipelines. To set the stage, I’ll share an experience of working with a codebase that became difficult to change as a product evolved.
Building on these earlier topics, the last technique is configurable design, which consists of a modular codebase that can be dynamically configured. This is a powerful technique that can help you deploy new features quickly without the risk of adding bugs to your codebase.
The code examples in this chapter are primarily to show you how to structure code. Unless otherwise specified, these are not intended to be examples that you would run. You can find many of these code examples in the GitHub repo.
Data pipeline code can be developed in a variety of ways. While software developers may be more accustomed to thinking about code developed in an IDE, data analysts, data scientists, and machine learning (ML) scientists may be more familiar with notebook interfaces, such as Jupyter. There are also web UIs available for some cloud services, such as with AWS Lambda and AWS Glue, which give you the ability to write code and manage connections with other services in the browser.
It can be challenging to manage code across these different environments. From a reliability perspective, code should be source controlled and tested following software development best practices. That can be tricky or impossible for notebooks and web UIs, which can open the door for mistakes and bugs.
To give you a sense of some of these challenges and remediation strategies, I’ll walk you through a real-world example in which I’ve dealt with multiple coding environments in the past. You’ll get a sense of the pros and cons of each, and I’ll share some advice for how to use them effectively.
As it turns out, one of the data pipeline projects I worked on (detailed in “A Lifeline for Medicaid Beneficiaries”) used all three of the environments I just mentioned: notebooks, an IDE, and the AWS Lambda web UI. The pipeline architecture is depicted in Figure 6-1.
A team of analysts developed the transformation logic for the pipeline, which ran against a data warehouse (not pictured). This portion of the pipeline was written in SQL using a Databricks notebook.
The notebook ran on a schedule and saved the transformation results to S3, which triggered a Lambda function on the creation of new objects. The Lambda function, written in Python, launched an EMR cluster that performed a second data transformation step before loading the data into DynamoDB.2
The following sections detail opportunities, challenges, and strategies for working with notebooks and web UIs, using the ETL process in Figure 6-1 as an example.
This example is a nice use case for notebooks, where we were collaborating with another team with the expertise to develop the transformation logic. The analysts could express the transformation code in a language they were familiar with (SQL) in an environment that allowed them to get immediate feedback while testing queries. This also helped us quickly test updates the analysts made without the overhead of a more traditional build and deploy process.
Notebooks are also great for prototyping and sharing ideas early in the design process. The ability to combine code, visualization, and notes is fantastic for conducting exploratory data analysis and figuring out data transformation logic, while also providing context to others for collaboration.
On the drawbacks side, there wasn’t a great way to test the notebook code, which is a downside of using notebooks in general. If you’re working with notebooks, factoring out critical code into a package that can be tested is preferable. Source control for notebooks is evolving, but there are still some shortcomings in this space. This is yet another reason to favor packaging code to import into a notebook.
While packaging code is a good option for languages that support it, you also need a way to deploy packages for this method to work. If packaging isn’t an option, you can consider separating code into another notebook. This gives you some insulation from accidentally introducing a bug when editing the notebook, and you can run some tests like this as well. Jupyter notebooks support importing code from other notebooks, and if you’re working with Databricks notebooks, you can achieve this same behavior with workflows.
Another way you can guard against introducing bugs in notebooks is by using widgets, which are UI elements that you can add to the notebook to parameterize values, similar to how you have parameters for a function. These UI elements, such as text boxes and radio buttons, provide a place to specify values without touching the code within the notebook and enable you to run a notebook for different inputs. Widgets are supported in both Jupyter and Databricks notebooks.
The widgets notebook shows a simple widget example, illustrated in Figure 6-2. Instead of changing the name in the notebook code, you can type a different name in the Text widget.
One place I’ve used widgets is in a notebook that ran query performance analysis, which you’ll see in Chapter 11. The widgets allowed users to specify a time frame and query name to inspect, which enabled the notebook to be used across various use cases without modifying the code.
Services with web UIs, such as AWS Lambda, give you the ability to write code directly in the browser, which can be very powerful for getting things up and running quickly. You can also see the impact of code changes immediately as opposed to waiting on a release cycle, which can be both good and bad. In the best case, you immediately update your system with a fix. In the worst case, you immediately deploy a bug.
Web UIs sometimes keep a short history of the last few code edits, allowing you to roll back to a prior version, but if you overrun this history, you’ll find yourself trying to re-create what was without the benefit of source control. I can tell you from experience that this is not how you want to spend your time.
In the case of AWS Lambda, you have the option to specify test events to validate the Lambda code. While convenient, keep in mind that using this mechanism executes the Lambda function, incurring costs. A better approach is to create a test event and download it for local development, including it as test collateral with your codebase. This gives you the benefit of having the event structure to test code against, without incurring the cost of running the Lambda.
Web UIs also let you specify relationships between different services. For example, in the Lambda UI, you can click an SNS trigger and write the name of the SNS topic that should trigger the Lambda function.
Maybe I’m just anxious, but I see every point-and-click connection in a web UI as a dependency that I won’t remember and my teammates won’t be aware of, creating an opportunity for bugs that are especially hard to track down. If you must use a web UI, my advice is to extract as much configuration and code as you can and put it into source control for better testing, versioning, and visibility.
You may need to put in additional effort to set up deployment processes to make this a reality, as you saw in the example I shared earlier. Use management tools such as AWS Serverless Application Model (SAM) where you can, and keep in mind that when you’re looking for information on how to do this you will likely come across documentation from your CSP suggesting that you pay for more of their services to do it; the AWS article “How to unit test and deploy AWS Glue jobs using AWS CodePipeline” is a case in point. Another option for Lambda specifically is to use GitHub actions to deploy code. This is a lighter-weight solution than using SAM, but it requires that you give GitHub permission to interact with your AWS account.
Along these lines, rather than using the Lambda UI to host our code, we created a Python module. Using SAM, we were able to source-control, test, and deploy the Lambda code as part of our regular build and deploy process. SAM uploaded our code artifacts to S3, which is a supported option for deploying AWS Lambda code.
The last piece of code is the EMR-based data transformation and load into DynamoDB. This code was developed in an IDE. To deploy this code, we zipped it into the same file as the Lambda function so that both the EMR transformation code and the Lambda function were deployed to S3 by SAM. Our team had a strong IaC practice,3 so when a new version of the Lambda Python code was deployed, the corresponding template was updated with the link to the latest artifacts.
The remaining sections focus on code development strategies you can use in any environment where you are using languages such as Python, Scala, or Java. To begin, let’s look at an example of how code can become difficult to change.
To motivate the development strategies in the following sections, let’s look at an example scenario of how code becomes difficult to change. This example is based on experiences I’ve had in the field, which I’ve fictionalized to relate to the HoD example running throughout this book.
With all the data the HoD social media platform has collected, Lou and Sylvia have been able to design a very clever ML engine, kind of like a Magic 8 Ball for birds, where you ask a question and get an answer, though a lot more sophisticated.
At first, HoD users could submit their questions directly to the ML engine. Over time, the HoD team got a sense of the questions users typically asked, such as “How long is a night heron’s neck?” and “What species of heron are present in Fort Lauderdale, Florida?” In response, the team created a handful of stock questions and bolted on some logic to execute the questions to the ML engine. This piece of code became known as the execution engine. Once the questions were processed, the execution engine stored the results in a database, as depicted in Figure 6-3.
You’ll notice that the execution engine directly abuts the ML engine in Figure 6-3. I’ve done this to communicate that these are tightly coupled at the code level. The stock questions were an experimental feature, offered as a sideline to the main HoD product, so it was preferential to get something up and running just to try it out versus spending a lot of time designing a separate system.
The stock questions feature turned out to be very popular, so the HoD team decided to do a fast-pivot to making stock questions the main feature of their platform. They developed an automation that created thousands of questions that were stored in a database. They had some other data that users wanted to incorporate into the ML results, so they added another data source as well. With more questions and more users, it became necessary to diversify how the results were stored, so an additional data sink to the cloud was added. This resulted in the system depicted in Figure 6-4.
You can see that the execution engine is still tightly coupled to the ML engine, but it has grown due to the architectural changes. There are also many more dependencies; instead of a single connection to a database, there are now three database connections to deal with, and cloud storage as well. These dependencies represent more potential points of failure and performance impacts in production, and more interfaces to manage when testing.
In addition, the exponential increase in the number of stock questions put more load on the system, resulting in performance and reliability issues. The tight coupling between the execution and ML engines meant debugging and testing became time consuming and laborious. Anytime changes were made on the ML side, there was a potential to break in the execution engine. When bugs arose, the HoD team had to replicate failures by rerunning the failed questions, adding monitoring and logging after the fact.
If you’ve worked on data pipelines, there’s likely something in this story that you can relate to. In the following sections, I’ll show you some techniques for handling these kinds of changes in your codebases.
One way to develop code that is easy to change is by thinking of code as reusable building blocks. Consider the different aspects of a LEGO brick. LEGOs have a standard connection interface that lets you build stable, interlocking structures. LEGO bricks also come in a variety of shapes, sizes, and colors that you can swap in as you like because of this standardized interface. Many software development principles are fundamentally about designing code to be more modular, like a LEGO brick.
Data processing tends to involve a variety of distinct operations: data acquisition, storage, and transformation. The single-responsibility principle encourages keeping the operations of a given bit of code within a well-defined scope. This helps you isolate different functions to improve modularity, testability, and readability. It also protects code from bugs and reduces your maintenance burden.
Think about this as the different shapes and sizes of LEGO bricks. If you want to build a car out of LEGOs, you can create whatever size, shape, and color of car you like by using individual bricks. If instead of these discrete pieces, LEGO supplied a single car-shaped LEGO, you would be much more limited in how you could customize it.
To get a sense of how to use single responsibility, let’s consider one of the stock questions in the example: “What species of heron are present in Fort Lauderdale, Florida?” The ML engine would provide a set of data that is pretty likely to contain herons near Fort Lauderdale, leaving the execution engine to filter and aggregate the results with the create_aggregate_data
method, which you can find in storage_example.py. Don’t worry about the internals of aggregate_by_species
for now; just know that it returns a dictionary representation of the aggregated data:
def
create_aggregate_data
(
data
):
bird_agg
=
aggregate_by_species
(
data
)
key
=
f
"aggregate_data_
{
datetime
.
utcnow
()
.
isoformat
()
}
.json"
...
s3
.
put_object
(
Bucket
=
"bucket_name"
,
Key
=
key
,
Body
=
bytes
(
json
.
dumps
(
bird_agg
)))
In the preceding code, create_aggregate_data
performs two functions: aggregating data based on species and storing the result in an S3 bucket. An important thing to note is the different interfaces present in this method. You have the data passed in as a variable, which minimizes the dependency on how the data is created, but you also have the interface to S3. There is also a dependency on Boto3, which is the AWS SDK for Python.
Every time you touch a piece of code, there is a potential for creating bugs. With these different interfaces and dependencies, there are a lot of reasons why the create_aggregate_data
method as it is currently written would need to be modified. If you need to change the region the data is stored in, you will have to update this method. There’s also the possibility that you may want to put the aggregated data in another place, such as a database, which will again lead you to modify this code. Finally, when testing, you’ll be writing data to S3, which is undesirable; it incurs unnecessary costs and requires authenticating to create a client in your test environment.
One step you can take to loosen the coupling between the data aggregation and the storage code is to refactor them into separate functions:
def
write_to_s3
(
data
,
key
):
s3
=
boto3
.
client
(
's3'
,
region_name
=
'us-east-1'
)
s3
.
put_object
(
Bucket
=
"bucket_name"
,
Key
=
key
,
Body
=
data
)
def
create_species_agg
(
data
):
bird_agg
=
aggregate_by_species
(
data
)
key
=
f
"aggregate_data_
{
datetime
.
utcnow
()
.
isoformat
()
}
.json"
write_to_s3
(
bytes
(
json
.
dumps
(
bird_agg
)),
key
)
When trying to understand whether your code is doing a good job of following single responsibility, try to state in a sentence what the code is doing. For example, before refactoring create_aggregate_data
, you would have stated “create_aggregate_data aggregates data and writes it to S3.” That and in the sentence is a sign that you might consider factoring out those different actions into separate methods. Compare this with write_to_s3
, which has the sole action of writing to S3.
By encapsulating the Boto3 code in the write_to_s3
method, create_aggregate_data
is now insulated from changes in the implementation details of how the data is uploaded to S3. For example, as more bird data becomes available, it could be desirable to compress the aggregated data to reduce its size, which you can now do without modifying create_aggregate_data
:
def
write_to_s3
(
data
,
key
):
s3
=
boto3
.
client
(
's3'
,
region_name
=
'us-east-1'
)
s3
.
put_object
(
Bucket
=
"bucket_name"
,
Key
=
key
,
Body
=
gzip
.
compress
(
data
))
In addition, you can use this generic function to store other results in S3. For example, in the HoD survey data pipeline in Chapter 1, the bird survey data is enriched with the social media data and stored to S3. You can reuse write_to_s3
to perform this operation as well:
with_social
=
enrich_with_social
(
data
)
write_to_s3
(
with_social
,
"with_social”)
Something you should be conscientious about is overengineering your code, where you try to make code overly generic and flexible in anticipation of future changes. Modular, extensible code is something you should strive for, but there are trade-offs with trying to do too much too early.
Understanding the priorities of the moment and the project phase (from prototype to MVP to production) will help you determine this. For example, if you’re working on a proof of concept or trying to get an idea out in the world, you might be prioritizing getting the system up and running quickly. I’ve worked on pipelines that started as a notebook that enabled us to quickly test out different ideas, in which case we weren’t focused on code design because we didn’t know what we needed yet.
In this same vein, keep in mind that the software principles I refer to in this chapter are guidelines, not rules to be blindly followed. It’s important for you to determine when it makes sense to put these into practice and to what extent.
Implementation details in data pipelines can change considerably in terms of how data is being transformed, how it is being acquired, and how it is being stored. You may have to incorporate new data sources and sinks, or modify or remove existing ones as you saw in the example earlier in this chapter.
If you’re familiar with object-oriented design, you may have heard of dependency inversion, which can be summarized as “depend on abstractions, not concretions.” This means that rather than writing your code depending on a specific implementation, you should write it so that you can depend on an abstract version of what you need. This allows the implementation details to fluctuate without impacting the surrounding logic.
Think about dependency inversion as keeping your options open. To use an analogy, let’s say you’re in the mood for pizza. If you will only accept pizza from Pizza Shack, that’s a concrete dependency. You have more options if pizza from a variety of purveyors is acceptable, be they Pizza Shack, Short King’s Pizza, or Pepperoni Palace. This is an abstract dependency, where your pizza desires can be fulfilled by multiple restaurants, so long as pizza is on the menu.
In Java this abstraction is an interface, and in Scala this would be a trait. Python has abstract base classes (ABCs). An ABC has no implementation details, but it can be used to enforce that specific entities exist in subclasses. Continuing with the storage example, I’ve defined an abstract base class, denoted by the decorator @abc
, which you can see in storage_example.py:
@abc
class
AbstractStorage
:
def
add
(
data
,
id
):
raise
NotImplementedError
This defines an expected interface, namely that an add
method is available, which you can then customize based on the storage medium. You can create an implementation of AbstractStorage
for S3 by subclassing AbstractStorage
, as shown in S3Storage
in cloud_storage.py:
class
S3Storage
(
AbstractStorage
):
def
__init__
(
self
,
bucket
):
self
.
s3
=
boto3
.
client
(
's3'
,
region_name
=
'us-east-1'
)
self
.
bucket
=
bucket
def
add
(
self
,
data
,
id
):
self
.
s3
.
put_object
(
Bucket
=
self
.
bucket
,
Key
=
id
,
Body
=
data
)
Going back to the create_aggregate_data
example, you can encapsulate this method in a class where you can also specify the storage method:
class
ProcessBirdData
:
def
__init__
(
self
,
storage
->
AbstractStorage
):
self
.
storage
=
storage
def
create_aggregate_data
(
self
,
data
):
bird_agg
=
aggregate_by_species
(
data
)
self
.
storage
.
add
(
bytes
(
json
.
dumps
(
bird_agg
)),
"species_data"
)
You may be wondering what’s the point of creating this extra class scaffolding around create_aggregate_data
, as it just seems like more lines of code to do the same thing. At this point, you would be correct! Let’s take a look at what this class structure and dependency inversion can give you.
You can use this approach to deploy a single codebase across multiple cloud providers. This kind of flexibility is nice to have if you aren’t sure what provider you plan to choose long term, or if you want to keep your codebase cloud agnostic.
Consider a pipeline deployment that can run on both AWS and GCS. You can add another implementation of AbstractStorage
specifically for GCS:
class
GCSStorage
(
AbstractStorage
):
def
__init__
(
self
,
bucket
):
self
.
gcs
=
storage
.
Client
()
self
.
bucket
=
self
.
gcs
.
bucket
(
bucket
)
def
add
(
self
,
data
,
id
):
blob
=
self
.
bucket
.
blob
(
id
)
blob
.
upload_from_string
(
json
.
dumps
(
data
))
Notice that the interface is the same as S3Storage
, but the implementation is different.
With classes implementing the GCS and AWS storage, you can create another implementation of AbstractStorage
that chooses the correct storage mechanism based on where the code is deployed:
class
CloudStorage
(
AbstractStorage
):
def
__init__
(
self
,
deploy
):
if
deploy
==
'gcs'
:
self
.
cloud_storage
=
GCSStorage
()
elif
deploy
==
'aws'
:
self
.
cloud_storage
=
S3Storage
()
def
add
(
self
,
data
,
id
):
self
.
cloud_storage
(
data
,
id
)
Let’s put all these pieces together. Assuming you use an environment variable DEPLOYMENT
to denote what CSP you’ve deployed the code to, you can run the bird data processing with:
storage_platform
=
CloudStorage
(
os
.
getenv
(
'DEPLOYMENT'
))
data
=
acquire_data
()
bird_data_process
=
ProcessBirdData
(
storage_platform
)
bird_data_process
.
create_aggregate_data
(
data
)
In the preceding code, storage_platform
will be chosen based on where the code is deployed.
AbstractStorage
provides a predictable interface for the rest of your codebase to work with, which you can use to plug in new data sinks. For example, you can add another subclass that stores data to a database:
class
DatabaseStorage
(
AbstractStorage
):
def
__init__
(
self
,
connection_string
,
model_cls
):
self
.
engine
=
create_engine
(
connection_string
)
self
.
model_cls
=
model_cls
def
add
(
self
,
data
,
id
):
data
[
'id'
]
=
id
with
Session
(
self
.
engine
)
as
session
:
model_inst
=
self
.
model_cls
(
**
data
)
session
.
add
(
model_inst
)
session
.
commit
()
In this case, model_cls
would be the object model that translates the dictionary to a given table in the database:
from
sqlalchemy
import
Column
,
Integer
,
String
from
sqlalchemy.orm
import
declarative_base
Base
=
declarative_base
()
class
AggData
(
Base
):
__tablename__
=
"aggregate_data"
id
=
Column
(
Integer
,
primary_key
=
True
)
description
=
Column
(
String
)
species
=
Column
(
String
)
In this implementation of AbstractStorage
, the __init__
method creates a database connection and the add
method inserts the data using the supplied model. This is a very simple example where the keys in the data dictionary match the attribute names in the model, allowing you to use **
to unpack the values. For more on database mapping, see the SQLAlchemy ORM Quick Start guide.
If you wanted to store the aggregate data to a database instead of the cloud, you would pass DatabaseStorage
to the ProcessBirdData
constructor:
bird_data_process
=
ProcessBirdData
(
DatabaseStorage
(
connection_string
,
AggData
))
bird_data_process
.
create_aggregate_data
(
data
)
You can see how, with a different connection_string
and implementation of the add
method, you could create different implementations of AbstractStorage
for different types of relational databases.
I’ll get into testing in depth in Chapter 7, but another benefit of dependency inversion I want to discuss before leaving this topic is testing. In addition to using the AbstractStorage
abstract base class as an interface to create different storage solutions across CSPs and data sinks, you can create a subclass for testing as with the MockStorage
class in cloud_storage.py:
class
MockStorage
(
AbstractStorage
):
def
__init__
(
self
,
bucket
):
self
.
bucket
=
bucket
def
add
(
self
,
data
,
id
):
(
f
"Would have written
{
data
}
to
{
self
.
bucket
}
at
{
id
}
"
)
return
(
data
,
f
"
{
self
.
bucket
}
/
{
id
}
"
)
With a mock for the storage layer, you don’t have to set up credentials to create an AWS or GCS client locally. Instead, you can implement MockStorage
; however, you need to support your tests. In this example, MockStorage
prints some information and returns the data and the path, giving you the opportunity to write tests like the following:
def
test_bird_data_process
(
test_data
,
expected_data
,
expected_path
):
storage
=
MockStorage
(
"bucket_name"
)
bird_data_process
=
ProcessBirdData
(
storage
)
result
,
object_path
=
bird_data_process
.
create_aggregate_data
(
test_data
)
assert
result
==
expected_data
assert
object_path
==
expected_path
DataFrames both illustrate some of the modular principles I’ve discussed so far and present some challenges. The functional interface of a DataFrame allows you to chain discrete operations, which provides a lot of nice flexibility. You can add, remove, or modify transformations easily, and the read and write functions allow for a lot of customization that you can do without modifying the transformation methods. The code in this section can be found in dataframes.py.
This example is a possible implementation of create_aggregate_data
, where a lowercase copy of the description
field is searched with a regular expression that will match any of the names in the species_list
. The matched species, or empty if no match, will be represented in a new column, species
. This data is grouped by the species
column with the count
field summed, resulting in a count of each species found in input data:
r_species
=
f
".*(
{
'|'
.
join
(
species_list
)
}
).*"
df
=
(
spark
.
read
.
json
(
's3://bird_bucket/bird_data.json'
)
.
withColumn
(
"description_lower"
,
f
.
lower
(
'description'
))
.
withColumn
(
"species"
,
f
.
regexp_extract
(
'description_lower'
,
r_species
,
1
))
.
drop
(
"description_lower"
)
.
groupBy
(
"species"
)
.
agg
({
"count"
:
"sum"
})
.
write
.
mode
(
"overwrite"
)
.
json
(
"s3://bird_bucket/result.json"
)
)
In this example, discrete read, write, and transformation functions are applied to some JSON data. The ability to chain these methods mimics the logical process you walk through when working with data, but the result is not modular. You’ll have to interact with S3 to test this code, and the chained data transformations can make it difficult to isolate bugs. In addition, if you add more transformation logic, you’ll have to touch all the code, from reading the data through writing it out.
Something you can do to improve modularity is to factor out the data transformation code into discrete methods that operate on a DataFrame:
def
apply_species_label
(
species_list
,
df
):
r_species
=
f
".*(
{
'|'
.
join
(
species_list
)
}
).*"
return
(
df
.
withColumn
(
"description_lower"
,
f
.
lower
(
'description'
))
.
withColumn
(
"species"
,
f
.
regexp_extract
(
'description_lower'
,
r_species
,
1
))
.
drop
(
"description_lower"
)
)
def
create_aggregate_data
(
df
):
return
(
df
.
groupBy
(
"species"
)
.
agg
({
"count"
:
"sum"
})
)
These methods take a DataFrame as input and return the updated DataFrame, which is the same interface the PySpark DataFrame provides. You can use the transform
method to chain the discrete methods:
df
=
(
spark
.
read
.
json
(
's3://bird_bucket/bird_data.json'
)
.
transform
(
partial
(
apply_species_label
,
species_list
))
.
transform
(
create_aggregate_data
)
.
write
.
mode
(
"overwrite"
)
.
json
(
"s3://bird_bucket/result.json"
))
You’ll notice that I’ve used partial
here, as the apply_species_label
method takes both the DataFrame and the species_list
as inputs.
One of the biggest improvements that comes from this refactoring is the ability to test each part of a complex data transformation by breaking it into smaller methods. Consider if you had to test the original version of create_aggregate_data
prior to factoring out apply_species_label
and create_aggregate_data
. In this case, the test cases would have to cover both species extraction and aggregation, as shown in Table 6-1.
Test case row | Description value | Count value |
---|---|---|
0 | "" | 2 |
1 | "Nothing to see here" | 4 |
2 | "Night hEron is very cool" | 1 |
3 | "Saw a night heron" | 2 |
4 | "Many hummingbirds" | 0 |
5 | "A blue heron on the lake" | 1 |
You then have to think about two vectors simultaneously: test cases for extracting the species and test cases for aggregating the results. You also have to create expected results, as shown in Table 6-2. How do you know which rows in Table 6-1 contribute to expected result row 0, where species is an empty string? It’s difficult to know with the aggregation.
Expected result row | Species | Sum |
---|---|---|
0 | "" | |
1 | "night heron" | 3 |
2 | "blue heron" | 1 |
On the other hand, factoring out apply_species_label
and create_aggregate_data
enables you to focus on these test cases independently. Without the aggregation, you can define test cases and expected results in the same table for apply_species_label
, as shown in Table 6-3. It’s now abundantly clear which cases match a given species name and which do not.
Description value | Expected value for species |
---|---|
"" | "" |
"Nothing to see here" | "" |
"Night hEron is very cool" | "night heron" |
"Saw a night heron" | "night heron" |
"Many hummingbirds" | " |
"A blue heron on the lake" | "blue heron" |
Here’s an example of testing apply_species_label
with one of the test cases shown in Table 6-3:
def
test_species_label
(
spark_context
):
data
=
[{
'description'
:
"Saw a night heron"
}]
data_df
=
spark_context
.
parallelize
(
data
)
.
toDF
()
species_list
=
[
'night heron'
]
result_df
=
apply_species_label
(
species_list
,
data_df
)
result
=
result_df
.
toPandas
()
.
to_dict
(
'list'
)
assert
result
[
'species'
][
0
]
==
'night heron'
While this example is trivial, consider that production data transformations often include multiple transformations and aggregations, which can quickly balloon into complex testing needs where it’s easy to miss important cases.
Another advantage of modularization is being able to change the transformation logic on its own, without impacting the top-level code that calls apply_species_label
.
Let’s say the ML engine is updated to export a species identification guess in the column ml_species
. A confidence rating is provided in the species_conf
column. If the confidence rating is greater than 0.8, you want to use the identification from the ML engine. Otherwise, you will use the previous implementation with the regular expression search:
def
apply_species_label
(
species_list
,
df
):
return
df
.
withColumn
(
"species"
,
f
.
when
(
f
.
col
(
"species_conf"
))
>
0.8
,
f
.
col
(
"ml_species"
)
f
.
otherwise
(
<
previous
implementation
>
)
At this point, you’ve seen techniques to help you create modular codebases. While these practices on their own will improve your design, you can take this further by leveraging modularity to create dynamic codebases. By using configurations to direct how the pipeline operates, you can add functionality without touching or deploying any code. This shows up in The Pragmatic Programmer as Tip 55: Parameterize Your App Using External Configurations (make sure to also check out the end-of-chapter tip on not overdoing it).4
One of the projects I worked on had the “configuration as a service” concept really nailed down.5 To add a new data source for the pipeline, we just had to add a new configuration. No code updates for the pipeline code were required at all. We could bring a new source online in under an hour.
Configurable code can really supercharge your development process if you get the abstractions right. You can use this technique to dynamically generate Airflow DAGs, which can include dynamically generating tasks at runtime. The Airflow documentation includes examples of using configurations to create DAGs and using dynamic task mapping to create tasks at runtime.
There are two conditions you need to satisfy to use this technique successfully:
The target process you want to parameterize can be expressed as a configuration.
The code can support the level of configuration required.
I’ll take you through an example to show you how to evaluate these conditions and create a configurable pipeline.
Recall the system in Figure 6-4, where thousands of automatically generated stock questions are run using the execution and ML engine. The success of this offering has resulted in numerous groups coming to HoD asking for help with bird identification. Since there seems to be fairly healthy demand, the HoD team decides to start offering heron identification as a service (HIaaS) in which companies bring their own data to be processed by the HoD species extraction and then stored in the customer’s database of choice. The pipeline for this process is depicted in Figure 6-5.
When I’m looking for opportunities to use configuration, I find it helpful to ask whether there is something in the pipeline that could be modeled as a function. The parameters to the function are the elements I can configure.
In the case of the HIaaS pipeline in Figure 6-5, you can see this potential with the “Extract species” step, where the source data and output database are configured based on the customer. The first thing to notice here is that we have some quantities that vary from customer to customer: the source bucket and the destination database. The following is not a real method, but rather a high-level thought experiment of how you could factor out these quantities:
def
run_extract_species
(
bucket
,
database
):
source_data
=
read_from_bucket
(
bucket
)
extracted
=
extract_species
(
source_data
)
store_data
(
extracted
,
database
)
The next step is to determine whether you can build a codebase that can be configured based on the bucket and database information. You’ve seen from “Modular Design with DataFrames” how you can create different implementations of AbstractStorage
for different cloud storage providers and different database types, so it seems pretty likely that, given a bucket name and a database connection string, you can configure these elements.
You don’t want to keep any secret information, such as database connection parameters, in the config, so you’ll need a way to create the database connection string at runtime. One option would be to create a method that generates a database connection string given a database type and customer_id
, where the customer_id
is used to fetch the database credentials from a secret store. Here’s a possible configuration that would give you the parameters you need:
{
"customer_id"
:
"1235"
,
"bucket"
:
"gs://bestco/bird_data"
,
"db"
:
"postgres"
}
You can now rewrite run_extract_species
to directly consume the config, with a new method, get_connection
, that creates the connection string:
def
run_extract_species
(
bucket
,
db
,
customer_id
):
raw
=
get_data
(
bucket
)
extracted
=
extract_species
(
raw
)
connection_string
=
get_connection
(
customer_id
,
db
)
store_data
(
extracted
,
connection_string
)
This is starting to look more like a configurable pipeline process. You are retrieving the raw data using the path to the bucket, running the extract_species
step, and handing off the result to store_data
.
Now, to bring new customers on board, HoD simply has to add a new configuration:
configs
=
[
{
"customer_id"
:
"1235"
,
"bucket"
:
"gs://bestco/bird_data"
,
"db"
:
"postgres"
},
{
"customer_id"
:
"3423"
,
"bucket"
:
"gs://for_the_birds"
,
"db"
:
"mysql"
},
{
"customer_id"
:
"0953"
,
"bucket"
:
"s3://dtop324"
,
"db"
:
"postgres"
},
]
To run the pipeline for these customers, you can simply call run_extract_species
for each line in the config:
for
config
in
configs
:
run_extract_species
(
**
config
)
An important note about this code if you’re not familiar with Airflow or other orchestration tools: each call to run_extract_species
generates a new DAG run. This means each customer extraction runs independently, as opposed to a batch job that runs all customers at once.
Make sure your configuration isn’t making promises your infrastructure can’t deliver on. When you’ve got a configurable setup, it can be easy to forget that each new configuration, while more trivial to add from the development side, will have impacts on the resources in your system. It might be a good idea for the HoD team to do some evaluation of the size of the customer source data before adding a configuration. You can also use the scalable techniques you saw in Chapter 3 to increase capacity as needed.
This example illustrates how to run a method based on a configuration, but this approach is supported in orchestration tools as well. I’ve implemented DAG factories in Airflow using a similar approach. An Astronomer article about dynamic DAG generation in Airflow, while a bit dated, will give you a general idea of different ways you can create dynamic DAGs based on a configuration.
In this chapter, you saw how to best work with different coding environments you encounter when building data pipelines, and how to create modular, configurable codebases to bring features online quickly. This knowledge will help you design pipelines that adapt quickly to changes in data, transformation needs, and external services.
Starting out by looking at different coding environments, you saw how to improve notebook testability by packaging code. This enables you to test and modify important logic using software development best practices, and it insulates this code from being accidentally modified. Even if you’re working with code that is not packageable, like SQL, you can use widgets and multiple notebooks to isolate code from undesirable changes.
When working with code in the browser, such as the web interface provided by AWS Lambda, you can save cloud service costs by packaging code and infrastructure configurations. Rather than using these services in the process of testing and development, you can develop and test code locally, as well as integrate with your CI/CD pipelines. Keeping this code in source control will help you roll back to prior states that can be lost in the short history kept by these interfaces. In addition, your present and future teammates will know exactly where this code is and what it does.
When it comes to how you code, favoring design that creates discrete building blocks will help you build code that is extensible, easy to test, and easy to debug. While you’re coding, think about the single-responsibility principle. If you find code that is trying to do several different things, consider refactoring to narrow the scope. Another advantage of this approach is that it minimizes bugs that can be introduced when modifying code. If you’ve structured code so that each entity has a single responsibility, you need only touch that part of the codebase when making changes, without exposing unrelated code to modification.
This same approach can be applied to DataFrames by factoring out transformation logic into separate methods. You don’t lose the ability to chain these transformations, and you gain more modular and testable code.
Apart from the Magic 8 Ball, no one can predict the future. While modular code is an ideal to strive for, it’s important to gauge when to spend time building reusable code versus when you need to get something glued together quickly.
Developing abstract representations of data sources and sinks results in nimble codebases that can support a variety of implementations. You saw how to use dependency inversion to provide a consistent interface while insulating dependent code from the details. This technique gives you the opportunity to swap in different storage and retrieval mechanisms, support multicloud with a single codebase, and create mock interfaces for testing.
With these modular building blocks, you now have a codebase that can be configured, enabling you to create a variety of topologies without making any code modifications. This can greatly reduce your code maintenance burden and increase the speed with which you can bring new customers, features, and data online.
When evaluating how to parameterize a design, you need to make sure the underlying codebase can be configured as needed and that this configurability can be expressed in a form your code can understand. From here, you saw how configurability can be used to create new pipeline instances and layouts using a single codebase.
Throughout this chapter, I’ve mentioned testability as an essential consideration when developing data pipeline code. The unit-testing examples in this chapter were a warmup for the main event in Chapter 7, where you’ll see how the code structure advice from this chapter enables you to use low-cost, high-coverage unit tests instead of depending on costly system-level tests.
The Medium article “Bring your Jupyter Notebook to life with interactive widgets”
The Python.org proposal “PEP 3119—Introducing Abstract Base Classes”
Astronomer’s “Dynamically generate DAGs in Airflow”
1 Topic 8, The Pragmatic Programmer, 20th Anniversary Edition. If you don’t have this book, order it now from your local bookstore.
2 See the lifeline ingestion architecture image in the Medium article “A zero-downtime serverless ingestion architecture for Medicaid’s first cross-agency API”.
3 IaC is beyond the scope of this book, but if you’re interested, check out Infrastructure as Code, 2nd Edition, by Kief Morris (O’Reilly).
4 The Pragmatic Programmer, 20th Anniversary Edition: Chapter 5, Section 33
5 The Pragmatic Programmer, 20th Anniversary Edition: p. 167
34.231.180.210