Design patterns and techniques

In this section, we'll outline some design patterns and general techniques for use when writing your own analytics. These are a collection of hints and tips that represent the accumulation of experiences working with Spark. They are offered up as guidelines for effective Spark analytic authoring. They also serve as a reference for when you encounter the inevitable scalability problems and don't know what to do.

Spark APIs

Problem

With so many different sets of API's and functions to choose from, it's difficult to know which ones are the most performant.

Solution

Apache Spark currently has over one thousand contributors, many of whom are highly experienced world-class software professionals. It is a mature framework having been developed for over six years. Over that time, they have focused on refining and optimizing just about every part of the framework from the DataFrame-friendly APIs, through the Netty-based shuffle machinery, to the catalyst query plan optimizer. The great news is that it all comes for "free" - providing you use the newest APIs available in Spark 2.0.

Recent optimizations (introduced by project tungsten), such as off-heap explicit memory management, cache-miss improvements, and dynamic stage generation, are only available with the newer DataFrame and Dataset APIs, and are not currently supported by the RDD API. In addition, the newly-introduced Encoders are significantly faster and more space-efficient than Kryo serialization or Java serialization.

For the most part, this means that Datasets usually outperform RDDs.

Example

Let's illustrate using an informal example of a basic count of people mentioned in articles:

personDS                             personRDD
  .groupBy($"name")                    .map(p => (p.person,1)) 
  .count                               .reduceByKey(_+_)
  .sort($"count".desc)                 .sortBy(_._2,false)
  .show

36 seconds (Dataset API)             99 seconds (RDD API)

The preceding snippet shows the relative performance difference between the two API's. For this test, in each case 20 iterations were performed over 200 MB uncompressed text, running on 20 x 1 GB, 1-core executors with commodity hardware. Therefore, in terms of performance, it's a good idea to learn and use the dataset API, only turning to RDDs (using ds.rdd) when you need the flexibility to compute something not available on the higher level API.

Summary pattern

Problem

My timeseries analytic must run operationally within strict service level agreements (SLAs) and there is not enough time to compute the required result over the entire dataset.

Solution

For real-time analytics, or ones with strict SLAs, running lengthy computations over large datasets can be impractical. Sometimes it's necessary to design analytics using a two-pass algorithm in order to compute results in a timely fashion. To do this, we'll need to introduce the concept of the Summary pattern.

The Summary pattern is a two-pass algorithm where the end result is reconstructed from the aggregation of summaries only. Although only using summaries, and having never processed the entire dataset directly, the result of the aggregation is the same as if it were run over the entire raw dataset.

The basic steps are:

  1. Calculate a summary over the appropriate interval (per minute, per day, per week, and so on).
  2. Persist the summary data for later use.
  3. Calculate an aggregate over the larger interval (per month, per year, and so on).

This is a particularly useful approach when designing incremental or online algorithms for streaming analytics.

Example

The GDELT GKG dataset is a great example of a summary dataset.

Certainly, it would be impractical to perform sentiment analysis or named entity recognition over say, a month's worth of global media news articles every 15 minutes. Fortunately, GDELT produces those 15 minute summaries that we are able to aggregate making this entirely possible.

Expand and Conquer Pattern

Problem

My analytic has a relatively small number of tasks, each with high Input/Shuffle Size (Bytes). These tasks take a long time to complete, while sometimes there are idle executors.

Solution

The Expand and Conquer pattern tokenizes records for more efficient parallel execution by allowing you to increase parallelism. By decomposing or unpacking each record you enable them to be composed in different ways, spread over the cluster, and processed by different executors.

In this pattern, flatMap is used, usually in conjunction with shuffle or repartition, to increase the number of tasks and decrease the amount of data being processed by each task. This gives rise to an optimal situation whereby enough tasks are queued so that no executors are ever left idle. It can also help in the scenario where you're struggling to process large amounts of data in the memory of one machine, and hence receiving out of memory errors.

This useful and versatile technique comes in handy in almost every situation where you have large datasets. It promotes the use of simple data structures and allows you to take full advantage of the distributed nature of Spark.

A word of caution, however, as flatMap can also cause performance problems because it has the potential to increase the time complexity of your analytic. By using flatMap, you are generating many records for each and every row, hence potentially adding another dimension of data that requires processing. Therefore, you should always consider the impact of this pattern on algorithmic complexity, using the Big O Notation.

