After using the previous Scala example to create a data frame, from a CSV based-data input file on HDFS, I can now define a temporary table, based on the data frame, and run SQL against it. The following example shows the temporary table called adult
being defined, and a row count being created using COUNT(*)
:
adultDataFrame.registerTempTable("adult") val resRDD = sqlContext.sql("SELECT COUNT(*) FROM adult") resRDD.map(t => "Count - " + t(0)).collect().foreach(println)
This gives a row count of over 32,000 rows:
Count – 32561
It is also possible to limit the volume of the data selected from the table using the LIMIT
SQL option, which is shown in the following example. The first 10 rows have been selected from the data, this is useful if I just want to check data types and quality:
val resRDD = sqlContext.sql("SELECT * FROM adult LIMIT 10") resRDD.map(t => t(0) + " " + t(1) + " " + t(2) + " " + t(3) + " " + t(4) + " " + t(5) + " " + t(6) + " " + t(7) + " " + t(8) + " " + t(9) + " " + t(10) + " " + t(11) + " " + t(12) + " " + t(13) + " " + t(14) ) .collect().foreach(println)
A sample of the data looks like the following:
50 Private 283676 Some-college 10 Married-civ-spouse Craft-repair Husband White Male 0 0 40 United-States >50K
When the schema for this data was created in the Scala-based data frame example in the last section, all the columns were created as strings. However, if I want to filter the data in SQL using WHERE
clauses, it would be useful to have proper data types. For instance, if an age column stores integer values, it should be stored as an integer so that I can execute numeric comparisons against it. I have changed my Scala code to include all the possible types:
import org.apache.spark.sql.types._
I have also now defined my schema using different types, to better match the data, and I have defined the row data in terms of the actual data types, converting raw data string values into integer values, where necessary:
val schema = StructType( StructField("age", IntegerType, false) :: StructField("workclass", StringType, false) :: StructField("fnlwgt", IntegerType, false) :: StructField("education", StringType, false) :: StructField("educational-num", IntegerType, false) :: StructField("marital-status", StringType, false) :: StructField("occupation", StringType, false) :: StructField("relationship", StringType, false) :: StructField("race", StringType, false) :: StructField("gender", StringType, false) :: StructField("capital-gain", IntegerType, false) :: StructField("capital-loss", IntegerType, false) :: StructField("hours-per-week", IntegerType, false) :: StructField("native-country", StringType, false) :: StructField("income", StringType, false) :: Nil) val rowRDD = rawRdd.map(_.split(",")) .map(p => Row( p(0).trim.toInt,p(1),p(2).trim.toInt,p(3), p(4).trim.toInt,p(5),p(6),p(7),p(8), p(9),p(10).trim.toInt,p(11).trim.toInt, p(12).trim.toInt,p(13),p(14) ))
The SQL can now use numeric filters in the WHERE
clause correctly. If the age
column were a string, this would not work. You can now see that the data has been filtered to give age values below 60 years:
val resRDD = sqlContext.sql("SELECT COUNT(*) FROM adult WHERE age < 60") resRDD.map(t => "Count - " + t(0)).collect().foreach(println)
This gives a row count of around 30,000 rows:
Count – 29917
It is possible to use Boolean logic in the WHERE
-based filter clauses. The following example specifies an age range for the data. Note that I have used variables to describe the select
and filter
components of the SQL statement. This allows me to break down the statement into different parts as they become larger:
val selectClause = "SELECT COUNT(*) FROM adult " val filterClause = "WHERE age > 25 AND age < 60" val resRDD = sqlContext.sql( selectClause + filterClause ) resRDD.map(t => "Count - " + t(0)).collect().foreach(println)
Giving a data count of around 23,000 rows:
Count – 23506
I can create compound filter clauses using the Boolean terms, such as AND
, OR
, as well as parentheses:
val selectClause = "SELECT COUNT(*) FROM adult " val filterClause = "WHERE ( age > 15 AND age < 25 ) OR ( age > 30 AND age < 45 ) " val resRDD = sqlContext.sql( selectClause + filterClause ) resRDD.map(t => "Count - " + t(0)).collect().foreach(println)
This gives me a row count of 17,000 rows, and represents a count of two age ranges in the data:
Count – 17198
It is also possible to use subqueries in Apache Spark SQL. You can see in the following example that I have created a subquery called t1
by selecting three columns; age
, education
, and occupation
from the table adult
. I have then used the table called t1
to create a row count. I have also added a filter clause acting on the age column from the table t1
. Notice also that I have added group by
and order by
clauses, even though they are empty currently, to my SQL:
val selectClause = "SELECT COUNT(*) FROM " val tableClause = " ( SELECT age,education,occupation from adult) t1 " val filterClause = "WHERE ( t1.age > 25 ) " val groupClause = "" val orderClause = "" val resRDD = sqlContext.sql( selectClause + tableClause + filterClause + groupClause + orderClause ) resRDD.map(t => "Count - " + t(0)).collect().foreach(println)
In order to examine the table joins, I have created a version of the adult CSV data file called adult.train.data2
, which only differs from the original by the fact that it has an added first column called idx
, which is a unique index. The Hadoop file system's cat
command here shows a sample of the data. The output from the file has been limited using the Linux head
command:
[hadoop@hc2nn sql]$ hdfs dfs -cat /data/spark/sql/adult.train.data2 | head -2 1,39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K 2,50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K
The schema has now been redefined to have an integer-based first column called idx
for an index, as shown here:
val schema = StructType( StructField("idx", IntegerType, false) :: StructField("age", IntegerType, false) :: StructField("workclass", StringType, false) :: StructField("fnlwgt", IntegerType, false) :: StructField("education", StringType, false) :: StructField("educational-num", IntegerType, false) :: StructField("marital-status", StringType, false) :: StructField("occupation", StringType, false) :: StructField("relationship", StringType, false) :: StructField("race", StringType, false) :: StructField("gender", StringType, false) :: StructField("capital-gain", IntegerType, false) :: StructField("capital-loss", IntegerType, false) :: StructField("hours-per-week", IntegerType, false) :: StructField("native-country", StringType, false) :: StructField("income", StringType, false) :: Nil)
And the raw row RDD in the Scala example now processes the new initial column, and converts the string value into an integer:
val rowRDD = rawRdd.map(_.split(",")) .map(p => Row( p(0).trim.toInt, p(1).trim.toInt, p(2), p(3).trim.toInt, p(4), p(5).trim.toInt, p(6), p(7), p(8), p(9), p(10), p(11).trim.toInt, p(12).trim.toInt, p(13).trim.toInt, p(14), p(15) )) val adultDataFrame = sqlContext.createDataFrame(rowRDD, schema)
We have looked at subqueries. Now, I would like to consider table joins. The next example will use the index that was just created. It uses it to join two derived tables. The example is somewhat contrived, given that it joins two data sets from the same underlying table, but you get the idea. Two derived tables are created as subqueries, and are joined at a common index column.
The SQL for a table join now looks like this. Two derived tables have been created from the temporary table adult
called t1
and t2
as subqueries. The new row index column called idx
has been used to join the data in tables t1
and t2
. The major SELECT
statement outputs all seven columns from the compound data set. I have added a LIMIT
clause to minimize the data output:
val selectClause = "SELECT t1.idx,age,education,occupation,workclass,race,gender FROM " val tableClause1 = " ( SELECT idx,age,education,occupation FROM adult) t1 JOIN " val tableClause2 = " ( SELECT idx,workclass,race,gender FROM adult) t2 " val joinClause = " ON (t1.idx=t2.idx) " val limitClause = " LIMIT 10" val resRDD = sqlContext.sql( selectClause + tableClause1 + tableClause2 + joinClause + limitClause ) resRDD.map(t => t(0) + " " + t(1) + " " + t(2) + " " + t(3) + " " + t(4) + " " + t(5) + " " + t(6) ) .collect().foreach(println)
Note that in the major SELECT
statement, I have to define where the index column comes from, so I use t1.idx
. All the other columns are unique to the t1
and t2
datasets, so I don't need to use an alias to refer to them (that is, t1.age
). So, the data that is output now looks like the following:
33 45 Bachelors Exec-managerial Private White Male 233 25 Some-college Adm-clerical Private White Male 433 40 Bachelors Prof-specialty Self-emp-not-inc White Female 633 43 Some-college Craft-repair Private White Male 833 26 Some-college Handlers-cleaners Private White Male 1033 27 Some-college Sales Private White Male 1233 27 Bachelors Adm-clerical Private White Female 1433 32 Assoc-voc Sales Private White Male 1633 40 Assoc-acdm Adm-clerical State-gov White Male 1833 46 Some-college Prof-specialty Local-gov White Male
This gives some idea of the SQL-based functionality within Apache Spark, but what if I find that the method that I need is not available? Perhaps, I need a new function. This is where the user-defined functions (UDFs) are useful. I will cover them in the next section.
3.133.152.159