Chapter 4. Parallel Collections and Futures

Data science often involves processing medium or large amounts of data. Since the previously exponential growth in the speed of individual CPUs has slowed down and the amount of data continues to increase, leveraging computers effectively must entail parallel computation.

In this chapter, we will look at ways of parallelizing computation and data processing over a single computer. Virtually all new computers have more than one processing unit, and distributing a calculation over these cores can be an effective way of hastening medium-sized calculations.

Parallelizing calculations over a single chip is suitable for calculations involving gigabytes or a few terabytes of data. For larger data flows, we must resort to distributing the computation over several computers in parallel. We will discuss Apache Spark, a framework for parallel data processing in Chapter 10, Distributed Batch Processing with Spark.

In this book, we will look at three common ways of leveraging parallel architectures in a single machine: parallel collections, futures, and actors. We will consider the first two in this chapter, and leave the study of actors to Chapter 9, Concurrency with Akka.

Parallel collections

Parallel collections offer an extremely easy way to parallelize independent tasks. The reader, being familiar with Scala, will know that many tasks can be phrased as operations on collections, such as map, reduce, filter, or groupBy. Parallel collections are an implementation of Scala collections that parallelize these operations to run over several threads.

Let's start with an example. We want to calculate the frequency of occurrence of each letter in a sentence:

scala> val sentence = "The quick brown fox jumped over the lazy dog"
sentence: String = The quick brown fox jumped ...

Let's start by converting our sentence from a string to a vector of characters:

scala> val characters = sentence.toVector
Vector[Char] = Vector(T, h, e,  , q, u, i, c, k, ...)

We can now convert characters to a parallel vector, a ParVector. To do this, we use the par method:

scala> val charactersPar = characters.par
ParVector[Char] = ParVector(T, h, e,  , q, u, i, c, k,  , ...)

ParVector collections support the same operations as regular vectors, but their methods are executed in parallel over several threads.

Let's start by filtering out the spaces in charactersPar:

scala> val lettersPar = charactersPar.filter { _ != ' ' }
ParVector[Char] = ParVector(T, h, e, q, u, i, c, k, ...)

Notice how Scala hides the execution details. The filter operation was performed using multiple threads, and you barely even noticed! The interface and behavior of a parallel vector is identical to its serial counterpart, save for a few details that we will explore in the next section.

Let's now use the toLower function to make the letters lowercase:

scala> val lowerLettersPar = lettersPar.map { _.toLower }
ParVector[Char] = ParVector(t, h, e, q, u, i, c, k, ...)

As before, the map method was applied in parallel. To find the frequency of occurrence of each letter, we use the groupBy method to group characters into vectors containing all the occurrences of that character:

scala> val intermediateMap = lowerLettersPar.groupBy(identity)
ParMap[Char,ParVector[Char]] = ParMap(e -> ParVector(e, e, e, e), ...)

Note how the groupBy method has created a ParMap instance, the parallel equivalent of an immutable map. To get the number of occurrences of each letter, we do a mapValues call on intermediateMap, replacing each vector by its length:

scala> val occurenceNumber = intermediateMap.mapValues { _.length }
ParMap[Char,Int] = ParMap(e -> 4, x -> 1, n -> 1, j -> 1, ...)

Congratulations! We've written a multi-threaded algorithm for finding the frequency of occurrence of each letter in a few lines of code. You should find it straightforward to adapt this to find the frequency of occurrence of each word in a document, a common preprocessing problem for analyzing text data.

Parallel collections make it very easy to parallelize some operation pipelines: all we had to do was call .par on the characters vector. All subsequent operations were parallelized. This makes switching from a serial to a parallel implementation very easy.

Limitations of parallel collections

Part of the power and the appeal of parallel collections is that they present the same interface as their serial counterparts: they have a map method, a foreach method, a filter method, and so on. By and large, these methods work in the same way on parallel collections as they do in serial. There are, however, some notable caveats. The most important one has to do with side effects. If an operation on a parallel collection has a side effect, this may result in a race condition: a situation in which the final result depends on the order in which the threads perform their operations.

Side effects in collections arise most commonly when we update a variable defined outside of the collection. To give a trivial example of unexpected behavior, let's define a count variable and increment it a thousand times using a parallel range:

scala> var count = 0
count: Int = 0

scala> (0 until 1000).par.foreach { i => count += 1 }

scala> count
count: Int = 874 // not 1000!

What happened here? The function passed to foreach has a side effect: it increments count, a variable outside of the scope of the function. This is a problem because the += operator is a sequence of two operations:

  • Retrieve the value of count and add one to it
  • Assign the result back to count

