RDD persistence and cache

Spark jobs usually contains multiple intermediate RDDs on which multiple actions can be called to compute different problems. However, each time an action is called the complete DAG for that action gets called, this not only increases the computing time, but is also wasteful as per CPU and other resources are concerned. To overcome the limitation of re-computing the entire iterative job, Spark provides two different options for persisting the intermediate RDD, that is, cache() and persist(). The cache() method persists the data unserialized in the memory by default .This possibly is the fastest way to retrieve the persisted data, However, use of cache() comes with some trade off. Each node computing a partition of the RDD persist the resultant on that node itself and hence in case of node failure the data of the RDD partition gets lost. It is then recomputed again, but certain computation time gets lost in the process. Similarly, the persisted data is also unserialized and hence consumes more memory.

These limitations can be overcome by using the persist() method with optimal replication and serializing the data . The persist() method also gives the flexibility of choosing different storage level, that is, Memory, Disk, and Off heap. The numeral value ( _2 ) in storage level defines the replication factor of the persisted data.

Storage Level

Memory Footprint

Computation Time

Replication

Serialized Objects

Memory Usage

Disk Usage

MEMORY_ONLY

HIGH

LOW

NO

NO

YES

NO

MEMORY_ONLY_2

HIGH

LOW

YES

NO

YES

NO

MEMORY_ONLY_SER

LOW

HIGH

NO

YES

YES

NO

MEMORY_ONLY_SER_2

LOW

HIGH

YES

YES

YES

NO

DISK_ONLY

LOW

HIGH

NO

YES

NO

YES

DISK_ONLY_2

LOW

HIGH

YES

YES

NO

YES

MEMORY_AND_DISK

HIGH

MEDIUM

NO

PARTIAL

PARTIAL

PARTIAL

MEMORY_AND_DISK_2

HIGH

MEDIUM

YES

PARTIAL

PARTIAL

PARTIAL

MEMORY_AND_DISK_SER

LOW

HIGH

NO

YES

PARTIAL

PARTIAL

MEMORY_AND_DISK_SER_2

LOW

HIGH

YES

YES

PARTIAL

PARTIAL

OFF_HEAP

LOW

HIGH

NO

YES

OFF-HEAP MEMORY

NO

List of Storage Level in Persist()

Data is always serialized when it is written to DISK_ONLY or OFF_HEAP and hence there is no storage level with _SER in its name when choosing only disk or off heap. With storage level MEMORY_ONLY a situation can arise when the amount of data to be persisted is more than that available in the memory, in such scenarios Spark uses the Least Recently Used (LRU) algorithm to drop partitions. However, if such dropped partitions are ever required then it is recomputed again. To overcome such memory constraint limitation one can choose between MEMORY_ONLY_SER, which reduces the memory footprint of the data slightly or otherwise can choose from MEMORY_AND_DIST storage level variants:

//cache() and persist()
JavaRDD<Integer> rdd = sparkContext.parallelize(Arrays.asList(1, 2,
3, 4, 5),3).cache();
JavaRDD<Integer> evenRDD= rdd.filter(neworg.apache.spark.api.java.function.Function<Integer, Boolean>() {
@Override
public Boolean call(Integer v1) throws Exception {
return ((v1%2)==0)?true:false;
}
});
evenRDD.persist(StorageLevel.MEMORY_AND_DISK());
evenRDD.foreach(newVoidFunction<Integer>() {
@Override
publicvoid call(Integer t) throws Exception {
System.out.println("The value of RDD are :"+t);
}
});
//unpersisting the RDD
evenRDD.unpersist();
rdd.unpersist();

It is important to note that the cache() method is just a syntactic sugar for using the persist() method with storage level as MEMORY_ONLY. Choosing a storage level for RDD persistence is of paramount importance as far as performance of Spark jobs are concerned, but there are no rules that govern the choice to be made. Some of the factors, however, can act as guiding principles to choose a better storage level for any Spark job:

  • Prefer MEMORY_ONLY storage as long as complete RDD can be persisted in memory.
  • Try MEMORY_ONLY_SER if memory required for RDD persistence is slightly more than available, however, this option increases the operation time as some computation gets consumed in serializing and de-serializing the data.
  • Spilling the RDD data on disk may sometimes be a far more expensive operation than re-computing the partition itself.
  • Data replication should be chosen for use cases where extremely high throughput is required such that the time lost in recomputation of lost partitions be avoided at all cost.

cache() usage is monitored in Spark using Least Recently Used (LRU) algorithms where unused old partitions are dropped automatically , however, one can manually evict an RDD from cache using the unpersist() method.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.149.228.138