Lightweight Shuffle

Problem

The Shuffle Read Blocked Time of my analytic is a significant proportion of the overall processing time (>5%). What can I do to avoid having to wait for the shuffle to finish?

Solution

Although Spark's shuffle is carefully engineered to minimize both network and disk I/O by using techniques such as data compression and merge file consolidation, it has the following two fundamental problems that mean it will often become a performance bottleneck:

  • It's I/O Intensive: The shuffle relies on (i) moving data over a network and (ii) writing that data to disk on the target machine. Therefore, it's much slower than local transformations. To illustrate how much slower, here are the relative timings for reading 1 MB sequentially from various devices: It's I/O intensive: The shuffle relies on (i) moving data over a network and (ii) writing that data to disk on the target machine. Therefore, it's much slower than local transformations. To illustrate how much slower, here are the relative timings for reading 1 MB sequentially from various devices:

    Memory                0.25ms

    10 GbE                        10ms

    Disk                     20ms        

    In this example, as a shuffle operation uses both network and disk, it would be around 120 times slower than one performed on a cached, local partition. Obviously, timings will vary depending on physical types and speeds of the devices used, figures here are provided as relative guidelines.

  • It's a synchronization point for concurrency: Every task in a stage must complete before the next stage can begin. Given that stage boundaries involve a shuffle (see ShuffleMapStage), it marks a point in the execution where tasks that otherwise would be ready to start, must wait until all tasks in that stage have finished. This gives rise to a synchronization barrier that can have a significant impact on performance.

For these reasons, try to avoid the shuffle where possible, or at least minimize its impact.

Sometimes it's possible to avoid a shuffle altogether, in fact there are patterns, such as Broadcast Variables or Wide Table patterns, that offer suggestions on how to do this, but often it's inevitable and all that can be done is to lessen the amount of data that is transferred, and hence the impact of the shuffle.

In this case, try to construct a Lightweight Shuffle specifically minimizing data transfer - only necessary bytes should be transferred.

Once again, if you use the Dataset and DataFrame API, when the catalyst generates a logical query plan it will perform over 50 optimizations, including pruning any unused columns or partitions automatically (see https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala). But if you're using RDDs, you'll have to do this yourself. There's following few techniques that you can try:

  • Use map to reduce data: Call map on your data immediately prior to a shuffle in order to get rid of any data that is not used in follow on processing.
  • Use keys only: When you have key-value pairs, consider using rdd.keys instead of rdd. For operations such as counts or membership tests, this should be sufficient. Similarly, consider using values whenever appropriate.
  • Adjust order of stages: Should you join and then groupBy or groupBy and then join? In Spark, this is mainly about the size of the datasets. It should be fairly trivial to do cost-based assessments using the number of records before and after each transformation. Experiment to find which one is more efficient for your datasets.
  • Filter first: Generally speaking, filtering rows prior to a shuffle is an advantage as it reduces the number of rows transferred. Consider filtering as early as possible, provided your revised analytic is functionally equivalent.

    In some situations, you can also filter out entire partitions, like so:

            val sortedPairs = rdd.sortByKey() 
            sortedPairs.filterByRange(lower, upper) 
    
  • Use CoGroup: If you have two or more RDDs all grouped by the same key, then CoGroup might be able to join them without instigating a shuffle. This ingenious little trick works because any RDD[(K,V)] using the same type K as a key, and grouped using a HashPartitioner, will always settle on the same node. Therefore, when joining by key K, no data needs to be moved.
  • Try a different codec: Another tip for decreasing the amount of bytes transferred is to change the compression algorithm.

    Spark provides three options: lz4, lzf, and snappy. Consider reviewing each one to determine which works best for your particular type of data:

            SparkSession
              .builder()
              .config("spark.io.compression.codec", "lzf")

Wide Table pattern

Problem

The one-to-many or many-to-many relationships in my datasets are producing many shuffles that ruin all my analytics' performance.

Solution

In order to optimize your data structures, we advocate denormalizing your data into a form that's useful for your particular type of processing. The approach, described here as the Wide Table pattern, involves combining data structures that are frequently used together, so that they are composed into a single record. This preserves data locality and removes the need to perform expensive joins. The more often a relationship is used, the more you benefit from this data locality.

