Data shuffling and partitions

To understand data shuffling in Spark, we first need to understand how data is partitioned in RDDs. When we create an RDD by, for instance, loading a file from HDFS, or reading a file in local storage, Spark has no control over what bits of data are distributed in which partitions. This becomes a problem for key-value RDDs: these often require knowing where occurrences of a particular key are, for instance to perform a join. If the key can occur anywhere in the RDD, we have to look through every partition to find the key.

To prevent this, Spark allows the definition of a partitioner on key-value RDDs. A partitioner is an attribute of the RDD that determines which partition a particular key lands in. When an RDD has a partitioner set, the location of a key is entirely determined by the partitioner, and not by the RDD's history, or the number of keys. Two different RDDs with the same partitioner will map the same key to the same partition.

Partitions impact performance through their effect on transformations. There are two types of transformations on key-value RDDs:

  • Narrow transformations, like mapValues. In narrow transformations, the data to compute a partition in the child RDD resides on a single partition in the parent. The data processing for a narrow transformation can therefore be performed entirely locally, without needing to communicate data between nodes.
  • Wide transformations, like reduceByKey. In wide transformations, the data to compute any single partition can reside on all the partitions in the parent. The RDD resulting from a wide transformation will, in general, have a partitioner set. For instance, the output of a reduceByKey transformation are hash-partitioned by default: the partition that a particular key ends up in is determined by hash(key) % numPartitions.

Thus, in our mutual information example, the RDDs pPresentAndSpam and pPresentAndHam will have the same partition structure since they both have the default hash partitioner. All descendent RDDs retain the same keys, all the way down to mutualInformation. The word language, for instance, will be in the same partition for each RDD.

Why does all this matter? If an RDD has a partitioner set, this partitioner is retained through all subsequent narrow transformations originating from this RDD. Let's go back to our mutual information example. The RDDs pPresentGivenHam and pPresentGivenSpam both originate from reduceByKey operations, and they both have string keys. They will therefore both have the same hash-partitioner (unless we explicitly set a different partitioner). This partitioner is retained as we construct pPresentAndSpam and pPresentAndHam. When we construct pPresent, we perform a full outer join of pPresentAndSpam and pPresentAndHam. Since both these RDDs have the same partitioner, the child RDD pPresent has narrow dependencies: we can just join the first partition of pPresentAndSpam with the first partition of pPresentAndHam, the second partition of pPresentAndSpam with the second partition of pPresentAndHam and so on, since any string key will be hashed to the same partition in both RDDs. By contrast, without partitioner, we would have to join the data in each partition of pPresentAndSpam with every partition of pPresentAndSpam. This would require sending data across the network to all the nodes holding pPresentAndSpam, a time-consuming exercise.

This process of having to send the data to construct a child RDD across the network, as a result of wide dependencies, is called shuffling. Much of the art of optimizing a Spark program involves reducing shuffling and, when shuffling is necessary, reducing the amount of shuffling.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.12.166.255