In order to create some user-defined functions in Scala, I need to examine my data in the previous adult dataset. I plan to create a UDF that will enumerate the education column, so that I can convert the column into an integer value. This will be useful if I need to use the data for machine learning, and so create a LabelPoint structure. The vector used, which represents each record, will need to be numeric. I will first determine what kind of unique education values exist, then I will create a function to enumerate them, and finally use it in SQL.
I have created some Scala code to display a sorted list of the education values. The DISTINCT
keyword ensures that there is only one instance of each value. I have selected the data as a subtable, using an alias called edu_dist
for the data column to ensure that the ORDER BY
clause works:
val selectClause = "SELECT t1.edu_dist FROM " val tableClause = " ( SELECT DISTINCT education AS edu_dist FROM adult ) t1 " val orderClause = " ORDER BY t1.edu_dist " val resRDD = sqlContext.sql( selectClause + tableClause + orderClause ) resRDD.map(t => t(0)).collect().foreach(println)
The data looks like the following. I have removed some values to save space, but you get the idea:
10th 11th 12th 1st-4th ……….. Preschool Prof-school Some-college
I have defined a method in Scala to accept the string-based education value, and return an enumerated integer value that represents it. If no value is recognized, then a special value called 9999
is returned:
def enumEdu( education:String ) : Int = { var enumval = 9999 if ( education == "10th" ) { enumval = 0 } else if ( education == "11th" ) { enumval = 1 } else if ( education == "12th" ) { enumval = 2 } else if ( education == "1st-4th" ) { enumval = 3 } else if ( education == "5th-6th" ) { enumval = 4 } else if ( education == "7th-8th" ) { enumval = 5 } else if ( education == "9th" ) { enumval = 6 } else if ( education == "Assoc-acdm" ) { enumval = 7 } else if ( education == "Assoc-voc" ) { enumval = 8 } else if ( education == "Bachelors" ) { enumval = 9 } else if ( education == "Doctorate" ) { enumval = 10 } else if ( education == "HS-grad" ) { enumval = 11 } else if ( education == "Masters" ) { enumval = 12 } else if ( education == "Preschool" ) { enumval = 13 } else if ( education == "Prof-school" ) { enumval = 14 } else if ( education == "Some-college" ) { enumval = 15 } return enumval }
I can now register this function using the SQL context in Scala, so that it can be used in an SQL statement:
sqlContext.udf.register( "enumEdu", enumEdu _ )
The SQL, and the Scala code to enumerate the data then look like this. The newly registered function called enumEdu
is used in the SELECT
statement. It takes the education type as a parameter, and returns the integer enumeration. The column that this value forms is aliased to the name idx
:
val selectClause = "SELECT enumEdu(t1.edu_dist) as idx,t1.edu_dist FROM " val tableClause = " ( SELECT DISTINCT education AS edu_dist FROM adult ) t1 " val orderClause = " ORDER BY t1.edu_dist " val resRDD = sqlContext.sql( selectClause + tableClause + orderClause ) resRDD.map(t => t(0) + " " + t(1) ).collect().foreach(println)
The resulting data output, as a list of education values and their enumerations, looks like the following:
0 10th 1 11th 2 12th 3 1st-4th 4 5th-6th 5 7th-8th 6 9th 7 Assoc-acdm 8 Assoc-voc 9 Bachelors 10 Doctorate 11 HS-grad 12 Masters 13 Preschool 14 Prof-school 15 Some-college
Another example function called ageBracket
takes the adult integer age value, and returns an enumerated age bracket:
def ageBracket( age:Int ) : Int = { var bracket = 9999 if ( age >= 0 && age < 20 ) { bracket = 0 } else if ( age >= 20 && age < 40 ) { bracket = 1 } else if ( age >= 40 && age < 60 ) { bracket = 2 } else if ( age >= 60 && age < 80 ) { bracket = 3 } else if ( age >= 80 && age < 100 ) { bracket = 4 } else if ( age > 100 ) { bracket = 5 } return bracket }
Again, the function is registered using the SQL context so that it can be used in an SQL statement:
sqlContext.udf.register( "ageBracket", ageBracket _ )
Then, the Scala-based SQL uses it to select the age, age bracket, and education value from the adult dataset:
val selectClause = "SELECT age, ageBracket(age) as bracket,education FROM " val tableClause = " adult " val limitClause = " LIMIT 10 " val resRDD = sqlContext.sql( selectClause + tableClause + limitClause ) resRDD.map(t => t(0) + " " + t(1) + " " + t(2) ).collect().foreach(println)
The resulting data then looks like this, given that I have used the LIMIT
clause to limit the output to 10 rows:
39 1 Bachelors 50 2 Bachelors 38 1 HS-grad 53 2 11th 28 1 Bachelors 37 1 Masters 49 2 9th 52 2 HS-grad 31 1 Masters 42 2 Bachelors
It is also possible to define functions for use in SQL, inline, during the UDF registration using the SQL context. The following example defines a function called dblAge
, which just multiplies the adult's age by two. The registration looks like this. It takes integer parameters (age
), and returns twice its value:
sqlContext.udf.register( "dblAge", (a:Int) => 2*a )
And the SQL that uses it, now selects the age
, and the double of the age
value called dblAge(age)
:
val selectClause = "SELECT age,dblAge(age) FROM " val tableClause = " adult " val limitClause = " LIMIT 10 " val resRDD = sqlContext.sql( selectClause + tableClause + limitClause ) resRDD.map(t => t(0) + " " + t(1) ).collect().foreach(println)
The two columns of the output data, which now contain the age and its doubled value, now look like this:
39 78 50 100 38 76 53 106 28 56 37 74 49 98 52 104 31 62 42 84
So far, DataFrames, SQL, and user-defined functions have been examined, but what if, as in my case, you are using a Hadoop stack cluster, and have Apache Hive available? The adult table that I have defined so far is a temporary table, but if I access Hive using Apache Spark SQL, I can access the static database tables. The next section will examine the steps needed to do this.
3.138.69.163