Changing the design of jobs with wide dependencies

In this section, we will change the job that was performing the join on non-partitioned data. We'll be changing the design of jobs with wide dependencies.

In this section, we will cover the following topics:

  • Repartitioning DataFrames using a common partition key
  • Understanding a join with pre-partitioned data
  • Understanding that we avoided shuffle

We will be using the repartition method on the DataFrame using a common partition key. We saw that when issuing a join, repartitioning happens underneath. But often, when using Spark, we want to execute multiple operations on the DataFrame. So, when we perform the join with other datasets, hashPartitioning will need to be executed once again. If we do the partition at the beginning when the data is loaded, we will avoid partitioning again.

Here, we have our example test case, with the data we used previously in the Testing operations that cause a shuffle in Apache Spark section. We have UserData with three records for user ID  user_1, user_2, and user_4  and the UserTransaction data with the user ID  that is, user_1, user_2, user_3:

test("example of operation that is causing shuffle") {
import spark.sqlContext.implicits._
val userData =
spark.sparkContext.makeRDD(List(
UserData("user_1", "1"),
UserData("user_2", "2"),
UserData("user_4", "200")
)).toDS()

Then, we need to repartition the data, which is the first very important thing to do. We are repartitioning our userData using the userId column:

val repartitionedUserData = userData.repartition(userData("userId"))

Then, we will repartition our data using the userId column, this time for transactionData:

 val repartitionedTransactionData = transactionData.repartition(transactionData("userId"))

Once we have our data repartitioned, we have the assurance that any data that has the same partition key  in this example, it's userId  will land on the same executor. Because of that, our repartitioned data will not have the shuffle, and the joins will be faster. In the end, we are able to join, but this time we are joining the pre-partitioned data:

//when
//data is already partitioned using join-column. Don't need to shuffle
val res: Dataset[(UserData, UserTransaction)]
= repartitionedUserData.joinWith(repartitionedTransactionData, userData("userId") === transactionData("userId"), "inner")

We can show our results using the following code:

 //then
res.show()
assert(res.count() == 2)
}
}

The output is shown in the following screenshot:

In the preceding screenshot, we have our physical plans for user ID and transactions. We perform a hash partitioning on the user ID column of the user ID data and also on the transaction data. After joining the data, we can see that the data is proper and that there is a physical plan for the join.

This time, the physical plan is a bit different.

We have a SortMergeJoin operation, and we are sorting our data that is already pre-partitioned in the previous step of our execution engine. In this way, our Spark engine will perform the sort-merge join, where we don't need to hash join. It will sort data properly and the join will be faster.

In the next section, we'll be using keyBy() operations to reduce shuffle even further.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.246.245