Using the reduce and reduceByKey methods to calculate the results

In this section, we will use the reduce and reduceBykey functions to calculate our results and understand the behavior of reduce. We will then compare the reduce and reduceBykey functions to check which of the functions should be used in a particular use case.

We will first focus on the reduce API. First, we need to create an input of UserTransaction. We have the user transaction A with amount 10, B with amount 1, and A with amount 101. Let's say that we want to find out the global maximum. We are not interested in the data for the specific key, but in the global data. We want to scan it, take the maximum, and return it, as shown in the following example:

test("should use reduce API") {
//given
val input = spark.makeRDD(List(
UserTransaction("A", 10),
UserTransaction("B", 1),
UserTransaction("A", 101)
))

So, this is the reduced use case. Now, let's see how we can implement it, as per the following example:

//when
val result = input
.map(_.amount)
.reduce((a, b) => if (a > b) a else b)

//then
assert(result == 101)
}

For the input, we need to first map the field that we're interested in. In this case, we are interested in amount. We will take amount and then take the maximum value.

In the preceding code example, reduce has two parameters, a and b. One parameter will be the current maximum in the specific Lambda that we are passing, and the second one will be our actual value, which we are investigating now. If the value was higher than the maximum state until now, we will return a; if not, it will return b. We will go through all the elements and, at the end, the result will be just one long number.

So, let's test this and check whether the result is indeed 101, as shown in the following code output. This means our test passed:

"C:Program FilesJavajdk-12injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2018.3.5libidea_rt.jar=50894:C:Program FilesJetBrainsIntelliJ IDEA 2018.3.5in" -Dfile.encoding=UTF-8 -classpath C:UsersSnehaIdeaProjectsChapter07outproductionChapter07 com.company.Main

Process finished with exit code 0

Now, let's consider a different situation. We want to find the maximum transaction amount, but this time we want to do it according to users. We not only want to find out the maximum transaction for user A but also for user B, but we want those things to be independent. So, for every key that is the same, we want to take only the maximum value from our data, as shown in the following example:

test("should use reduceByKey API") {
//given
val input = spark.makeRDD(
List(
UserTransaction("A", 10),
UserTransaction("B", 1),
UserTransaction("A", 101)
)
)

To achieve this, reduce is not a good choice because it will go through all of the values and give us the global maximum. We have key operations in Spark but, first, we want to do it for a specific group of elements. We need to use keyBy to tell Spark which ID should be taken as the unique one and it will execute the reduce function only within the specific key. So, we use keyBy(_.userId) and then we get the reducedByKey function. The reduceByKey function is similar to reduce but it works key-wise so, inside the Lambda, we'll only get values for a specific key, as shown in the following example:

    //when
val result = input
.keyBy(_.userId)
.reduceByKey((firstTransaction, secondTransaction) =>
TransactionChecker.higherTransactionAmount(firstTransaction, secondTransaction))
.collect()
.toList

By doing this, we get the first transaction and then the second one. The first one will be a current maximum and the second one will be the transaction that we are investigating right now. We will create a helper function that is taking those transactions and call it higherTransactionAmount.

The higherTransactionAmount function is used in taking the firstTransaction and secondTransaction. Please note that for the UserTransaction type, we need to pass that type. It also needs to return UserTransaction that we cannot return a different type.

If you are using the reduceByKey method from Spark, we need to return the same type as that of the input arguments. If firstTransaction.amount is higher than secondTransaction.amount, we will just return the firstTransaction since we are returning the secondTransaction, so transaction objects not the total amount. This is shown in the following example:

object TransactionChecker {
def higherTransactionAmount(firstTransaction: UserTransaction, secondTransaction: UserTransaction): UserTransaction = {
if (firstTransaction.amount > secondTransaction.amount) firstTransaction else secondTransaction
}
}

Now, we will collect, add, and test the transaction. After our test, we have the output where, for the key B, we should get transaction ("B", 1) and, for the key A, transaction ("A", 101). There will be no transaction ("A", 10) because we filtered it out, but we can see that for every key, we are able to find out the maximums. This is shown in the following example:

    //then
result should contain theSameElementsAs
List(("B", UserTransaction("B", 1)), ("A", UserTransaction("A", 101)))
}

}

We can see that the test passed and everything is as expected, as shown in the following output:

"C:Program FilesJavajdk-12injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2018.3.5libidea_rt.jar=50909:C:Program FilesJetBrainsIntelliJ IDEA 2018.3.5in" -Dfile.encoding=UTF-8 -classpath C:UsersSnehaIdeaProjectsChapter07outproductionChapter07 com.company.Main

Process finished with exit code 0

In the next section, we will perform actions that trigger the computations of our data.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.116.62.168