In Chapter 1, we explored the idea of deciding whether to cancel a meeting in a data-driven way. We decided on a probabilistic decision criterion: to cancel the meeting with a client if the probability of the flight arriving within 15 minutes of the scheduled arrival time was less than 70%. To model the arrival delay given a variety of attributes about the flight, we need historical data that covers a large number of flights. Historical data that includes this information from 1987 onward is available from the US Bureau of Transportation Statistics (BTS). One of the reasons that the government captures this data is to monitor the fraction of flights by a carrier that are on-time (defined as flights that arrive less than 15 minutes late), so as to be able to hold airlines accountable.1 Because the key use case is to compute on-time performance, the dataset that captures flight delays is called Airline On-time Performance Data. That’s the dataset we will use in this book.
All of the code snippets in this chapter are available in the GitHub repository at https://github.com/GoogleCloudPlatform/data-science-on-gcp/ in the folder 02_ingest. See the last section of Chapter 1 for instructions on how to clone the repository and the README.md file in the 02_ingest directory for instructions on how to do the steps described in this chapter.
For the past 35 years, all major US air carriers have been required to file statistics about each of their domestic flights with the BTS. The data they are required to file includes the scheduled departure and arrival times as well as the actual departure and arrival times. From the scheduled and actual arrival times, the arrival delay associated with each flight can be calculated. Therefore, this dataset can give us the true value or “label” for building a model to predict arrival delay.
The actual departure and arrival times are defined rather precisely, based on when the parking brake of the aircraft is released and when it is later reactivated at the destination. The rules even go as far as to define what happens if the pilot forgets to engage the parking brake—in that case, the time that the passenger door is closed or opened is used instead. Because of the precise nature of the rules, and the fact that they are enforced, we can treat arrival and departure times from all carriers uniformly. Had this not been the case, we would have to dig deeper into the quirks of how each carrier defines “departure” and “arrival,” and do the appropriate translations.2 Good data science begins with such standardized, repeatable, trustable data collection rules; you should use the BTS’s very well-defined data collection rules as a model when creating standards for your own data collection, whether it is log files, web impressions, or sensor data that you are collecting. The airlines report this particular data monthly, and it is collated by the BTS across all carriers and published as a free dataset on the web.
In addition to the scheduled and actual departure and arrival times, the data includes information such as the origin and destination airports, flight numbers, and nonstop distance between the two airports. It is unclear from the documentation whether this distance is the distance taken by the flight in question or whether it is simply a precomputed distance—if the flight needs to go around a thunderstorm, is the distance in the dataset the actual distance traveled by the flight or the great-circle3 distance between the airports? This is something that we will need to examine—it should be easy to ascertain whether the distance between a pair of airports remains the same or changes. In addition, a flight is broken into three parts (Figure 2-1)—taxi-out duration, air time, and taxi-in duration—and all three time intervals are reported.
Before we get started with ingesting data, we need to decide what it is that we have to ingest into our model. There are two potential traps—causality and training-serving skew (I’ll define them shortly). We should take care to avoid these problems during the ingest phase, in order to save us a lot of heartburn later.
The key question boils down to this: what data will we be able to provide to the model at the time that we need to make predictions?
Some of the fields in the dataset could form the inputs to our model to help us predict the arrival delay as a function of these variables. Some, but not all. Why? It should be clear that we cannot use taxi-in duration or actual flight distance because at the time the aircraft is taking off, which is when we want to make our decision on whether to cancel the meeting, we will not know either of these things. The in-air flight time between two airports is not known a priori given that pilots have the ability to speed up or slow down. Thus, even though we have these fields in our historical dataset, we should not use them in our prediction model. This is called a causality constraint.
The causality constraint is one instance of a more general principle. Before using any field as input to a model, we should consider whether the data will be known at the time we want to make the decision. It is not always a matter of logic as with the taxi-in duration. Sometimes, practical considerations such as security (is the decision maker allowed to know this data?), the latency between the time the data is collected and the time it is available to the model, and cost of obtaining the information also play a part in making some data unusable. At the same time, it is possible that approximations might be available for fields that we cannot use because of causality—even though, for example, we cannot use the actual flight distance, we should be able to use the great-circle distance between the airports in our model.
Similarly, we might be able to use the data itself to create approximations for fields that are obviated by the causality constraint. Even though we cannot use the actual taxi-in duration, we can use the mean taxi-in duration of this flight at this airport on previous days, or the mean taxi-in duration of all flights at this airport over the past hour to approximate what the taxi-in duration might be. Over the historical data, this could be a simple batch operation after grouping the data by airport and hour. When predicting in real time, though, this will need to be a moving average on streaming data. Indeed, approximations to unknowable data will be an important part of our models.
A training–serving skew is the condition in which you use a variable that’s computed differently in your training dataset than in the production model. For example, suppose that you train a model with the distance between cities in miles, but when you predict, the distance that you receive as input is actually in kilometers. That is obviously a bad thing and will result in a bad result from the model because the model will be providing predictions based on the distances being 1.6 times their actual value. Although it is obvious in clear-cut cases such as unit mismatches, the same principle (that the training dataset has to reflect what is done to inputs at prediction time) applies to more subtle scenarios as well.
For example, it is important to realize that even though we have the actual taxi-in duration in the data, we cannot use that taxi-in duration in our modeling. Instead, we must approximate the taxi-in duration using time aggregates and use those time aggregates in our training; otherwise, it will result in a training–serving skew. If our model uses taxi-in duration as an input, and that input in real-time prediction is going to be computed as an average of taxi-in durations over the previous hour, we will need to ensure that we also compute the average in the same way during training. We cannot use the recorded taxi-in as it exists in the historical dataset. If we did that, our model will be treating our time averages (which will tend to have the extrema averaged out) as the actual value of taxi-in duration (which in the historical data contained extreme values). If the model, in our training, learns that such extreme values of taxi-in duration are significant, the training–serving skew caused by computing the taxi-in duration in different ways could be as bad as treating kilometers as miles.
As our models become increasingly sophisticated—and more and more of a black box—it will become extremely difficult to troubleshoot errors that are caused by a training–serving skew. This is especially true if the code bases for computing inputs for training and during prediction are different and begin to diverge over time. We will always attempt to design our systems in such a way that the possibilities of a training–serving skew are minimized. In particular, we will gravitate toward solutions in which we can use the same code in training (building a model) as in prediction.
The dataset includes codes for the airports (such as ATL for Atlanta) from which and to which the flight is scheduled to depart and land. Planes might land at an airport other than the one they are scheduled to land at if there are in-flight emergencies or if weather conditions cause a deviation. In addition, the flight might be canceled. It is important for us to ascertain how these circumstances are reflected in the dataset—although they are relatively rare occurrences, our analysis could be adversely affected if we don’t deal with them in a reasonable way. The way we deal with these out-of-the-ordinary situations also must be consistent between training and prediction.
The dataset also includes airline codes (such as AA for American Airlines), but it should be noted that airline codes can change over time (for example, United Airlines and Continental Airlines merged and the combined entity began reporting as United Airlines in 2012). If we use airline codes in our prediction, we will need to cope with these changes in a consistent way, too.
As of August 2021, there were nearly 200 million records in the on-time performance dataset, with records starting in 1987. The last available data was June 2021, indicating that there is more than a month’s delay in updating the dataset – this is going to be important when we automate the process of getting new data.
In this book, our model will use input fields drawn mostly from this dataset, but where feasible and necessary, we will include other datasets such as airport locations and weather. We can download the on-time performance data from the BTS website as comma-separated value (CSV) files. The web interface requires you to filter by geography and period, as illustrated in Figure 2-2. The data itself is offered in two ways: one, with all the data in a zipped file and the other, of just the fields that we select in the form.
This is not the most helpful way to provide data for download. For one thing, the data can be downloaded only one month at a time. For another, going through a web form is pretty error-prone. Imagine that you want to download all of the data for 2015. In that scenario, you’d painstakingly select the fields you want for January 2015, submit the form, and then have to repeat the process for February 2015. If you forgot to select a field in February, that field would be missing, and you wouldn’t know until you began analyzing the data!
Obviously, we can script the download to make it less tiresome and ensure consistency.4 However, it is better to download all the raw data, not just a few selected fields. Why? Won’t the files be larger if we ask for all the fields? Won’t larger files take longer to download?
Yes, the files will be larger if we download all the fields using the static link. But there is a significant drawback to doing preselection. In order to support the interactive capability of selecting fields, the BTS does server-side processing – it extracts the fields we want, creates a custom zip file and makes the zip file available for download. This would make our code reliant on the BTS servers having the necessary uptime and reliability.5 Avoiding the server-side processing should help reduce this dependency.6
An even more salient reason is that best practice in data engineering now is to build ELT (Extract-Load-Transform) pipelines, rather than ETL pipelines. What this means is that we will extract the data from BTS and immediately load the data into a data warehouse rather than rely on the BTS server to do transformation for us before loading it into Google Cloud. This point is important. The recommended modern data architecture is to minimize the preprocessing of data – instead, land all available data as-is into the Enterprise Data Warehouse (EDW) and then carry out whatever transformations are necessary for different use cases (see Figure 2-3). This is called a hub-and-spoke architecture, with the EDW functioning as the hub.
Even though I’m going to download all the fields, it’s worthwhile reading through the column descriptions provided by BTS to learn more about the dataset and get a preliminary idea about what fields are relevant to our problem and whether there are any caveats. For example, Table 2-1 shows three ways in which the airline is recorded. Which of these should we use?
Column Name | Description (copied from BTS website) |
FlightDate | Flight Date (yyyymmdd). |
Reporting_Airline | Unique Carrier Code. When the same code has been used by multiple carriers, a numeric suffix is used for earlier users; for example, PA, PA(1), PA(2). Use this field for analysis across a range of years. |
DOT_ID_Reporting_Airline | An identification number assigned by the US Department of Transportation (DOT) to identify a unique airline (carrier). A unique airline (carrier) is defined as one holding and reporting under the same DOT certificate regardless of its Code, Name, or holding company/corporation. |
IATA_CODE_Reporting_Airline | Assigned by International Air Transport Association (IATA) and commonly used to identify a carrier. Because the same code might have been assigned to different carriers over time, the code is not always unique. For analysis, use the Unique Carrier Code. |
It’s clear that we could use either the Reporting_Airline
or the DOT_ID_Reporting_Airline
since they are both unique. Ideally, we’d use whichever one of these corresponds to the common nomenclature (for example, UA or United Airlines). Fortunately, the BTS provides an “analysis” link for the columns (see Figure 2-4), so we don’t have to wait until we explore the data to make this decision. It turns out that the Reporting_Airline
is what we want – the IATA code consists of the number 19977 for United Airlines whereas the Reporting_Airline
is UA as we would like.
The first thing to do in any real-world problem where we are fortunate enough to be provided documentation is to read it! After reading through the descriptions of the 100-plus fields in the dataset, there are a few fields that appear relevant to the problem of training, predicting, or evaluating flight arrival delay. Table 2-2 presents the fields I shortlisted.
Column Name | Description (copied from BTS website) |
FlightDate | Flight Date (yyyymmdd). |
Reporting_Airline | Unique Carrier Code. When the same code has been used by multiple carriers, a numeric suffix is used for earlier users; for example, PA, PA(1), PA(2). Use this field for analysis across a range of years. |
Origin | Origin Airport. |
Dest | Destination Airport. |
CRSDepTime | Computerized Reservation System (CRS) Departure Time (local time: hhmm). |
DepTime | Actual Departure Time (local time: hhmm). |
DepDelay | Difference in minutes between scheduled and actual departure time. Early departures show negative numbers. |
TaxiOut | taxi-out duration, in minutes. |
WheelsOff | Wheels-Off time (local time: hhmm). |
WheelsOn | Wheels-On Time (local time: hhmm). |
TaxiIn | taxi-in duration, in minutes. |
CRSArrTime | CRS Arrival Time (local time: hhmm). |
ArrTime | Actual Arrival Time (local time: hhmm). |
ArrDelay | Difference in minutes between scheduled and actual arrival time. Early arrivals show negative numbers. |
Cancelled | Cancelled flight indicator (1 = Yes). |
CancellationCode | Specifies the reason for cancellation. |
Diverted | Diverted flight indicator (1 = Yes). |
Distance | Distance between airports (miles). |
There are essentially three options when it comes to processing large datasets (see Table 2-3), and all three are possible on GCP. Which one you use depends on the problem – in this book, we’ll use the third option because it is the most flexible. However, this option requires a bit of preplanning on our part – we will have to store our data in Google Cloud Storage and in Google BigQuery. Let’s see why we choose this.
Option | Performance and cost | Required platform capabilities | How to implement on Google Cloud Platform | Example use case |
Scaling up | Expensive on both compute and storage; fast, but limited to capability of most powerful machine | Very powerful machines; ability to rent machines by the minute; attachable persistent SSDs | Compute Engine with persistent SSDs Vertex AI Notebooks |
Job that requires rereading of data (e.g. training an ML model) |
Scaling out with sharding | High storage costs; inexpensive compute; add machines to achieve desired speed, but limited to ability to preshard the data on a cluster of desired size | Data local to the compute nodes; attachable persistent SSDs | Cloud Dataproc (with Spark) and HDFS | Light compute on a splittable dataset (e.g. creating a search index from thousands of documents). Many data analytics use cases used to be in this segment. |
Scaling out with data in situ | Inexpensive storage, compute; add machines to achieve desired speed | Extremely fast networking, cluster-wide filesystem | Cloud Dataproc + Spark on Cloud Storage, BigQuery, Cloud Dataflow, Vertex AI Training, etc. | Any use case where we can configure datasets so that I/O keeps up with computation. Data analytics use cases are increasingly falling into this segment. |
Even if you are used to downloading data to your laptop for data analysis and development, you should realize that this is a suboptimal solution. Would it be great to directly ingest the BTS files into our data analysis programs without having to go through a step of downloading them? Having a single source of truth has many advantages, ranging from security (providing and denying access) to error correction (no need to worry about stale copies of the data lying around). Of course, the reason we don’t do this is that we’d have to read the BTS data over the internet, and the public internet typically has speeds of 3 to 10 MBps. If you are carrying out analysis on your laptop, accessing data via the internet every time you need it will become a serious bottleneck.
Downloading the data has the benefit that subsequent reads happen on the local drive and this is both inexpensive and fast (see Figure 2-5). For small datasets and short, quick computation, it’s perfectly acceptable to download data to your laptop and do the work there. This doesn’t scale, though. What if our data analysis is very complex or the data is so large that a single laptop is no longer enough? We have two options: scale up or scale out.
One option to deal with larger datasets or more difficult computation jobs is to use a larger, more powerful machine with many CPUs/GPUs, lots of RAM, and many terabytes of drive space. This is called scaling up, and it is a perfectly valid solution. However, such a computer is likely to be quite expensive. Because we are unlikely to be using it 24 hours a day, we might choose to rent an appropriately large computer from a public cloud provider. In addition, the public cloud offers persistent drives that can be shared between multiple instances and whose data is replicated to guard against data loss. In short, then, if you want to do your analysis on one large machine but keep your data permanently in the cloud, a good solution would be to marry a powerful, high-memory Compute Engine instance with a persistent drive, download the data from the external datacenter (BTS’s computer in our case) onto the persistent drive, and start up compute instances on demand, as depicted in Figure 2-6 (cloud prices in Figure 2-6 are estimated monthly charges; actual costs may be higher or lower than the estimate).
When you are done with the analysis, you can delete the Compute Engine instance.7 Provision the smallest persistent drive that adequately holds your data — temporary storage (or caches) during analysis can be made to an attached SSD that is deleted along with the instance, and persistent drives can always be resized if your initial size proves too small. This gives you all the benefits of doing local analysis but with the ability to use a much more powerful machine at a lower cost. I will note here that this recommendation assumes several things: the ability to rent powerful machines by the minute, to attach resizeable persistent drives to compute instances, and to achieve good-enough performance by using solid-state persistent drives. These are true of Google Cloud and other public cloud providers, but are unlikely to be true on-premises.
Scaling up is a common approach whenever you have a job that needs to read the data multiple times. This is quite common when training machine learning models, and so scaling up is a common approach in machine learning, especially machine learning on images and video. Indeed, Google Cloud offers special Compute Engine instances, called Deep Learning VM that come preinstalled with the GPUs and libraries that are needed for machine learning. Jupyter Notebook instances are also frequently scaled up as necessary to fit the job. You’d create a Deep Learning VM, and attach to it a persistent disk containing the training data.
The solution of using a high-memory Compute Engine instance along with persistent drives and caches might be reasonable for jobs that can be done on a single machine, but it doesn’t work for jobs that are bigger than that. Configuring a job into smaller parts so that processing can be carried out on multiple machines is called scaling out . One way to scale out a data processing job is to shard 8 the data and store the pieces on the drives attached to multiple compute instances or persistent drives that will be attached to multiple instances. Then, each compute instance can carry out analysis on a small chunk of data at high speeds—these operations are called the map operations. The results of the analysis on the small chunks can be combined, after some suitable collation, on a different set of compute nodes—these combination operations are called the reduce operations. Together, this model is known as MapReduce . This approach also requires an initial download of the data from the external datacenter to the cloud. In addition, we also need to split the data onto preassigned drives or nodes.
Whenever we need to carry out analysis, we will need to spin up the entire cluster of nodes, reattach the persistent drives and carry out the computation. Fortunately, we don’t need to build the infrastructure to do the sharding or cluster creation ourselves. We could store the data on the Hadoop Distributed File System (HDFS), which will do the sharding for us, spin up a Cloud Dataproc cluster (which has Hadoop, Presto, Spark, etc. preinstalled on a cluster of Compute Engine VMs), and run our analysis job on that cluster. Figure 2-7 presents an overview of this approach.
A MapReduce framework like the Hadoop ecosystem requires data to be presharded. Because the presharded data must be stored on drives that are attached to compute instances, the scheme can be highly wasteful unless all the data happens to get used all the time by those compute instances. In essence, whenever you need to run a job, you ship the code to whichever nodes happen to be storing the data. What you should be doing is to trying to find a machine that has free capacity. Shipping the analysis code to run on storage nodes regardless of their computational load leads to poor efficiency because it is likely that there are long periods during which a node might have nothing to do, and other periods when it is subject to resource contention.
In summary, we have two options to work with large datasets: keep the data as is and scale up by using a large-enough computer, or scale out by sharding the data and shipping code to the nodes that store the data. Both of these options have some drawbacks. Scaling up is subject to the limitations of whatever the most powerful machine available to you can do. Scaling out is subject to inefficiencies of resource allocation. Is there a way to keep data in situ and scale out?
Recall that much of the economics of our case for downloading the data onto nodes on which we can do the compute relied on the slowness of an internet connection as compared to drive speeds—it is because the public internet operates at only 3 to 10 MBps, whereas drives offer two orders of magnitude faster access, that we moved the data to a large Compute Engine instance (scaling up) or sharded it onto persistent drives attached to Compute Engine instances (scaling out).
What if, though, you are operating in an environment in which networking speeds are higher, and files are available to all compute instances at those high speeds? For example, what if you had a job that uses 100,000 servers and those servers could communicate with one another at 1 GBps? This is seriously fast—it is twice the speed of SSD, 10 times the speed of a local hard drive, and 100 times faster than the public internet. What if, in addition, you have a cluster-level filesystem (not node-by-node 9 ) whose metadata is sharded across the datacenter and replicated on-write for durability? Because the total bisection bandwidth of the Jupiter networks in Google’s datacenters is 125,000 GBps 10 and because Google’s next-generation Colossus filesystem operates at the cluster level, this is the scenario that operates if your data is available in Google Cloud Storage and your jobs are running on Compute Engine instances in the same datacenter as the file. At that point, it becomes worthwhile to treat the entire datacenter as a single computer. The speed of the network and the design of the storage make both compute and data fungible resources that can be allocated to whichever part of the datacenter is most free. Scheduling a set of jobs over a single large data center provides much higher utilization than scheduling the same set of jobs over many smaller clusters. This resource allocation can be automatic—there is no need to preshard the data, and if we use an appropriate computation framework (such as BigQuery, Cloud Dataflow, or Vertex AI), we don’t even need to instantiate a Compute Engine instance ourselves. Figure 2-8 presents this framework, where compute and storage are separate.
Therefore, choose between scaling up, scaling out with data sharding, or scaling out with data in situ depending on the problem that you are solving (see Table 2-3).
To carry out our data analysis on the on-time performance dataset, we will need to download the monthly data from the BTS website and then upload it to Google Cloud Storage. Doing this manually will be tedious and error-prone, so let’s script this operation.
How would you script filling out the BTS web form shown in Figure 2-2? First, verify that the website’s terms of use do not bar you from automated downloads! Then, use the Chrome browser’s developer tools to find what web calls the form makes. Once you know that, you can repeat those web calls in a script.
The BTS web form is a simple HTML form with no dynamic behavior. This type of form collects all the user selections into a single GET or POST request. If we can create that same request from a script, we will be able to obtain the data without going through the web form.
We can find out the exact HTTP command sent by the browser after we make our selections on the BTS website. You can do this while on the BTS download website in the Chrome web browser – in the upper-right menu bar of the browser, navigate to the Developer Tools menu, as shown in Figure 2-9.
Now, on the BTS website, select the Prezipped File
option, select 2015
and January
in the dropdowns, and click the Download
button. The Developer tools menu shows us that the browser is now making a POST request for the file: https://transtats.bts.gov/PREZIP/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_
2015_1.zip
It is pretty obvious what the pattern here is. If we issue a HTTP POST for a file with the pattern:
${BASEURL}_${YEAR}_${MONTH}.zip
we should get the data corresponding to a single month. Let’s try it from the command-line of Cloud Shell:
BTS=https://transtats.bts.gov/PREZIP
BASEURL="${BTS}/On_Time_Reporting_Carrier_On_Time_Performance_1987_present"
YEAR=2015
MONTH=3
curl -o temp.zip ${BASEURL}_${YEAR}_${MONTH}.zip
We see the data for March 2015 starting to get downloaded. Once the file is downloaded, we can unzip it:
unzip temp.zip
We then notice that the zip file contains a Comma Separated Values (CSV) containing the flights data for the month of January 2015.
In the data exploration phase, I’ll do most of my processing interactively with Linux command-line tools. I will assume that this is what you are using as well. Adapt the commands as necessary if you are working locally in some other environment (e.g., where I ask you to do a sudo apt-get install
, you might use the appropriate install command for your Linux-like environment). When we have settled on the processing to be done, we’ll look at how to make this more automated.
Instead of calling the downloaded file temp.zip
, let’s call it 201501.zip
and place it into a temporary directory. To pad the month 1 to be 01, we can use the printf
command in bash:13
MONTH2=$(printf "%02d" $MONTH)
To create a temporary directory, we can use the Linux command mktemp:
TMPDIR=$(mktemp -d)
Then, to download the file to the temporary directory, we can do:
ZIPFILE=${TMPDIR}/${YEAR}_${MONTH2}.zip curl -o $ZIPFILE ${BASEURL}_${YEAR}_${MONTH}.zip
Now, we can unzip the file, extract the CSV file to the current directory (./) and blow out the remaining contents of the zip file:
unzip -d $TMPDIR $ZIPFILE mv $TMPDIR/*.csv ./${YEAR}${MONTH2}.csv rm -rf $TMPDIR
I put the above commands into a file called download.sh and then in the script ingest.sh
, I call it from within a for
loop:
for MONTH in `seq 1 12`; do bash download.sh $YEAR $MONTH done
On running this, we get a set of CSV files, one for each month in 2015 (see Figure 2-10).
The complete download script is on GitHub at https://github.com/GoogleCloudPlatform/data-science-on-gcp/tree/master/02_ingest —if you want to follow along with me, perform these steps:
Go to https://console.cloud.google.com
On the top strip, activate Cloud Shell using the button shown in Figure 2-10.
In Cloud Shell, type the following:
git clone https://github.com/GoogleCloudPlatform/data-science-on-gcp/
This downloads the GitHub code to your CloudShell home directory.
Navigate into the flights folder:
cd data-science-on-gcp
Make a new directory to hold the data, and then change into that directory:
mkdir data cd data
Run the code to download the files:
for MONTH in `seq 1 12`; do bash download.sh 2015 $MONTH done
When the script completes, look at the downloaded ZIP files, shown in Figure 2-11:
ls -lrt
This looks quite reasonable—all the files have different sizes and the sizes are robust enough that one would assume they are not just error messages.
At this point, I have 12 CSV files. Let’s look at the first two lines of one of them to ensure the data matches what we think it ought to be:
head -2 201503.csv
The result is shown in Figure 2-12.
There is a header in each CSV file, and the second line looks like data. Some of the fields are enclosed by quotes (perhaps in case the strings themselves have commas), and there are some fields that are missing (there is nothing between successive commas towards the end of the line). There seems to be a pesky extra comma at the end as well.
How many fields are there? Because the second line doesn’t have any commas between the quotes, we can check using:
head -2 201503.csv | tail -1 | sed 's/,/ /g' | wc -w
The number of words is 81, so there are 81 columns (remember there’s a comma at the end of the line). Here’s how the command works. It first gets the first two lines of the data file (with head -2
), and the last line of that (with tail -1
) so that we are looking at the second line of 201503.csv. Then, we replace all the commas by spaces and count the number of words with wc -w
.
How much data is there? A quick shell command (wc
for wordcount, with an -l
[lowercase letter L] to display only the line count) informs us that there are between 43,000 and 52,000 flights per month:
$ wc -l *.csv 469969 201501.csv 429192 201502.csv 504313 201503.csv 485152 201504.csv 496994 201505.csv 503898 201506.csv 520719 201507.csv 510537 201508.csv 464947 201509.csv 486166 201510.csv 467973 201511.csv 479231 201512.csv 5819091 total
This adds up to nearly six million flights in 2015! The slowness of this command should tell us that any kind of analysis that involves reading all the data is going to be quite cumbersome. You can repeat this for other years (2016-2019) but let’s wait until we have the whole process complete for one year before we add more years.
You might have realized by now that knowing a little Unix scripting can come in very handy at this initial stage of data analysis.
For durability of this raw dataset, let’s upload it to Google Cloud Storage. To do that, you first need to create a bucket, essentially a namespace for Binary Large Objects (blobs) stored in Cloud Storage that you typically want to treat similarly from a permissions perspective. You can create a bucket from the Google Cloud Platform Console. For reasons that we will talk about shortly, make the bucket a single-region bucket.
Bucket names must be globally unique (i.e., unique not just within your project or organization, but across Google Cloud Platform). This means that bucket names are globally knowable even if the contents of the bucket are not accessible. This can be problematic. For example, if you created a bucket named acme_gizmo
, a competitor might later try to create a bucket also named acme_gizmo
, but fail because the name already exists. This failure can alert your competitor to the possibility that Acme Corp. is developing a new Gizmo. It might seem like it would take Sherlock Holmes–like powers of deduction to arrive at this conclusion, but it’s simply best that you avoid revealing sensitive information in bucket names. A common pattern to create unique bucket names is to use suffixes on the project ID. Project IDs are globally unique,14 and thus a bucket name such as <projectid>-dsongcp
will also tend to be unique. In my case, my project ID is cloud-training-demos
and my bucket name is cloud-training-demos-ml
.
Cloud Storage will also serve as the staging ground to many of the GCP tools and enable collaborative sharing of the data with our colleagues. In my case, to upload the files to Cloud Storage, I type the following:
gsutil -m cp *.csv gs://cloud-training-demos-ml/flights/raw/
This uploads the files to Cloud Storage to my bucket cloud-training-demos-ml
in a multithreaded manner (-m
) and makes me the owner. If you are working locally, another way to upload the files would be to use the Cloud Platform Console.
It is better to keep these as separate files instead of concatenating them into a single large file because Cloud Storage is a blob store, and not a regular filesystem. In particular, it is not possible to append to a file on Cloud Storage; you can only replace it. Therefore, although concatenating all 12 files into a single file containing the entire year of data will work for this batch dataset, it won’t work as well if we want to later add to the dataset one month at a time, as new data become available. Secondly, because Cloud Storage is blob storage, storing the files separately will permit us to more easily process parts of the entire archive (for example, only the summer months) without having to build in slicing into our data processing pipeline. Thirdly, it is generally a good idea to keep ingested data in as raw a form as possible.
It is preferable that this bucket to which we upload the data is a single-region bucket. There are three reasons: one is that we will create Compute Engine instances in the same region as the bucket and access it only from this one region. A multi-region bucket would be overkill because we don’t need global availability. Second, a single-region bucket is less expensive than a multi-region one. Third, single-region buckets are optimized for high throughput whereas multi-region buckets are optimized for edge latency (think data analytics vs. web traffic). All three of the above factors point to using single region buckets for data analytics and machine learning.
Note that both single-region and multi-region buckets in Google Cloud Platform offer strong consistency, so this does not seem like a consideration to choose one over the other. However, the speed differences inherent in being able to offer strong consistency on global buckets points to using single-region buckets if you can. What exactly is strong versus eventual consistency, and why does it matter? Suppose that a worker in a distributed application updates a piece of data and another worker views that piece of data immediately afterward. Does the second worker always see the updated value? Then, what you have is strong consistency . If, on the other hand, there could be a potential lag between an update and availability (i.e., if different viewers can see potentially different values of the data at the same instant in time), what you have is eventual consistency . Eventually, all viewers of the data will see the updated value, but that lag will be different for different viewers. Strong consistency is an implicit assumption that is made in a number of programming paradigms. However, to achieve strong consistency, we have to make compromises on scalability and performance (this is called Brewer’s theorem ). For example, we might need to lock readers out of the data while it is being updated so that simultaneous readers always see a consistent and correct value.
Brewer’s theorem, also called the CAP theorem, states that no computer system can simultaneously guarantee consistency, availability, and partition resilience. Consistency is the guarantee that every reader sees the latest written information. Availability is the guarantee that a response is sent to every request (regardless of whether it is the most current information or not). Partition resilience is the guarantee that the system continues to operate even if the network connecting readers, writers, and storage drops an arbitrary number of messages. Since network failures are a fact of life in distributed systems, the CAP theorem essentially says that you need to choose between consistency and availability. Cloud Spanner doesn’t change this: it essentially makes choices during partitioning—Cloud Spanner is always consistent and achieves five-9s (but not perfect) availability despite operating over a wide area. For more details, see http://bit.ly/2Abs7D8 and neither do multi-regional buckets.
If you need the performance of a regional bucket, but need to be tolerant to failure (for example, you want to be able to carry out your workload even if a region goes down), there are two options: eventual consistency and dual-region buckets. As an example of eventual consistency consider how internet Domain Name System (DNS) servers cache values and have their values replicated across many DNS servers all over the internet. If a DNS value is updated, it takes some time for this modified value to become replicated at every DNS server. Eventually, this does happen, though. Having a centralized DNS server that is locked whenever any DNS value is modified would have led to an extremely brittle system. Because the DNS system is based on eventual consistency, it is highly available and extremely scalable, enabling name lookups for/to millions of internet-capable devices. The other option is to have a dual-region bucket in a multi-region location, so that the metadata remains the same. If, for whatever reason one region is not available for analytics, computation fails over to the other region in a multi-region location (US, EU, Asia). Dual-region buckets are more expensive than either single-region buckets or multi-region buckets, but offer both high-performance and reliability.
This being public data, I will ensure that my colleagues can use this data without having to wait on me:
gsutil acl ch -R -g google.com:R gs://cloud-training-demos-ml/flights/raw/
That changes the access control list (acl
) recursively (-R
), applying to the group google.com read permission (:R
) on everything starting from the Cloud Storage URL supplied. Had there been sensitive information in the dataset, I would have to be more careful, of course. We’ll discuss fine-grained security, by providing views with different columns to different roles in my organization, when we talk about putting the data in BigQuery. We’ll also discuss information leakage when information about the flights that people take can leak from the predictions we make on their behalf and how to guard against this when we talk about machine learning predictions.
On Google Cloud, the best place for structured and semi-structured data is BigQuery, a serverless data warehouse and SQL engine.
Most relational database systems, whether commercial or open source, are row oriented in that the data is stored row by row. This makes it easy to append new rows of data to the database and allows for features such as row-level locking when updating the value of a row. The drawback is that queries that involve table scans (i.e., any aggregation that requires reading every row) can be expensive. Indexing counteracts this expense by creating a lookup table to map rows to column values, so that SELECT
s that involve indexed columns do not have to load unnecessary rows from storage into memory. If you can rely on indexes for fast lookup of your data, a traditional Relational Database Management System (RDBMS) works well. For example, if your queries tend to come from software applications, you will know the queries that will come in and can create the appropriate indexes beforehand. This is not an option for use cases like business intelligence for which human users are writing ad hoc queries; therefore, a different architecture is needed.
BigQuery, on the other hand, is a columnar database—data is stored column by column and each column’s data is stored in a highly efficient compressed format that enables fast querying. Because of the way data is stored, many common queries can be carried out such that the query processing time is linear on the size of the relevant data. For applications such as data warehousing and business intelligence for which the predominant operations are read-only SELECT
queries requiring full table scans, columnar databases are a better fit. BigQuery, for example, can scan terabytes of data in a matter of seconds. The trade-off is that INSERT
, UPDATE
, and DELETE
, although possible in BigQuery, are significantly more expensive to process than SELECT
statements. BigQuery is tuned toward analytics use cases.
BigQuery is serverless, so you don’t actually spin up a BigQuery server in your project. Instead, you simply submit a SQL query, and it is executed on the cloud. Queries that you submit to BigQuery are executed on a large number of compute nodes (called slots) in parallel. These slots do not need to be statically allocated beforehand—instead, they are “always on” available on demand, and scale to the size of your job. Because data is in situ and not sharded (not broken into small chunks that are attached to individual compute instances), the total power of the datacenter can be brought to bear on the problem. Because these resources are elastic and used only for the duration of the query, BigQuery is more powerful and less expensive than a statically preallocated cluster because preallocated clusters will typically be provisioned for the average use case—BigQuery can bring more resources to bear on the above-average computational jobs and utilize fewer resources for below-average ones.
In addition, because you don’t need to reserve any compute resources for your data when you are not querying your data, it is extremely cost effective to just keep your data in BigQuery (you’ll pay for storage, but storage is inexpensive). Whenever you do need to query the data, the data is immediately available—you can query it without the need to start project-specific compute resources. This on-demand, autoscaling of compute resources is incredibly liberating.
In summary, BigQuery is a columnar database, making it particularly effective for read-only queries that process all of the data. Because it is serverless, can autoscale to thousands of compute nodes, and doesn’t require clusters to be preallocated, it is also very powerful and quite inexpensive.
Although it is possible to ingest files from on-premises hardware directly into BigQuery using the
bq command-line tool
that comes with the Google Cloud SDK (
gcloud
), you should use that capability only for small datasets. To retrieve data from outside Google Cloud Platform to BigQuery, it is preferable to first load it into Cloud Storage and use Cloud Storage as the staging ground for BigQuery, as demonstrated in Figure 2-13.
For larger files, it is better to ingest the files into Cloud Storage using
gsutil
first because
gsutil
takes advantage of multithreaded, resumable uploads and is better suited to the public internet. Fortunately, we already have the flights CSV files in Cloud Storage!
When should you save your data in Cloud Storage, and when should you store it in BigQuery? First, if the data is not tabular-like (that is: images, videos, and other arbitrary file types), then Google Cloud Storage (GCS) is the right choice. For tabular-like data, the answer boils down to what you want to do with the data and the kinds of analyses you want to perform. If you’ll mostly be running custom code that expects to read plain files, or your analysis involves reading the entire dataset, use Cloud Storage. On the other hand, if your desired access pattern is to run interactive SQL queries on the data, store your data in BigQuery. In the pre-cloud world, if you would use flat files, use Cloud Storage. If you’d put the data in a database, put it in BigQuery.
The first step to ingest data into BigQuery is to create a BigQuery dataset – a dataset is a container for tables. You can have multiple datasets within a project. Go to the web console at https://console.cloud.google.com/bigquery and choose the Create Dataset option. Then, create a dataset called dsongcp.
You can also do this from the command-line:
bq mk dsongcp
Datasets in BigQuery are mostly just an organizational convenience—tables are where data resides and it is the columns of the table that dictate the queries we write. Besides providing a way to organize tables, though, datasets also serve as the access control point. You can provide view or edit access only at the project or dataset level, not at the table level. Cloud Identity Access Management (Cloud IAM) on Google Cloud Platform provides a mechanism to control who can carry out what actions on which resource (Figure 2-14).
The “who” can be specified in terms of an individual user (identified by his Google account such as a gmail.com address or company email address if the company is a GSuite customer), a Google Group (i.e., all current members of the group), or a GSuite domain (anyone with a Google account in that domain). Google groups and GSuite domains provide a convenient mechanism for aggregating a number of users and providing similar access to all of them.
In addition, different logical parts of an application can be assigned separate identities (linked to email addresses) called service accounts. Service accounts are a very powerful concept because they allow different parts of a codebase to have permissions that are independent of the access level of the person running that application. For example, you might want an application to be able to query a table but not delete it even if the developer who created the application and the person running the application have that authority.
You should use service accounts with care for scenarios in which audit records are mandatory. Providing access at the Google Groups level provides more of an audit trail: because Google groups don’t have login credentials (only individual users do), the user who made a request or action is always recorded, even if their access is provided at the level of a Google Group or GSuite domain. However, service accounts are themselves login credentials, and so the audit trail turns cold if you provide access to service accounts—you will no longer know which user initiated the application request unless that application in turn logs this information. This is something to keep in mind when granting access to service accounts. Try to avoid providing service account access to resources that require auditability. If you do provide service account access, you should ensure that the application to which you have provided access itself provides the necessary audit trail by keeping track of the user on behalf of whom it is executing the request. The same considerations apply to service accounts that are part of Google Groups or domains. Because audit trails go cold with service accounts,16 you should restrict Google Groups and GSuite domains to only human users and service accounts that belong to applications that provide any necessary legal auditability guarantees.
Creating single-user projects is another way to ensure that service accounts map cleanly to users, but this can lead to significant administrative overhead associated with shared resources and departing personnel. Essentially, you would create a project that is billed to the same company billing account, but each individual user would have her own project in which she works. You can use the gcloud
command to script the creation of such single-user projects in which the user in question is an editor (not an owner).17
In addition to specific users, groups, domains, and service accounts, there are two wildcard options available. Access can be provided to allAuthenticatedUsers
, in which case anyone authenticated with either a Google account or a service account is provided access. Because allAuthenticatedUsers
includes service accounts, it should not be used for resources for which a clear audit trail is required. The other wildcard option is to provide access to allUsers
, in which case anyone on the internet has access—a common use case for this is to provide highly available static web resources by storing them on Cloud Storage. Be careful about doing this indiscriminately—egress of data from Google Cloud Platform is not free, so you will pay for the bandwidth consumed by the download of your cloud-hosted datasets.
The “what” actions depend on the resource access which is being controlled. The resources themselves fall into a policy hierarchy.
Policies can be specified at an organization level (i.e., to all projects in the organization), at the project level (i.e., to all resources in the project), or at the resource level (i.e., to a Compute Engine instance or a BigQuery dataset). As Figure 2-15 shows, policies specified at higher levels are inherited at lower levels, and the policy in effect is the union of all the permissions granted—there is no way to restrict some access to a dataset to a user who has that access inherited from the project level. Moving a project from one organization to another automatically updates that project’s Cloud IAM policy and ends up affecting all the resources owned by that project.
What type of actions can be carried out depends on the resource in question. Before Cloud IAM was introduced on the Google Cloud Platform, there were only three roles: owner, editor, and viewer/reader for all resources. Cloud IAM brought with it much finer-grained roles, but the original three roles were grandfathered in as primitive roles. Table 2-4 lists some of the roles that are possible for BigQuery datasets.
Role | Capabilities | Inherits from |
Project Viewer | Execute a query List datasets |
|
Project Editor | Create a new dataset | Project Viewer |
Project Owner | List/delete datasets View jobs run by other project users |
Project Editor |
bigquery.user | Execute a query List datasets |
|
bigquery.dataViewer | Read, query, copy, export tables in the dataset | |
bigquery.dataEditor | Append, load data into tables in the dataset | Project Editor bigquery.dataViewer |
bigquery.dataOwner | Update, delete on tables in dataset | Project Owner bigquery.dataEditor |
bigquery.admin | All |
We can load the data directly into BigQuery’s native storage using the command line utility bq that comes with the gcloud SDK :
bq load --autodetect --source_format=CSV --skip_leading_rows=1 dsongcp.flights_auto gs://${BUCKET}/flights/raw/201501.csv
Here, we are asking BigQuery to autodetect the schema from the CSV file and loading the January data into a table named flights_auto (if you are following along with me, make sure to change the bucket to reflect the bucket that your files are in).
If you now go to the BigQuery web console ( https://console.cloud.google.com/bigquery ) and examine the dataset dsongcp, you will see that there is a table named flights_auto in it. You can examine the autodetected schema and preview the contents of the table.
We can try querying the data to find the average departure and arrival delays at the busiest airports:
SELECT ORIGIN, AVG(DEPDELAY) AS dep_delay, AVG(ARRDELAY) AS arr_delay, COUNT(ARRDELAY) AS num_flights FROM dsongcp.flights_auto GROUP BY ORIGIN ORDER BY num_flights DESC LIMIT 10
The result (see Table 2-5) starts with Atlanta (ATL), Dallas (DFW), and Chicago (ORD) which is what we would expect.
Row | ORIGIN | dep_delay | arr_delay | num_flights | |
1 | ATL | 7.265885087329549 | 1.0802479706819135 | 29197 | |
2 | DFW | 11.761812240572308 | 9.37162730937924 | 22571 | |
3 | ORD | 19.96205128205128 | 17.016131923283645 | 22316 | |
4 | LAX | 7.476340878516738 | 5.542057719380547 | 17048 | |
5 | DEN | 15.506798076352176 | 11.842324888226543 | 16775 | |
6 | IAH | 9.07378596782721 | 5.353498597528596 | 13191 | |
7 | PHX | 8.066722908198505 | 6.197786998616902 | 13014 | |
8 | SFO | 10.328127477406069 | 9.038424821002382 | 12570 | |
9 | LAS | 8.566096692995435 | 5.0543525523958595 | 11499 | |
10 | MCO | 9.887440638577354 | 5.820512820512793 | 9867 |
Auto detection is hit-and-miss, though. This is because the way it works is that BigQuery samples about a hundred rows of data in order to determine what the data type needs to be. If the arrival delay was an integer for all 100 rows that it saw, but turns out to be a string (NA) somewhere else in the file, the loading will fail. Autodetection may also fail if many of the fields are empty.
Because of this, autodetection is okay during initial exploration, but we should quickly pivot to actually specifying the schema. At that time, it may be worthwhile to also consider whether this table should be partitioned by date – if most of our queries will be, not on the full table, but only a few days, then partitioning will lead to cost savings. If that were the case, we would create the table first, specifying that it should be partitioned by date:
bq mk --time_partitioning_type=DAY dsongcp.flights_auto
When loading the data, we’d need to load each partition separately (partitions are named
flights_auto$20150101
, for example). We can also partition by a column in the data (FlightsDate, for example).
Currently, we don’t know much about the fields, so we can ask BigQuery to treat all the columns except the FlightDate as a string:
SCHEMA=Year:STRING,...,FlightDate:DATE,Reporting_Airline:STRING,...
Putting all these together, the loading becomes (see bqload.sh in the course repository):
for MONTH in `seq -w 1 12`; do CSVFILE=gs://${BUCKET}/flights/raw/${YEAR}${MONTH}.csv bq --project_id $PROJECT load --time_partitioning_field=FlightDate --time_partitioning_type=MONTH --source_format=CSV --ignore_unknown_values --skip_leading_rows=1 --schema=$SCHEMA ${PROJECT}:dsongcp.flights_raw$${YEAR}${MONTH} $CSVFILE done
At this point, we have the CSV files in Cloud Storage and the raw data in BigQuery. We have successfully ingested the 2015 flights data into GCP! If you want, you can repeat this for years 2016-201918 by changing the for loop in ingest.sh to:
for YEAR in `seq 2016 2019`; do
However, it is not necessary to do so – all the code in this book will work fine with just 2015 data. In Chapter 3, we will start to look at the 2015 data and do useful things with it.
But before we move on, let’s digress a little and consider automation.
Now that we have some historical flight data in our Cloud Storage bucket, it is natural to wonder how to keep the bucket current. After all, airlines didn’t stop flying in 2021, and the BTS continues to refresh its website on a monthly basis. It would be good if we could schedule monthly downloads to keep ourselves synchronized with the BTS.
There are two scenarios to consider here. The BTS could let us know when it has new data, and we could then proceed to ingest the data. The other option is that we periodically monitor the BTS website and ingest new data as it becomes available. The BTS doesn’t offer a mechanism by which we can be notified about data updates, so we will need to resort to polling. We can, of course, be smart about how we do the polling. For example, if the BTS tends to update its website around the 5th of every month, we could poll at that time.
Where should this ingest program be executed? Realizing that this is a program that will be invoked only once a month (more often if retries are needed if an ingest fails), we realize that this is not a long-running job, but is instead something that should be scheduled to run periodically. The traditional way to do this is to schedule a cron 19 job in Unix/Linux. To schedule a cron job, you add a line to a crontab 20 file and then register it with a Unix daemon that takes care of the scheduling. For example, adding this line
1 2 10 * * /etc/bin/ingest_flights.py
to
crontab
will cause the Python program
/etc/bin/ingest_flights.py
(that would carry out the same steps to ingest the flights data that we did on the command line in the previous section) to be run by the system at 02:01 on the 10th of every month.
Although cron jobs are a straightforward solution, there are several problems that all come down to resilience and repeatability:
The cron job is scheduled on a particular server. If that server happens to be rebooted around 2 AM on April 10, the ingest might never take place that month.
The environment that cron executes in is very restricted. Our task will need to download data from BTS, uncompress it, clean it up, and upload it to the cloud. These impose a variety of requirements in terms of memory, space, and permissions and it can be difficult to configure cron appropriately. In practice, system administrators configure cron jobs on particular machines, and find it difficult to port them to other machines that do not have the same system paths.
If the ingest job fails (if, for example, the network is down), there is no way to retry it. Retries and other such failure-recovery efforts will have to be explicitly coded in our Python program.
Remote monitoring and one-time, ad hoc executions are not part of the cron interface. If you need to monitor, troubleshoot, and restart the ingest from a mobile device, good luck.
This litany of drawbacks is not unique to cron. They are implicit in any solution that is tied to specific servers. So, how would you do it on the cloud? What you should not do is to create a Compute Engine VM and schedule a cron job on it—that will be subject to some of the same problems!
For resilience and reliability, we need a serverless way to schedule ingest jobs. Obviously, the ingest job will need to be run on some machine somewhere. However, we shouldn’t need to manage that machine at all. This is a job that needs perhaps two minutes of compute resources a month. We should be looking for a way to write the ingest code and let the cloud infrastructure take care of provisioning resources, making retries, and providing for remote monitoring and ad hoc execution.
On Google Cloud Platform, Cloud Scheduler provides a way to schedule periodic jobs in a serverless manner. These jobs can involve hitting a HTTP endpoint (which is what we will do), but can also send a message via Cloud Pub/Sub or trigger a Google Kubernetes Engine or Cloud Dataflow job. Figure 2-16 presents our architecture for the monthly ingest job.
First, we will write a standalone ingest_flights.py application that is capable of downloading the data for a specific year/month and uploading the data to Cloud Storage. We will invoke the ingest code from making sure to explicitly capture our dependencies in a Docker file. Cloud Run will run our container.21
The way scheduling works in Cloud Scheduler is that we must specify a URL that will be invoked or a Cloud Pub/Sub topic that must be monitored. Whereas in the previous Linux cron example we specified a script on the server that was running the cron daemon, the Cloud Scheduler endpoint will be a URL that will be visited according to the schedule that we specify (this can be any URL; it doesn’t need to be a service that we write). Because our ingest code is a standalone Python program, we will wrap that ingest code into a Python Flask application (main.py) so that we can invoke it by using a URL (Flask is a web application framework).
While exploring the data, we carried out the ingest on the command line in Bash. We saved our commands as we went along in the form of Bash scripts. We created our ingest program by simply making a Bash script 22 that invokes those intermediate steps:
#!/bin/bash for MONTH in `seq 1 12`; do bash download.sh $YEAR $MONTH done # upload the raw CSV files to our GCS bucket bash upload.sh # load the CSV files into BigQuery as string columns bash bqload.sh
This is the sort of decision that leads to spaghetti-like code that is difficult to unravel and to maintain. There are many assumptions made by this set of Bash scripts in terms of what to download, where the temporary storage resides, and where to upload it. Changing any of these will involve changing multiple scripts. Using Bash to quickly get a handle on the data is a good idea, as is the idea of saving these scripts so as to continue the exploration. But when it comes to making the ingest more systematic and routine, you do not want to use a shell scripting language; a more formal programming language is better.
In this book, we will use Python wherever we can because of its ability to span a wide range of computing tasks, from systems programming to statistics and machine learning. Python is currently the best choice if you need to pick a single language in which to do most of your work. Java is typesafe and performant. Its object-orientation and packaging architecture are suitable for large, multideveloper programs, but it makes the code too verbose. Moreover, the lack of a Read-Evaluate-Process-Loop (REPL) interpreter makes Java unwieldy for quick experimentation. C++ is numerically very efficient but standard libraries for non-numerical computing are often nonexistent. Given the choice of a data package available in Python, Scala, R, and Java, significant majorities choose Python and Scala over R and Java.23 Scala combines the benefits of Python (easy scriptability, conciseness) with the benefits of Java (type-safety, speed), but the tooling for Scala (such as for statistics and visualization) is not as pervasive as it is for Python. Today, therefore, the best choice of programming language is Python. For certain use cases for which speed is important and Python is not performant enough, it might be necessary to use Java.
The ingest program in Python24 goes through the same four steps as before when we did it manually on the command line:
Download data from the BTS website to a local file.
Unzip the downloaded ZIP file and extract the CSV file it contains.
Upload the CSV file to Google Cloud Storage.
Load the CSV data into a BigQuery partitioned table
Whereas our download Bash script got all 12 months of a hardcoded year (2015), our download subroutine in Python will take as input the year and month:
def download(YEAR, MONTH, destdir): ''' Downloads on-time performance data and returns local filename YEAR e.g.'2015' MONTH e.g. '01' for January ''' url = os.path.join("https://transtats.bts.gov/PREZIP", "_{}_{}.zip".format(YEAR, int(MONTH))) filename = os.path.join(destdir, "{}{}.zip".format(YEAR, MONTH)) with open(filename, "wb") as fp: response = urlopen(url) fp.write(response.read()) return filename
Another thing to note is that our Bash script simply downloaded the ZIP file from BTS to the current working directory of the user. However, since our Python script is meant to be executed on demand by the scheduler service, we cannot make assumptions about the directory in which the script will be run. In particular, we don’t know whether that directory will be writeable and have enough space. Hence, we ask the caller of the function to provide an appropriate destination directory in which to store the downloaded ZIP file.
Here’s how to unzip the file and extract the CSV contents:
def zip_to_csv(filename, destdir): zip_ref = zipfile.ZipFile(filename, 'r') cwd = os.getcwd() os.chdir(destdir) zip_ref.extractall() os.chdir(cwd) csvfile = os.path.join(destdir, zip_ref.namelist()[0]) zip_ref.close()
Unzipping explodes the size of the file. We can optimize things slightly. Rather than upload the text file, we can gzip it since BigQuery knows how to load gzipped CSV files:
gzipped = csvfile + ".gz" with open(csvfile, 'rb') as ifp: with gzip.open(gzipped, 'wb') as ofp: shutil.copyfileobj(ifp, ofp) return gzipped
Annoyingly, the
zipfile
module in Python doesn’t provide a way to extract contents to a specific directory—it insists on extracting the contents in the current working directory. So, we make sure to change to the destination directory before doing the extraction. Then, we change back.
Here’s the code to upload the CSV file for a given month to Cloud Storage:
def upload(csvfile, bucketname, blobname): client = storage.Client() bucket = client.get_bucket(bucketname) blob = Blob(blobname, bucket) blob.upload_from_filename(csvfile) gcslocation = 'gs://{}/{}'.format(bucketname, blobname) print ('Uploaded {} ...'.format(gcslocation)) return gcslocation
The code asks for the
bucketname
(the single-region bucket that was created during our exploration) and a
blobname
(e.g.,
flights/201501.csv
) and carries out the upload using the Cloud Storage Python library. Although it can be tempting to simply use the
subprocess
module in Python to invoke
gsutil
operations, it is better not to do so. If you go the
subprocess
route, you will then need to ensure that the Cloud SDK (that
gsutil
comes with) is installed on whichever machine this is going to run on. This won’t be a problem in Cloud Run, but might pose problems if you switch the way you provide URL access later (to, say, Google App Engine or Cloud Functions). It is preferable to use pure Python modules when possible and add those modules to
requirements.txt
, as follows:
Flask google-cloud-storage google-cloud-bigquery gunicorn==20.1.0
The Flask library will help us handle HTTP requests (covered shortly) and Google Cloud Storage is needed so as to invoke the
get_bucket()
and
upload_from_filename()
operations. While using the latest version of libraries is okay, it poses the problem that an upgrade to those dependencies might break our code. For production code, it is better to pin the library versions to the ones with which the code has been tested:
Flask==2.0.1 google-cloud-storage==1.42.0 google-cloud-bigquery==2.25.1 gunicorn==20.1.0
If you do pin libraries, though, you will have to have a process in place to periodically test and upgrade to the latest stable version of your dependencies. Otherwise, your code might go stale or, worse, using library versions with known vulnerabilities.
We can now write an
ingest()
method that calls the four major steps, plus the verification, in order:
def ingest(year, month, bucket): ''' ingest flights data from BTS website to Google Cloud Storage return cloud-storage-blob-name on success. raises DataUnavailable if this data is not on BTS website ''' tempdir = tempfile.mkdtemp(prefix='ingest_flights') try: zipfile = download(year, month, tempdir) bts_csv = zip_to_csv(zipfile, tempdir) gcsloc = 'flights/raw/{}{}.csv.gz'.format(year, month) gcsloc = upload(bts_csv, bucket, gcsloc) return bqload(gcsloc, year, month) finally: print ('Cleaning up by removing {}'.format(tempdir)) shutil.rmtree(tempdir)
The destination directory that we use to stage the downloaded data before uploading to Cloud Storage is obtained using the tempfile
package in Python. This ensures that if, for whatever reason, there are two instances of this program running at the same time, they will not cause contention issues.
We can try out the code by writing a main()
that is executed if this program25 is run on the command line:
if __name__ == '__main__': import argparse parser = argparse.ArgumentParser( description='ingest flights data from BTS website to GCS') parser.add_argument('--bucket', help='GCS bucket to upload data to', required=True) parser.add_argument('--year', help='Example: 2015.', required=True) parser.add_argument('--month', help='01 for Jan.', required=True) try: args = parser.parse_args() gcsfile = ingest(args.year, args.month, args.bucket) print ('Success ... ingested to {}'.format(gcsfile)) except DataUnavailable as e: print ('Try again later: {}'.format(e.message))
Specifying a valid month ends with a new (or replaced) file on Cloud Storage:
$ ./ingest_flights.py --bucket cloud-training-demos-ml --year 2015 --month 01 ... Success ... ingested to gs://cloud-training-demos-ml/flights/201501.csv
Trying to download a month that is not yet available results in an error message:
$ ./ingest_flights.py --bucket cloud-training-demos-ml --year 2029 --month 01 ... HTTP Error 403: Forbidden
On Cloud Scheduler, this will result in the call failing and being retried subject to a maximum number of retries. Retries will also happen if the BTS web server cannot be reached.
At this point, we have the equivalent of our exploratory Bash scripts, but with some additional resilience, repeatability, and fault tolerance built in. Our Python program expects us to provide a year, month, and bucket. However, if we are doing monthly ingests, we already know which year and month we need to ingest. No, not the current month—recall that there is a time lag between the flight events and the data being reported by the carriers to the BTS. Instead, it is simply the month after whatever files we already have on Cloud Storage! So, we can automate this, too:
def next_month(bucketname): ''' Finds which months are on GCS, and returns next year,month to download ''' client = storage.Client() bucket = client.get_bucket(bucketname) blobs = list(bucket.list_blobs(prefix='flights/raw/')) files = [blob.name for blob in blobs if 'csv' in blob.name] # csv files only lastfile = os.path.basename(files[-1]) # e.g. 201503.csv year = lastfile[:4] month = lastfile[4:6] dt = datetime.datetime(int(year), int(month), 15) # 15th of month dt = dt + datetime.timedelta(30) # will always go to next month return '{}'.format(dt.year), '{:02d}'.format(dt.month)
To get the next month given that there is a file, say
201503.csv
, on Cloud Storage, we add 30 days to the Ides of March—this gets around the fact that there can be anywhere from 28 days to 31 days in a month, and that
timedelta
requires a precise number of days to add to a date.
By changing the year and month to be optional parameters, we can try out the ingest program’s ability to find the next month and ingest it to Cloud Storage. We simply add:
if args.year is None or args.month is None: year, month = next_month(args.bucket) else: year = args.year month = args.month gcsfile = ingest(year, month, args.bucket)
Now that we have an ingest program that is capable of updating our Cloud Storage bucket one month at a time, we can move on to building the scaffolding to have it be executed in a serverless way.
Cloud Run is a serverless framework that provides an autoscaling, resilient runtime for containerized code. The container (see Figure 2-16) will consist of code that listens for requests or events. Cloud Run abstracts away all the infrastructure management that would otherwise be needed.
Now that we have a Python function that will do the ingest, we will wrap it inside a web application. To write the web application, we will use Flask which is a lightweight Python web application framework and as a web server, we will use gunicorn. Flask provides the ability to invoke Python code in response to a HTTP request while gunicorn will listen to HTTP requests and send them to the Flask app. Our container will consist of the gunicorn server, Flask application, and its dependencies. This is expressed in the form of a Dockerfile:
FROM python:3.6-slim # Copy local code to the container image. ENV APP_HOME /app WORKDIR $APP_HOME COPY . ./ # Install production dependencies. RUN pip install --no-cache-dir -r requirements.txt # Run the web service on container startup. # Timeout is set to 0 to disable the timeouts of # the workers to allow Cloud Run to handle instance scaling. CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
In our main.py, we have a function that gets invoked in response to the URL trigger:
import logging from flask import escape from ingest_flights import * app = Flask(__name__) @app.route("/", methods=['POST']) def ingest_flights(request): try: json = request.get_json() year = escape(json['year']) if 'year' in json else None month = escape(json['month']) if 'month' in json else None bucket = escape(json['bucket']) # required if year is None or month is None or len(year) == 0 or len(month) == 0: year, month = next_month(bucket) tableref, numrows = ingest(year, month, bucket) ok = 'Success ... ingested {} rows to {}'.format(numrows, tableref) return ok except Exception as e: logging.exception('Try again later')
Essentially, the
main.py
has a single function that receives a Flask request object, from which we can extract the JSON payload of the HTTP POST by which the Cloud Run will be triggered. We get the next month by looking to see what months are already in the bucket and then ingest the necessary data using the existing code in the module
ingest_flights
. We can deploy the our code base as a container to Cloud Run using:
NAME=ingest-flights-monthly REGION=us-central1 gcloud run deploy $NAME --region $REGION --source=$(pwd) --platform=managed --timeout 12m
But there are a couple of serious security and governance problems if we do this.
What are the security problems?
Anyone can invoke the URL and cause our dataset to get updated. We have to disallow unauthenticated users.
Allowing this code to run with our user account’s permissions will pollute any audit logs since we are not actually running the ingest interactively. We need to create a separate account so that the Cloud Run service can run with that identity.
Allowing this code to run with our user account’s permissions is also quite dangerous because our user account will typically have very broad permissions. We’d like to restrict the tasks that this automated service can do: we want it to be able to write only to specific Cloud Storage buckets and BigQuery tables.
The way to address the first point is to disallow unauthenticated users. The way to accomplish the second requirement is to specify that the Cloud Run service will have to run as a service account. A service account is an account whose identity is meant to be taken on by automated services. Like any identity, it can be configured to have specific and limited permissions. Therefore, before we can deploy the Cloud Run service, we will need to create a service account. Service accounts have email addresses of the form [email protected].
You can create a service account by going to the web console in the IAM area, but as usual, I prefer to script things:26
SVC_ACCT=svc-monthly-ingest PROJECT_ID=$(gcloud config get-value project) BUCKET=${PROJECT_ID}-cf-staging REGION=us-central1 SVC_PRINCIPAL=serviceAccount:${SVC_ACCT}@${PROJECT_ID}.iam.gserviceaccount.com gcloud iam service-accounts create $SVC_ACCT --display-name "flights monthly ingest"
Then, we make the service account the admin of the staging GCS bucket so that it can read, write, list, delete, etc. on this bucket (and only this bucket):
gsutil mb -l $REGION gs://$BUCKET gsutil uniformbucketlevelaccess set on gs://$BUCKET gsutil iam ch ${SVC_PRINCIPAL}:roles/storage.admin gs://$BUCKET
We will also allow the service account to create and delete partitions on tables in just the BigQuery dataset dsongcp (and no other datasets):
bq --project_id=${PROJECT_ID} query --nouse_legacy_sql "GRANT `roles/bigquery.dataOwner` ON SCHEMA dsongcp TO '$SVC_PRINCIPAL' " gcloud projects add-iam-policy-binding ${PROJECT_ID} --member ${SVC_PRINCIPAL} --role roles/bigquery.jobUser
Are these permissions enough? One way is to try to ingest a month of data when running as this service account. To do so, we will have to impersonate the service account: 27
Visit the Service Accounts section of the GCP Console
Select the newly created service account svc-monthly-ingest and click Manage Keys
Add key (Create a new JSON key) and download it to a file named tempkey.json. Transfer this key file to your Cloud Shell instance.
Run
gcloud auth activate-service-account --key-file tempkey.json
Try ingesting one month
./ingest_flights.py --bucket $BUCKET --year 2015 --month 03 --debug
Once you have ensured that the service account has all the necessary permissions, go back to running commands as yourself using gcloud auth login.
Now that we have the code for the Flask application and a service account with the right permissions, we can deploy the code to Cloud Run to run as this service account: 28
NAME=ingest-flights-monthly SVC_ACCT=svc-monthly-ingest PROJECT_ID=$(gcloud config get-value project) REGION=us-central1 SVC_EMAIL=${SVC_ACCT}@${PROJECT_ID}.iam.gserviceaccount.com gcloud run deploy $NAME --region $REGION --source=$(pwd) --platform=managed --service-account ${SVC_EMAIL} --no-allow-unauthenticated --timeout 12m
Recall that we started the discussion on securing the Cloud Run instance by saying that we would disallow unauthenticated users and have the Cloud Run service run as a service account. Note how we are turning on both these options when we deploy to Cloud Run.
Once the application has been deployed to Cloud Run, we can try accessing the URL of the service with our authentication details in the header of the web request and a JSON message as its POST: 29
# Feb 2015 echo {"year":"2015","month":"02","bucket":"${BUCKET}"} > /tmp/message curl -k -X POST $URL -H "Authorization: Bearer $(gcloud auth print-identity-token)" -H "Content-Type:application/json" --data-binary @/tmp/message
But what is the URL? Cloud Run generates the URL when we deploy the container, and we can obtain it using:
gcloud run services describe ingest-flights-monthly --format 'value(status.url)') Changing the message to provide only the bucket (no year or month) will make the service get the next month: echo {"bucket":"${BUCKET}"} > /tmp/message curl -k -X POST $URL -H "Authorization: Bearer $(gcloud auth print-identity-token)" -H "Content-Type:application/json" --data-binary @/tmp/message
Our intent is to automatically invoke CloudRun once a month. We can do that using Cloud Scheduler, which is also serverless and doesn’t require us to manage any infrastructure. We simply specify the schedule and the URL to hit. This URL is what came from the output of the Cloud Run deployment command in the previous section.
echo {"bucket":"${BUCKET}"} > /tmp/message cat /tmp/message gcloud scheduler jobs create http monthlyupdate --description "Ingest flights using Cloud Run" --schedule="8 of month 10:00" --time-zone "America/New_York" --uri=$SVC_URL --http-method POST --oidc-service-account-email $SVC_EMAIL --oidc-token-audience=$SVC_URL --max-backoff=7d --max-retry-attempts=5 --max-retry-duration=2d --min-backoff=12h --headers="Content-Type=application/json" --message-body-from-file=/tmp/message
The preceding parameters would make the first retry happen after 12 hours. Subsequent retries are increasingly farther apart, up to a maximum of 2 days between attempts. We fail the task permanently if it fails five times within a defined time period and the task is more than 7 days old (both limits must be passed for the task to fail).
To try out the Cloud Scheduler, we could wait for the 8th of the month to roll around. Or we could go to the GCP web console and click on “Run Now”. Unfortunately, it won’t work because Cloud Scheduler wants to run as the service account while you are logged in as yourself. So, give yourself the ability to impersonate the service account by going to the Service Accounts part of the web console. Once you’ve done that, you will be able to get “Run Now” to work.
The monthly update mechanism works if you have the previous month’s data on Cloud Storage. If you start out with only 2015 data, updating it monthly means that you will inevitably be many months behind. So, you will need to run it ad hoc until your data is up to date and then let the Cron service take care of things after that. Alternatively, you can take advantage of the fact that the ingest task is cheap and non-intrusive when there is no new data. So, you can change the schedule to be every day instead of every month. A better solution is to change the ingest task so that if it is successful in ingesting a new month of data, it immediately tries to ingest the next month. This way, your program will crawl month-by-month to the latest available month and then keep itself always up-to-date.
At this point, it is worth reflecting a bit on what we have accomplished. We are able to ingest data and keep it up-to-date by doing just these steps:
Write some Python code.
Deploy that Python code to the Google Cloud Platform.
We did not need to manage any infrastructure in order to do this. We didn’t install any OS, manage accounts on those machines, keep them up to date with security patches, maintain failover systems, and so on—a serverless solution that consists simply of deploying code to the cloud is incredibly liberating. Not only is our ingest convenient, it is also very inexpensive--everything scales down to zero when it is not being used. All this falls comfortably within the free tier or might cost less than 5¢ a month.
The US BTS collects, and makes publicly available, a dataset of flight information. It includes nearly a hundred fields, including scheduled and actual departure and arrival times, origin and destination airports, and flight numbers of every domestic flight scheduled by every major carrier. We will use this dataset to estimate the likelihood of an arrival delay of more than 15 minutes of the specific flight whose outcome needs to be known in order for us to decide whether to cancel the meeting.
There are three possible data processing architectures on the cloud for large datasets: scaling up, scaling out with sharded data, and scaling out with data in situ. Scaling up is very efficient, but is limited by the size of the largest machine you can get a hold of. Scaling out is very popular but requires that you preshard your data by splitting it among compute nodes, which leads to maintaining expensive clusters unless you can hit sustained high utilization. Keeping data in situ is possible only if your datacenter supports petabits per second of bisectional bandwidth so that any file can be moved to any compute node in the datacenter on demand. Because Google Cloud Platform has this capability, we will upload our data to Google Cloud Storage, a blob storage that is not presharded and to BigQuery which will allow us to carry out interactive exploration on large datasets.
To automate the ingest of the files, we reverse engineered the BTS’s web form and obtained the format of the POST request that we need to make. With that request in hand, we were able to write a Bash script to pull down 12 months of data, uncompress the ZIP file, and load the data into BigQuery. It is quite straightforward to change this script to loop through multiple years.
We discussed the difference between strong consistency and eventual consistency and how to make the appropriate trade-off imposed by Brewer’s CAP theorem. In this case, we wanted strong consistency and did not need global availability. Hence, we chose to use a single-region bucket. We then uploaded the downloaded, unzipped, and cleaned CSV files to Google Cloud Storage.
To schedule monthly downloads of the BTS dataset, we made our download and cleanup Python program and made it callable from Cloud Run so that it was completely serverless. We used Cloud Scheduler to periodically request the Cloud Run application to download BTS data, unzip it, and upload it to both Cloud Storage and BigQuery.
This is the point at which you should put this book off to the side and attempt to repeat all the things I’ve talked about. All the code snippets in this book have corresponding code in the GitHub repository.
I strongly encourage you to play around with the code in 02_ingest with the goal of understanding why it is organized that way and being able to write similar code yourself. At minimum, though, you should do the following:
Open Cloud Shell and git clone the book’s code repository as explained in Chapter 1.
Go to the 02_ingest folder of the repository.
Go to the Storage section of the GCP web console and create a new bucket.
Change the BUCKET
variable in upload.sh to reflect the bucket that you created.
Run ./ingest.sh
. This will populate your bucket and BigQuery dataset with data from 2015. Optionally, you can change the year loop in this file to download all the data corresponding to 2015-2019.
Because software changes, an up-to-date list of the preceding steps is available in the course repository in 02_ingest/README.md. This is true for all the following chapters.
1 See, for example, https://www.congress.gov/congressional-report/107th-congress/senate-report/13/1. The bill referenced in the report was not enacted into law, but it illustrates Congress’s monitoring function based on the statistics collected by the Department of Transportation.
2 For example, weather radar data from before 2000 had timestamps assigned by a radar engineer. Essentially, the engineers would look at their wristwatches and enter a time into the radar products generator. Naturally, this was subject to all kinds of human errors—dates could be many hours off. The underlying problem was fixed by the introduction of network clocks to ensure consistent times between all the radars on the US weather radar network. When using historical weather data, though, time correction is an important preprocessing step.
3 The shortest path between two points on the globe is an arc that passes through the two points and whose focus point is the center of the globe.
4 Indeed, scripting the field selection and download is what I did in the first edition of the book. If interested, see the select-and-download code in GitHub in the branch edition1_tf2 – the key thing is that the web request sends the selected fields inside the POST request and handles the resulting client-side redirect to obtain the zip file that is created on demand.
5 Over the last 5 years, I have observed that the BTS server that does this zip file creation is frequently down. Yes, they would ideally use a public Cloud to host their website, but you try telling the US government what to do.
6 Another thing I am doing is to host the zip files on Google Cloud, and have my code hit the Google Cloud server. The code by default will not hit the BTS server anymore. The original BTS URL is still present in the code, just commented out, so change it back if you want to try it out.
7 You could also just stop (and not delete) the Google Cloud Platform Compute Engine instance. Stopping the instance stops the bill associated with the compute machine, but you will continue to pay for storage. In particular, you will continue to pay for the SSD associated with the Compute Engine instance. The key advantage of a stopped instance is that you get to resume exactly where you left off, but this might not be important if you always start from a clean (known) state each time.
8 To shard a large database is to partition it into smaller, more easily managed parts. Whereas normalization of a database table places the columns of a database into different tables, sharding splits the rows of the database and uses different database server instances to handle each part. For more information, go to https://en.wikipedia.org/wiki/Shard_(database_architecture).
9 In other words, rather than a filesystem that is local to a single machine, it is common to the entire cluster of machines that form the datacenter.
10 The blog on Google’s networking infrastructure is worth a read. One petabit is 1 million gigabits, so the 1 Pbps quoted in the article works out to 125,000 GBps. Networking has only gotten better since 2015, of course.
11 A condition in which network packets need to be delivered in order; thus, a slow packet holds up delivery of later packets.
12 Microsoft Azure seems to involve a centralized host layer, for example, while AWS S3 seems to prioritize network latency. You’d design your software for such infrastructure differently.
13 See the script 02_ingest/download.sh in the course repository.
14 You can get your unique project ID from the Cloud Platform Console dashboard; it could be different from the common name that you assigned to your project. By default, Google Cloud Platform tries to give you a project ID that is the same as your project name, but if that name is already taken, you will get an autogenerated, unique project ID. Because of this default, you should be similarly careful about giving projects sensitive names.
15 See . As of this writing, switching to the flat-rate pricing model requires you to contact your Google Cloud Platform account representative.
16 A service account is tied to a project, but project membership evolves over time. So, even the subset of users who could have invoked the action might not be known unless you have strict governance over who is allowed to be an owner/editor/viewer of a project.
17 See for a gcloud script that will create single-user projects. The users will be editors on the project, but project ownership will reside with the person who has billing administrator rights.
18 Let’s ignore 2020 and 2021 because those were the years of the COVID pandemic.
19 A shortened form of a misspelling of chronos, Greek for time, cron is the name of the Unix daemon process that executes scheduled jobs at specific times.
20 Shortened form of cron table.
21 Docker containers are lightweight wrappers around a piece of software (here, the Flask endpoint main.py) that contain everything needed to run that software—code (e.g., ingest_flights.py), runtime (Python dependencies, etc.), configuration files and system libraries (here, a specific Linux distribution). Unlike a virtual machine, different containers running on the same machine can share layers of operating system dependencies.
22 02_ingest/ingest.sh
23 In 2016, Databricks found that 65% of survey respondents used Spark in Scala versus 62% in Python, 44% in SQL, 29% in Java and 20% in R. See the infographie and survey linked from https://databricks.com/blog/2016/09/27/spark-survey-2016-released.html..
24 See 02_ingest/monthlyupdate/ingest_flights.py
25 The full program is available as ingest_flights.py on GitHub at http://bit.ly/2BPhya4 —try it out.
26 See 02_ingest/monthlyupdate/01_setup_svc_acct.sh
27 See the instructions in README.md in 02_ingest.
28 See 02_ingest/monthlyupdate/02_deploy_cr.sh
29 See 02_ingest/monthlyupdate/03_call_cr.sh
44.200.94.150