Resilient distributed datasets

Spark expresses all computations as a sequence of transformations and actions on distributed collections, called Resilient Distributed Datasets (RDD). Let's explore how RDDs work with the Spark shell. Navigate to the examples directory and open a Spark shell as follows:

$ spark-shell
scala> 

Let's start by loading an email in an RDD:

scala> val email = sc.textFile("ham/9-463msg1.txt")
email: rdd.RDD[String] = MapPartitionsRDD[1] at textFile

email is an RDD, with each element corresponding to a line in the input file. Notice how we created the RDD by calling the textFile method on an object called sc:

scala> sc
spark.SparkContext = org.apache.spark.SparkContext@459bf87c

sc is a SparkContext instance, an object representing the entry point to the Spark cluster (for now, just our local machine). When we start a Spark shell, a context is created and bound to the variable sc automatically.

Let's split the email into words using flatMap:

scala> val words = email.flatMap { line => line.split("\s") }
words: rdd.RDD[String] = MapPartitionsRDD[2] at flatMap

This will feel natural if you are familiar with collections in Scala: the email RDD behaves just like a list of strings. Here, we split using the regular expression s, denoting white space characters. Instead of using flatMap explicitly, we can also manipulate RDDs using Scala's syntactic sugar:

scala> val words = for { 
  line <- email
  word <- line.split("\s") 
} yield word
words: rdd.RDD[String] = MapPartitionsRDD[3] at flatMap

Let's inspect the results. We can use .take(n) to extract the first n elements of an RDD:

scala> words.take(5)
Array[String] = Array(Subject:, tsd98, workshop, -, -)

We can also use .count to get the number of elements in an RDD:

scala> words.count
Long = 939

RDDs support many of the operations supported by collections. Let's use filter to remove punctuation from our email. We will remove all words that contain any non-alphanumeric character. We can do this by filtering out elements that match this regular expression anywhere in the word: [^a-zA-Z0-9].

scala> val nonAlphaNumericPattern = "[^a-zA-Z0-9]".r
nonAlphaNumericPattern: Regex = [^a-zA-Z0-9]

scala> val filteredWords = words.filter { 
  word => nonAlphaNumericPattern.findFirstIn(word) == None 
}
filteredWords: rdd.RDD[String] = MapPartitionsRDD[4] at filter

scala> filteredWords.take(5)
Array[String] = Array(tsd98, workshop, 2nd, call, paper)

scala> filteredWords.count
Long = 627

In this example, we created an RDD from a text file. We can also create RDDs from Scala iterables using the sc.parallelize method available on a Spark context:

scala> val words = "the quick brown fox jumped over the dog".split(" ") 
words: Array[String] = Array(the, quick, brown, fox, ...)

scala> val wordsRDD = sc.parallelize(words)
wordsRDD: RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:23

This is useful for debugging and for trialling behavior in the shell. The counterpart to parallelize is the .collect method, which converts an RDD to a Scala array:

scala> val wordLengths = wordsRDD.map { _.length }
wordLengths: RDD[Int] = MapPartitionsRDD[2] at map at <console>:25

scala> wordLengths.collect
Array[Int] = Array(3, 5, 5, 3, 6, 4, 3, 3)

The .collect method requires the entire RDD to fit in memory on the master node. It is thus either used for debugging with a reduced dataset, or at the end of a pipeline that trims down a dataset.

As you can see, RDDs offer an API much like Scala iterables. The critical difference is that RDDs are distributed and resilient. Let's explore what this means in practice.

RDDs are immutable

You cannot change an RDD once it is created. All operations on RDDs either create new RDDs or other Scala objects.

RDDs are lazy

When you execute operations like map and filter on a Scala collection in the interactive shell, the REPL prints the values of the new collection to screen. The same isn't true of Spark RDDs. This is because operations on RDDs are lazy: they are only evaluated when needed.

Thus, when we write:

val email = sc.textFile(...)
val words = email.flatMap { line => line.split("\s") }

We are creating an RDD, words that knows how to build itself from its parent RDD, email, which, in turn, knows that it needs to read a text file and split it into lines. However, none of the commands actually happen until we force the evaluation of the RDDs by calling an action to return a Scala object. This is most evident if we try to read from a non-existent text file:

scala> val inp = sc.textFile("nonexistent")
inp: rdd.RDD[String] = MapPartitionsRDD[5] at textFile

We can create the RDD without a hitch. We can even define further transformations on the RDD. The program crashes only when these transformations are finally evaluated:

scala> inp.count // number of lines
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/pascal/...

The action .count is expected to return the number of elements in our RDD as an integer. Spark has no choice but to evaluate inp, which results in an exception.

