User-defined functions

In order to create some user-defined functions in Scala, I need to examine my data in the previous adult dataset. I plan to create a UDF that will enumerate the education column, so that I can convert the column into an integer value. This will be useful if I need to use the data for machine learning, and so create a LabelPoint structure. The vector used, which represents each record, will need to be numeric. I will first determine what kind of unique education values exist, then I will create a function to enumerate them, and finally use it in SQL.

I have created some Scala code to display a sorted list of the education values. The DISTINCT keyword ensures that there is only one instance of each value. I have selected the data as a subtable, using an alias called edu_dist for the data column to ensure that the ORDER BY clause works:

    val selectClause = "SELECT t1.edu_dist FROM "
    val tableClause  = " ( SELECT DISTINCT education AS edu_dist FROM adult ) t1 "
    val orderClause  = " ORDER BY t1.edu_dist "

    val resRDD = sqlContext.sql( selectClause + tableClause  + orderClause )

    resRDD.map(t => t(0)).collect().foreach(println)

The data looks like the following. I have removed some values to save space, but you get the idea:

 10th
 11th
 12th
 1st-4th
 ………..
 Preschool
 Prof-school
 Some-college

I have defined a method in Scala to accept the string-based education value, and return an enumerated integer value that represents it. If no value is recognized, then a special value called 9999 is returned:

  def enumEdu( education:String ) : Int =
  {
    var enumval = 9999

         if ( education == "10th" )         { enumval = 0 }
    else if ( education == "11th" )         { enumval = 1 }
    else if ( education == "12th" )         { enumval = 2 }
    else if ( education == "1st-4th" )      { enumval = 3 }
    else if ( education == "5th-6th" )      { enumval = 4 }
    else if ( education == "7th-8th" )      { enumval = 5 }
    else if ( education == "9th" )          { enumval = 6 }
    else if ( education == "Assoc-acdm" )   { enumval = 7 }
    else if ( education == "Assoc-voc" )    { enumval = 8 }
    else if ( education == "Bachelors" )    { enumval = 9 }
    else if ( education == "Doctorate" )    { enumval = 10 }
    else if ( education == "HS-grad" )      { enumval = 11 }
    else if ( education == "Masters" )      { enumval = 12 }
    else if ( education == "Preschool" )    { enumval = 13 }
    else if ( education == "Prof-school" )  { enumval = 14 }
    else if ( education == "Some-college" ) { enumval = 15 }

    return enumval
  }

I can now register this function using the SQL context in Scala, so that it can be used in an SQL statement:

    sqlContext.udf.register( "enumEdu", enumEdu _ )

The SQL, and the Scala code to enumerate the data then look like this. The newly registered function called enumEdu is used in the SELECT statement. It takes the education type as a parameter, and returns the integer enumeration. The column that this value forms is aliased to the name idx:

    val selectClause = "SELECT enumEdu(t1.edu_dist) as idx,t1.edu_dist FROM "
    val tableClause  = " ( SELECT DISTINCT education AS edu_dist FROM adult ) t1 "
    val orderClause  = " ORDER BY t1.edu_dist "

    val resRDD = sqlContext.sql( selectClause + tableClause  + orderClause )

    resRDD.map(t => t(0) + " " + t(1) ).collect().foreach(println)

The resulting data output, as a list of education values and their enumerations, looks like the following:

0  10th
1  11th
2  12th
3  1st-4th
4  5th-6th
5  7th-8th
6  9th
7  Assoc-acdm
8  Assoc-voc
9  Bachelors
10  Doctorate
11  HS-grad
12  Masters
13  Preschool
14  Prof-school
15  Some-college

Another example function called ageBracket takes the adult integer age value, and returns an enumerated age bracket:

  def ageBracket( age:Int ) : Int =
  {
    var bracket = 9999

         if ( age >= 0  && age < 20  ) { bracket = 0 }
    else if ( age >= 20 && age < 40  ) { bracket = 1 }
    else if ( age >= 40 && age < 60  ) { bracket = 2 }
    else if ( age >= 60 && age < 80  ) { bracket = 3 }
    else if ( age >= 80 && age < 100 ) { bracket = 4 }
    else if ( age > 100 )              { bracket = 5 }

    return bracket
  }

Again, the function is registered using the SQL context so that it can be used in an SQL statement:

    sqlContext.udf.register( "ageBracket", ageBracket _ )

Then, the Scala-based SQL uses it to select the age, age bracket, and education value from the adult dataset:

    val selectClause = "SELECT age, ageBracket(age) as bracket,education FROM "
    val tableClause  = " adult "
    val limitClause  = " LIMIT 10 "

    val resRDD = sqlContext.sql( selectClause + tableClause  +
                                 limitClause )

    resRDD.map(t => t(0) + " " + t(1) + " " + t(2) ).collect().foreach(println)

The resulting data then looks like this, given that I have used the LIMIT clause to limit the output to 10 rows:

39 1  Bachelors
50 2  Bachelors
38 1  HS-grad
53 2  11th
28 1  Bachelors
37 1  Masters
49 2  9th
52 2  HS-grad
31 1  Masters
42 2  Bachelors

It is also possible to define functions for use in SQL, inline, during the UDF registration using the SQL context. The following example defines a function called dblAge, which just multiplies the adult's age by two. The registration looks like this. It takes integer parameters (age), and returns twice its value:

    sqlContext.udf.register( "dblAge", (a:Int) => 2*a )

And the SQL that uses it, now selects the age, and the double of the age value called dblAge(age):

    val selectClause = "SELECT age,dblAge(age) FROM "
    val tableClause  = " adult "
    val limitClause  = " LIMIT 10 "

    val resRDD = sqlContext.sql( selectClause + tableClause  + limitClause )

    resRDD.map(t => t(0) + " " + t(1) ).collect().foreach(println)

The two columns of the output data, which now contain the age and its doubled value, now look like this:

39 78
50 100
38 76
53 106
28 56
37 74
49 98
52 104
31 62
42 84

So far, DataFrames, SQL, and user-defined functions have been examined, but what if, as in my case, you are using a Hadoop stack cluster, and have Apache Hive available? The adult table that I have defined so far is a temporary table, but if I access Hive using Apache Spark SQL, I can access the static database tables. The next section will examine the steps needed to do this.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.69.163