Using a custom partitioner to reduce shuffle

In this section, we will use a custom partitioner to reduce shuffle. We will cover the following topics:

  • Implementing a custom partitioner
  • Using the partitioner with the partitionBy method on Spark
  • Validating that our data was partitioned properly

We will implement a custom partitioner with our custom logic, which will partition the data. It will inform Spark where each record should land and on which executor. We will be using the partitionBy method on Spark. In the end, we will validate that our data was partitioned properly. For the purposes of this test, we are assuming that we have two executors:

import com.tomekl007.UserTransaction
import org.apache.spark.sql.SparkSession
import org.apache.spark.{Partitioner, SparkContext}
import org.scalatest.FunSuite
import org.scalatest.Matchers._

class CustomPartitioner extends FunSuite {
val spark: SparkContext = SparkSession.builder().master("local[2]").getOrCreate().sparkContext

test("should use custom partitioner") {
//given
val numberOfExecutors = 2

Let's assume that we want to split our data evenly into 2 executors and that the instances of data with the same key will land on the same executor. So, our input data is a list of UserTransactions"a", "b", "a", "b", and "c". The values are not so important, but we need to keep them in mind to test the behavior later. The amount is 100, 101, 202, 1, and 55 for the given UserTransactions:

val data = spark
.parallelize(List(
UserTransaction("a", 100),
UserTransaction("b", 101),
UserTransaction("a", 202),
UserTransaction("b", 1),
UserTransaction("c", 55)

When we do a keyBy(_.userId) is passed to our partitioner and so when we issue partitionBy, we need to extend override methods:

).keyBy(_.userId)
.partitionBy(new Partitioner {
override def numPartitions: Int = numberOfExecutors

The getPartition method takes a key, which will be the userId. The key will be passed here and the type will be a string:

override def getPartition(key: Any): Int = {
key.hashCode % numberOfExecutors
}
})

The signature of these methods is Any, so we need to override it, and also override the number of partitions.

We then print our two partitions, and numPartitions returns the value of 2:

println(data.partitions.length)

getPartition is very simple as it takes the hashCode and numberOfExecutors modules. It ensures that the same key will land on the same executor.

We will then map every partition for the respective partition as we get an iterator. Here, we are taking amount for a test purpose:

//when
val res = data.mapPartitions[Long](iter =>
iter.map(_._2).map(_.amount)
).collect().toList

In the end, we assert 55, 100, 202, 101, and 1; the order is random, so there is no need to take care of the order:

//then
res should contain theSameElementsAs List(55, 100, 202, 101, 1)
}
}

If we still want to, we should use a sortBy method. Let's start this test and see whether our custom partitioner works as expected. Now, we can start. We have 2 partitions, so it works as expected, as shown in the following screenshot:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.12.136.119