Thus, it is probably more appropriate to think of an RDD as a pipeline of operations, rather than a more traditional collection.

RDDs know their lineage

RDDs can only be constructed from stable storage (for instance, by loading data from a file that is present on every node in the Spark cluster), or through a set of transformations based on other RDDs. Since RDDs are lazy, they need to know how to build themselves when needed. They do this by knowing who their parent RDD is, and what operation they need to apply to the parent. This is a well-defined process since the parent RDD is immutable.

The toDebugString method provides a diagram of how an RDD is constructed:

scala> filteredWords.toDebugString
(2) MapPartitionsRDD[6] at filter at <console>:27 []
 |  MapPartitionsRDD[3] at flatMap at <console>:23 []
 |  MapPartitionsRDD[1] at textFile at <console>:21 []
 |  ham/9-463msg1.txt HadoopRDD[0] at textFile at <console>:21 []

RDDs are resilient

If you run an application on a single computer, you generally don't need to worry about hardware failure in your application: if the computer fails, your application is doomed anyway.

Distributed architectures should, by contrast, be fault-tolerant: the failure of a single machine should not crash the entire application. Spark RDDs are built with fault tolerance in mind. Let's imagine that one of the worker nodes fails, causing the destruction of some of the data associated with an RDD. Since the Spark RDD knows how to build itself from its parent, there is no permanent data loss: the elements that were lost can just be re-computed when needed on another computer.

RDDs are distributed

When you construct an RDD, for instance from a text file, Spark will split the RDD into a number of partitions. Each partition will be entirely localized on a single machine (though there is, in general, more than one partition per machine).

Many transformations on RDDs can be executed on each partition independently. For instance, when performing a .map operation, a given element in the output RDD depends on a single element in the parent: data does not need to be moved between partitions. The same is true of .flatMap and .filter operations. This means that the partition in the RDD produced by one of these operations depends on a single partition in the parent RDD.

On the other hand, a .distinct transformation, which removes all duplicate elements from an RDD, requires the data in a given partition to be compared to the data in every other partition. This requires shuffling the data across the nodes. Shuffling, especially for large datasets, is an expensive operation and should be avoided if possible.

Transformations and actions on RDDs

The set of operations supported by an RDD can be split into two categories:

  • Transformations create a new RDD from the current one. Transformations are lazy: they are not evaluated immediately.
  • Actions force the evaluation of an RDD, and normally return a Scala object, rather than an RDD, or have some form of side-effect. Actions are evaluated immediately, triggering the execution of all the transformations that make up this RDD.

In the tables below, we give some examples of useful transformations and actions. For a full, up-to-date list, consult the Spark documentation (http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations).

For the examples in these tables, we assume that you have created an RDD with:

scala> val rdd = sc.parallelize(List("quick", "brown", "quick", "dog"))

The following table lists common transformations on an RDD. Recall that transformations always generate a new RDD, and that they are lazy operations:

Transformation

Notes

Example (assuming rdd is { "quick", "brown", "quick", "dog" })

rdd.map(func)

 

rdd.map { _.size } // => { 5, 5, 5, 3 }

rdd.filter(pred)

 

rdd.filter { _.length < 4 } // => { "dog" }

rdd.flatMap(func)

 

rdd.flatMap { _.toCharArray } // => { 'q', 'u', 'i', 'c', 'k', 'b', 'r', 'o' … }

rdd.distinct()

Remove duplicate elements in RDD.

rdd.distinct // => { "dog", "brown", "quick" }

rdd.pipe(command, [envVars])

Pipe through an external program. RDD elements are written, line-by-line, to the process's stdin. The output is read from stdout.

rdd.pipe("tr a-z A-Z") // => { "QUICK", "BROWN", "QUICK", "DOG" }

The following table describes common actions on RDDs. Recall that actions always generate a Scala type or cause a side-effect, rather than creating a new RDD. Actions force the evaluation of the RDD, triggering the execution of the transformations underpinning the RDD.

Action

Nodes

Example (assuming rdd is { "quick", "brown", "quick", "dog" })

rdd.first

First element in the RDD.

rdd.first // => quick

rdd.collect

Transform the RDD to an array (the array must be able to fit in memory on the master node).

rdd.collect // => Array[String]("quick", "brown", "quick", "dog")

rdd.count

Number of elements in the RDD.

rdd.count // => 4

rdd.countByValue

Map of element to the number of times this element occurs. The map must fit on the master node.

rdd.countByValue // => Map(quick -> 2, brown -> 1, dog -> 1)

rdd.take(n)

Return an array of the first n elements in the RDD.

