Avoiding transformations

In this section, we will look at the transformations that should be avoided. Here, we will focus on one particular transformation.

We will start by understanding the groupBy API. Then, we will investigate data partitioning when using groupBy, and then we will look at what a skew partition is and why should we avoid skew partitions.

Here, we are creating a list of transactions. UserTransaction is another model class that includes userId and amount. The following code block shows a typical transaction where we are creating a list of five transactions:

test("should trigger computations using actions") {
//given
val input = spark.makeRDD(
List(
UserTransaction(userId = "A", amount = 1001),
UserTransaction(userId = "A", amount = 100),
UserTransaction(userId = "A", amount = 102),
UserTransaction(userId = "A", amount = 1),
UserTransaction(userId = "B", amount = 13)))

We have created four transactions for userId = "A", and one for userId = "B".

Now, let's consider that we want to coalesce transactions for a specific userId to have the list of transactions. We have an input that we are grouping by userId, as shown in the following example:

//when apply transformation
val rdd = input
.groupBy(_.userId)
.map(x => (x._1,x._2.toList))
.collect()
.toList

For every x element, we will create a tuple. The first element of a tuple is an ID, while the second element is an iterator of every transaction for that specific ID. We will transform it into a list using toList. Then, we will collect everything and assign it to toList to have our result. Let's assert the result. rdd should contain the same element as B, that is, the key and one transaction, and A, which has four transactions, as shown in the following code:

//then
rdd should contain theSameElementsAs List(
("B", List(UserTransaction("B", 13))),
("A", List(
UserTransaction("A", 1001),
UserTransaction("A", 100),
UserTransaction("A", 102),
UserTransaction("A", 1))
)
)
}
}

Let's start this test and check if this behaves as expected. We get the following output:

"C:Program FilesJavajdk-12injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2018.3.5libidea_rt.jar=50822:C:Program FilesJetBrainsIntelliJ IDEA 2018.3.5in" -Dfile.encoding=UTF-8 -classpath C:UsersSnehaIdeaProjectsChapter07outproductionChapter07 com.company.Main

Process finished with exit code 0

At first glance, it has passed and it works as expected. But the question arises as to why we want to group it. We want to group it to save it to the filesystem or do some further operations, such as concatenating all the amounts.

We can see that our input is not a normal distribution, since almost all the transactions are for the userId = "A". Because of that, we have a key that is skewed. This means that one key has the majority of the data in it and that the other keys have a lower amount of data. When we use groupBy in Spark, it takes all the elements that have the same grouping, which in this example is userId, and sends those values to exactly the same executors.

For example, if our executors have 5 GB of memory and we have a really big dataset that has hundreds of gigabytes and one key has 90 percent of data, it means that everything will go to one executor and the rest of the executors will take a minority of the data. So, the data will not be normally distributed and, because of the non-uniform distribution, processing will not be as efficient as possible.

So, when we use the groupBy key, we must first answer the question of why we want to group it. Maybe we can filter it or aggregate it at the lower level before groupBy and then we will only group the results, or maybe we don't group at all. We will be investigating how to solve that problem with Spark API in the following sections.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.59.197.213