The process involves constructing a data representation, view, or table that contains everything you need to do for follow-on processing. You may construct this programmatically, or by standard joins SparkSQL statements. It is then materialized ahead of time and used directly inside your analytics whenever required.

Where necessary, data is duplicated across each row to ensure self-sufficiency. You should resist the urge to factor out additional tables, like those found in third-normal form or in snowflake designs, and instead rely on columnar data formats, such as Parquet and ORC, to provide efficient storage mechanisms without sacrificing fast sequential access. They can do this by arranging data by column and compressing data within each column, which helps alleviate concerns when duplicating data.

Similarly, nested types, classes, or arrays can often be used to good effect inside a record to represent children or composite data classes. Again, avoid necessary dynamic joins at analytic runtime.

Example

For an example of how to use denormalized data structures, including nested types, see Chapter 3, Input Formats and Schema.

Broadcast variables pattern

Problem

My analytic requires many compact reference datasets and dimension tables that, despite their smaller size, cause costly shuffles of all data.

Solution

While some datasets - such as transaction logs or tweets - are theoretically infinitely large, others have natural limits and will never grow beyond a certain size. These are known as bounded datasets. Although they may change occasionally over time, they are reasonably stable and can be said to be held within a finite space. For example, the list of all the postcodes in the UK could be considered a bounded dataset.

When joining to a bounded dataset or any small collection, there is an opportunity to take advantage of an efficiency pattern that Spark provides. Rather than using join as you would normally, which would instigate a shuffle that could potentially transfer all data, consider using a broadcast variable instead. Once assigned, the broadcast variable will be distributed and made available locally to all the executors in your cluster. You can use a broadcast variable like so:

Creating a broadcast variable

val toBeBroadcast = smallDataset.collect
val bv = spark.sparkContext.broadcast(toBeBroadcast)

Note

Make sure you collect any data to be broadcast.

Accessing a broadcast variable

ds.mapPartitions { partition =>

    val smallDataset = bv.value
    partition map { r => f(r, bv.value) }
}

Removing a broadcast variable

bv.destroy() 

Broadcast variables can be used by the RDD API or the Dataset API. Also, you can still exploit broadcast variables in SparkSQL - it will handle it automatically. Just ensure that the threshold is set above the size of the table to join, like so:

SparkSession
  .builder()
  .config("spark.sql.autoBroadcastJoinThreshold", "50MB")

Example

For examples of how to use broadcast variables to implement efficient joins and filters, see Chapter 9 , News dictionary and Real-time Tagging System.

Combiner pattern

Problem

My analytic is performing an aggregation based on a set of keys, and hence, is having to shuffle all data for all keys. Consequently, it's very slow.

Solution

At the core of Apache Spark's shuffling abilities is a powerful and flexible pattern, referred to here as the Combiner pattern, which offers a mechanism for greatly reducing the amount of data in the shuffle. The Combiner pattern is so important that examples of it can be found in multiple locations in the Spark code - to see it in action here are some of those examples:

  • ExternalAppendOnlyMap
  • CoGroupedRDD
  • DeclarativeAggregate
  • ReduceAggregator

In fact, all high-level API's that use the shuffle operation, such as groupBy, reduceByKey, combineByKey, and so on, use this pattern as the core of their processing. However, there's some variation in the implementations mentioned previously, although the fundamental concept is the same. Let's take a closer look.

The Combiner pattern provides an efficient approach to compute a function across sets of records in parallel and then combines their output in order to achieve an overall result.

Solution

Generally, it consists of three functions that must be provided by the caller:

  • Initialize (e) -> C0Creates the initial container, otherwise known as createCombiner, type constructor, or zero.

    In this function, you should create and initialize an instance that will serve as the container for all other combined values. Sometimes the first value from each key is also provided to pre-populate the container that will eventually hold all the combined values for that key. In this case, the function is known as unit.

    It's worth noting that this function is executed exactly once per key on every partition in your dataset. Therefore, it is potentially called multiple times for each key and consequently must not introduce any side-effects that would produce inconsistent results were the dataset to be distributed differently.

  • Update(C0, e) -> CiAdds an element to the container. Otherwise known as mergeValuebind function, or reduce.

    In this function, you should add a record from the originating RDD into the container. This usually involves transforming or aggregating the value in some way and only the output of this calculation is taken forwards inside the container.

    As updates are executed in parallel and in any order, this function must be commutative and associative.

  • Merge(Ci, Cj) -> CkCombines together two containers. Otherwise known as mergeCombiners or merge.

    In this function, you should combine the values represented by each container to form a new value, which is then taken forwards.

    Again, because there are no guarantees on the order of merges, this function should be commutative and associative.

