Reusing the same rdd for different actions

In this section, we will reuse the same rdd for different actions. First, we will minimize the execution time by reusing the rdd. We will then look at caching and a performance test for our code.

The following example is the test from the preceding section but a bit modified, as here we take start by currentTimeMillis() and the result. So, we are just measuring the result of all actions that are executed:

//then every call to action means that we are going up to the RDD chain
//if we are loading data from external file-system (I.E.: HDFS), every action means
//that we need to load it from FS.
val start = System.currentTimeMillis()
println(rdd.collect().toList)
println(rdd.count())
println(rdd.first())
rdd.foreach(println(_))
rdd.foreachPartition(t => t.foreach(println(_)))
println(rdd.max())
println(rdd.min())
println(rdd.takeOrdered(1).toList)
println(rdd.takeSample(false, 2).toList)
val result = System.currentTimeMillis() - start

println(s"time taken (no-cache): $result")

}

If someone doesn't know Spark very well, they will assume that all actions are cleverly executed. We know that every action count means that we are going up to the rdd in the chain, which means we are going to all transformations to load data. In the production system, loading data will be from an external PI system such as HDFS. This means that every action causes the call to the filesystem, which retrieves all data and then applies transformations, as shown in the following example:

//when apply transformation
val rdd = input
.filter(_.userId.contains("A"))
.keyBy(_.userId)
.map(_._2.amount)

This is a very expensive operation as every action is very costly. When we start this test, we can see that the time taken without caching will take 632 milliseconds, as shown in the following output:

List(1)
List(100, 1)
time taken (no-cache): 632
Process finished with exit code 0

Let's compare this with the caching use. Our test, at first glance, looks very similar, but this is not the same because you are issuing cache() and we are returning rdd. So, rdd will be already cached and every subsequent call to the rdd will go through the cache, as shown in the following example:

//when apply transformation
val rdd = input
.filter(_.userId.contains("A"))
.keyBy(_.userId)
.map(_._2.amount)
.cache()

The first action will execute DAG, save the data into our cache, and then the subsequent actions will just retrieve the specific things according to the method that was called from memory. There will be no HDFS lookup, so let's start this test, as per the following example, and see how long it takes:

//then every call to action means that we are going up to the RDD chain
//if we are loading data from external file-system (I.E.: HDFS), every action means
//that we need to load it from FS.
val start = System.currentTimeMillis()
println(rdd.collect().toList)
println(rdd.count())
println(rdd.first())
rdd.foreach(println(_))
rdd.foreachPartition(t => t.foreach(println(_)))
println(rdd.max())
println(rdd.min())
println(rdd.takeOrdered(1).toList)
println(rdd.takeSample(false, 2).toList)
val result = System.currentTimeMillis() - start

println(s"time taken(cache): $result")


}
}

The first output will be as follows:

List(1)
List(100, 102)
time taken (no-cache): 585
List(1001, 100, 102, 1)
4

The second output will be as follows:

1
List(1)
List(102, 1)
time taken(cache): 336
Process finished with exit code 0

Without cache, the value is 585 milliseconds and with cache, the value is 336. The difference is not much as we are just creating data in tests. However, in real production systems, this will be a big difference because we need to look up data from external filesystems.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.61.81