Available partitioners on key/value data

We know that partitioning and partitioners are the key components of Apache Spark. They influence how our data is partitioned, which means they influence where the data actually resides on which executors. If we have a good partitioner, then we will have good data locality, which will reduce shuffle. We know that shuffle is not desirable for processing, so reducing shuffle is crucial, and, therefore, choosing a proper partitioner is also crucial for our systems.

In this section, we will cover the following topics:

  • Examining HashPartitioner
  • Examining RangePartitioner
  • Testing

We will first examine our HashPartitioner and RangePartitioner. We will then compare them and test the code using both the partitioners.

First we will create a UserTransaction array, as per the following example:

 val keysWithValuesList =
Array(
UserTransaction("A", 100),
UserTransaction("B", 4),
UserTransaction("A", 100001),
UserTransaction("B", 10),
UserTransaction("C", 10)
)

We will then use keyBy (as shown in the following example) because the partitioner will automatically work on the key for our data:

 val keyed = data.keyBy(_.userId)

We will then take a partitioner of key data, as shown in the following example:

 val partitioner = keyed.partitioner

The code shows partitioner.isEmpty, because we have not defined any partitioner and thus it is empty at this point, as can be seen in the following example:

 assert(partitioner.isEmpty)

We can specify a partitioner by using the partitionBy method, as shown in the following example:

val hashPartitioner = keyed.partitionBy(new HashPartitioner(100))

The method is expecting a partitioner abstract class implementation. We will have a couple of implementations, but first, let's focus on HashPartitioner.

HashPartitioner takes a number of partitions and has a number of partitions. numPartition returns our argument, but getPartition gets a bit more involved, as shown in the following example:

    def numPartitions: Int = partitions
def getPartition(key: Any): int = key match {
case null => 0
case_ => Utils.nonNegativeMode(key.hashCode, numPartitions)
}

It first checks if our key is null. If it is null, it will land in partition number 0. If we have data with null keys, they will all land in the same executors, and, as we know, this is not a good situation because the executors will have a lot of memory overhead and they can fail without memory exceptions.

If the key is not null, then it does a nonNegativeMod from hashCode and the number of partitions. It has to be the modulus of the number of partitions so that it can be assigned to the proper partition. Thus, the hashCode method is very important for our key.

If we are supplying a custom key and not a primitive type like an integer or string, which has a well-known hashCode, we need to supply and implement a proper hashCode as well. But the best practice is to use the case class from Scala as they have hashCode and equals implemented for you.

We have defined partitioner now, but partitioner is something that could be changed dynamically. We can change our partitioner to be rangePartitioner. rangePartitioner takes the partitions in an RDD.

rangePartitioner is more complex as it tries to divide our data into ranges, which is not as simple as HashPartitioner is in getting partition. The method is really complex as it is trying to spread our data evenly and has complex logic for spreading that into ranges.

Let's start our test to check if we were able to assign partitioner properly, as shown in the following output:

Our tests have passed. This means that, at the initial point, the partitioner was empty and then we had to shuffle RDD at partitionBy, and also a branchPartitioner. But it shows us only the number line where we created an instance of the partitioner interface.

In the next section, we'll try to improve it or try to tweak and play with the partitioner by implementing a custom partitioner.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.183.138