Using aggregateByKey instead of groupBy()

In this section, we will explore the reason why we use aggregateByKey instead of groupBy.

We will cover the following topics:

  • Why we should avoid the use of groupByKey
  • What aggregateByKey gives us
  • Implementing logic using aggregateByKey

First, we will create our array of user transactions, as shown in the following example:

 val keysWithValuesList =
Array(
UserTransaction("A", 100),
UserTransaction("B", 4),
UserTransaction("A", 100001),
UserTransaction("B", 10),
UserTransaction("C", 10)
)

We will then use parallelize to create an RDD, as we want our data to be key-wise. This is shown in the following example:

 val data = spark.parallelize(keysWithValuesList)
val keyed = data.keyBy(_.userId)

In the preceding code, we invoked keyBy for userId to have the data of payers, key, and user transaction.

Let's consider that we want to aggregate, where we want to execute some specific logic for the same key, as shown in the following example:

 val aggregatedTransactionsForUserId = keyed
.aggregateByKey(amountForUser)(addAmount, mergeAmounts)

The reasoning for this can be for choosing a maximum element, minimum element, or to calculate average. aggregateByKey needs to take three parameters, as shown in the following example:

aggregateByKey(amountForUser)(addAmount, mergeAmounts)

The first parameter is an initial parameter of type T, and defining amountForUser is an initial parameter that has a type of ArrayBuffer. This is very important because the Scala compiler will infer that type, and argument numbers 1 and 2 need to have exactly the same type T in this example: ArrayBuffer.empty[long].

The next argument is a method that takes the current element that we are processing. In this example, transaction: UserTransaction) => is a current transaction and also needs to take the state that we were initializing our function with, and, hence, it will be an array buffer here.

It needs to be of the same type that's as shown in the following code block, so this is our type T:

mutable.ArrayBuffer.empty[Long]

At this point, we are able to take any transaction and add it to the specific state. This is done in a distributed way. For one key, execution is done on one executor and, for exactly the same key, on different executors. This happens in parallel, so multiple trades will be added for the same key.

Now, Spark knows that, for exactly the same key, it has multiple states of type T ArrayBuffer that it needs to merge. So, we need to mergeAmounts for our transactions for the same key.

The mergeArgument is a method that takes two states, both of which are intermediate states of type T, as shown in the following code block:

 val mergeAmounts = (p1: mutable.ArrayBuffer[Long], p2: mutable.ArrayBuffer[Long]) => p1 ++= p2

In this example, we want to merge the release buffers into one array buffer. Therefore, we issue p1 ++= p2. This will merge two array buffers into one.

Now, we have all arguments ready and we are able to execute aggregateByKey and see what the results look like. The result is an RDD of string and type T, the ArrayBuffer[long], which is our state. We will not be keeping UserTransaction in our RDD anymore, which helps in reducing the amount of memory. UserTransaction is a heavy object because it can have multiple fields and, in this example, we are only interested in the amount field. So, this way, we can reduce the memory that is used.

The following example shows what our result should look like:

 aggregatedTransactionsForUserId.collect().toList should contain theSameElementsAs List(
("A", ArrayBuffer(100, 100001)),
("B", ArrayBuffer(4,10)),
("C", ArrayBuffer(10)))

We should have a key, A, and an ArrayBuffer of 100 and 10001, since it is our input data. B should be 4 and 10, and lastly, C should be 10.

Let's start the test to check if we have implemented aggregateByKey properly, as shown in the following example:

From the preceding output, we can see that it worked as expected.

In the next section, we'll be looking at the actions that are available on key/value pairs.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.14.132