You may have noticed a similarity between this pattern and the concept of monads. If you haven't encountered monads yet, they represent an abstract mathematical concept, used in functional programming as a way of expressing functions so that they are composable in a general way. They support many features, such as composition, side-effect free execution, repeatability, consistency, lazy evaluation, immutability, and provide many other benefits. We will not give a full explanation of monads here, there are plenty of great introductions already out there - for example http://www.simononsoftware.com/a-short-introduction-to-monads/, which takes a practical rather than a theoretical viewpoint. Instead, we will explain where the Combiner pattern is different and how it helps to understand Spark.

Spark executes the update function on every record in your dataset. Due to its distributed nature, this can happen in parallel. It also runs the merge function to combine results from the output of each partition. Again, because this function is applied in parallel and therefore could be combined in any order, Spark requires these functions to be commutative, meaning that the sequence in which they are applied should have no impact on the overall answer. It's this commutative, merge step that really provides the basis of the definition.

An understanding of this pattern is useful for reasoning about the behavior of any distributed aggregations. If you're interested in understanding this pattern further, a nice implementation can be found in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala.

In addition to this, it's useful when trying to determine which high-level API to use. With so many available, it's sometimes difficult to know which one to choose. By applying an understanding of types to the preceding descriptions, we can decide on the most fitting and performant API. For example, where the types of e and Cn are the same, you should consider using reduceByKey. However, where the type of e is different to Cn, then an operation such as combineByKey should be considered.

To illustrate, let's consider some different approaches using four of the most common operations available on the RDD API.

Example

To provide some context, let's say we have an RDD of key-value pairs representing people mentioned in news articles, where the key is the name of the person referred to in the article, and the value is a pre-filtered, tokenized, bag-of-words, textual-version of the article:

// (person:String, article:Array[String])
val rdd:RDD[(String,Array[String])] = ...

Now suppose we want to find some statistics about articles in which a person is mentioned, for example, min and max length, most frequently used words (excluding stop-words), and so on. In this case, our result would be of the form, (person:String,stats:ArticleStats), where ArticleStats is a case class designed to hold the required statistics:

case class ArticleStats(minLength:Long,maxLength:Long,mfuWord:(String,Int))

Let's start with the definition of the three combiner functions, as described previously:

val init = (a:Array[String]) => {
  ArticleStats(a)
}

val update = (stats:ArticleStats, a:Array[String]) => {
  stats |+| ArticleStats(a)
}

val merge = (s1:ArticleStats,s2:ArticleStats) => {
  s1 |+| s2
}

As you might notice, these functions are really just the syntactic sugar of our pattern; the real logic is hidden away in the companion class and the semigroup:

object ArticleStats {
  def apply(a:Array[String]) =
    new ArticleStats(calcMin(a),calcMax(a),findMFUWord(a))
 ...
}

implicit object statsSemiGroup extends SemiGroup[ArticleStats] {
  def append(a: ArticleStats, b: ArticleStats) : ArticleStats = ???
}

For our purposes, we won't cover these in detail, let's just assume that any computation necessary to calculate the statistics are carried out by the supporting code - including the logic for finding the extremities of two previously calculated metrics - and instead focus on the explanation of our different approaches.

GroupByKey approach:

Our first approach is by far and away the slowest option because groupByKey doesn't use the update function. Despite this obvious disadvantage, we can still achieve our result - by sandwiching the groupByKey between maps where the first map is used to convert into the desired type and the last to perform the reduce-side aggregation:

rdd.mapValues { case value => init(value) }
   .groupByKey()
   .mapValues { case list => list.fold(merge) } // note: update not used

However, you will notice that it does not perform any map-side combining for efficiency, instead preferring to combine all values on the reduce-side, meaning that all values are copied across the network as part of the shuffle.

