Aggregations

Aggregation techniques allow you to combine the elements in the RDD in arbitrary ways to perform some computation. In fact, aggregation is the most important part of big data analytics. Without aggregation, we would not have any way to generate reports and analysis like Top States by Population, which seems to be a logical question asked when given a dataset of all State populations for the past 200 years. Another simpler example is that of a need to just count the number of elements in the RDD, which asks the executors to count the number of elements in each partition and send to the Driver, which then adds the subsets to compute the total number of elements in the RDD.

In this section, our primary focus is on the aggregation functions used to collect and combine data by key. As seen earlier in this chapter, a PairRDD is an RDD of (key - value) pairs where key and value are arbitrary and can be customized as per the use case.

In our example of state populations, a PairRDD could be the pairs of <State, <Population, Year>> which means State is taken as the key and the tuple <Population, Year> is considered the value. This way of breaking down the key and value can generate aggregations such as Top Years by Population per State. On the contrary, in case our aggregations are done around Year say Top States by Population per Year, we can use a pairRDD of pairs of <Year, <State, Population>>.

The following is the sample code to generate a pairRDD from the StatePopulation dataset both with State as the key as well as the Year as the key:

scala> val statesPopulationRDD = sc.textFile("statesPopulation.csv")
statesPopulationRDD: org.apache.spark.rdd.RDD[String] = statesPopulation.csv MapPartitionsRDD[157] at textFile at <console>:26

scala> statesPopulationRDD.take(5)
res226: Array[String] = Array(State,Year,Population, Alabama,2010,4785492, Alaska,2010,714031, Arizona,2010,6408312, Arkansas,2010,2921995)

Next, we can generate a pairRDD using State as the key and a tuple of <Year, Population> as the value as shown in the following code snippet:

scala> val pairRDD = statesPopulationRDD.map(record => record.split(",")).map(t => (t(0), (t(1), t(2))))
pairRDD: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[160] at map at <console>:28

scala> pairRDD.take(5)
res228: Array[(String, (String, String))] = Array((State,(Year,Population)), (Alabama,(2010,4785492)), (Alaska,(2010,714031)), (Arizona,(2010,6408312)), (Arkansas,(2010,2921995)))

As mentioned earlier, we can also generate a PairRDD using Year as the key and a tuple of <State, Population> as the value as shown in the following code snippet:

scala> val pairRDD = statesPopulationRDD.map(record => record.split(",")).map(t => (t(1), (t(0), t(2))))
pairRDD: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[162] at map at <console>:28

scala> pairRDD.take(5)
res229: Array[(String, (String, String))] = Array((Year,(State,Population)), (2010,(Alabama,4785492)), (2010,(Alaska,714031)), (2010,(Arizona,6408312)), (2010,(Arkansas,2921995)))

We will now look into how we can use the common aggregation functions on the pairRDD of <State, <Year, Population>>:

  • groupByKey
  • reduceByKey
  • aggregateByKey
  • combineByKey
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.213.215