Using SQL

After using the previous Scala example to create a data frame, from a CSV based-data input file on HDFS, I can now define a temporary table, based on the data frame, and run SQL against it. The following example shows the temporary table called adult being defined, and a row count being created using COUNT(*):

    adultDataFrame.registerTempTable("adult")

    val resRDD = sqlContext.sql("SELECT COUNT(*) FROM adult")

    resRDD.map(t => "Count - " + t(0)).collect().foreach(println)

This gives a row count of over 32,000 rows:

Count – 32561

It is also possible to limit the volume of the data selected from the table using the LIMIT SQL option, which is shown in the following example. The first 10 rows have been selected from the data, this is useful if I just want to check data types and quality:

    val resRDD = sqlContext.sql("SELECT * FROM adult LIMIT 10")

    resRDD.map(t => t(0)  + " " + t(1)  + " " + t(2)  + " " + t(3)  + " " +
                    t(4)  + " " + t(5)  + " " + t(6)  + " " + t(7)  + " " +
                    t(8)  + " " + t(9)  + " " + t(10) + " " + t(11) + " " +
                    t(12) + " " + t(13) + " " + t(14)
              )
      .collect().foreach(println)

A sample of the data looks like the following:

50  Private  283676  Some-college  10  Married-civ-spouse  Craft-repair  Husband  White  Male  0  0  40  United-States  >50K

When the schema for this data was created in the Scala-based data frame example in the last section, all the columns were created as strings. However, if I want to filter the data in SQL using WHERE clauses, it would be useful to have proper data types. For instance, if an age column stores integer values, it should be stored as an integer so that I can execute numeric comparisons against it. I have changed my Scala code to include all the possible types:

import org.apache.spark.sql.types._

I have also now defined my schema using different types, to better match the data, and I have defined the row data in terms of the actual data types, converting raw data string values into integer values, where necessary:

    val schema =
      StructType(
        StructField("age",                IntegerType, false) ::
        StructField("workclass",          StringType,  false) ::
        StructField("fnlwgt",             IntegerType, false) ::
        StructField("education",          StringType,  false) ::
        StructField("educational-num",    IntegerType, false) ::
        StructField("marital-status",     StringType,  false) ::
        StructField("occupation",         StringType,  false) ::
        StructField("relationship",       StringType,  false) ::
        StructField("race",               StringType,  false) ::
        StructField("gender",             StringType,  false) ::
        StructField("capital-gain",       IntegerType, false) ::
        StructField("capital-loss",       IntegerType, false) ::
        StructField("hours-per-week",     IntegerType, false) ::
        StructField("native-country",     StringType,  false) ::
        StructField("income",             StringType,  false) ::
        Nil)

    val rowRDD = rawRdd.map(_.split(","))
      .map(p => Row( p(0).trim.toInt,p(1),p(2).trim.toInt,p(3),
                     p(4).trim.toInt,p(5),p(6),p(7),p(8),
                     p(9),p(10).trim.toInt,p(11).trim.toInt,
                     p(12).trim.toInt,p(13),p(14) ))

The SQL can now use numeric filters in the WHERE clause correctly. If the age column were a string, this would not work. You can now see that the data has been filtered to give age values below 60 years:

    val resRDD = sqlContext.sql("SELECT COUNT(*) FROM adult WHERE age < 60")
    resRDD.map(t => "Count - " + t(0)).collect().foreach(println)

This gives a row count of around 30,000 rows:

Count – 29917

It is possible to use Boolean logic in the WHERE-based filter clauses. The following example specifies an age range for the data. Note that I have used variables to describe the select and filter components of the SQL statement. This allows me to break down the statement into different parts as they become larger:

    val selectClause = "SELECT COUNT(*) FROM adult "
    val filterClause = "WHERE age > 25 AND age < 60"
    val resRDD = sqlContext.sql( selectClause + filterClause )
    resRDD.map(t => "Count - " + t(0)).collect().foreach(println)

Giving a data count of around 23,000 rows:

Count – 23506

I can create compound filter clauses using the Boolean terms, such as AND, OR, as well as parentheses:

    val selectClause = "SELECT COUNT(*) FROM adult "
    val filterClause =
   "WHERE ( age > 15 AND age < 25 ) OR ( age > 30 AND age < 45 ) "

    val resRDD = sqlContext.sql( selectClause + filterClause )
    resRDD.map(t => "Count - " + t(0)).collect().foreach(println)

This gives me a row count of 17,000 rows, and represents a count of two age ranges in the data:

Count – 17198