For this reason, you should always consider the following alternatives before resorting to this approach.

ReduceByKey approach:

To improve on this, we can use reduceByKey. Unlike groupByKey, reduceByKey provides map-side combining for efficiency by making use of the update function. In terms of performance, it offers an optimum approach. However, it still requires each value to be manually converted to the correct type prior to invocation:

rdd.map(init(_._2)).reduceByKey(merge)

The result is achieved in two steps by mapping records from the originating RDD into the desired type.

AggregateByKey approach:

Again, aggregateByKey provides the same performance characteristics as reduceByKey - by implementing map-side combine - but this time as one operation:

rdd.aggregateByKey(ArticleStats())(update,merge) 

In the preceding snippet we see update and merge being called, however init is not used directly. Instead, an empty container, in the form of a blank ArticleStats object, is provided explicitly for the purposes of initialization. This syntax is closer to that of fold, so it's useful if you're more familiar with that style.

CombineByKey Approach:

Generally, combineByKey is thought of as the most flexible key-based operation, giving you complete control over all three functions in the Combiner pattern:

rdd.combineByKey(init,update,merge)

While providing init as a function rather than just a single value might give you more flexibility in select scenarios, in practice for most problems the relationship between init, update, and merge is such that you don't really gain anything in terms of functionality or performance between either approach. And regardless, all three are backed by combineByKeyWithClassTag, so in this instance feel free to choose whichever one that is a better syntactic fit for your problem, or just pick the one you prefer.

Optimized cluster

Problem

I want to know how to configure my Spark job's executors in order to make full use of the resources of my cluster, but with so many options I'm confused.

Solution

As Spark is designed to scale horizontally, generally speaking, you should prefer having more executors over larger executors. But with each executor comes the overhead of a JVM, so it's advisable to make full use of them by running multiple tasks inside each executor. As this seems like a bit of a contradiction, let's look at how to configure Spark to achieve this.

Spark provides the following options (specified on the command line or in configuration):

--num-executors (YARN-only setting [as of Spark 2.0])
--executor-cores
--executor-memory
--total-executor-cores

Number of executors can be estimated using the following formula:

number of executors = (total cores - cluster overhead) / cores per executor

For example, when using a YARN-based cluster accessing HDFS and running in YARN-client mode, the equation would be as follows:

((T - (2*N + 6)) / 5)

where:

T: Total number of cores in the cluster.

N: Total of nodes in the cluster.

2: Removes the per node overhead of HDFS and YARN.

Assumes two HDFS processes on each node - DataNode and NodeManager.

6: Removes the master process overhead of HDFS and YARN.

Assumes the average of six processes - NameNode, ResourceManager, SecondaryNameNode, ProxyServer, HistoryServer, and so on. Obviously, this is an example and in reality it depends on what other services are running the cluster, along with other factors such as Zookeeper quorum size, HA strategy, and so on.

5: Anecdotally, the optimum number of cores for each executor to ensure optimal task concurrency without prohibitive disk I/O contention.

Memory allocation can be estimated using the following formula:

mem per executor = (mem per node / number of executors per node) * safety fraction

For example, when using a YARN-based cluster running in YARN-client mode with 64 GB per node, the equation would be as follows:

(64 / E)* 0.9 => 57.6 / E

where:

E:Number of executors per node (as calculated in the previous example).

0.9: Fraction of actual memory allocated to the heap after subtracting off-heap.

overhead (spark.yarn.executor.memoryOverhead, default 10%).

It's worth noting that while it is generally beneficial to allocate more memory to an executor (allowing more space for sorting, caching, and so on) increasing the memory also increases garbage collection pressure. The GC must sweep the entire heap for unreachable object references, therefore the larger the memory region it has to analyze, the more resources it must consume and at some point this leads to diminishing returns. Whilst there's no absolute figure as to at what point this happens, as a general rule of thumb, keep the memory per executor under 64 GB to avoid problems.

The preceding equations should provide a good starting-point estimation for sizing your cluster. For further tuning, you may wish to experiment by tweaking these settings and measuring the effect on performance using the Spark UI.

Redistribution pattern

Problem

My analytic always runs on the same few executors. How do I increase the level of parallelism?

Solution