rdd.take(2) // => Array(quick, brown)

rdd.takeOrdered(n:Int)(implicit ordering: Ordering[T])

Top n elements in the RDD according to the element's default ordering, or the ordering passed as second argument. See the Scala docs for Ordering for how to define custom comparison functions (http://www.scala-lang.org/api/current/index.html#scala.math.Ordering).

rdd.takeOrdered(2) // => Array(brown, dog)

rdd.takeOrdered(2) (Ordering.by { _.size }) // => Array[String] = Array(dog, quick)

rdd.reduce(func)

Reduce the RDD according to the specified function. Uses the first element in the RDD as the base. func should be commutative and associative.

rdd.map { _.size }.reduce { _ + _ } // => 18

rdd.aggregate(zeroValue)(seqOp, combOp)

Reduction for cases where the reduction function returns a value of type different to the RDD's type. In this case, we need to provide a function for reducing within a single partition (seqOp) and a function for combining the value of two partitions (combOp).

rdd.aggregate(0) ( _ + _.size, _ + _ ) // => 18

Persisting RDDs

We have learned that RDDs only retain the sequence of operations needed to construct the elements, rather than the values themselves. This, of course, drastically reduces memory usage since we do not need to keep intermediate versions of our RDDs in memory. For instance, let's assume we want to trawl through transaction logs to identify all the transactions that occurred on a particular account:

val allTransactions = sc.textFile("transaction.log")
val interestingTransactions = allTransactions.filter { 
  _.contains("Account: 123456")
}

The set of all transactions will be large, while the set of transactions on the account of interest will be much smaller. Spark's policy of remembering how to construct a dataset, rather than the dataset itself, means that we never have all the lines of our input file in memory at any one time.

There are two situations in which we may want to avoid re-computing the elements of an RDD every time we use it:

  • For interactive use: we might have detected fraudulent behavior on account "123456", and we want to investigate how this might have arisen. We will probably want to perform many different exploratory calculations on this RDD, without having to re-read the entire log file every time. It therefore makes sense to persist interestingTransactions.
  • When an algorithm re-uses an intermediate result, or a dataset. A canonical example is logistic regression. In logistic regression, we normally use an iterative algorithm to find the 'optimal' coefficients that minimize the loss function. At every step in our iterative algorithm, we must calculate the loss function and its gradient from the training set. We should avoid re-computing the training set (or re-loading it from an input file) if at all possible.

Spark provides a .persist method on RDDs to achieve this. By calling .persist on an RDD, we tell Spark to keep the dataset in memory next time it is computed.

scala> words.persist
rdd.RDD[String] = MapPartitionsRDD[3] at filter

Spark supports different levels of persistence, which you can tune by passing arguments to .persist:

scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel

scala> interestingTransactions.persist(
  StorageLevel.MEMORY_AND_DISK)
rdd.RDD[String] = MapPartitionsRDD[3] at filter

Spark provides several persistence levels, including:

  • MEMORY_ONLY: the default storage level. The RDD is stored in RAM. If the RDD is too big to fit in memory, parts of it will not persist, and will need to be re-computed on the fly.
  • MEMORY_AND_DISK: As much of the RDD is stored in memory as possible. If the RDD is too big, it will spill over to disk. This is only worthwhile if the RDD is expensive to compute. Otherwise, re-computing it may be faster than reading from the disk.

If you persist several RDDs and run out of memory, Spark will clear the least recently used out of memory (either discarding them or saving them to disk, depending on the chosen persistence level). RDDs also expose an unpersist method to explicitly tell Spark than an RDD is not needed any more.