To understand why this causes unexpected behavior, let's imagine that the foreach loop has been parallelized over two threads. Thread A might read the count variable when it is 832 and add one to it to give 833. Before it has time to reassign 833 to count, Thread B reads count, still at 832, and adds one to give 833. Thread A then assigns 833 to count. Thread B then assigns 833 to count. We've run through two updates but only incremented the count by one. The problem arises because += can be separated into two instructions: it is not atomic. This leaves room for threads to interleave their operations:

Limitations of parallel collections

The anatomy of a race condition: both thread A and thread B are trying to update count concurrently, resulting in one of the updates being overwritten. The final value of count is 833 instead of 834.

To give a somewhat more realistic example of problems caused by non-atomicity, let's look at a different method for counting the frequency of occurrence of each letter in our sentence. We define a mutable Char -> Int hash map outside of the loop. Each time we encounter a letter, we increment the corresponding integer in the map:

scala> import scala.collection.mutable
import scala.collection.mutable

scala> val occurenceNumber = mutable.Map.empty[Char, Int]
occurenceNumber: mutable.Map[Char,Int] = Map()

scala> lowerLettersPar.foreach { c => 
  occurenceNumber(c) = occurenceNumber.getOrElse(c, 0) + 1
}

scala> occurenceNumber('e') // Should be 4
Int = 2

The discrepancy occurs because of the non-atomicity of the operations in the foreach loop.

In general, it is good practice to avoid side effects in higher-order functions on collections. They make the code harder to understand and preclude switching from serial to parallel collections. It is also good practice to avoid exposing mutable state: immutable objects can be shared freely between threads and cannot be affected by side effects.

Another limitation of parallel collections occurs in reduction (or folding) operations. The function used to combine items together must be associative. For instance:

scala> (0 until 1000).par.reduce {_ - _ } // should be -499500
Int = 63620

The minus operator, , is not associative. The order in which consecutive operations are applied matters: (a – b) – c is not the same as a – (b – c). The function used to reduce a parallel collection must be associative because the order in which the reduction occurs is not tied to the order of the collection.

Error handling

In single-threaded programs, exception handling is relatively straightforward: if an exception occurs, the function can either handle it or escalate it. This is not nearly as obvious when parallelism is introduced: a single thread might fail, but the others might return successfully.

Parallel collection methods will throw an exception if they fail on any element, just like their serial counterparts:

scala> Vector(2, 0, 5).par.map { 10 / _ }
java.lang.ArithmeticException: / by zero
...

There are cases when this isn't the behavior that we want. For instance, we might be using a parallel collection to retrieve a large number of web pages in parallel. We might not mind if a few of the pages cannot be fetched.

Scala's Try type was designed for sandboxing code that might throw exceptions. It is similar to Option in that it is a one-element container:

scala> import scala.util._
import scala.util._

scala> Try { 2 + 2 }
Try[Int] = Success(4)

Unlike the Option type, which indicates whether an expression has a useful value, the Try type indicates whether an expression can be executed without throwing an exception. It takes on the following two values:

  • Try { 2 + 2 } == Success(4) if the expression in the Try statement is evaluated successfully
  • Try { 2 / 0 } == Failure(java.lang.ArithmeticException: / by zero) if the expression in the Try block results in an exception

This will make more sense with an example. To see the Try type in action, we will try to fetch web pages in a fault tolerant manner. We will use the built-in Source.fromURL method which fetches a web page and opens an iterator of the page's content. If it fails to fetch the web page, it throws an error:

scala> import scala.io.Source
import scala.io.Source

scala> val html = Source.fromURL("http://www.google.com")
scala.io.BufferedSource = non-empty iterator

scala> val html = Source.fromURL("garbage")
java.net.MalformedURLException: no protocol: garbage
...

Instead of letting the expression propagate out and crash the rest of our code, we can wrap the call to Source.fromURL in Try:

scala> Try { Source.fromURL("http://www.google.com") }
Try[BufferedSource] = Success(non-empty iterator)

scala> Try { Source.fromURL("garbage") }
Try[BufferedSource] = Failure(java.net.MalformedURLException: no protocol: garbage)

To see the power of our Try statement, let's now retrieve a list of URLs in parallel in a fault tolerant manner:

scala> val URLs = Vector("http://www.google.com", 
  "http://www.bbc.co.uk",
  "not-a-url"
)
URLs: Vector[String] = Vector(http://www.google.com, http://www.bbc.co.uk, not-a-url)

scala> val pages = URLs.par.map { url =>
  url -> Try { Source.fromURL(url) } 
}
pages: ParVector[(String, Try[BufferedSource])] = ParVector((http://www.google.com,Success(non-empty iterator)), (http://www.bbc.co.uk,Success(non-empty iterator)), (not-a-url,Failure(java.net.MalformedURLException: no protocol: not-a-url)))

We can then use a collect statement to act on the pages we could fetch successfully. For instance, to get the number of characters on each page:

scala> pages.collect { case(url, Success(it)) => url -> it.size }
ParVector[(String, Int)] = ParVector((http://www.google.com,18976), (http://www.bbc.co.uk,132893))

By making good use of Scala's built-in Try classes and parallel collections, we have built a fault tolerant, multithreaded URL retriever in a few lines of code. (Compare this to the myriad of Java/C++ books that prefix code examples with 'error handling is left out for clarity'.)

Tip

The Try type versus try/catch statements

Programmers with imperative or object-oriented backgrounds will be more familiar with try/catch blocks for handling exceptions. We could have accomplished similar functionality here by wrapping the code for fetching URLs in a try block, returning null if the call raises an exception.

However, besides being more verbose, returning null is less satisfactory: we lose all information about the exception and null is less expressive than Failure(exception). Furthermore, returning a Try[T] type forces the caller to consider the possibility that the function might fail, by encoding this possibility in the type of the return value. In contrast, just returning T and coding failure with a null value allows the caller to ignore failure, raising the possibility of a confusing NullPointerException being thrown at a completely different point in the program.

In short, Try[T] is just another higher-order type, like Option[T] or List[T]. Treating the possibility of failure in the same way as the rest of the code adds coherence to the program and encourages programmers to tackle the possibility of exceptions explicitly.

Setting the parallelism level

So far, we have considered parallel collections as black boxes: add par to a normal collection and all the operations are performed in parallel. Often, we will want more control over how the tasks are executed.

Internally, parallel collections work by distributing an operation over multiple threads. Since the threads share memory, parallel collections do not need to copy any data. Changing the number of threads available to the parallel collection will change the number of CPUs that are used to perform the tasks.

Parallel collections have a tasksupport attribute that controls task execution:

scala> val parRange = (0 to 100).par
parRange: ParRange = ParRange(0, 1, 2, 3, 4, 5,...

scala> parRange.tasksupport
TaskSupport = scala.collection.parallel.ExecutionContextTaskSupport@311a0b3e

scala> parRange.tasksupport.parallelismLevel
Int = 8 // Number of threads to be used

The task support object of a collection is an execution context, an abstraction capable of executing Scala expressions in a separate thread. By default, the execution context in Scala 2.11 is a work-stealing thread pool. When a parallel collection submits tasks, the context allocates these tasks to its threads. If a thread finds that it has finished its queued tasks, it will try and steal outstanding tasks from the other threads. The default execution context maintains a thread pool with number of threads equal to the number of CPUs.

The number of threads over which the parallel collection distributes the work can be changed by changing the task support. For instance, to parallelize the operations performed by a range over four threads:

scala> import scala.collection.parallel._
import scala.collection.parallel._

scala> parRange.tasksupport = new ForkJoinTaskSupport(
  new scala.concurrent.forkjoin.ForkJoinPool(4)
)
parRange.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@6e1134e1

scala> parRange.tasksupport.parallelismLevel
Int: 4

An example – cross-validation with parallel collections

Let's apply what you have learned so far to solve data science problems. There are many parts of a machine learning pipeline that can be parallelized trivially. One such part is cross-validation.

We will give a brief description of cross-validation here, but you can refer to The Elements of Statistical Learning, by Hastie, Tibshirani, and Friedman for a more in-depth discussion.

Typically, a supervised machine learning problem involves training an algorithm over a training set. For instance, when we built a model to calculate the probability of a person being male based on their height and weight, the training set was the (height, weight) data for each participant, together with the male/female label for each row. Once the algorithm is trained on the training set, we can use it to classify new data. This process only really makes sense if the training set is representative of the new data that we are likely to encounter.

The training set has a finite number of entries. It will thus, inevitably, have idiosyncrasies that are not representative of the population at large, merely due to its finite nature. These idiosyncrasies will result in prediction errors when predicting whether a new person is male or female, over and above the prediction error of the algorithm on the training set itself. Cross-validation is a tool for estimating the error caused by the idiosyncrasies of the training set that do not reflect the population at large.

Cross-validation works by dividing the training set in two parts: a smaller, new training set and a cross-validation set. The algorithm is trained on the reduced training set. We then see how well the algorithm models the cross-validation set. Since we know the right answer for the cross-validation set, we can measure how well our algorithm is performing when shown new information. We repeat this procedure many times with different cross-validation sets.

There are several different types of cross-validation, which differ in how we choose the cross-validation set. In this chapter, we will look at repeated random subsampling: we select k rows at random from the training data to form the cross-validation set. We do this many times, calculating the cross-validation error for each subsample. Since each iteration is independent of the previous ones, we can parallelize this process trivially. It is therefore a good candidate for parallel collections. We will look at an alternative form of cross-validation, k-fold cross-validation, in Chapter 12, Distributed Machine Learning with MLlib.

We will build a class that performs cross-validation in parallel. I encourage you to write the code as you go, but you will find the source code corresponding to these examples on GitHub (https://github.com/pbugnion/s4ds).We will use parallel collections to handle the parallelism and Breeze data types in the inner loop. The build.sbt file is identical to the one we used in Chapter 2 , Manipulating Data with Breeze:

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "org.scalanlp" %% "breeze" % "0.11.2",
  "org.scalanlp" %% "breeze-natives" % "0.11.2"
)

We will build a RandomSubsample class. The class exposes a type alias, CVFunction, for a function that takes two lists of indices—the first corresponding to the reduced training set and the second to the validation set—and returns a Double corresponding to the cross-validation error:

type CVFunction = (Seq[Int], Seq[Int]) => Double

The RandomSubsample class will expose a single method, mapSamples, which takes a CVFunction, repeatedly passes it different partitions of indices, and returns a vector of the errors. This is what the class looks like:

// RandomSubsample.scala

import breeze.linalg._
import breeze.numerics._

/** Random subsample cross-validation
  * 
  * @param nElems Total number of elements in the training set.
  * @param nCrossValidation Number of elements to leave out of training set.
*/
class RandomSubsample(val nElems:Int, val nCrossValidation:Int) {

  type CVFunction = (Seq[Int], Seq[Int]) => Double

  require(nElems > nCrossValidation,
    "nCrossValidation, the number of elements " +
    "withheld, must be < nElems")

  private val indexList = DenseVector.range(0, nElems)

  /** Perform multiple random sub-sample CV runs on f
    *
    * @param nShuffles Number of random sub-sample runs.
    * @param f user-defined function mapping from a list of
    *   indices in the training set and a list of indices in the
    *   test-set to a double indicating the out-of sample score
    *   for this split.
    * @returns DenseVector of the CV error for each random split.
    */
  def mapSamples(nShuffles:Int)(f:CVFunction)
  :DenseVector[Double] = {
    val cvResults = (0 to nShuffles).par.map { i =>
      
      // Randomly split indices between test and training
      val shuffledIndices = breeze.linalg.shuffle(indexList)
      val Seq(testIndices, trainingIndices) =
        split(shuffledIndices, Seq(nCrossValidation))
 
       // Apply f for this split
      f(trainingIndices.toScalaVector, 
        testIndices.toScalaVector)
    }
    DenseVector(cvResults.toArray)
  }
}

Let's look at what happens in more detail, starting with the arguments passed to the constructor:

class RandomSubsample(val nElems:Int, val nCrossValidation:Int)

We pass the total number of elements in the training set and the number of elements to leave out for cross-validation in the class constructor. Thus, passing 100 to nElems and 20 to nCrossValidation implies that our training set will have 80 random elements of the total data and that the test set will have 20 elements.

We then construct a list of all integers between 0 and nElems:

private val indexList = DenseVector.range(0, nElems)

For each iteration of the cross-validation, we will shuffle this list and take the first nCrossValidation elements to be the indices of rows in our test set and the remaining to be the indices of rows in our training set.

Our class exposes a single method, mapSamples, that takes two curried arguments: nShuffles, the number of times to perform random subsampling, and f, a CVFunction:

  def mapSamples(nShuffles:Int)(f:CVFunction):DenseVector[Double] 

With all this set up, the code for doing cross-validation is deceptively simple. We generate a parallel range from 0 to nShuffles and, for each item in the range, generate a new train-test split and calculate the cross-validation error:

    val cvResults = (0 to nShuffles).par.map { i =>
      val shuffledIndices = breeze.linalg.shuffle(indexList)
      val Seq(testIndices, trainingIndices) = 
        split(shuffledIndices, Seq(nCrossValidation))
      f(trainingIndices.toScalaVector, testIndices.toScalaVector)
    }

The only tricky part of this function is splitting the shuffled index list into a list of indices for the training set and a list of indices for the test set. We use Breeze's split method. This takes a vector as its first argument and a list of split-points as its second, and returns a list of fragments of the original vector. We then use pattern matching to extract the individual parts.

Finally, mapSamples converts cvResults to a Breeze vector:

DenseVector(cvResults.toArray) 

Let's see this in action. We can test our class by running cross-validation on the logistic regression example developed in Chapter 2 , Manipulating Data with Breeze. In that chapter, we developed a LogisticRegression class that takes a training set (in the form of a DenseMatrix) and target (in the form of a DenseVector) at construction time. The class then calculates the parameters that best represent the training set. We will first add two methods to the LogisticRegression class to use the trained model to classify previously unseen examples:

  • The predictProbabilitiesMany method uses the trained model to calculate the probability of having the target variable set to one. In the context of our example, this is the probability of being male, given a height and weight.
  • The classifyMany method assigns classification labels (one or zero) to members of a test set. We will assign a one if predictProbabilitiesMany returns a value greater than 0.5.

With these two functions, our LogisticRegression class becomes:

// Logistic Regression.scala

class LogisticRegression(
  val training:DenseMatrix[Double],
  val target:DenseVector[Double]
) {
  ...
  /** Probability of classification for each row
    * in test set.
    */
  def predictProbabilitiesMany(test:DenseMatrix[Double])
  :DenseVector[Double] = {
    val xBeta = test * optimalCoefficients
    sigmoid(xBeta)
  }

  /** Predict the value of the target variable 
    * for each row in test set.
    */
  def classifyMany(test:DenseMatrix[Double])
  :DenseVector[Double] = {
    val probabilities = predictProbabilitiesMany(test)
    I((probabilities :> 0.5).toDenseVector)
  }
  ...
}

We can now put together an example program for our RandomSubsample class. We will use the same height-weight data as in Chapter 2 , Manipulating Data with Breeze. The data preprocessing will be similar. The code examples for this chapter provide a helper module, HWData, to load the height-weight data into Breeze vectors. The data itself is in the data/ directory of the code examples for this chapter (available on GitHub at https://github.com/pbugnion/s4ds/tree/master/chap04).

For each new subsample, we create a new LogisticRegression instance, train it on the subset of the training set to get the best coefficients for this train-test split, and use classifyMany to generate predictions on the cross-validation set in this split. We then calculate the classification error and report the average classification error over every train-test split:

// RandomSubsampleDemo.scala

import breeze.linalg._
import breeze.linalg.functions.manhattanDistance
import breeze.numerics._
import breeze.stats._

object RandomSubsampleDemo extends App {

  /* Load and pre-process data */
  val data = HWData.load

  val rescaledHeights:DenseVector[Double] =
    (data.heights - mean(data.heights)) / stddev(data.heights)

  val rescaledWeights:DenseVector[Double] =
    (data.weights - mean(data.weights)) / stddev(data.weights)

  val featureMatrix:DenseMatrix[Double] =
    DenseMatrix.horzcat(
      DenseMatrix.ones[Double](data.npoints, 1),
      rescaledHeights.toDenseMatrix.t,
      rescaledWeights.toDenseMatrix.t
    )

  val target:DenseVector[Double] = data.genders.values.map { 
    gender => if(gender == 'M') 1.0 else 0.0 
  }

  /* Cross-validation */
  val testSize = 20
  val cvCalculator = new RandomSubsample(data.npoints, testSize)

  // Start parallel CV loop
  val cvErrors = cvCalculator.mapSamples(1000) { 
    (trainingIndices, testIndices) =>

    val regressor = new LogisticRegression(
      data.featureMatrix(trainingIndices, ::).toDenseMatrix,
      data.target(trainingIndices).toDenseVector
    )
    // Predictions on test-set
    val genderPredictions = regressor.classifyMany(
      data.featureMatrix(testIndices, ::).toDenseMatrix
    )
    // Calculate number of mis-classified examples
    val dist = manhattanDistance(
      genderPredictions, data.target(testIndices)
    )
    // Calculate mis-classification rate
    dist / testSize.toDouble
  }

  println(s"Mean classification error: ${mean(cvErrors)}")
}

Running this program on the height-weight data gives a classification error of 10%.

We now have a fully working, parallelized cross-validation class. Scala's parallel range made it simple to repeatedly compute the same function in different threads.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.3.167