When Datasets and RDDs are relatively small to begin with, even if you then expand them using flatMap, any child in the lineage will take the parents number of partitions.

So, if some of your executors are idle, calling the repartition function could improve your level of parallelism. You will incur the immediate cost of moving data around, but this could pay-off overall.

Use the following command to determine the number of partitions for your data and hence the parallelism:

ds.rdd.getNumPartitions()

If the number of partitions is less than the maximum number of tasks allowable on your cluster, then you're not making full use of your executors.

Conversely, if you have a large number of tasks (10,000+) and they aren't running for very long then you should probably call coalesce to make better use of your resources - starting and stopping tasks is relatively expensive!

Example

Here, we increase the parallelism of a Dataset to 400. The physical plan will show this as RoundRobinPartitioning(400), like so:

ds.repartition(400)
  .groupByKey($"key")
  .reduceGroups(f)
  .explain

...

+- Exchange RoundRobinPartitioning(400)
                  +- *BatchedScan parquet

And here's the equivalent re-partitioning for an RDD performed by simply specifying the number of partitions to use in the reduceByKey function:

rdd.reduceByKey(f,400)
    .toDebugString

res1: String =
(400) ShuffledRDD[11] at reduceByKey at <console>:26 []
  +-(7) MapPartitionsRDD[10] at map at <console>:26 []
     |  MapPartitionsRDD[6] at rdd at <console>:26 []
     |  MapPartitionsRDD[5] at rdd at <console>:26 []
     |  MapPartitionsRDD[4] at rdd at <console>:26 []
     |  FileScanRDD[3] at rdd at <console>:26 []

Salting key pattern

Problem

Most of my tasks finish in a reasonable time, but there's always one or two that take much longer (>10x) and repartitioning does not seem to have any beneficial effect.

Solution

If you're experiencing having to wait for a handful of slow tasks, then you could be suffering from a skew in your data distribution. Symptoms of this are that you're seeing some tasks taking far longer than others or that some tasks have far more input or output.

If this is the case, the first thing to do is check that the number of keys is greater than the number of executors, as coarse-grained grouping can limit parallelism. A quick way to find the number of keys in your RDD is to use rdd.keys.count. If this value is lower than the number of executors, then reconsider your key strategy. Patterns such as Expand and Conquer may be able to help out.

If the preceding things are in order, the next thing to review is key distribution. Where you find a small number of keys with large numbers of associated values, consider the Salting Key pattern. In this pattern, popular keys are subdivided by appending a random element. For example:

rdd filter {
   case (k,v) => isPopular(k)
}
.map {
   case (k,v) => (k + r.nextInt(n), v)
}

This results in a more balanced key distribution because during the shuffle, the HashPartitioner sends the new keys to different executors. You can choose the value of n to suit the parallelism you need - greater skew in the data necessitates a greater range of salts.

Of course, all this salting does mean that you'll need to re-aggregate back onto the old keys to ensure that you ultimately calculate the correct answer. But, depending on the amount of skew in your data, a two-phase aggregation may still be faster.

You can either apply this salting to all keys, or filter out as in the preceding example. The threshold at which you filter, decided by isPopular in the example, is also entirely your choice.

Secondary sort pattern

Problem

When grouping by keys, my analytic has to explicitly sort the values after they are grouped. This sorting takes place in memory, therefore large value-sets take a long time, and they may involve spilling to disk and sometimes give an OutOfMemoryError. Here is an example of the problematic approach:

rdd.reduceByKey(_+_).sortBy(_._2,false) // inefficient for large groups

Instead, when grouping by key, values should be pre-ordered within each key for immediate and efficient follow-on processing.

Solution

Use the Secondary Sort pattern to order the list of items in a group efficiently by using the shuffle machinery. This approach will scale when handling even the largest of datasets.

In order to sort efficiently, this pattern utilizes three concepts:

  1. Composite key: Contains both the elements you want to group by and the elements you want to sort by.
  2. Grouping partitioner: Understands which parts of the composite key are related to grouping.
  3. Composite key ordering: Understands which parts of the composite key are related to ordering.

Each of these is injected into Spark so that the final dataset is presented as grouped and ordered.

Please note, in order to perform a secondary sort you need to use RDDs, as the new Dataset API is not currently supported. Track the progress on the following JIRA https://issues.apache.org/jira/browse/SPARK-3655.

