Chapter 2. Introduction to Data Analysis with PySpark

Python is the most widely used language for data science tasks. The prospect of being able to do statistical computing and web programming using the same language had contributed to its rise in popularity in early 2010s. This has led to a thriving ecosystem of tools and a helpful community for data analysis, often referred as the PyData ecosystem. This is a big reason for PySpark’s popularity. Being able to leverage distributed computing via Spark in Python helps data science practitioners be more productive because of familiarity with the programming language and presence of a wide community. For that same reason, we have opted to write our examples in PySpark.

It’s difficult to express how transformative it is to do all of your data munging and analysis in a single environment, regardless of where the data itself is stored and processed. It’s the sort of thing that you have to experience to understand, and we wanted to be sure that our examples captured some of that magic feeling we experienced when we first started using PySpark. For example, PySpark provides interoperability with Pandas, which is one of the most popular PyData tools. We will explore this feature further in the chapter.

In this chapter, we will explore PySpark’s powerful Dataframe API via a data cleansing exercise. In PySpark, the DataFrame is an abstraction for data sets that have a regular structure in which each record is a row made up of a set of columns, and each column has a well-defined data type. You can think of a data frame as the Spark analogue of a table in a relational database. Even though the naming convention might make you think of a data.frame object in a pandas.DataFrame object, Spark’s DataFrames are a different beast. This is because they represent distributed data sets on a cluster, not local data where every row in the data is stored on the same machine. Although there are similarities in how you use DataFrames and the role they play inside the Spark ecosystem, there are some things you may be used to doing when working with data frames in Pandas or R that do not apply to Spark, so it’s best to think of them as their own distinct entity and try to approach them with an open mind.

As for data cleansing, it is the first step in any data science project,and often the most important. Many clever analyses have been undone because the data analyzed had fundamental quality problems or underlying artifacts that biased the analysis or led the data scientist to see things that weren’t really there. Hence, what better way to introduce you to working with data using PySpark and DataFrames than a data cleansing exercise?

First, we will introduce PySpark’s fundamentals and practice them using a sample dataset from the UC Irvine Machine Learning Repository. We will reiterate why PySpark is a good choice for Data Science and introduce its programming model. We will then set up PySpark on our system or cluster and analyze our dataset using PySpark’s DataFrame API. Most of your time using PySpark for data analysis will center around the DataFrame API so get ready to become intimately familiar with it. This will set us up for the following chapters where we delve into various machine learning algorithms.

You don’t need to deeply understand the how Spark works under the hood for performing data science tasks. However, understanding basic concepts about Spark’s architecture will make it easier to work with PySpark and take better decisions when writing code. That is what we will cover in the next section.

When using the DataFrame API, your PySpark code should provide comparable performance with Scala. If you’re using a UDF (user defined function) or RDDs, you will have a performance impact.

Spark Architecture

Spark architecture
Figure 2-1. Spark architecture diagram (placeholder - replace SparkContext with SparkSession)

Figure 2.1 depicts the Spark architecture through high-level components. Spark applications run as independent sets of processes on a cluster or locally. At a high level, a Spark application is comprised of a driver process, cluster manager and a set of executor processes. The driver program is the central component and responsible for distributing tasks across executor processes. There will always be just one driver process. When we talk about scaling, we mean increasing the number of executors. The cluster manager simply manages resources.

Spark is a distributed, data-parallel compute engine. In the data-parallel model, more data partitions equals more parallelism. Partitioning allows for efficient parallelism. A distributed scheme of breaking up data into chunks or partitions allows Spark executors to process only data that is close to them, minimizing network bandwidth. That is, each executor’s core is assigned its own data partition to work on. Remember this whenever a choice related to partitioning comes up.

Spark programming starts with a data set, usually residing in some form of distributed, persistent storage like HDFS and in a format like Parquet. Writing a Spark program typically consists of a few steps:

  1. Define a set of transformations on the input data set.

  2. Invoke actions that output the transformed data sets to persistent storage or return results to the driver’s local memory. These actions will ideally be performed by the worker nodes, as depicted on the right in Figure 2.1.

  3. Run local computations that operate on the results computed in a distributed fashion. These can help you decide what transformations and actions to undertake next.

