Chapter 11. Spark SQL and DataFrames

In the previous chapter, we learned how to build a simple distributed application using Spark. The data that we used took the form of a set of e-mails stored as text files.

We learned that Spark was built around the concept of resilient distributed datasets (RDDs). We explored several types of RDDs: simple RDDs of strings, key-value RDDs, and RDDs of doubles. In the case of key-value RDDs and RDDs of doubles, Spark added functionality beyond that of the simple RDDs through implicit conversions. There is one important type of RDD that we have not explored yet: DataFrames (previously called SchemaRDD). DataFrames allow the manipulation of objects significantly more complex than those we have explored to date.

A DataFrame is a distributed tabular data structure, and is therefore very useful for representing and manipulating structured data. In this chapter, we will first investigate DataFrames through the Spark shell, and then use the Ling-spam e-mail dataset, presented in the previous chapter, to see how DataFrames can be integrated in a machine learning pipeline.

DataFrames – a whirlwind introduction

Let's start by opening a Spark shell:

$ spark-shell

Let's imagine that we are interested in running analytics on a set of patients to estimate their overall health level. We have measured, for each patient, their height, weight, age, and whether they smoke.

We might represent the readings for each patient as a case class (you might wish to write some of this in a text editor and paste it into the Scala shell using :paste):

scala> case class PatientReadings(
  val patientId: Int,
  val heightCm: Int,
  val weightKg: Int,
  val age:Int,    
  val isSmoker:Boolean  
)
defined class PatientReadings

We would, typically, have many thousands of patients, possibly stored in a database or a CSV file. We will worry about how to interact with external sources later in this chapter. For now, let's just hard-code a few readings directly in the shell:

scala> val readings = List(
  PatientReadings(1, 175, 72, 43, false),
  PatientReadings(2, 182, 78, 28, true),
  PatientReadings(3, 164, 61, 41, false),
  PatientReadings(4, 161, 62, 43, true)
)
List[PatientReadings] = List(...

We can convert readings to an RDD by using sc.parallelize:

scala> val readingsRDD = sc.parallelize(readings)
readingsRDD: RDD[PatientReadings] = ParallelCollectionRDD[0] at parallelize at <console>:25

Note that the type parameter of our RDD is PatientReadings. Let's convert the RDD to a DataFrame using the .toDF method:

scala> val readingsDF = readingsRDD.toDF
readingsDF: sql.DataFrame = [patientId: int, heightCm: int, weightKg: int, age: int, isSmoker: boolean]

We have created a DataFrame where each row corresponds to the readings for a specific patient, and the columns correspond to the different features:

scala> readingsDF.show
+---------+--------+--------+---+--------+
|patientId|heightCm|weightKg|age|isSmoker|
+---------+--------+--------+---+--------+
|        1|     175|      72| 43|   false|
|        2|     182|      78| 28|    true|
|        3|     164|      61| 41|   false|
|        4|     161|      62| 43|    true|
+---------+--------+--------+---+--------+

The easiest way to create a DataFrame is to use the toDF method on an RDD. We can convert any RDD[T], where T is a case class or a tuple, to a DataFrame. Spark will map each attribute of the case class to a column of the appropriate type in the DataFrame. It uses reflection to discover the names and types of the attributes. There are several other ways of constructing DataFrames, both from RDDs and from external sources, which we will explore later in this chapter.

DataFrames support many operations for manipulating the rows and columns. For instance, let's add a column for the Body Mass Index (BMI). The BMI is a common way of aggregating height and weight to decide if someone is overweight or underweight. The formula for the BMI is:

DataFrames – a whirlwind introduction

Let's start by creating a column of the height in meters:

scala> val heightM = readingsDF("heightCm") / 100.0     
heightM: sql.Column = (heightCm / 100.0)

heightM has data type Column, representing a column of data in a DataFrame. Columns support many arithmetic and comparison operators that apply element-wise across the column (similarly to Breeze vectors encountered in Chapter 2, Manipulating Data with Breeze). Operations on columns are lazy: the heightM column is not actually computed when defined. Let's now define a BMI column:

scala> val bmi = readingsDF("weightKg") / (heightM*heightM)
bmi: sql.Column = (weightKg / ((heightCm / 100.0) * (heightCm / 100.0)))

It would be useful to add the bmi column to our readings DataFrame. Since DataFrames, like RDDs, are immutable, we must define a new DataFrame that is identical to readingsDF, but with an additional column for the BMI. We can do this using the withColumn method, which takes, as its arguments, the name of the new column and a Column instance:

scala> val readingsWithBmiDF = readingsDF.withColumn("BMI", bmi)
readingsWithBmiDF: sql.DataFrame = [heightCm: int, weightKg: int, age: int, isSmoker: boolean, BMI: double]

All the operations we have seen so far are transformations: they define a pipeline of operations that create new DataFrames. These transformations are executed when we call an action, such as show:

scala> readingsWithBmiDF.show
+---------+--------+--------+---+--------+------------------+
|patientId|heightCm|weightKg|age|isSmoker|               BMI|
+---------+--------+--------+---+--------+------------------+
|        1|     175|      72| 43|   false|23.510204081632654|
|        2|     182|      78| 28|    true| 23.54788069073783|
|        3|     164|      61| 41|   false|22.679952409280194|
|        4|     161|      62| 43|    true|  23.9188302920412|
+---------+--------+--------+---+--------+------------------+

Besides creating additional columns, DataFrames also support filtering rows that satisfy a certain predicate. For instance, we can select all smokers:

scala> readingsWithBmiDF.filter {
  readingsWithBmiDF("isSmoker") 
}.show
+---------+--------+--------+---+--------+-----------------+
|patientId|heightCm|weightKg|age|isSmoker|              BMI|
+---------+--------+--------+---+--------+-----------------+
|        2|     182|      78| 28|    true|23.54788069073783|
|        4|     161|      62| 43|    true| 23.9188302920412|
+---------+--------+--------+---+--------+-----------------+

Or, to select everyone who weighs more than 70 kgs:

scala> readingsWithBmiDF.filter { 
  readingsWithBmiDF("weightKg") > 70 
}.show
+---------+--------+--------+---+--------+------------------+
|patientId|heightCm|weightKg|age|isSmoker|               BMI|
+---------+--------+--------+---+--------+------------------+
|        1|     175|      72| 43|   false|23.510204081632654|
|        2|     182|      78| 28|    true| 23.54788069073783|
+---------+--------+--------+---+--------+------------------+

It can become cumbersome to keep repeating the DataFrame name in an expression. Spark defines the operator $ to refer to a column in the current DataFrame. Thus, the filter expression above could have been written more succinctly using:

scala> readingsWithBmiDF.filter { $"weightKg" > 70 }.show
+---------+--------+--------+---+--------+------------------+
|patientId|heightCm|weightKg|age|isSmoker|               BMI|
+---------+--------+--------+---+--------+------------------+
|        1|     175|      72| 43|   false|23.510204081632654|
|        2|     182|      78| 28|    true| 23.54788069073783|
+---------+--------+--------+---+--------+------------------+

The .filter method is overloaded. It accepts either a column of Boolean values, as above, or a string identifying a Boolean column in the current DataFrame. Thus, to filter our readingsWithBmiDF DataFrame to sub-select smokers, we could also have used the following:

scala> readingsWithBmiDF.filter("isSmoker").show
+---------+--------+--------+---+--------+-----------------+
|patientId|heightCm|weightKg|age|isSmoker|              BMI|
+---------+--------+--------+---+--------+-----------------+
|        2|     182|      78| 28|    true|23.54788069073783|
|        4|     161|      62| 43|    true| 23.9188302920412|
+---------+--------+--------+---+--------+-----------------+

When comparing for equality, you must compare columns with the special triple-equals operator:

scala> readingsWithBmiDF.filter { $"age" === 28 }.show
+---------+--------+--------+---+--------+-----------------+
|patientId|heightCm|weightKg|age|isSmoker|              BMI|
+---------+--------+--------+---+--------+-----------------+
|        2|     182|      78| 28|    true|23.54788069073783|
+---------+--------+--------+---+--------+-----------------+

Similarly, you must use !== to select rows that are not equal to a value:

scala> readingsWithBmiDF.filter { $"age" !== 28 }.show
+---------+--------+--------+---+--------+------------------+
|patientId|heightCm|weightKg|age|isSmoker|               BMI|
+---------+--------+--------+---+--------+------------------+
|        1|     175|      72| 43|   false|23.510204081632654|
|        3|     164|      61| 41|   false|22.679952409280194|
|        4|     161|      62| 43|    true|  23.9188302920412|
+---------+--------+--------+---+--------+------------------+
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.186.79