While efficient execution of the data pipeline is prerogative of the task scheduler, which is part of the Spark driver, sometimes Spark needs hints. Spark scheduling is primarily driven by the two parameters: CPU and memory. Other resources, such as disk and network I/O, of course, play an important part in Spark performance as well, but neither Spark, Mesos or YARN can currently do anything to actively manage them.
The first parameter to watch is the number of RDD partitions, which can be specified explicitly when reading the RDD from a file. Spark usually errs on the side of too many partitions as it provides more parallelism, and it does work in many cases as the task setup/teardown times are relatively small. However, one might experiment with decreasing the number of partitions, especially if one does aggregations.
The default number of partitions per RDD and the level of parallelism is determined by the spark.default.parallelism
parameter, defined in the $SPARK_HOME/conf/spark-defaults.conf
configuration file. The number of partitions for a specific RDD can also be explicitly changed by the coalesce()
or repartition()
methods.
The total number of cores and available memory is often the reason for deadlocks as the tasks cannot proceed further. One can specify the number of cores for each executor with the --executor-cores
flag when invoking spark-submit, spark-shell, or PySpark from the command line. Alternatively, one can set the corresponding parameters in the spark-defaults.conf
file discussed earlier. If the number of cores is set too high, the scheduler will not be able to allocate resources on the nodes and will deadlock.
In a similar way, --executor-memory
(or the spark.executor.memory
property) specifies the requested heap size for all the tasks (the default is 1g). If the executor memory is specified too high, again, the scheduler may be deadlocked or will be able to schedule only a limited number of executors on a node.
The implicit assumption in Standalone mode when counting the number of cores and memory is that Spark is the only running application—which may or may not be true. When running under Mesos or YARN, it is important to configure the cluster scheduler that it has the resources available to schedule the executors requested by the Spark Driver. The relevant YARN properties are: yarn.nodemanager.resource.cpu-vcores
and yarn.nodemanager.resource.memory-mb
. YARN may round the requested memory up a little. YARN's yarn.scheduler.minimum-allocation-mb
and yarn.scheduler.increment-allocation-mb
properties control the minimum and increment request values respectively.
JVMs can also use some memory off heap, for example, for interned strings and direct byte buffers. The value of the spark.yarn.executor.memoryOverhead
property is added to the executor memory to determine the full memory request to YARN for each executor. It defaults to max (384, .07 * spark.executor.memory).
Since Spark can internally transfer the data between executors and client node, efficient serialization is very important. I will consider different serialization frameworks in Chapter 6, Working with Unstructured Data, but Spark uses Kryo serialization by default, which requires the classes to be registered explicitly in a static method. If you see a serialization error in your code, it is likely because the corresponding class has not been registered or Kryo does not support it, as it happens with too nested and complex data types. In general, it is recommended to avoid complex objects to be passed between the executors unless the object serialization can be done very efficiently.
Driver has similar parameters: spark.driver.cores
, spark.driver.memory
, and spark.driver.maxResultSize
. The latter one sets the limit for the results collected from all the executors with the collect
method. It is important to protect the driver process from out-of-memory exceptions. The other way to avoid out-of-memory exceptions and consequent problems are to either modify the pipeline to return aggregated or filtered results or use the take
method instead.
18.118.198.127