Persisting RDDs can have a drastic impact on performance. What and how to persist therefore becomes very important when tuning a Spark application. Finding the best persistence level generally requires some tinkering, benchmarking and experimentation. The Spark documentation provides guidelines on when to use which persistence level (http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence), as well as general tips on tuning memory usage (http://spark.apache.org/docs/latest/tuning.html).

Importantly, the persist method does not force the evaluation of the RDD. It just notifies the Spark engine that, next time the values in this RDD are computed, they should be saved rather than discarded.

Key-value RDDs

So far, we have only considered RDDs of Scala value types. RDDs of more complex data types support additional operations. Spark adds many operations for key-value RDDs: RDDs whose type parameter is a tuple (K, V), for any type K and V.

Let's go back to our sample email:

scala> val email = sc.textFile("ham/9-463msg1.txt")
email: rdd.RDD[String] = MapPartitionsRDD[1] at textFile

scala> val words = email.flatMap { line => line.split("\s") }
words: rdd.RDD[String] = MapPartitionsRDD[2] at flatMap

Let's persist the words RDD in memory to avoid having to re-read the email file from disk repeatedly:

scala> words.persist

To access key-value operations, we just need to apply a transformation to our RDD that creates key-value pairs. Let's use the words as keys. For now, we will just use 1 for every value:

scala> val wordsKeyValue = words.map { _ -> 1 }
wordsKeyValue: rdd.RDD[(String, Int)] = MapPartitionsRDD[32] at map 

scala> wordsKeyValue.first
(String, Int) = (Subject:,1)

Key-value RDDs support several operations besides the core RDD operations. These are added through an implicit conversion, using the "pimp my library" pattern that we explored in Chapter 5, Scala and SQL through JDBC. These additional transformations fall into two broad categories: by-key transformations and joins between RDDs.

By-key transformations are operations that aggregate the values corresponding to the same key. For instance, we can count the number of times each word appears in our email using reduceByKey. This method takes all the values that belong to the same key and combines them using a user-supplied function:

scala> val wordCounts = wordsKeyValue.reduceByKey { _ + _ }
wordCounts: rdd.RDD[(String, Int)] = ShuffledRDD[35] at reduceByKey

scala> wordCounts.take(5).foreach { println }
(university,6)
(under,1)
(call,3)
(paper,2)
(chasm,2)

Note that reduceByKey requires (in general) shuffling the RDD, since not every occurrence of a given key will be in the same partition:

scala> wordCounts.toDebugString
(2) ShuffledRDD[36] at reduceByKey at <console>:30 []
 +-(2) MapPartitionsRDD[32] at map at <console>:28 []
    |  MapPartitionsRDD[7] at flatMap at <console>:23 []
    |      CachedPartitions: 2; MemorySize: 50.3 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
    |  MapPartitionsRDD[3] at textFile at <console>:21 []
    |      CachedPartitions: 2; MemorySize: 5.1 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
    |  ham/9-463msg1.txt HadoopRDD[2] at textFile at <console>:21 []

Note that key-value RDDs are not like Scala Maps: the same key can occur multiple times, and they do not support O(1) lookup. A key-value RDD can be transformed to a Scala map using the .collectAsMap action:

scala> wordCounts.collectAsMap
scala.collection.Map[String,Int] = Map(follow -> 2, famous -> 1...

This requires pulling the entire RDD onto the main Spark node. You therefore need to have enough memory on the main node to house the map. This is often the last stage in a pipeline that filters a large RDD to just the information that we need.

There are many by-key operations, which we describe in the table below. For the examples in the table, we assume that rdd is created as follows:

scala> val words = sc.parallelize(List("quick", "brown","quick", "dog"))
words: RDD[String] = ParallelCollectionRDD[25] at parallelize at <console>:21

scala> val rdd = words.map { word => (word -> word.size) }
rdd: RDD[(String, Int)] = MapPartitionsRDD[26] at map at <console>:23

scala> rdd.collect
Array[(String, Int)] = Array((quick,5), (brown,5), (quick,5), (dog,3))

Transformation

Notes

Example (assumes rdd is { quick -> 5, brown -> 5, quick -> 5, dog -> 3 })

rdd.mapValues

Apply an operation to the values.

rdd.mapValues { _ * 2 } // => { quick -> 10, brown -> 10, quick -> 10, dog ->6 }

rdd.groupByKey

Return a key-value RDD in which values corresponding to the same key are grouped into iterables.

rdd.groupByKey // => { quick -> Iterable(5, 5), brown -> Iterable(5), dog -> Iterable(3) }

rdd.reduceByKey(func)

Return a key-value RDD in which values corresponding to the same key are combined using a user-supplied function.

rdd.reduceByKey { _ + _ } // => { quick -> 10, brown -> 5, dog -> 3 }

rdd.keys

Return an RDD of the keys.

rdd.keys // => { quick, brown, quick, dog }

rdd.values

Return an RDD of the values.

rdd.values // => { 5, 5, 5, 3 }

The second category of operations on key-value RDDs involves joining different RDDs together by key. This is somewhat similar to SQL joins, where the keys are the column being joined on. Let's load a spam email and apply the same transformations we applied to our ham email:

scala> val spamEmail = sc.textFile("spam/spmsgb17.txt")
spamEmail: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[52] at textFile at <console>:24

scala> val spamWords = spamEmail.flatMap { _.split("\s") }
spamWords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[53] at flatMap at <console>:26

scala> val spamWordCounts = spamWords.map { _ -> 1 }.reduceByKey { _ + _ }
spamWordsCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[55] at reduceByKey at <console>:30

scala> spamWordCounts.take(5).foreach { println }
(banner,3)
(package,14)
(call,1)
(country,2)
(offer,1)

Both spamWordCounts and wordCounts are key-value RDDs for which the keys correspond to unique words in the message, and the values are the number of times that word occurs. There will be some overlap in keys between spamWordCounts and wordCounts, since the emails will share many of the same words. Let's do an inner join between those two RDDs to get the words that occur in both emails:

scala> val commonWordCounts = wordCounts.join(spamWordCounts)
res93: rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[58] at join at <console>:41

scala> commonWordCounts.take(5).foreach { println }
(call,(3,1))
(include,(6,2))
(minute,(2,1))
(form,(1,7))
((,(36,5))

The values in the RDD resulting from an inner join will be pairs. The first element in the pair is the value for that key in the first RDD, and the second element is the value for that key in the second RDD. Thus, the word call occurs three times in the legitimate email and once in the spam email.

Spark supports all four join types. For instance, let's perform a left join:

scala> val leftWordCounts = wordCounts.leftOuterJoin(spamWordCounts)
leftWordCounts: rdd.RDD[(String, (Int, Option[Int]))] = MapPartitionsRDD[64] at leftOuterJoin at <console>:40

scala> leftWordCounts.take(5).foreach { println }
(call,(3,Some(1)))
(paper,(2,None))
(chasm,(2,None))
(antonio,(1,None))
(event,(3,None))

Notice that the second element in our pair has type Option[Int], to accommodate keys absent in spamWordCounts. The word paper, for instance, occurs twice in the legitimate email and never in the spam email. In this case, it is more useful to have zeros to indicate absence, rather than None. Replacing None with a default value is simple with getOrElse:

scala> val defaultWordCounts = leftWordCounts.mapValues { 
  case(leftValue, rightValue) => (leftValue, rightValue.getOrElse(0))
}
org.apache.spark.rdd.RDD[(String, (Int, Option[Int]))] = MapPartitionsRDD[64] at leftOuterJoin at <console>:40

scala> defaultwordCounts.take(5).foreach { println }
(call,(3,1))
(paper,(2,0))
(chasm,(2,0))
(antonio,(1,0))
(event,(3,0))

The table below lists the most common joins on key-value RDDs:

Transformation

Result (assuming rdd1 is { quick -> 1, brown -> 2, quick -> 3, dog -> 4 } and rdd2 is { quick -> 78, brown -> 79, fox -> 80 })

rdd1.join(rdd2)

{ quick -> (1, 78), quick -> (3, 78), brown -> (2, 79) }

rdd1.leftOuterJoin(rdd2)

{ dog -> (4, None), quick -> (1, Some(78)), quick -> (3, Some(78)), brown -> (2, Some(79)) }

rdd1.rightOuterJoin(rdd2)

{ quick -> (Some(1), 78), quick -> (Some(3), 78), brown -> (Some(2), 79), fox -> (None, 80) }

rdd1.fullOuterJoin(rdd2)

{ dog -> (Some(4), None), quick -> (Some(1), Some(78)), quick -> (Some(3), Some(78)), brown -> (Some(2), Some(79)), fox -> (None, Some(80)) }

For a complete list of transformations, consult the API documentation for PairRDDFunctions, http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions.

Double RDDs

In the previous section, we saw that Spark adds functionality to key-value RDDs through an implicit conversion. Similarly, Spark adds statistics functionality to RDDs of doubles. Let's extract the word frequencies for the ham message, and convert the values from integers to doubles:

scala> val counts = wordCounts.values.map { _.toDouble }
counts: rdd.RDD[Double] = MapPartitionsRDD[9] at map

We can then get summary statistics using the .stats action:

scala> counts.stats
org.apache.spark.util.StatCounter = (count: 397, mean: 2.365239, stdev: 5.740843, max: 72.000000, min: 1.000000)

Thus, the most common word appears 72 times. We can also use the .histogram action to get an idea of the distribution of values:

scala> counts.histogram(5)
(Array(1.0, 15.2, 29.4, 43.6, 57.8, 72.0),Array(391, 1, 3, 1, 1))

The .histogram method returns a pair of arrays. The first array indicates the bounds of the histogram bins, and the second is the count of elements in that bin. Thus, there are 391 words that appear less than 15.2 times. The distribution of words is very skewed, such that a histogram with regular-sized bin is not really appropriate. We can, instead, pass in custom bins by passing an array of bin edges to the histogram method. For instance, we might distribute the bins logarithmically:

scala> counts.histogram(Array(1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0))
res13: Array[Long] = Array(264, 94, 22, 11, 1, 4, 1)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.189.186