CoGroupedRDD

CoGroupedRDD is an RDD that cogroups its parents. Both parent RDDs have to be pairRDDs for this to work, as a cogroup essentially generates a pairRDD consisting of the common key and list of values from both parent RDDs. Take a look at the following code snippet:

class CoGroupedRDD[K] extends RDD[(K, Array[Iterable[_]])] 

The following is an example of a CoGroupedRDD where we create a cogroup of two pairRDDs, one having pairs of State, Population and the other having pairs of State, Year:

scala> val pairRDD = statesPopulationRDD.map(record => (record.split(",")(0), record.split(",")(2)))
pairRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[60] at map at <console>:27

scala> val pairRDD2 = statesPopulationRDD.map(record => (record.split(",")(0), record.split(",")(1)))
pairRDD2: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[66] at map at <console>:27

scala> val cogroupRDD = pairRDD.cogroup(pairRDD2)
cogroupRDD: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[68] at cogroup at <console>:31

scala> cogroupRDD.take(10)
res82: Array[(String, (Iterable[String], Iterable[String]))] = Array((Montana,(CompactBuffer(990641, 997821, 1005196, 1014314, 1022867, 1032073, 1042520),CompactBuffer(2010, 2011, 2012, 2013, 2014, 2015, 2016))), (California,(CompactBuffer(37332685, 37676861, 38011074, 38335203, 38680810, 38993940, 39250017),CompactBuffer(2010, 2011, 2012, 2013, 2014, 2015, 2016))),

The following is a diagram of the cogroup of pairRDD and pairRDD2 by creating pairs of values for each key:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.9.12