Testing operations that cause a shuffle in Apache Spark

In this section, we will test the operations that cause a shuffle in Apache Spark. We will cover the following topics:

  • Using join for two DataFrames
  • Using two DataFrames that are partitioned differently
  • Testing a join that causes a shuffle

A join is a specific operation that causes shuffle, and we will use it to join our two DataFrames. We will first check whether it causes shuffle and then we will check how to avoid it. To understand this, we will use two DataFrames that are partitioned differently and check the operation of joining two datasets or DataFrames that are not partitioned or partitioned randomly. It will cause shuffle because there is no way to join two datasets with the same partition key if they are on different physical machines.

Before we join the dataset, we need to send them to the same physical machine. We will be using the following test.

We need to create UserData, which is a case class that we have seen already. It has the user ID and data. We have user IDs, that is, user_1, user_2, and user_4:

test("example of operation that is causing shuffle") {
import spark.sqlContext.implicits._
val userData =
spark.sparkContext.makeRDD(List(
UserData("user_1", "1"),
UserData("user_2", "2"),
UserData("user_4", "200")
)).toDS()

We then create some transactional data similar to a user ID (user_1user_2, and user_3):

val transactionData =
spark.sparkContext.makeRDD(List(
UserTransaction("user_1", 100),
UserTransaction("user_2", 300),
UserTransaction("user_3", 1300)
)).toDS()

We use the joinWith transaction on UserData by using the userID column from UserData and transactionData. Since we have issued an inner join, the result has two elements because there is a join between the record and the transaction, that is, UserData, and UserTransaction. However, UserData has no transaction and Usertransaction has no user data:

//shuffle: userData can stay on the current executors, but data from
//transactionData needs to be send to those executors according to joinColumn
//causing shuffle
//when
val res: Dataset[(UserData, UserTransaction)]
= userData.joinWith(transactionData, userData("userId") === transactionData("userId"), "inner")

When we were joining the data, the data was not partitioned because this was some random data for Spark. It was unable to know that the user ID column is the partition key, as it cannot guess this. Since it is not pre-partitioned, to join the data from two datasets, will need to send data from the user ID to the executor. Hence, there will be a lot of data shuffling from the executor, which is because the data is not partitioned.

Let's explain the query, perform an assert, and show the results by starting the test:

//then
res.show()
assert(res.count() == 2)
}
}

We can see our result as follows:

+------------+-------------+
| _1 | _2|
+----------- +-------------+
+ [user_1,1] | [user_1,100]|
| [user_2,2] | [user_2,300]|
+------------+-------------+

We have [user_1,1] and [user_1,100], which is userID and userTransaction. It appears that the join worked properly, but let's look at that physical parameter. We had SortMergeJoin use userID for the first dataset and the second one, and then we used Sort and hashPartitioning.

In the previous section, Detecting a shuffle in a process, we used the partition method, which uses hashPartitioning underneath. Although we are using join, we still need to use hash partitioning because our data is not properly partitioned. So, we need to partition the first dataset as there will be a lot of shuffling, and then we need to do exactly the same thing for the second DataFrame. Again, the shuffling will be done twice, and once that data is partitioned on the joined field, the join could be local to the executor.

There will be an assertion of records after executing the physical plan, stating that the userID user data one is on the same executor as that of user transaction userID one. Without hashPartitioning, there is no guarantee and hence we need to do the partitioning.

In the next section, we'll learn how to change the design of jobs with wide dependencies, so we will see how to avoid unnecessary shuffling when performing a join on two datasets.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.62.122