Range Partitioner partitions the keys based on a range. All keys falling in the same range will land in the same partition. It is recommended that the data type follows the natural ordering, for example, integers, long, and so on.
org.apache.spark.RangePartitioner provides a parameterized constructor which accepts five parameters as follows:
RangePartitioner (int partitions, RDD <? extends scala.Product2< K , V >> rdd, boolean ascending, scala.math.Ordering< K > evidence$1, scala.reflect.ClassTag< K > evidence$2)
where:
Parameter 1: Number of partitions
Parameter 2: RDD to be partitioned
Parameter 3: Order of the range ascending/descending
Parameter 4: Ordering method for the key
Parameter 5: Class tag for the key
Let's partition pairRDD created previously using RangePartitioner as follows:
RangePartitioner rangePartitioner = new RangePartitioner(4, org.apache.spark.api.java.JavaRDD.toRDD(pairRdd);, true, scala.math.Ordering.Int$.MODULE$ , scala.reflect.ClassTag$.MODULE$.apply(Integer.class));
JavaPairRDD<Integer, String> rangePartitioned = pairRdd.partitionBy(rangePartitioner);
Here, we have used Scala objects for ordering and class tag parameters. Also, a Java RDD needs to be converted to org.apache.spark.rdd.RDD to use it with RangePartitioner.
Since we have provided 4 as the value of the number of output partitions, the resulting RDD will contain four partitions, and as per the range partitioner, the logic keys distribution should be as follows:
(1,2) - Partition 0
(3,4) - Partition 1
(5,6) - Partition 2
(7,8) - Partition 3
The following code can be used to check the distribution of the keys:
JavaRDD<String> mapPartitionsWithIndex = rangePartitioned .mapPartitionsWithIndex((index, tupleIterator) -> {List<String> list=new ArrayList<>();
while(tupleIterator.hasNext()){ list.add("Partition number:"+index+",key:"+tupleIterator.next()._1());}
return list.iterator();}, true);
System.out.println(mapPartitionsWithIndex.collect());