Hash Partitioner

Hash Partitioner partitions the keys based on the their hashCode. It determines the resulting partition using the following formula:

(hashCode of the key) % (total number of partitions )

where hashCode is a Java object hashCode

Let's partition an RDD of integers using HashPartitioner:

SparkConf conf = new SparkConf().setMaster("local").setAppName("Partitioning");
JavaSparkContext jsc = new JavaSparkContext(conf);

JavaPairRDD<Integer, String> pairRdd = jsc.parallelizePairs( Arrays.asList(new Tuple2<Integer, String>(1, "A"),new Tuple2<Integer, String>(2, "B"),new Tuple2<Integer, String>(3, "C"),new Tuple2<Integer, String>(4, "D"),new Tuple2<Integer, String>(5, "E"),new Tuple2<Integer, String>(6, "F"), new Tuple2<Integer, String>(7, "G"),new Tuple2<Integer, String>(8, "H")), 3);

We have initialized JavaSparkContext and created pairRDD with the key as an integer type, value as a string type, and with numPartitions=3.

If you print the number of partitions for RDD created previously, the result should be 3. Let's partition this RDD using HashPartitioner:

JavaPairRDD<Integer, String> hashPartitioned = pairRdd.partitionBy(new HashPartitioner(2));

org.apache.spark.HashPartitioner provides a parameterized constructor, which accepts one parameter; that is, the number of partitions as follows:

HashPartitoner(int partitions)

Since, we have provided 2 as the value of the partition count, the resulted RDD will have two partitions which can be verified as follows:

System.out.println(hashPartitioned.getNumPartitions());

According to the HashPartitioner formulae - (hashCode of the key) % (total number of partitions ), the tuples with keys (2, 4, 6, 8) should be in partition 0, and tuples with keys (1, 3, 5, 7) should be in partition 1.

We can check the distribution of the keys among partitions as follows:

JavaRDD<String> mapPartitionsWithIndex = hashPartitioned.mapPartitionsWithIndex((index, tupleIterator) -> {List<String> list=new ArrayList<>();
while(tupleIterator.hasNext()){ list.add("Partition number:"+index+",key:"+tupleIterator.next()._1());}
return list.iterator();}, true);
System.out.println(mapPartitionsWithIndex.collect());
We will discuss the mapPartitionsWithIndex transformation in the next section (Advanced transformations)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.210.205