RDD partitioning

As we have seen in previous chapters, Spark loads data into an RDD. Since Spark runs in distributed mode, different executors can run on different worker machines and RDD is loaded in to the executor(s) memory. RDDs being a distributed dataset gets split across executors. These splits are called RDD partitions.

In other words, partitions are the splits of RDD loaded in different executors memory. The following diagram depicts the logical representation of RDD partitioned across various worker nodes:

More than one partition of an RDD can be loaded in an executor memory.

Spark partitions the RDD at the time of creation even if the user has not provided any partition count explicitly. However, the user can provide a partition count as well. Let's discuss it programmatically:

SparkConf conf = new SparkConf().setMaster("local").setAppName("Partitioning Example");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<Integer> intRDD= jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3);

In this example, an RDD of integers is created with 3 as the value for the numPartitions parameter. To verify the number of partitions, you can use:

intRDD.getNumPartitions()

If the user does not specify the value of numPartitions, the value of the configuration parameter spark.default.parallelism decides the number of partitions. As per Spark documentation (https://spark.apache.org/docs/latest/configuration.html), the value of this parameter depends upon the following factors:

  • For distributed shuffle operations, like reduceByKey and join, the largest number of partitions in a parent RDD. For operations like parallelize with no parent RDDs, it depends on the cluster manager:
    • Local mode: Number of cores on the local machine
    • Mesos fine grained mode: 8
    • Others: The total number of cores on all executor nodes or two, whichever is the larger
  • While loading data from the local file system or HDFS, by default Spark creates a number of partitions equal to the number of blocks. In the case of the local file system, the value of the parameter hadoop fs.local.block.size, which is 32 MB by default, is taken as the partition size. If the HDFS value of dfs.blocksize, which is 128 MB in Hadoop 2.x, this is considered as the partition size. However, a user can also specify the numPartitions value as follows:
jsc.textFile("hdfs://localhost:8020/user/spark/data.csv",4)
As a partition of an RDD is stored in the executor memory, the size of the partition is limited by the available memory of the executor process.

The major advantage of partitions lies in the parallelism of processing the data. A transformation that does not require shuffling, for example, map, can be executed parallelly on all the partitions of the RDD.

Apache Spark runs a maximum of one task per partition. So, to use Spark Cluster to its maximum efficiency, an RDD should be well partitioned. Therefore, on a Spark cluster of 256 cores, RDDs should consist of 250-255 partitions for the maximum utilization of the cluster. While reading a file of unsplittable format, such as a compressed format like gz, Apache Spark does not split it into partitions, so the whole file will be loaded into one partition. For example:

JavaRDD<String> textFile = jsc.textFile("test.gz",2);
System.out.println(textFile.getNumPartitions());

The preceding code will return 1 as count of the partitions even if 2 has been provided as the value of numPartitions parameter explicitly.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.21.43.26