Example

Consider the following model:

case class Mention(name:String, article:String, published:Long) 

Here we have an entity representing the occasions where people are mentioned in news articles containing the person's name, the article they were mentioned in, and its publication date.

Suppose we want to group together all the mentions of people with the same name, and order them by time. Let's look at the three mechanisms we need:

Composite key:

case class SortKey(name:String, published:Long)

Contains both name and published date.

Grouping partitioner:

class GroupingPartitioner(partitions: Int) extends Partitioner {
 
    override def numPartitions: Int = partitions

    override def getPartition(key: Any): Int = {

      val groupBy = key.asInstanceOf[SortKey]
      groupBy.name.hashCode() % numPartitions
    }
  }

It's only grouping by name.

Composite key ordering:

implicit val sortBy: Ordering[SortKey] = Ordering.by(m => m.published)

It's only sorting by published date.

Once we have defined these, we can use them in the API, like so:

val pairs = mentions.rdd.keyBy(m => SortKey(m.name, m.published))
pairs.repartitionAndSortWithinPartitions(new GroupingPartitioner(n))

Here the SortKey is used to pair the data, the GroupingPartitioner is used when partitioning the data, and the Ordering is used during the merge and, of course, it's found via Scala's implicit mechanism, which matches based on type.

Filter overkill pattern

Problem

My analytic uses a whitelist in order to filter relevant data for processing. The filter happens early on in the pipeline so that my analytic only ever has to process the data I'm interested in, for maximum efficiency. However, the whitelist frequently changes meaning my analytic must be executed afresh, each time, against the new list.

Solution

Contrary to some of the other advice you'll read here, in some scenarios calculating results across all the data, by removing filters, can actually increase the overall efficiency of an analytic.

If you are frequently rerunning your analytic over different segments of the dataset, then consider using a popular approach, described here as the Filter Overkill pattern. This involves omitting all filters in Spark and processing over the entire corpus. The results of this one-off processing will be much larger that the filtered version, but it can be easily indexed in a tabular data store and filtered dynamically at query time. This avoids having to apply different filters over multiple runs, and having to re-compute historical data when filters change.

Probabilistic algorithms

Problem

It takes too long to compute statistics over my dataset because it is too large. By the time the response is received, it's out of date or no longer relevant. Therefore, it's more important to receive a timely response, or at least provide a maximum bound to time-complexity, than a complete or correct answer. In fact, a well-timed estimate even with a small probability of error would be taken in preference to a correct answer where the running time is not known.

Solution

Probabilistic algorithms use randomization to improve the time complexity of their algorithms and guarantee worst case performance. If you are time sensitive and just about right is good enough, you should consider using a probabilistic algorithm.

In addition, the same can be said for the problem of memory usage. There are a set of Probabilistic algorithms that provide estimates inside restricted space-complexity. Examples include:

  • Bloom Filter is a membership test that is guaranteed to never miss an element in a set, but could give you a false positive, that is, determine an element to be a member of a set when it is not. It's useful for quickly reducing the amount of data in a problem-space prior to a more accurate calculation.
  • HyperLogLog counts the number of distinct values in a column, providing a very reasonable estimate using a fixed memory footprint.
  • CountMinSketch provides a frequency table used for counting occurrences of events in a stream of data. Particularly useful in Spark streaming where a fixed memory footprint eliminates the potential for memory overflows.

Spark provides implementations of these in org.apache.spark.sql.DataFrameStatFunctions and they can be used by accessing df.stat. Spark also includes some access via the RDD API:

rdd.countApprox() 
rdd.countByValueApprox() 
rdd.countApproxDistinct() 

Example

For an example of how to use a Bloom Filter see Chapter 11, Anomaly Detection on Sentiment Analysis.

Selective caching

Problem

My analytic is caching datasets, but if anything, it's running slower than before.

Solution

Caching is key to getting the most performance out of Spark; however, when used incorrectly, it can have a detrimental effect. Caching is particularly useful whenever you intend to use an RDD more than once. This generally happens when you are: (i) using the data across stages, (ii) the data appears in the lineage of multiple child datasets, or (iii) during iterative processes, such as stochastic gradient descent.

