Caching

Caching enables Spark to persist data across computations and operations. In fact, this is one of the most important technique in Spark to speed up computations, particularly when dealing with iterative computations.

Caching works by storing the RDD as much as possible in the memory. If there is not enough memory then the current data in storage is evicted, as per LRU policy. If the data being asked to cache is larger than the memory available, the performance will come down because Disk will be used instead of memory.

You can mark an RDD as cached using either persist() or cache()

cache() is simply a synonym for persist(MEMORY_ONLY)

persist can use memory or disk or both:

persist(newLevel: StorageLevel) 

The following are the possible values for Storage level:

Storage Level Meaning
MEMORY_ONLY Stores RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK Stores RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER
(Java and Scala)
Stores RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER
(Java and Scala)
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, and so on. Same as the preceding levels, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.

Storage level to choose depends on the situation

  • If RDDs fit into memory, use MEMORY_ONLY as that's the fastest option for execution performance
  • Try MEMORY_ONLY_SER is there are serializable objects being used in order to make the objects smaller
  • DISK should not be used unless your computations are expensive.
  • Use replicated storage for best fault tolerance if you can spare the additional memory needed. This will prevent recomputation of lost partitions for best availability.
unpersist() simply frees up the cached content.

The following are examples of how to call persist() function using different types of storage (memory or disk):

scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel

scala> rdd_one.persist(StorageLevel.MEMORY_ONLY)
res37: rdd_one.type = ParallelCollectionRDD[26] at parallelize at <console>:24

scala> rdd_one.unpersist()
res39: rdd_one.type = ParallelCollectionRDD[26] at parallelize at <console>:24

scala> rdd_one.persist(StorageLevel.DISK_ONLY)
res40: rdd_one.type = ParallelCollectionRDD[26] at parallelize at <console>:24

scala> rdd_one.unpersist()
res41: rdd_one.type = ParallelCollectionRDD[26] at parallelize at <console>:24

The following is an illustration of the performance improvement we get by caching.

First, we will run the code:

scala> val rdd_one = sc.parallelize(Seq(1,2,3,4,5,6))
rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd_one.count
res0: Long = 6

scala> rdd_one.cache
res1: rdd_one.type = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd_one.count
res2: Long = 6

You can use the WebUI to look at the improvement achieved as shown in the following screenshots:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.195.56