Using Spark transformations to defer computations to a later time

 Let's first understand Spark DAG creation. We will be executing DAG by issuing the action and also deferring the decision about starting the job until the last possible moment to check what this possibility gives us.

Let's have a look at the code we will be using in this section.

First, we need to initialize Spark. Every test we carry out will be the same. We need to initialize it before we start using it, as shown in the following example:

class DeferComputations extends FunSuite {
val spark: SparkContext = SparkSession.builder().master("local[2]").getOrCreate().sparkContext

Then, we will have the actual test. Here, test is called should defer computation. It is simple but shows a very powerful abstraction of Spark. We start by creating an rdd of InputRecord, as shown in the following example:

test("should defer computations") {
//given
val input = spark.makeRDD(
List(InputRecord(userId = "A"),
InputRecord(userId = "B")))

InputRecord is a case class that has a unique identifier, which is an optional argument.

It can be a random uuid if we are not supplying it and the required argument, that is, userId. The InputRecord will be used throughout this book for testing purposes. We have created two records of the InputRecord that we will apply a transformation on, as shown in the following example:

//when apply transformation
val rdd = input
.filter(_.userId.contains("A"))
.keyBy(_.userId)
.map(_._2.userId.toLowerCase)
//.... built processing graph lazy

We will only filter records that have A in the userId field. We will then transform it to the keyBy(_.userId) and then extract userId from value and map it toLowerCase. This is our rdd. So, here, we have only created DAG, which we have not executed yet. Let's assume that we have a complex program and we are creating a lot of those acyclic graphs before the actual logic.

The pros of Spark are that this is not executed until action is issued, but we can have some conditional logic. For example, we can get a fast path execution. Let's assume that we have shouldExecutePartOfCode(), which can check a configuration switch, or go to the rest service to calculate if the rdd calculation is still relevant, as shown in the following example:

if (shouldExecutePartOfCode()) {
//rdd.saveAsTextFile("") ||
rdd.collect().toList
} else {
//condition changed - don't need to evaluate DAG
}
}

We have used simple methods for testing purposes that we are just returning true for, but, in real life, this could be complex logic:

private def shouldExecutePartOfCode(): Boolean = {
//domain logic that decide if we still need to calculate
true
}
}

After it returns true, we can decide if we want to execute the DAG or not. If we want to, we can call rdd.collect().toList or saveAsTextFile to execute the rdd. Otherwise, we can have a fast path and decide that we are no longer interested in the input rdd. By doing this, only the graph will be created.

When we start the test, it will take some time to complete and return the following output:

"C:Program FilesJavajdk-12injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2018.3.5libidea_rt.jar=50627:C:Program FilesJetBrainsIntelliJ IDEA 2018.3.5in" -Dfile.encoding=UTF-8 -classpath C:UsersSnehaIdeaProjectsChapter07outproductionChapter07 com.company.Main

Process finished with exit code 0

We can see that our test passed and we can conclude that it worked as expected. Now, let's look at some transformations that should be avoided.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.98.250