The problem occurs when you cache indiscriminately without considering reuse. This is because the cache adds overhead when it's created, updated and flushed, and then must be garbage collected when not used. Therefore, improper caching can actually slow down your job. So, the easiest way to improve caching is to stop doing it (selectively of course).

Another consideration is whether there's enough memory allocated and available to efficiently cache your RDD. If your dataset won't fit into memory, Spark will either throw an OutOfMemoryError or swap data to disk (depending on the storage levels, this will be talked about shortly). In the latter case, this could have a performance impact due to both (i) the time taken to move extra data in and out of memory and (ii) having to wait for the availability of the disk (I/O wait).

In order to determine whether you have enough memory allocated to your executors, first cache the dataset as follows:

ds.cache 
ds.count 

Then, look at the Storage page in the Spark UI. For each RDD, this provides the fraction cached, its size, and the amount spilled to disk.

Solution

This should enable you to adjust the memory allocated to each executor in order to ensure that your data fits in memory. There are also the following caching options available:

  • NONE: No caching (default)
  • MEMORY: Used when cache is called
  • DISK: Spill to disk
  • SER: Same as MEMORY, but objects are stored in a byte array
  • 2 (REPLICATED): Keep a cached copy on two different nodes

The preceding options can be used in any combination, for example:

  • If you're experiencing OutOfMemoryError errors, try changing to MEMORY_AND_DISK to allow spilling of the cache to disk
  • If you're experiencing high garbage collection times, consider trying one of the serialized byte buffer forms of cache, such as MEMORY_AND_SER, as this will circumvent the GC entirely (at the slight cost of increased serialization)

The goal here is to ensure that the Fraction Cached is at 100%, and where possible, minimize the Size on Disk to establish effective in-memory caching of your datasets.

Garbage collection

Problem

The GC time of my analytic is a significant proportion of the overall processing time (>15%).

Solution

Spark's garbage collector works pretty efficiently out of the box, so you should only attempt to adjust it if you're sure that it's the cause and not the symptom of the problem. Before altering the GC settings, you should ensure that you have reviewed all other aspects of your analytic. Sometimes you might see high GC times in the Spark UI for reasons other than a poor GC configuration. Most of the time, it's worth investigating these first.

If you're seeing frequent or lengthy GC times, the first thing to do is confirm that your code is behaving sensibly and make sure that it's not at the root of excess/irregular memory consumption. For example, review your caching strategy (see the preceding section) or use the unpersist function to explicitly remove RDDs or Datasets that are no longer required.

Another factor for consideration is the number of objects you allocate within your job. Try to minimize the amount of objects you instantiate by (i) simplifying your domain model, or (ii) by reusing instances, or (iii) by preferring primitives where you can.

Finally, if you're still seeing lengthy GC times, try tuning the GC. There's some great information provided by Oracle on how to do this (https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/g1_gc_tuning.html), but specifically there is evidence to suggest that Spark can perform well using the G1 GC. It's possible to switch to this GC by adding XX:UseG1GC to the Spark command line.

When tuning the G1 GC, the two main options are:

  • InitiatingHeapOccupancyPercent: A threshold percent of how full the heap should be before the GC triggers a cycle. The lower the percentage, the more frequently the GC runs, but the less work it has to do on each run. Therefore, if you set it to less than 45% (the default value), you might see fewer pauses. It can be configured on the command line using -XX:InitiatingHeapOccupancyPercent.
  • ConcGCThread: The number of concurrent GC threads running in the background. The more threads, the quicker the garbage collection can complete. But it's a trade-off as more GC threads means more CPU resource allocation. Can be configured on the command line using -XX:ConcGCThread.

In summary, it's a matter of experimenting with these settings and tuning your analytic to find the optimum configuration.

Graph traversal

Problem

My analytic has an iterative step that only completes when a global condition is met, such as all keys report no more values to process, and consequently the running time can be slow and difficult to predict.

Solution

Generally speaking, the efficiency of graph-based algorithms is such that, if you can represent your problem as a standard graph traversal problem, you probably should. Examples of problems with graph-based solutions include: shortest-path, depth-first search, and page-rank.

Example

For an example of how to use the Pregel algorithm in GraphX and how to interpret a problem in terms of graph traversal, see Chapter 7, Building Communities.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.13.164