It is also possible to use subqueries in Apache Spark SQL. You can see in the following example that I have created a subquery called t1 by selecting three columns; age, education, and occupation from the table adult. I have then used the table called t1 to create a row count. I have also added a filter clause acting on the age column from the table t1. Notice also that I have added group by and order by clauses, even though they are empty currently, to my SQL:

    val selectClause = "SELECT COUNT(*) FROM "
    val tableClause = " ( SELECT age,education,occupation from adult) t1 "
    val filterClause = "WHERE ( t1.age > 25 ) "
    val groupClause = ""
    val orderClause = ""

    val resRDD = sqlContext.sql( selectClause + tableClause +
                                 filterClause +
                                 groupClause + orderClause
                               )

    resRDD.map(t => "Count - " + t(0)).collect().foreach(println)

In order to examine the table joins, I have created a version of the adult CSV data file called adult.train.data2, which only differs from the original by the fact that it has an added first column called idx, which is a unique index. The Hadoop file system's cat command here shows a sample of the data. The output from the file has been limited using the Linux head command:

[hadoop@hc2nn sql]$ hdfs dfs -cat /data/spark/sql/adult.train.data2 | head -2

1,39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K
2,50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K

The schema has now been redefined to have an integer-based first column called idx for an index, as shown here:

    val schema =
      StructType(
        StructField("idx",                IntegerType, false) ::
        StructField("age",                IntegerType, false) ::
        StructField("workclass",          StringType,  false) ::
        StructField("fnlwgt",             IntegerType, false) ::
        StructField("education",          StringType,  false) ::
        StructField("educational-num",    IntegerType, false) ::
        StructField("marital-status",     StringType,  false) ::
        StructField("occupation",         StringType,  false) ::
        StructField("relationship",       StringType,  false) ::
        StructField("race",               StringType,  false) ::
        StructField("gender",             StringType,  false) ::
        StructField("capital-gain",       IntegerType, false) ::
        StructField("capital-loss",       IntegerType, false) ::
        StructField("hours-per-week",     IntegerType, false) ::
        StructField("native-country",     StringType,  false) ::
        StructField("income",             StringType,  false) ::
        Nil)

And the raw row RDD in the Scala example now processes the new initial column, and converts the string value into an integer:

    val rowRDD = rawRdd.map(_.split(","))
      .map(p => Row( p(0).trim.toInt,
                     p(1).trim.toInt,
                     p(2),
                     p(3).trim.toInt,
                     p(4),
                     p(5).trim.toInt,
                     p(6),
                     p(7),
                     p(8),
                     p(9),
                     p(10),
                     p(11).trim.toInt,
                     p(12).trim.toInt,
                     p(13).trim.toInt,
                     p(14),
                     p(15)
                   ))

    val adultDataFrame = sqlContext.createDataFrame(rowRDD, schema)

We have looked at subqueries. Now, I would like to consider table joins. The next example will use the index that was just created. It uses it to join two derived tables. The example is somewhat contrived, given that it joins two data sets from the same underlying table, but you get the idea. Two derived tables are created as subqueries, and are joined at a common index column.

The SQL for a table join now looks like this. Two derived tables have been created from the temporary table adult called t1 and t2 as subqueries. The new row index column called idx has been used to join the data in tables t1 and t2. The major SELECT statement outputs all seven columns from the compound data set. I have added a LIMIT clause to minimize the data output:

 val selectClause = "SELECT t1.idx,age,education,occupation,workclass,race,gender FROM "
 val tableClause1 = " ( SELECT idx,age,education,occupation FROM adult) t1 JOIN "
 val tableClause2 = " ( SELECT idx,workclass,race,gender FROM adult) t2 "
 val joinClause = " ON (t1.idx=t2.idx) "
 val limitClause = " LIMIT 10"

 val resRDD = sqlContext.sql( selectClause +
                              tableClause1 + tableClause2 +
                              joinClause   + limitClause
                            )

    resRDD.map(t => t(0) + " " + t(1) + " " + t(2) + " " +
                    t(3) + " " + t(4) + " " + t(5) + " " + t(6)
              )
              .collect().foreach(println)

Note that in the major SELECT statement, I have to define where the index column comes from, so I use t1.idx. All the other columns are unique to the t1 and t2 datasets, so I don't need to use an alias to refer to them (that is, t1.age). So, the data that is output now looks like the following:

33 45  Bachelors  Exec-managerial  Private  White  Male
233 25  Some-college  Adm-clerical  Private  White  Male
433 40  Bachelors  Prof-specialty  Self-emp-not-inc  White  Female
633 43  Some-college  Craft-repair  Private  White  Male
833 26  Some-college  Handlers-cleaners  Private  White  Male
1033 27  Some-college  Sales  Private  White  Male
1233 27  Bachelors  Adm-clerical  Private  White  Female
1433 32  Assoc-voc  Sales  Private  White  Male
1633 40  Assoc-acdm  Adm-clerical  State-gov  White  Male
1833 46  Some-college  Prof-specialty  Local-gov  White  Male

This gives some idea of the SQL-based functionality within Apache Spark, but what if I find that the method that I need is not available? Perhaps, I need a new function. This is where the user-defined functions (UDFs) are useful. I will cover them in the next section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.227.46.69