Logging with log4j with Spark recap

We have already discussed this topic in Chapter 14, Time to Put Some Order - Cluster Your Data with Spark MLlib. However, let's replay the same contents to make your brain align with the current discussion Debugging Spark applications. As stated earlier, Spark uses log4j for its own logging. If you configured Spark properly, Spark gets logged all the operation to the shell console. A sample snapshot of the file can be seen from the following figure:

Figure 16: A snap of the log4j.properties file

Set the default spark-shell log level to WARN. When running the spark-shell, the log level for this class is used to overwrite the root logger's log level so that the user can have different defaults for the shell and regular Spark apps. We also need to append JVM arguments when launching a job executed by an executor and managed by the driver. For this, you should edit the conf/spark-defaults.conf. In short, the following options can be added:

spark.executor.extraJavaOptions=-Dlog4j.configuration=file:/usr/local/spark-2.1.1/conf/log4j.properties spark.driver.extraJavaOptions=-Dlog4j.configuration=file:/usr/local/spark-2.1.1/conf/log4j.properties

To make the discussion clearer, we need to hide all the logs generated by Spark. We then can redirect them to be logged in the file system. On the other hand, we want our own logs to be logged in the shell and a separate file so that they don't get mixed up with the ones from Spark. From here, we will point Spark to the files where our own logs are, which in this particular case is /var/log/sparkU.log. This log4j.properties file is then picked up by Spark when the application starts, so we don't have to do anything aside of placing it in the mentioned location:

package com.chapter14.Serilazition
import org.apache.log4j.LogManager
import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
object myCustomLog {
def main(args: Array[String]): Unit = {
val log = LogManager.getRootLogger
//Everything is printed as INFO once the log level is set to INFO untill you set the level to new level for example WARN.
log.setLevel(Level.INFO)
log.info("Let's get started!")
// Setting logger level as WARN: after that nothing prints other than WARN
log.setLevel(Level.WARN)
// Creating Spark Session
val spark = SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "E:/Exp/")
.appName("Logging")
.getOrCreate()
// These will note be printed!
log.info("Get prepared!")
log.trace("Show if there is any ERROR!")
//Started the computation and printing the logging information
log.warn("Started")
spark.sparkContext.parallelize(1 to 20).foreach(println)
log.warn("Finished")
}
}

In the preceding code, everything is printed as INFO once the log level is set to INFO until you set the level to new level for example WARN. However, after that no info or trace and so on, that will note be printed. In addition to that, there are several valid logging levels supported by log4j with Spark. The successful execution of the preceding code should generate the following output:

17/05/13 16:39:14 INFO root: Let's get started!
17/05/13 16:39:15 WARN root: Started
4
1
2
5
3
17/05/13 16:39:16 WARN root: Finished

You can also set up the default logging for Spark shell in conf/log4j.properties. Spark provides a template of the log4j as a property file, and we can extend and modify that file for logging in Spark. Move to the SPARK_HOME/conf directory and you should see the log4j.properties.template file. You should use the following conf/log4j.properties.template after renaming it to log4j.properties. While developing your Spark application, you can put the log4j.properties file under your project directory while working on an IDE-based environment such as Eclipse. However, to disable logging completely, just set the log4j.logger.org flags as OFF as follows:

log4j.logger.org=OFF

So far, everything is very easy. However, there is a problem we haven't noticed yet in the preceding code segment. One drawback of the org.apache.log4j.Logger class is that it is not serializable, which implies that we cannot use it inside a closure while doing operations on some parts of the Spark API. For example, suppose we do the following in our Spark code:

object myCustomLogger {
def main(args: Array[String]):Unit= {
// Setting logger level as WARN
val log = LogManager.getRootLogger
log.setLevel(Level.WARN)
// Creating Spark Context
val conf = new SparkConf().setAppName("My App").setMaster("local[*]")
val sc = new SparkContext(conf)
//Started the computation and printing the logging information
//log.warn("Started")
val i = 0
val data = sc.parallelize(i to 100000)
data.map{number =>
log.info(“My number”+ i)
number.toString
}
//log.warn("Finished")
}
}

