Using keyBy() operations to reduce shuffle

In this section, we will use keyBy() operations to reduce shuffle. We will cover the following topics:

  • Loading randomly partitioned data
  • Trying to pre-partition data in a meaningful way
  • Leveraging the keyBy() function

We will load randomly partitioned data, but this time using the RDD API. We will repartition the data in a meaningful way and extract the information that is going on underneath, similar to DataFrame and the Dataset API. We will learn how to leverage the keyBy() function to give our data some structure and to cause the pre-partitioning in the RDD API.

Here is the test we will be using in this section. We are creating two random input records. The first record has a random user ID, user_1, the second one has a random user ID, user_1, and the third one has a random user ID, user_2:

test("Should use keyBy to distribute traffic properly"){
//given
val rdd = spark.sparkContext.makeRDD(List(
InputRecord("1234-3456-1235-1234", "user_1"),
InputRecord("1123-3456-1235-1234", "user_1"),
InputRecord("1123-3456-1235-9999", "user_2")
))

We will extract what is happening underneath Spark using rdd.toDebugString:

println(rdd.toDebugString)

At this point, our data is spread randomly and the records for the user ID field could be on different executors because the Spark execution engine cannot guess whether user_1 is a meaningful key for us or whether 1234-3456-1235-1234 is. We know that 1234-3456-1235-1234 is not a meaningful key, and that it is a unique identifier. Using that field as a partition key will give us a random distribution and a lot of shuffling because there is no data locality when you use the unique field as a partition key.

There is no possibility for Spark to know that data for the same user ID will land on the same executor, and that's why we need to use the user ID field, either user_1, user_1, or user_2, when partitioning the data. To achieve that in the RDD API, we can use keyBy(_.userId) in our data, but this time it will change the RDD type:

val res = rdd.keyBy(_.userId)

If we check the RDD type, we'll see that this time, an RDD is not an input record, but it is an RDD of the string and input record. The string is a type of the field that we expected here, and it is userId. We will also extract information about the keyBy() function by  using toDebugString on the result:

println(res.toDebugString)

Once we use keyBy(), all the records for the same user ID will land on the same executor. As we have discussed, this can be dangerous because if we have a skew key, it means that we have a key that has very high cardinality, and we can run out of memory. Also, all operations on a result will be key-wise, so we'll be on the pre-partitioned data:

res.collect()

Let's start this test. The output will be as follows:

We can see that our first debug string is a very simple one, and we have only the collection on the RDD, but the second one is a bit different. We have a keyBy() method and we make an RDD underneath it. We have our child RDD and parent RDD from the first section, Testing operations that cause a shuffle in Apache Spark, from when we extended the RDD. This a parent-child chain that's issued by the keyBy() method.

In the next section, we'll use a custom partitioner to reduce shuffle even further.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.21.248.162