Application failure

Most of the time, application failure happens because one or more stages fail eventually. As discussed earlier in this chapter, Spark jobs comprise several stages. Stages aren't executed independently: for instance, a processing stage can't take place before the relevant input-reading stage. So, suppose that stage 1 executes successfully but stage 2 fails to execute, the whole application fails eventually. This can be shown as follows:

Figure 19: Two stages in a typical Spark job

To show an example, suppose you have the following three RDD operations as stages. The same can be visualized as shown in Figure 20, Figure 21, and Figure 22:

val rdd1 = sc.textFile(“hdfs://data/data.csv”)
.map(someMethod)
.filter(filterMethod)
Figure 20: Stage 1 for rdd1
val rdd2 = sc.hadoopFile(“hdfs://data/data2.csv”)
.groupByKey()
.map(secondMapMethod)

Conceptually, this can be shown in Figure 21, which first parses the data using the hadoopFile() method, groups it using the groupByKey() method, and finally, maps it:

Figure 21: Stage 2 for rdd2
val rdd3 = rdd1.join(rdd2).map(thirdMapMethod)

Conceptually, this can be shown in Figure 22, which first parses the data, joins it, and finally, maps it:

Figure 22: Stage 3 for rdd3

Now you can perform an aggregation function, for example, collect, as follows:

rdd3.collect()

Well! You have developed a Spark job consisting of three stages. Conceptually, this can be shown as follows:

Figure 23: three stages for the rdd3.collect() operation

Now, if one of the stages fails, your job will fail eventually. As a result, the final rdd3.collect() statement will throw an exception about stage failure. Moreover, you may have issues with the following four factors:

  • Mistakes in the aggregation operation
  • Exceptions in the main thread
  • OOP
  • Class not found exception while submitting jobs using the spark-submit script
  • Misconception about some API/methods in Spark core library

To get rid of the aforementioned issues, our general suggestion is to ensure that you have not made any mistakes while performing any map, flatMap, or aggregate operations. Second, ensure that there are no flaws in the main method while developing your application with Java or Scala. Sometimes you don't see any syntax error in your code, but it's important that you have developed some small test cases for your application. Most common exceptions that occur in the main method are as follows:

  • java.lang.noclassdeffounderror
  • java.lang.nullpointerexception
  • java.lang.arrayindexoutofboundsexception
  • java.lang.stackoverflowerror
  • java.lang.classnotfoundexception
  • java.util.inputmismatchexception

These exceptions can be avoided with the careful coding of your Spark application. Alternatively, use Eclipse's (or any other IDEs) code debugging features extensively to get rid of the semantic error to avoid the exception. For the third problem, that is, OOM, it's a very common problem. It is to be noted that Spark requires at least 8 GB of main memory, with sufficient disk space available for the standalone mode. On the other hand, to get the full cluster computing facilities, this requirement is often high.

Preparing a JAR file including all the dependencies to execute Spark jobs is of paramount importance. Many practitioners use Google's Guava; it is included in most distributions, yet it doesn't guarantee backward compatibility. This means that sometimes your Spark job won't find a Guava class even if you explicitly provided it; this happens because one of the two versions of the Guava libraries takes precedence over the other, and this version might not include a required class. In order to overcome this issue, you usually resort to shading.

Make sure that you have set the Java heap space with –Xmx parameter with a sufficiently large value if you're coding using IntelliJ, Vim, Eclipse, Notepad, and so on. While working with cluster mode, you should specify the executor memory while submitting Spark jobs using the Spark-submit script. Suppose you have a CSV to be parsed and do some predictive analytics using a random forest classifier, you might need to specify the right amount of memory, say 20 GB, as follows:

--executor-memory 20G

Even if you receive the OOM error, you can increase this amount to, say, 32 GB or more. Since random forest is computationally intensive, requiring larger memory, this is just an example of random forest. You might experience similar issues while just parsing your data. Even a particular stage may fail due to this OOM error. Therefore, make sure that you are aware of this error.

For the class not found exception, make sure that you have included your main class in the resulting JAR file. The JAR file should be prepared with all the dependencies to execute your Spark job on the cluster nodes. We will provide a step-by-step JAR preparation guideline in Chapter 17, Time to Go to ClusterLand - Deploying Spark on a Cluster.

For the last issue, we can provide some examples of some misconceptions about Spark Core library. For example, when you use the wholeTextFiles method to prepare RDDs or DataFrames from multiple files, Spark does not run in parallel; in cluster mode for YARN, it may run out of memory sometimes.

Once, I experienced an issue where, at first, I copied six files in my S3 storage to HDFS. Then, I tried to create an RDD, as follows:

sc.wholeTextFiles("/mnt/temp") // note the location of the data files is /mnt/temp/

Then, I tried to process those files line by line using a UDF. When I looked at my computing nodes, I saw that only one executor was running per file. However, I then got an error message saying that YARN had run out of memory. Why so? The reasons are as follows:

  • The goal of wholeTextFiles is to have only one executor for each file to be processed
  • If you use .gz files, for example, you will have only one executor per file, maximum
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.192.241