You should experience an exception that says Task not serializable as follows:

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.apache.log4j.spi.RootLogger
Serialization stack: object not serializable

At first, we can try to solve this problem in a naive way. What you can do is just make the Scala class (that does the actual operation) Serializable using extends Serializable . For example, the code looks as follows:

class MyMapper(n: Int) extends Serializable {
@transient lazy val log = org.apache.log4j.LogManager.getLogger("myLogger")
def logMapper(rdd: RDD[Int]): RDD[String] =
rdd.map { i =>
log.warn("mapping: " + i)
(i + n).toString
}
}
This section is intended for carrying out a discussion on logging. However, we take the opportunity to make it more versatile for general purpose Spark programming and issues. In order to overcome the task not serializable error in a more efficient way, compiler will try to send the whole object (not only the lambda) by making it serializable and forces SPark to accept that. However, it increases shuffling significantly, especially for big objects! The other ways are making the whole class Serializable or by declaring the instance only within the lambda function passed in the map operation. Sometimes, keeping the not Serializable objects across the nodes can work. Lastly, use the forEachPartition() or mapPartitions() instead of just map() and create the not Serializable objects. In summary, these are the ways to solve the problem around:
  • Serializable the class
  • Declare the instance only within the lambda function passed in the map
  • Make the NotSerializable object as a static and create it once per machine
  • Call the forEachPartition () or mapPartitions() instead of map() and create the NotSerializable object

In the preceding code, we have used the annotation @transient lazy, which marks the Logger class to be nonpersistent. On the other hand, object containing the method apply (i.e. MyMapperObject) that instantiate the object of the MyMapper class is as follows:

//Companion object 
object MyMapper {
def apply(n: Int): MyMapper = new MyMapper(n)
}

Finally, the object containing the main() method is as follows:

//Main object
object myCustomLogwithClosureSerializable {
def main(args: Array[String]) {
val log = LogManager.getRootLogger
log.setLevel(Level.WARN)
val spark = SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "E:/Exp/")
.appName("Testing")
.getOrCreate()
log.warn("Started")
val data = spark.sparkContext.parallelize(1 to 100000)
val mapper = MyMapper(1)
val other = mapper.logMapper(data)
other.collect()
log.warn("Finished")
}

Now, let's see another example that provides better insight to keep fighting the issue we are talking about. Suppose we have the following class that computes the multiplication of two integers:

class MultiplicaitonOfTwoNumber {
def multiply(a: Int, b: Int): Int = {
val product = a * b
product
}
}

Now, essentially, if you try to use this class for computing the multiplication in the lambda closure using map(), you will get the Task Not Serializable error that we described earlier. Now we simply can use foreachPartition() and the lambda inside as follows:

val myRDD = spark.sparkContext.parallelize(0 to 1000)
myRDD.foreachPartition(s => {
val notSerializable = new MultiplicaitonOfTwoNumber
println(notSerializable.multiply(s.next(), s.next()))
})

Now, if you compile it, it should return the desired result. For your ease, the complete code with the main() method is as follows:

package com.chapter16.SparkTesting
import org.apache.spark.sql.SparkSession
class MultiplicaitonOfTwoNumber {
def multiply(a: Int, b: Int): Int = {
val product = a * b
product
}
}
object MakingTaskSerilazible {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "E:/Exp/")
.appName("MakingTaskSerilazible")
.getOrCreate()
val myRDD = spark.sparkContext.parallelize(0 to 1000)
myRDD.foreachPartition(s => {
val notSerializable = new MultiplicaitonOfTwoNumber
println(notSerializable.multiply(s.next(), s.next()))
})
}
}

The output is as follows:

0
5700
1406
156
4032
7832
2550
650
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.146.173