Custom functions on DataFrames

So far, we have only used built-in functions to operate on DataFrame columns. While these are often sufficient, we sometimes need greater flexibility. Spark lets us apply custom transformations to every row through user-defined functions (UDFs). Let's assume that we want to use the equation that we derived in Chapter 2, Manipulating Data with Breeze, for the probability of a person being male, given their height and weight. We calculated that the decision boundary was given by:

Custom functions on DataFrames

Any person with f > 0 is more likely to be male than female, given their height and weight and the training set used for Chapter 2, Manipulating Data with Breeze (which was based on students, so is unlikely to be representative of the population as a whole). To convert from a height in centimeters to the normalized height, rescaledHeight, we can use this formula:

Custom functions on DataFrames

Similarly, to convert a weight (in kilograms) to the normalized weight, rescaledWeight, we can use:

Custom functions on DataFrames

The average and standard deviation of the height and weight are calculated from the training set. Let's write a Scala function that returns whether a person is more likely to be male, given their height and weight:

scala> def likelyMale(height:Int, weight:Int):Boolean = {
  val rescaledHeight = (height - 171.0)/8.95
  val rescaledWeight = (weight - 65.7)/13.4
  -0.75 + 2.48*rescaledHeight + 2.23*rescaledWeight > 0
}

To use this function on Spark DataFrames, we need to register it as a user-defined function (UDF). This transforms our function, which accepts integer arguments, into one that accepts column arguments:

scala> val likelyMaleUdf = sqlContext.udf.register(
  "likelyMaleUdf", likelyMale _)
likelyMaleUdf: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function2>,BooleanType,List())

To register a UDF, we must have access to a sqlContext instance. The SQL context provides the entry point for DataFrame operations. The Spark shell creates a SQL context at startup, bound to the variable sqlContext, and destroys it when the shell session is closed.

The first argument passed to the register function is the name of the UDF (we will use the UDF name later when we write SQL statements on the DataFrame, but you can ignore it for now). We can then use the UDF just like the built-in transformations included in Spark:

scala> val likelyMaleColumn = likelyMaleUdf(
  readingsDF("heightCm"), readingsDF("weightKg"))
likelyMaleColumn: org.apache.spark.sql.Column = UDF(heightCm,weightKg)

scala> readingsDF.withColumn("likelyMale", likelyMaleColumn).show
+---------+--------+--------+---+--------+----------+
|patientId|heightCm|weightKg|age|isSmoker|likelyMale|
+---------+--------+--------+---+--------+----------+
|        1|     175|      72| 43|   false|      true|
|        2|     182|      78| 28|    true|      true|
|        3|     164|      61| 41|   false|     false|
|        4|     161|      62| 43|    true|     false|
+---------+--------+--------+---+--------+----------+

As you can see, Spark applies the function underlying the UDF to every row in the DataFrame. We are not limited to using UDFs to create new columns. We can also use them in filter expressions. For instance, to select rows likely to correspond to women:

scala> readingsDF.filter(
  ! likelyMaleUdf($"heightCm", $"weightKg")
).show
+---------+--------+--------+---+--------+
|patientId|heightCm|weightKg|age|isSmoker|
+---------+--------+--------+---+--------+
|        3|     164|      61| 41|   false|
|        4|     161|      62| 43|    true|
+---------+--------+--------+---+--------+

Using UDFs lets us define arbitrary Scala functions to transform rows, giving tremendous additional power for data manipulation.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.224.73.175