In SQL, aggregation of data is very flexible. The same thing is true in Spark SQL too. Instead of running SQL statements on a single data source located in a single machine, here Spark SQL can do the same on distributed data sources. In the previous chapter, a MapReduce use case was discussed to do data aggregation and the same is being used here to demonstrate the aggregation capabilities of Spark SQL. In this section also, the use cases are approached in the SQL query way as well as in the DataFrame API way.
The use cases selected for elucidating the MapReduce kind of data processing here are given as follows:
At the Scala REPL prompt, try the following statements:
scala> // Define the case classes for using in conjunction with DataFrames
scala> case class Trans(accNo: String, tranAmount: Double)
defined class Trans
scala> // Functions to convert the sequence of strings to objects defined by the case classes
scala> def toTrans = (trans: Seq[String]) => Trans(trans(0), trans(1).trim.toDouble)
toTrans: Seq[String] => Trans
scala> // Creation of the list from where the RDD is going to be created
scala> val acTransList = Array("SB10001,1000", "SB10002,1200","SB10001,8000", "SB10002,400", "SB10003,300", "SB10001,10000","SB10004,500","SB10005,56", "SB10003,30","SB10002,7000","SB10001,-100", "SB10002,-10")
acTransList: Array[String] = Array(SB10001,1000, SB10002,1200, SB10001,8000, SB10002,400, SB10003,300, SB10001,10000, SB10004,500, SB10005,56, SB10003,30, SB10002,7000, SB10001,-100, SB10002,-10)
scala> // Create the DataFrame
scala> val acTransDF = sc.parallelize(acTransList).map(_.split(",")).map(toTrans(_)).toDF()
acTransDF: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> // Show the first few records of the DataFrame
scala> acTransDF.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10001| 8000.0|
|SB10002| 400.0|
|SB10003| 300.0|
|SB10001| 10000.0|
|SB10004| 500.0|
|SB10005| 56.0|
|SB10003| 30.0|
|SB10002| 7000.0|
|SB10001| -100.0|
|SB10002| -10.0|
+-------+----------+
scala> // Register temporary view in the DataFrame for using it in SQL
scala> acTransDF.createOrReplaceTempView("trans")
scala> // Use SQL to create another DataFrame containing the account summary records
scala> val acSummary = spark.sql("SELECT accNo, sum(tranAmount) as TransTotal FROM trans GROUP BY accNo")
acSummary: org.apache.spark.sql.DataFrame = [accNo: string, TransTotal: double]
scala> // Show the first few records of the DataFrame
scala> acSummary.show
+-------+----------+
| accNo|TransTotal|
+-------+----------+
|SB10005| 56.0|
|SB10004| 500.0|
|SB10003| 330.0|
|SB10002| 8590.0|
|SB10001| 18900.0|
+-------+----------+
scala> // Create the DataFrame using API for the account summary records
scala> val acSummaryViaDFAPI = acTransDF.groupBy("accNo").agg(sum("tranAmount") as "TransTotal")
acSummaryViaDFAPI: org.apache.spark.sql.DataFrame = [accNo: string, TransTotal: double]
scala> // Show the first few records of the DataFrame
scala> acSummaryViaDFAPI.show
+-------+----------+
| accNo|TransTotal|
+-------+----------+
|SB10005| 56.0|
|SB10004| 500.0|
|SB10003| 330.0|
|SB10002| 8590.0|
|SB10001| 18900.0|
+-------+----------+
In this code snippet, everything is very similar to the preceding section's code. The only difference is that, here, aggregations are used in the SQL queries as well as in the DataFrame API.
At the Python REPL prompt, try the following statements:
>>> from pyspark.sql import Row
>>> # Creation of the list from where the RDD is going to be created
>>> acTransList = ["SB10001,1000", "SB10002,1200", "SB10001,8000","SB10002,400", "SB10003,300", "SB10001,10000","SB10004,500","SB10005,56","SB10003,30","SB10002,7000", "SB10001,-100","SB10002,-10"]
>>> # Create the DataFrame
>>> acTransDF = sc.parallelize(acTransList).map(lambda trans: trans.split(",")).map(lambda p: Row(accNo=p[0], tranAmount=float(p[1]))).toDF()
>>> # Register temporary view in the DataFrame for using it in SQL
>>> acTransDF.createOrReplaceTempView("trans")
>>> # Use SQL to create another DataFrame containing the account summary records
>>> acSummary = spark.sql("SELECT accNo, sum(tranAmount) as transTotal FROM trans GROUP BY accNo")
>>> # Show the first few records of the DataFrame
>>> acSummary.show()
+-------+----------+
| accNo|transTotal|
+-------+----------+
|SB10005| 56.0|
|SB10004| 500.0|
|SB10003| 330.0|
|SB10002| 8590.0|
|SB10001| 18900.0|
+-------+----------+
>>> # Create the DataFrame using API for the account summary records
>>> acSummaryViaDFAPI = acTransDF.groupBy("accNo").agg({"tranAmount": "sum"}).selectExpr("accNo", "`sum(tranAmount)` as transTotal")
>>> # Show the first few records of the DataFrame
>>> acSummaryViaDFAPI.show()
+-------+----------+
| accNo|transTotal|
+-------+----------+
|SB10005| 56.0|
|SB10004| 500.0|
|SB10003| 330.0|
|SB10002| 8590.0|
|SB10001| 18900.0|
+-------+----------+
In the DataFrame API for Python, there are some minor syntax differences as compared to its Scala counterpart.
18.191.54.149