It’s important to remember that all of PySpark’s higher-level abstractions still rely on the same philosophy that has been present in Spark since the very beginning: the interplay between storage and execution. Understanding these principles will help you make better use of Spark for data analysis.

Next, we will install and set up PySpark on our machine so that we can start performing data analysis. This is a one-time exercise that will help us run the code examples from this and following chapters.

Installing PySpark

The examples and code in this book assume you have Spark 3.0.1 available. For the purpose of following the code examples, install PySpark from the PyPi repository.

$ pip install pyspark

At the time of writing, PySpark is reported to be incompatible with Python 3.8. Please use Python 3.6 or 3.7 for following the code examples.

If you want SQL, ML, and/or MLlib as extra dependencies, that’s an option too. We will need these ahead.

$ pip install pyspark[sql,ml,mllib]

Installing from PyPi skips the libraries required to run Scala, Java or R. Full releases can be obtained from the Spark project site. Refer to the Spark documentation for instructions on setting up a Spark environment, whether on a cluster or simply on your local machine.

Now we’re ready to launch the pyspark-shell, which is a REPL for the Python language that also has some Spark-specific extensions. This is similar to Python or IPython shell that you may have used. If you’re just running these examples on your personal computer, you can launch a local Spark cluster by specifying local[N], where N is the number of threads to run, or * to match the number of cores available on your machine. For example, to launch a local cluster that uses eight threads on an eight-core machine:

$ pyspark --master local[*]

A Spark application itself is often referred to as a Spark cluster. That is a logical abstraction and is different from a physical cluster (multiple machines).

If you have a Hadoop cluster that runs a version of Hadoop that supports YARN, you can launch the Spark jobs on the cluster by using the value of yarn for the Spark master:

$ pyspark --master yarn --deploy-mode client

The rest of the examples in this book will not show a --master argument to spark-shell, but you will typically need to specify this argument as appropriate for your environment.

You may need to specify additional arguments to make the Spark shell fully utilize your resources. A list of arguments can be found by executing pyspark --help. For example, when running Spark with a local master, you can use --driver-memory 2g to let the single local process use 2 GB of memory. YARN memory configuration is more complex, and relevant options like --executor-memory are explained in the Spark on YARN documentation.

The Spark framework officially supports 4 cluster deployment modes - standalone, YARN, Kubernetes and Mesos. More details can be found in the Deploying Spark documentation.

After running one of these commands, you will see a lot of log messages from Spark as it initializes itself, but you should also see a bit of ASCII art, followed by some additional log messages and a prompt:

Python 3.6.12 |Anaconda, Inc.| (default, Sep  8 2020, 23:10:56)
[GCC 7.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_   version 3.0.1
      /_/

Using Python version 3.6.12 (default, Sep  8 2020 23:10:56)
SparkSession available as 'spark'.

You can run the :help command in the shell. This will prompt you to either start an interactive help mode or ask for help about specific Python objects. In addition to the note about :help, the Spark log messages indicated “SparkSession available as spark.” This is a reference to the SparkSession, which acts as an entrypoint to all Spark operations and data. Go ahead and type spark at the command line:

spark
...
<pyspark.sql.session.SparkSession object at DEADBEEF>

The REPL will print the string form of the object. For the SparkSession object, this is simply its name plus the hexadecimal address of the object in memory. (DEADBEEF is a placeholder; the exact value you see here will vary from run to run.) In an interactive Spark shell, the Spark driver instantiates a SparkSession for you, while in a Spark application, you create a SparkSession object yourself.

In Spark 2.0, the SparkSession became a unified channel to all Spark operations and data. Previously used entry points such as SparkContext, SQLContext, HiveContext, SparkConf, and StreamingContext can be accessed through it too.

What exactly do we do with the spark variable? SparkSession is an object, so it has methods associated with it. We can see what those methods are in the PySpark shell by typing the name of a variable, followed by a period, followed by tab:

 spark.[t]
...
spark.Builder(           spark.conf
spark.newSession(        spark.readStream
spark.stop(              spark.udf
spark.builder            spark.createDataFrame(
spark.range(             spark.sparkContext
spark.streams            spark.version
spark.catalog            spark.getActiveSession(
spark.read               spark.sql(
spark.table(

Out of all the methods provided by SparkSession, the ones that we’re going to use most often allow us to create DataFrames. Now that we have set up PySpark, we can set up our dataset of interest and start using PySpark’s DataFrame API to interact with it. That’s what we will do in the next section.

Setting up our data

The UC Irvine Machine Learning Repository is a fantastic source for interesting (and free) data sets for research and education. The data set we’ll analyze was curated from a record linkage study performed at a German hospital in 2010, and it contains several million pairs of patient records that were matched according to several different criteria, such as the patient’s name (first and last), address, and birthday. Each matching field was assigned a numerical score from 0.0 to 1.0 based on how similar the strings were, and the data was then hand-labeled to identify which pairs represented the same person and which did not. The underlying values of the fields that were used to create the data set were removed to protect the privacy of the patients. Numerical identifiers, the match scores for the fields, and the label for each pair (match versus nonmatch) were published for use in record linkage research.

From the shell, let’s pull the data from the repository:

$ mkdir linkage
$ cd linkage/
$ curl -L -o donation.zip https://bit.ly/1Aoywaq
$ unzip donation.zip
$ unzip 'block_*.zip'

If you have a Hadoop cluster handy, you can create a directory for the block data in HDFS and copy the files from the data set there:

$ hadoop dfs -mkdir linkage
$ hadoop dfs -put block_*.csv linkage

To create a data frame for our record linkage data set, we’re going to use the SparkSession object. Specifically, we will use the csv method on its Reader API:

prev = spark.read.csv("linkage")
...
prev
...
DataFrame[_c0: string, _c1: string, _c2: string, _c3: string,...

By default, every column in a CSV file is treated as a string type, and the column names default to _c0, _c1, _c2, …. We can look at the head of a data frame in the shell by calling its show method:

prev.show()

We can see that the first row of the DataFrame is the name of the header columns, as we expected, and that the CSV file has been cleanly split up into its individual columns. We can also see the presence of the ? strings in some of the columns; we will need to handle these as missing values. In addition to naming each column correctly, it would be ideal if Spark could properly infer the data type of each of the columns for us.

Fortunately, Spark’s CSV reader provides all of this functionality for us via options that we can set on the reader API. You can see the full list of options that the API takes in the pyspark documentation. For now, we’ll read and parse the linkage data like this:

parsed = spark.read.option("header", "true").option("nullValue", "?").option("inferSchema", "true").csv("linkage")

When we call show on the parsed data, we see that the column names are set correctly and the ? strings have been replaced by null values. To see the inferred type for each column, we can print the schema of the parsed DataFrame like this:

parsed.printSchema()
...
root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: double (nullable = true)
 |-- cmp_fname_c2: double (nullable = true)
...

Each StructField instance contains the name of the column, the most specific data type that could handle the type of data contained in each record, and a boolean field that indicates whether or not the column may contain null values, which is true by default. In order to perform the schema inference, Spark must do two passes over the data set: one pass to figure out the type of each column, and a second pass to do the actual parsing.

If you know the schema that you want to use for a file ahead of time, you can create an instance of the pyspark.sql.types.StructType class and pass it to the Reader API via the schema function, which can have a significant performance benefit when the data set is very large, since Spark will not need to perform an extra pass over the data to figure out the data type of each column.

Here is an example of defining a schema using StructType and StructField:

from pyspark.sql.types import *
schema = StructType([StructField("id_1", IntegerType(), False),
  StructField("id_2", StringType(), False),
  StructField("cmp_fname_c1", DoubleType(), False)])

spark.read.schema(schema).csv("...")

Another way to define the schema is using DDL (data definition statements).

schema = "id_1 INT, id_2 INT, cmp_fname_c1 DOUBLE"

DataFrames have a number of methods that allow us to read data from the cluster into the PySpark REPL on our client machine. Perhaps the simplest of these is first, which returns the first element of the DataFrame into the client:

parsed.first
...
Row(id_1='3148', id_2='8326', cmp_fname_c1='1', cmp_fname_c2=None,...

The first method can be useful for sanity checking a data set, but we’re generally interested in bringing back larger samples of a DataFrame into the client for analysis. When we know that a DataFrame only contains a small number of records, we can use the toPandas, or collect method to return all the contents of a DataFrame to the client as an array. For extremely large DataFrames using these methods can be dangerous, and cause an out-of-memory (OOM) exception. Because we don’t know how big the linkage data set is just yet, we’ll hold off on doing this right now.

In the next several sections, we’ll use a mix of local development and testing and cluster computation to perform more munging and analysis of the record linkage data, but if you need to take a moment to drink in the new world of awesome that you have just entered, we certainly understand.

Analyzing Data with the DataFrame API

The DataFrame API comes with a powerful set of tools that will likely be familiar to data scientists who are used to Python, and SQL. In this section, we will begin to explore these tools and how to apply them to the record linkage data.

If we look at the schema of the parsed DataFrame and the first few rows of data, we see this.

parsed.printSchema()
...
root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: double (nullable = true)
 |-- cmp_fname_c2: double (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: double (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: integer (nullable = true)
 |-- cmp_bm: integer (nullable = true)
 |-- cmp_by: integer (nullable = true)
 |-- cmp_plz: integer (nullable = true)
 |-- is_match: boolean (nullable = true)

...

parsed.show(5)
...
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| 3148| 8326|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|14055|94934|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|33948|34740|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|  946|71870|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|64880|71676|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
  • The first two fields are integer IDs that represent the patients that were matched in the record.

  • The next nine values are (possibly missing) numeric values (either doubles or ints) that represent match scores on different fields of the patient records, such as their names, birthdays, and locations. The fields are stored as integers when the only possible values are match (1) or no-match (0), and doubles whenever partial matches are possible.

  • The last field is a boolean value (true or false) indicating whether or not the pair of patient records represented by the line was a match.

Our goal is to come up with a simple classifier that allows us to predict whether a record will be a match based on the values of the match scores for the patient records. Let’s start by getting an idea of the number of records we’re dealing with via the count method:

parsed.count()
...
5749132

This is a relatively small data set—certainly small enough to fit in memory on one of the nodes in a cluster or even on your local machine if you don’t have a cluster available. Thus far, every time we’ve processed the data, Spark has re-opened the file, reparsed the rows, and then performed the action requested, like showing the first few rows of the data or counting the number of records. When we ask another question, Spark will do these same operations, again and again, even if we have filtered the data down to a small number of records or are working with an aggregated version of the original data set.

This isn’t an optimal use of our compute resources. After the data has been parsed once, we’d like to save the data in its parsed form on the cluster so that we don’t have to reparse it every time we want to ask a new question of the data. Spark supports this use case by allowing us to signal that a given DataFrame should be cached in memory after it is generated by calling the cache method on the instance. Let’s do that now for the parsed DataFrame:

parsed.cache()

Once our data has been cached, the next thing we want to know is the relative fraction of records that were matches versus those that were nonmatches.

from pyspark.sql.functions import col

parsed.groupBy("is_match").count().orderBy(col("count").desc()).show()
...
+--------+-------+
|is_match|  count|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+

Instead of writing a function to extract the is_match column, we simply pass its name to the groupBy method on the DataFrame, call the count method to, well, count the number of records inside each grouping, sort the resulting data in descending order based on the count column, and then cleanly render the result of the computation in the REPL with show. Under the covers, the Spark engine determines the most efficient way to perform the aggregation and return the results. This represents the clean, fast, and expressive way to do data analysis that Spark provides.

Note that there are two ways we can reference the names of the columns in the DataFrame: either as literal strings, like in groupBy("is_match"), or as Column objects by using the "col()" function that we used on the count column. Either approach is valid in most cases, but we needed to use the col function to call the desc method on the resulting count column object.

You may have noticed that the functions on the DataFrame API are similar to the components of a SQL query. This isn’t a coincidence, and in fact we have the option of treating any DataFrame we create as if it were a database table and expressing our questions using familiar and powerful SQL syntax. First, we need to tell the Spark SQL execution engine the name it should associate with the parsed DataFrame, since the name of the variable itself (“parsed”) isn’t available to Spark:

parsed.createOrReplaceTempView("linkage")

Because the parsed DataFrame is only available during the length of this PySpark REPL session, it is a temporary table. Spark SQL may also be used to query persistent tables in HDFS if we configure Spark to connect to an Apache Hive metastore that tracks the schemas and locations of structured data sets.

Once our temporary table is registered with the Spark SQL engine, we can query it like this:

spark.sql("""
  SELECT is_match, COUNT(*) cnt
  FROM linkage
  GROUP BY is_match
  ORDER BY cnt DESC
""").show()
...
+--------+-------+
|is_match|    cnt|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+

You have the option of running Spark using either an ANSI 2003-compliant version of Spark SQL (the default) or in HiveQL mode by calling the enableHiveSupport method when you create a SparkSession instance via its Builder API.

Should you use Spark SQL or the DataFrame API to do your analysis in PySpark? There are pros and cons to each: SQL has the benefit of being broadly familiar and expressive for simple queries. It is also the best way to quickly read and filter data stored in commonly used columnar file formats like Parquet and ORC. It also lets you query data using JDBC/ODBC connectors from databases such as PostgresSQL or tools such as Tableau. The downside of SQL is that it can be difficult to express complex, multistage analyses in a dynamic, readable, and testable way—all areas where the DataFrame API shines. Throughout the rest of the book, we use both Spark SQL and the DataFrame API, and leave it as an exercise for the reader to examine the choices we made and translate our computations from one interface to the other.

We cab apply functions one-by-one to our DataFrame to obtain statistics such as count and mean. However, PySpark offers a better way to obtain summary statistics for DataFrames and that’s what we will cover in the next section.

Fast Summary Statistics for DataFrames

Although there are many kinds of analyses that may be expressed equally well in SQL or with the DataFrame API, there are certain common things that we want to be able to do with data frames that can be tedious to express in SQL. One such analysis that is especially helpful is computing the min, max, mean, and standard deviation of all the non-null values in the numerical columns of a data frame. In PySpark, this function has the same name that it does in Pandas, describe:

summary = parsed.describe()
...
summary.show()

The summary DataFrame has one column for each variable in the parsed DataFrame, along with another column (also named summary) that indicates which metric—count, mean, stddev, min, or max—is present in the rest of the columns in the row. We can use the select method to choose a subset of the columns in order to make the summary statistics easier to read and compare:

summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()
+-------+------------------+------------------+
|summary|      cmp_fname_c1|      cmp_fname_c2|
+-------+------------------+------------------+
|  count|           5748125|            103698|
|   mean|0.7129024704436274|0.9000176718903216|
| stddev|0.3887583596162788|0.2713176105782331|
|    min|               0.0|               0.0|
|    max|               1.0|               1.0|
+-------+------------------+------------------+

Note the difference in the value of the count variable between cmp_fname_c1 and cmp_fname_c2. While almost every record has a non-null value for cmp_fname_c1, less than 2% of the records have a non-null value for cmp_fname_c2. To create a useful classifier, we need to rely on variables that are almost always present in the data—unless their missingness indicates something meaningful about whether the record matches.

Once we have an overall feel for the distribution of the variables in our data, we want to understand how the values of those variables are correlated with the value of the is_match column. Therefore, our next step is to compute those same summary statistics for just the subsets of the parsed DataFrame that correspond to matches and nonmatches. We can filter DataFrames using either SQL-style where syntax or with Column objects using the DataFrame API and then use describe on the resulting DataFrames:

matches = parsed.where("is_match = true")
matchSummary = matches.describe()

misses = parsed.filter(col("is_match") == False)
missSummary = misses.describe()

The logic inside the string we pass to the where function can include statements that would be valid inside a WHERE clause in Spark SQL. For the filtering condition that uses the DataFrame API, we use the == operator on the "is_match" column object to check for equality with the boolean object, False. Note that the where function is an alias for the filter function; we could have reversed the where and filter calls in the above snippet and everything would have worked the same way.

We can now start to compare our matchSummary and missSummary DataFrames to see how the distribution of the variables changes depending on whether the record is a match or a miss. Although this is a relatively small data set, doing this comparison is still somewhat tedious—what we really want is to transpose the matchSummary and missSummary DataFrames so that the rows and columns are swapped, which would allow us to join the transposed DataFrames together by variable and analyze the summary statistics, a practice that most data scientists know as “pivoting” or “reshaping” a data set. In the next section, we’ll show you how to perform these transforms.

Pivoting and Reshaping DataFrames

We can transpose the DataFrames entirely using functions provided by PySpark. However, there is another way to perform this task. PySpark allows conversion between Spark and Pandas DataFrames. We will convert the DataFrames in question into Pandas DataFrames, reshape them and convert them back to Spark DataFrames. We can safely do this because of the small size of the summary, matchSummary and missSummary DataFrames since Pandas DataFrames reside in memory. In upcoming chapters, we will rely on Spark operations for such transformations on larger datasets.

Conversion to/from Pandas DataFrames is possible because of the Apache Arrow project, which allows efficient data transfer between JVM and Python processes. The PyArrow library was installed as a dependency of the Spark SQL module when we installed pyspark[sql] using pip.

Let’s convert summary into a Pandas DataFrame:

summary_p = summary.toPandas()

We can now use Pandas functions on the summary_p DataFrame.

summary_p.head()
...
summary_p.shape
...
(5,12)

We can now perform a transpose operation to swap rows and columns using familiar Pandas methods on the DataFrame.

summary_p = summary_p.set_index('summary').transpose().reset_index()
...
summary_p = summary_p.rename(columns={'index':'field'})
...
summary_p = summary_p.rename_axis(None, axis=1)
...
summary_p.shape
...
(11,6)

We have successfully transposed the summary_p Pandas DataFrame. Convert it into a Spark DataFrame using SparkSession’s createDataFrame method:

summaryT = spark.createDataFrame(summary_p)
...
summaryT.show()
...
+------------+-------+-------------------+-------------------+---+------+
|       field|  count|               mean|             stddev|min|   max|
+------------+-------+-------------------+-------------------+---+------+
|        id_1|5749132|  33324.48559643438| 23659.859374488064|  1| 99980|
|        id_2|5749132|  66587.43558331935| 23620.487613269695|  6|100000|
|cmp_fname_c1|5748125| 0.7129024704437266|0.38875835961628014|0.0|   1.0|
|cmp_fname_c2| 103698| 0.9000176718903189| 0.2713176105782334|0.0|   1.0|
|cmp_lname_c1|5749132| 0.3156278193080383| 0.3342336339615828|0.0|   1.0|
|cmp_lname_c2|   2464| 0.3184128315317443|0.36856706620066537|0.0|   1.0|
|     cmp_sex|5749132|  0.955001381078048|0.20730111116897781|  0|     1|
|      cmp_bd|5748337|0.22446526708507172|0.41722972238462636|  0|     1|
|      cmp_bm|5748337|0.48885529849763504| 0.4998758236779031|  0|     1|
|      cmp_by|5748337| 0.2227485966810923| 0.4160909629831756|  0|     1|
|     cmp_plz|5736289|0.00552866147434343|0.07414914925420046|  0|     1|
+------------+-------+-------------------+-------------------+---+------+

We are not done yet. Print the schema of the summaryT DataFrame.

summaryT.printSchema()
...
root
 |-- field: string (nullable = true)
 |-- count: string (nullable = true)
 |-- mean: string (nullable = true)
 |-- stddev: string (nullable = true)
 |-- min: string (nullable = true)
 |-- max: string (nullable = true)

In the summary schema, as obtained from the describe() method, every field is treated as a string. Since we want to analyze the summary statistics as numbers, we’ll need to convert the values from strings to doubles.

for c in summaryT.columns:
  if c == 'field':
    continue
  summaryT = summaryT.withColumn(c, summaryT[c].cast(DoubleType()))
...
summaryT.printSchema()
...
root
 |-- field: string (nullable = true)
 |-- count: double (nullable = true)
 |-- mean: double (nullable = true)
 |-- stddev: double (nullable = true)
 |-- min: double (nullable = true)
 |-- max: double (nullable = true)

Now that we have figured out how to transpose a summary DataFrame, let’s implement our logic into a function that we can reuse on the matchSummary and missSummary DataFrames.

from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType

def pivotSummary(desc: DataFrame) -> DataFrame:
  # convert to Pandas dataframe
  desc_p = desc.toPandas()
  # transpose
  desc_p = desc_p.set_index('summary').transpose().reset_index()
  desc_p = desc_p.rename(columns={'index':'field'})
  desc_p = desc_p.rename_axis(None, axis=1)
  # convert to Spark dataframe
  descT = spark.createDataFrame(desc_p)
  # convert metric columns to double from string
  for c in descT.columns:
    if c == 'field':
      continue
    else:
      descT = descT.withColumn(c, descT[c].cast(DoubleType()))
  return descT

Now in your Spark shell, use the pivotSummary function on the matchSummary and missSummary DataFrames:

matchSummaryT = pivotSummary(matchSummary)
missSummaryT = pivotSummary(missSummary)

Now that we have successfully transposed the summary DataFrames, we can join and compare them. That’s what we will do in the next section. Further, we will also select desirable features for building our model.

Joining DataFrames and Selecting Features

So far, we have only used Spark SQL and the DataFrame API to filter and aggregate the records from a data set, but we can also use these tools in order to perform joins (inner, left outer, right outer, or full outer) on DataFrames as well. Although the DataFrame API includes a join function, it’s often easier to express these joins using Spark SQL, especially when the tables we are joining have a large number of column names in common and we want to be able to clearly indicate which column we are referring to in our select expressions. Let’s create temporary views for the matchSummaryT and missSummaryT DataFrames, join them on the field column, and compute some simple summary statistics on the resulting rows:

matchSummaryT.createOrReplaceTempView("match_desc")
missSummaryT.createOrReplaceTempView("miss_desc")
spark.sql("""
  SELECT a.field, a.count + b.count total, a.mean - b.mean delta
  FROM match_desc a INNER JOIN miss_desc b ON a.field = b.field
  WHERE a.field NOT IN ("id_1", "id_2")
  ORDER BY delta DESC, total DESC
""").show()
...
+------------+---------+--------------------+
|       field|    total|               delta|
+------------+---------+--------------------+
|     cmp_plz|5736289.0|  0.9563812499852176|
|cmp_lname_c2|   2464.0|  0.8064147192926264|
|      cmp_by|5748337.0|  0.7762059675300512|
|      cmp_bd|5748337.0|   0.775442311783404|
|cmp_lname_c1|5749132.0|  0.6838772482590526|
|      cmp_bm|5748337.0|  0.5109496938298685|
|cmp_fname_c1|5748125.0|  0.2854529057460786|
|cmp_fname_c2| 103698.0| 0.09104268062280008|
|     cmp_sex|5749132.0|0.032408185250332844|
+------------+---------+--------------------+

A good feature has two properties: it tends to have significantly different values for matches and nonmatches (so the difference between the means will be large) and it occurs often enough in the data that we can rely on it to be regularly available for any pair of records. By this measure, cmp_fname_c2 isn’t very useful because it’s missing a lot of the time and the difference in the mean value for matches and nonmatches is relatively small—0.09, for a score that ranges from 0 to 1. The cmp_sex feature also isn’t particularly helpful because even though it’s available for any pair of records, the difference in means is just 0.03.

Features cmp_plz and cmp_by, on the other hand, are excellent. They almost always occur for any pair of records, and there is a very large difference in the mean values (more than 0.77 for both features.) Features cmp_bd, cmp_lname_c1, and cmp_bm also seem beneficial: they are generally available in the data set and the difference, in mean values for matches and nonmatches are substantial.

Features cmp_fname_c1 and cmp_lname_c2 are more of a mixed bag: cmp_fname_c1 doesn’t discriminate all that well (the difference in the means is only 0.28) even though it’s usually available for a pair of records, whereas cmp_lname_c2 has a large difference in the means but it’s almost always missing. It’s not quite obvious under what circumstances we should include these features in our model based on this data.

For now, we’re going to use a simple scoring model that ranks the similarity of pairs of records based on the sums of the values of the obviously good features: cmp_plz, cmp_by, cmp_bd, cmp_lname_c1, and cmp_bm. For the few records where the values of these features are missing, we’ll use 0 in place of the null value in our sum. We can get a rough feel for the performance of our simple model by creating a data frame of the computed scores and the value of the is_match column and evaluating how well the score discriminates between matches and nonmatches at various thresholds.

Scoring And Model Evaluation

For our scoring function, we are going to sum up the value of five fields (cmp_lname_c1, cmp_plz, cmp_by, cmp_bd, and cmp_bm). We will use expr from pyspark.sql.functions for doing this. The expr function parses an input expression string into the column that it represents. This string can even involve multiple columns.

Let’s create the required expression string:

good_features = ["cmp_lname_c1", "cmp_plz", "cmp_by", "cmp_bd", "cmp_bm"]
...
sum_expression = " + ".join(good_features)
...
sum_expression
...
'cmp_lname_c1 + cmp_plz + cmp_by + cmp_bd + cmp_bm'

We can now use the sum_expression string for calculating the score. When summing up the values, we will account for and replace null values with 0 using DataFrame’s fillna method.

scored = parsed.fillna(0, subset=good_features).withColumn('score', expr(sum_expression)).select('score', 'is_match')
...
scored.show()
...
+-----+--------+
|score|is_match|
+-----+--------+
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
...

The final step in creating our scoring function is to decide on what threshold the score must exceed in order for us to predict that the two records represent a match. If we set the threshold too high, then we will incorrectly mark a matching record as a miss (called the false-negative rate), whereas if we set the threshold too low, we will incorrectly label misses as matches (the false-positive rate.) For any nontrivial problem, we always have to trade some false positives for some false negatives, and the question of what the threshold value should be usually comes down to the relative cost of the two kinds of errors in the situation to which the model is being applied.

To help us choose a threshold, it’s helpful to create a contingency table (which is sometimes called a cross tabulation, or crosstab) that counts the number of records whose scores fall above/below the threshold value crossed with the number of records in each of those categories that were/were not matches. Since we don’t know what threshold value we’re going to use yet, let’s write a function that takes the scored DataFrame and the choice of threshold as parameters and computes the crosstabs using the DataFrame API:

def crossTabs(scored: DataFrame, t: DoubleType) -> DataFrame:
  return scored.selectExpr(f"score >= {t} as above", "is_match").groupBy("above").pivot("is_match", ("true", "false")).count()

Note that we are including the selectExpr method of the DataFrame API to dynamically determine the value of the field named above based on the value of the t argument using Python’s f-string formatting syntax, which allows us to substitute variables by name if we preface the string literal with the letter f (yet another handy bit of Scala implicit magic). Once the above field is defined, we create the crosstab with a standard combination of the groupBy, pivot, and count methods that we used before.

Applying a high threshold value of 4.0, meaning that the average of the five features is 0.8, we can filter out almost all of the nonmatches while keeping over 90% of the matches:

crossTabs(scored, 4.0).show()
...
+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20871|    637|
|false|   60|5727564|
+-----+-----+-------+

Applying the lower threshold of 2.0, we can ensure that we capture all of the known matching records, but at a substantial cost in terms of false positive (top-right cell):

crossTabs(scored, 2.0).show()
...
+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20931| 596414|
|false| null|5131787|
+-----+-----+-------+

Even though the number of false positives is higher than we want, this more generous filter still removes 90% of the nonmatching records from our consideration while including every positive match. Even though this is pretty good, it’s possible to do even better; see if you can find a way to use some of the other values from MatchData (both missing and not) to come up with a scoring function that successfully identifies every true match at the cost of less than 100 false positives.

Where to Go from Here

If this chapter was your first time carrying out data preparation and analysis with PySpark, we hope that you got a feel for what a powerful foundation these tools provide. If you have been using Python and Spark for a while, we hope that you will pass this chapter along to your friends and colleagues as a way of introducing them to that power as well.

Our goal for this chapter was to provide you with enough knowledge to be able to understand and complete the rest of the examples in this book. If you are the kind of person who learns best through practical examples, your next step is to continue on to the next set of chapters, where we will introduce you to MLlib, the machine learning library designed for Spark.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
54.87.17.177