Performing actions that trigger computations

Spark has a lot more actions that issue DAG, and we should be aware of all of them because they are very important. In this section, we'll understand what can be an action in Spark, do a walk-through of actions, and test those actions if they behave as expected.

The first action we covered is collect. We also covered two actions besides that—we covered both reduce and reduceByKey in the previous section. Both methods are actions because they return a single result.

First, we will create the input of our transactions and then apply some transformations just for testing purposes. We will take only the user that contains A, using keyBy_.userId, and then take only the amount of the required transaction, as shown in the following example:

test("should trigger computations using actions") {
//given
val input = spark.makeRDD(
List(
UserTransaction(userId = "A", amount = 1001),
UserTransaction(userId = "A", amount = 100),
UserTransaction(userId = "A", amount = 102),
UserTransaction(userId = "A", amount = 1),
UserTransaction(userId = "B", amount = 13)))

//when apply transformation
val rdd = input
.filter(_.userId.contains("A"))
.keyBy(_.userId)
.map(_._2.amount)

The first action that we are already aware of is rdd.collect().toList. The next one is count(), which needs to take all of the values and calculate how many values are inside the rdd. There is no way to execute count() without triggering the transformation. Also, there are different methods in Spark, such as countApprox, countApproxDistinct, countByValue, and countByValueApprox. The following example shows us the code for rdd.collect().toList:

//then
println(rdd.collect().toList)
println(rdd.count()) //and all count*

If we have a huge dataset and the approximate counter is enough, you can use countApprox as it will be a lot faster. We then use rdd.first(), but this option is a bit different because it only needs to take the first element. Sometimes, if you want to take the first element and execute everything inside our DAG, we need to be focus on that and check it in the following way:

println(rdd.first())

Also, on the rdd, we have foreach(), which is a for loop to which we can pass any function. A Scala function or a Java function is assumed to be Lambda, but to execute elements of our result rdd, DAG needs to be calculated because from here onwards, it is an action. Another variant of the foreach() method is foreachPartition(), which takes every partition and returns an iterator for the partition. Inside that, we have an iterator to carry our iterations again and then print our elements. We also have our max() and min() methods and, as expected, max() is taking the maximum value and min() is taking the minimum value. But these methods are taking the implicit ordering.

If we have an rdd of a simple primitive type, like Long, we don't need to pass it here. But if we do not use map(), we need to define the ordering for the UserTransaction for Spark to find out which element is max and which element is min. These two things need to execute the DAG and so they are classed as actions, as shown in the following example:

 rdd.foreach(println(_))
rdd.foreachPartition(t => t.foreach(println(_)))
println(rdd.max())
println(rdd.min())

We then have takeOrdered(), which is a more time-consuming operation than first() because first() takes a random element. takeOrdered() needs to execute DAG and sort everything. When everything is sorted, only then does it take the top element.

In our example, we are taking num = 1. But sometimes, for testing or monitoring purposes, we need to take only the sample of the data. To take a sample, we use the takeSample() method and pass a number of elements, as shown in the following code:

 println(rdd.takeOrdered(1).toList)
println(rdd.takeSample(false, 2).toList)
}
}

Now, let's start the test and see the output of implementing the previous actions, as shown in the following screenshot:

List(1001, 100, 102 ,1)
4
1001
1001
100
102
1

The first action returns all values. The second actions return 4 as a count. We will consider the first element, 1001, but this is a random value and it is not ordered. We will then print all the elements in the loop, as shown in the following output:

102
1
1001
1
List(1)
List(100, 1)

We then get max and min values like 1001 and 1, which are similar to first(). After that, we get an ordered list, List(1), and sample List(100, 1), which is random. So, in the sample, we get random values from input data and applied transformations.

In the next section, we will learn how to reuse the rdd for different actions.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.168.211