RangePartitioner

RangePartitioner works by partitioning the RDD into roughly equal ranges. Since the range has to know the starting and ending keys for any partition, the RDD needs to be sorted first before a RangePartitioner can be used.

RangePartitioning first needs reasonable boundaries for the partitions based on the RDD and then create a function from key K to the partitionIndex where the element belongs. Finally, we need to repartition the RDD, based on the RangePartitioner to distribute the RDD elements correctly as per the ranges we determined.

The following is an example of how we can use RangePartitioning of a PairRDD. We also can see how the partitions changed after we repartition the RDD using a RangePartitioner:

import org.apache.spark.RangePartitioner
scala> val statesPopulationRDD = sc.textFile("statesPopulation.csv")
statesPopulationRDD: org.apache.spark.rdd.RDD[String] = statesPopulation.csv MapPartitionsRDD[135] at textFile at <console>:26

scala> val pairRDD = statesPopulationRDD.map(record => (record.split(",")(0), 1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[136] at map at <console>:28

scala> val rangePartitioner = new RangePartitioner(5, pairRDD)
rangePartitioner: org.apache.spark.RangePartitioner[String,Int] = org.apache.spark.RangePartitioner@c0839f25

scala> val rangePartitionedRDD = pairRDD.partitionBy(rangePartitioner)
rangePartitionedRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[130] at partitionBy at <console>:32

scala> pairRDD.mapPartitionsWithIndex((i,x) => Iterator(""+i + ":"+x.length)).take(10)
res215: Array[String] = Array(0:177, 1:174)

scala> rangePartitionedRDD.mapPartitionsWithIndex((i,x) => Iterator(""+i + ":"+x.length)).take(10)
res216: Array[String] = Array(0:70, 1:77, 2:70, 3:63, 4:71)

The following diagram is an illustration of the RangePartitioner as seen in the preceding example:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.66.149