Writing UDF on PySpark

Like Scala and Java, you can also work with User Defined Functions (aka. UDF) on PySpark. Let's see an example in the following. Suppose we want to see the grade distribution based on the score for some students who have taken courses at a university.

We can store them in two separate arrays as follows:

# Let's generate somerandom lists
students = ['Jason', 'John', 'Geroge', 'David']
courses = ['Math', 'Science', 'Geography', 'History', 'IT', 'Statistics']

Now let's declare an empty array for storing the data about courses and students so that later on both can be appended to this array as follows:

rawData = []
for (student, course) in itertools.product(students, courses):
rawData.append((student, course, random.randint(0, 200)))

Note that for the preceding code to work, please import the following at the beginning of the file:

import itertools
import random

Now let's create a DataFrame from these two objects toward converting corresponding grades against each one's score. For this, we need to define an explicit schema. Let's suppose that in your planned DataFrame, there would be three columns named Student, Course, and Score.

At first, let's import necessary modules:

from pyspark.sql.types
import StructType, StructField, IntegerType, StringType

Now the schema can be defined as follows:

schema = StructType([StructField("Student", StringType(), nullable=False),
StructField("Course", StringType(), nullable=False),
StructField("Score", IntegerType(), nullable=False)])

Now let's create an RDD from the Raw Data as follows:

courseRDD = spark.sparkContext.parallelize(rawData)

Now let's convert the RDD into the DataFrame as follows:

courseDF = spark.createDataFrame(courseRDD, schema) 
coursedDF.show()

The output is as follows:

Figure 12: Sample of the randomly generated score for students in subjects

Well, now we have three columns. However, we need to convert the score into grades. Say you have the following grading schema:

  • 90~100=> A
  • 80~89 => B
  • 60~79 => C
  • 0~59 => D

For this, we can create our own UDF such that this will convert the numeric score to grade. It can be done in several ways. Following is an example of doing so:

# Define udf
def scoreToCategory(grade):
if grade >= 90:
return 'A'
elif grade >= 80:
return 'B'
elif grade >= 60:
return 'C'
else:
return 'D'

Now we can have our own UDF as follows:

from pyspark.sql.functions
import udf
udfScoreToCategory = udf(scoreToCategory, StringType())

The second argument in the udf() method is the return type of the method (that is, scoreToCategory). Now you can call this UDF to convert the score into grade in a pretty straightforward way. Let's see an example of it:

courseDF.withColumn("Grade", udfScoreToCategory("Score")).show(100)

The preceding line will take score as input for all entries and convert the score to a grade. Additionally, a new DataFrame with a column named Grade will be added.

The output is as follows:

Figure 13: Assigned grades

Now we can use the UDF with the SQL statement as well. However, for that, we need to register this UDF as follows:

spark.udf.register("udfScoreToCategory", scoreToCategory, StringType()) 

The preceding line will register the UDF as a temporary function in the database by default. Now we need to create a team view to allow executing SQL queries:

courseDF.createOrReplaceTempView("score")

Now let's execute an SQL query on the view score as follows:

spark.sql("SELECT Student, Score, udfScoreToCategory(Score) as Grade FROM score").show() 

The output is as follows:

Figure 14: Query on the students score and corresponding grades

The complete source code for this example is as follows:

import os
import sys
import itertools
import random

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import udf

spark = SparkSession
.builder
.appName("PCAExample")
.getOrCreate()

# Generate Random RDD
students = ['Jason', 'John', 'Geroge', 'David']
courses = ['Math', 'Science', 'Geography', 'History', 'IT', 'Statistics']
rawData = []
for (student, course) in itertools.product(students, courses):
rawData.append((student, course, random.randint(0, 200)))

# Create Schema Object
schema = StructType([
StructField("Student", StringType(), nullable=False),
StructField("Course", StringType(), nullable=False),
StructField("Score", IntegerType(), nullable=False)
])

courseRDD = spark.sparkContext.parallelize(rawData)
courseDF = spark.createDataFrame(courseRDD, schema)
courseDF.show()

# Define udf
def scoreToCategory(grade):
if grade >= 90:
return 'A'
elif grade >= 80:
return 'B'
elif grade >= 60:
return 'C'
else:
return 'D'

udfScoreToCategory = udf(scoreToCategory, StringType())
courseDF.withColumn("Grade", udfScoreToCategory("Score")).show(100)

spark.udf.register("udfScoreToCategory", scoreToCategory, StringType())
courseDF.createOrReplaceTempView("score")
spark.sql("SELECT Student, Score, udfScoreToCategory(Score) as Grade FROM score").show()

spark.stop()

Now let's do some analytics tasks on PySpark. In the next section, we will show an example using the k-means algorithm for a clustering task using PySpark.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.252.87