Data serialization

Serialization is an important tuning for performance improvement and optimization in any distributed computing environment. Spark is not an exception, but Spark jobs are often data and computing extensive. Therefore, if your data objects are not in a good format, then you first need to convert them into serialized data objects. This demands a large number of bytes of your memory. Eventually, the whole process will slow down the entire processing and computation drastically.

As a result, you often experience a slow response from the computing nodes. This means that we sometimes fail to make 100% utilization of the computing resources. It is true that Spark tries to keep a balance between convenience and performance. This also implies that data serialization should be the first step in Spark tuning for better performance.

Spark provides two options for data serialization: Java serialization and Kryo serialization libraries:

  • Java serialization: Spark serializes objects using Java's ObjectOutputStream framework. You handle the serialization by creating any class that implements java.io.Serializable. Java serialization is very flexible but often quite slow, which is not suitable for large data object serialization.
  • Kryo serialization: You can also use Kryo library to serialize your data objects more quickly. Compared to Java serialization, Kryo serialization is much faster, with 10x speedup and is compact than that of Java. However, it has one issue, that is, it does not support all the serializable types, but you need to require your classes to be registered.

You can start using Kryo by initializing your Spark job with a SparkConf and calling conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer). To register your own custom classes with Kryo, use the registerKryoClasses method, as follows:

val conf = new SparkConf()
.setMaster(“local[*]”)
.setAppName(“MyApp”)
conf.registerKryoClasses(Array(classOf[MyOwnClass1], classOf[MyOwnClass2]))
val sc = new SparkContext(conf)

If your objects are large, you may also need to increase the spark.kryoserializer.buffer config. This value needs to be large enough to hold the largest object you serialize. Finally, if you don't register your custom classes, Kryo still works; however, the full class name with each object needs to be stored, which is wasteful indeed.

For example, in the logging subsection at the end of the monitoring Spark jobs section, the logging and computing can be optimized using the Kryo serialization. At first, just create the MyMapper class as a normal class (that is, without any serialization), as follows:

class MyMapper(n: Int) { // without any serialization
@transient lazy val log = org.apache.log4j.LogManager.getLogger("myLogger")
def MyMapperDosomething(rdd: RDD[Int]): RDD[String] = rdd.map { i =>
log.warn("mapping: " + i)
(i + n).toString
}
}

Now, let's register this class as a Kyro serialization class and then set the Kyro serialization as follows:

conf.registerKryoClasses(Array(classOf[MyMapper])) // register the class with Kyro
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // set Kayro serialization

That's all you need. The full source code of this example is given in the following. You should be able to run and observe the same output, but an optimized one as compared to the previous example:

package com.chapter14.Serilazition
import org.apache.spark._
import org.apache.spark.rdd.RDD
class MyMapper(n: Int) { // without any serilization
@transient lazy val log = org.apache.log4j.LogManager.getLogger
("myLogger")
def MyMapperDosomething(rdd: RDD[Int]): RDD[String] = rdd.map { i =>
log.warn("mapping: " + i)
(i + n).toString
}
}
//Companion object
object MyMapper {
def apply(n: Int): MyMapper = new MyMapper(n)
}
//Main object
object KyroRegistrationDemo {
def main(args: Array[String]) {
val log = LogManager.getRootLogger
log.setLevel(Level.WARN)
val conf = new SparkConf()
.setAppName("My App")
.setMaster("local[*]")
conf.registerKryoClasses(Array(classOf[MyMapper2]))
// register the class with Kyro
conf.set("spark.serializer", "org.apache.spark.serializer
.KryoSerializer") // set Kayro serilazation
val sc = new SparkContext(conf)
log.warn("Started")
val data = sc.parallelize(1 to 100000)
val mapper = MyMapper(1)
val other = mapper.MyMapperDosomething(data)
other.collect()
log.warn("Finished")
}
}

The output is as follows:

17/04/29 15:33:43 WARN root: Started 
.
.
17/04/29 15:31:51 WARN myLogger: mapping: 1
17/04/29 15:31:51 WARN myLogger: mapping: 49992
17/04/29 15:31:51 WARN myLogger: mapping: 49999
17/04/29 15:31:51 WARN myLogger: mapping: 50000
.
.
17/04/29 15:31:51 WARN root: Finished

Well done! Now let's have a quick look at how to tune the memory. We will look at some advanced strategies to make sure the efficient use of the main memory in the next section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.14.211.70