Scala

The Scala standard library offers a rich set of tools, such as parallel collections and concurrent classes to scale number-crunching applications. Although these tools are very effective in processing medium-sized datasets, they are unfortunately quite often discarded by developers in favor of more elaborate frameworks.

Object creation

Although code optimization and memory management is beyond the scope of this chapter, it is worthwhile to remember that a few simple steps can be taken to improve the scalability of an application. One of the most frustrating challenges in using Scala to process large datasets is the creation of a large number of objects and the load on the garbage collector.

A partial list of remedial actions is as follows:

  • Limiting unnecessary duplication of objects in an iterated function by using a mutable instance
  • Using lazy values and Stream classes to create objects as needed
  • Leveraging efficient collections such as bloom filters or skip lists
  • Running javap to decipher the generation of byte code by the JVM

Streams

Some problems require the pre processing and training of very large datasets, resulting in significant memory consumption by the JVM. Streams are list-like collections in which elements are instantiated or computed lazily. Streams share the same goal of postponing computation and memory allocation as views.

Memory on demand

One very important application of Streams for scientific computing and machine learning is the implementation of iterative or recursive computation for very large or infinite data Streams.

Let's consider the simple computation of the mean of a very large dataset. The following is the formula for it:

Memory on demand

This formula has two major problems:

  • It requires us to allocate memory for the entire dataset xi
  • The sum of all elements may overflow if the dataset is very large

A simple solution is to iterate the value of the mean for each new element of the dataset as follows:

Memory on demand

As you can see in the following code, the iterative formula for the computation of the mean of a very large or infinite dataset can be implemented using Streams and tail recursion (line 1). At each iteration (or recursion), the new mean is updated from the existing mean (line 2), the count is incremented (line 3), and the next value is accessed through the tail of the Stream (line 4):

def mean(strm: => Stream[Double]): Double = {

@scala.annotation.tailrec //1
def mean(z: Double, 
         count: Int, 
         strm: Stream[Double]): (Double, Int) =

   if(strm.isEmpty) (z, count)
   else
      mean((1.0 - 1.0/count)*z + strm.head/count, //2
           count+1, //3
           strm.tail) //4

   mean(0.0, 1, strm)._1
}

Tip

Stream as by-name parameter

Passing the Stream by value holds the Stream tail. One easy solution is to pass the Stream by name, as a function. In this case, the mean is computed before it is passed to the method.

Design for reusing Streams memory

Let's consider the computation of the loss function in machine learning. An observation of type DataPoint is defined as a features vector x and a label or expected value y:

case class DataPoint(x: DblVec, y: Double)

We can create a loss function, LossFunction that processes a very large dataset on a platform with limited memory. The optimizer responsible for the minimization of the loss or error invoked the loss function at each iteration or recursion, as described in the following diagram:

Design for reusing Streams memory

Illustration of lazy memory management with Scala Streams

The memory management for the Stream consists of the following steps:

  1. Allocate a memory block to load a slice of the Stream for which the loss function is compute.
  2. Apply the loss function to the slice of the Stream.
  3. Release the memory block associated to the slice once the loss function is complete.
  4. Repeat step 1 to 3 for the next slice on the Stream.

The challenge is to make sure the memory allocated for a slice of the Stream is actually released once it is no longer needed (the computation of the loss for the observations contained in the slice is completed). This is accomplished by allocating each slice with a weak reference.

Note

Spark Streaming

The architecture of Spark Streaming described in the Spark Streaming section of Chapter 17, Apache Spark MLlib used a similar design principle as our reusable Stream memory.

Let's implement our design. The constructor of the LossFunction class has three arguments (line 6 of the code shown as follows):

  • The computation f of the loss for each data point
  • The weights of the model
  • The size of the entire Stream, dataSize

Here is the code:

type StreamLike = WeakReference[Stream[DataPoint]] //5
type DblVec = Vector[Double]

class LossFunction(
    f: (DblVec, DblVec) => Double,
    weights: DblVec, 
    dataSize: Int) {  //6

  var nElements = 0
  def compute(stream: () => StreamLike): Double = 
      compute(stream().get, 0.0)  //7

  def sqrLoss(xs: List[DataPoint]): Double = xs.map(dp => {
     val z = dp.y - f(weights, dp.x)
     z * z
  }).reduce(_ + _) //8
}

The LossFunction for the Stream is implemented as the tail recursion, compute (line 7). The recursive method updates the reference of the Stream. The type of reference of the Stream is WeakReference (line 5), so the garbage collection can reclaim the memory associated with the slice for which the loss has been computed. In this example, the loss function is computed as a sum of square error (line 8).

The compute method manages the allocation and release of slices of the Stream:

@tailrec
def compute(strm: Stream[DataPoint], loss: Double): Double ={
  if( nElements >= dataSize)  loss
  else {
    val step = 
      if(nElements + STEP > dataSize) dataSize - nElements 
      else STEP
    nElements += step

    val newLoss = sqrLoss(strm.take(step).toList) //9
    compute(strm.drop(STEP), loss + newLoss)//10
  }
}

The dataset is processed in two steps:

  • The driver allocates (that is, take) a slice of the Stream of observations and then computes the cumulative loss for all the observations in the slice (line 9)
  • Once the computation of the loss for the slice is completed, the memory allocated to the weak reference is released (that is drop) (line 10)

Tip

Alternative to weak references

There are alternatives to weak references to the Stream for forcing the garbage collector to reclaim the memory blocks associated with each slice of observations:

  • Define the stream reference as def
  • Wrap the reference into a method: the reference is then accessible to the garbage collector when the wrapping method returns
  • Use a List Iterator.

The average memory allocated during the execution of the LossFunction for the entire Stream is the memory needed to allocate a single slice.

Parallel collections

The Scala standard library includes parallelized collections, whose purpose is to shield developers from the intricacies of concurrent thread execution and race condition. Parallel collections are a very convenient approach to encapsulate concurrency constructs to a higher level of abstraction [16:1].

There are two ways to create parallel collections in Scala:

  • Converting an existing collection into a parallel collection of the same semantic using the par method, for example, List[T].par: ParSeq[T], Array[T].par: ParArray[T], Map[K,V].par: ParMap[K,V], and so on
  • Using the collections classes from the collection.parallel, parallel.immutable, or parallel.mutable packages, for example, ParArray, ParMap, ParSeq, ParVector, and so on

Processing a parallel collection

A parallel collection lends itself to concurrent processing until a pool of threads and a tasks scheduler are assigned to it. Fortunately, Scala parallel and concurrent packages provide developers with a powerful toolbox to map partitions or segments of collection to tasks running on different CPU cores. The components are as follows:

  • TaskSupport: This trait inherits the generic Tasks trait. It is responsible for scheduling the operation on the parallel collection. There are three concrete implementations of TaskSupport.
  • ThreadPoolTaskSupport: This uses the threads pool in an older version of the JVM.
  • ExecutionContextTaskSupport: This uses ExecutorService, which delegates the management of tasks to either a thread pool or the ForkJoinTasks pool.
  • ForkJoinTaskSupport: This uses the fork-join pools of type java.util. concurrent.FortJoinPool introduced in Java SDK 1.6. In Java, a fork-join pool is an instance of ExecutorService that attempts to run not only the current task, but also any of its subtasks. It executes the ForkJoinTask instances that are lightweight threads.

The following example implements the generation of random exponential value using a parallel vector and ForkJoinTaskSupport:

val rand = new ParVector[Float]
Range(0,MAX).foreach(n =>rand.updated(n, n*Random.nextFloat))//1 
rand.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(16)) 
val randExp = vec.map( Math.exp(_) )//2

The parallel vector of random probabilities, rand, is created and initialized by the main task (line 1), but the conversion to a vector of exponential value, randExp, is executed by a pool of 16 concurrent tasks (line 2).

Tip

Preserving order of elements

Operations that traverse a parallel collection using an iterator preserve the original order of the element of the collection. Iterator-less methods such as foreach or map do not guarantee that the order of the elements that are processed will be preserved.

Benchmark framework

The main purpose of parallel collections is to improve the performance of execution through concurrency. The first step is to either select an existing benchmark or create our own.

Tip

Scala library benchmark

The Scala standard library has a trait, testing.Benchmark, for testing using the command line [16:2]. All you need to do is insert your function or code in the run method:

object test with Benchmark { def run { /* … /* }

Let us create a parameterized class, Parallelism, to evaluate the performance of operations on parallel collections:

abstract class Parallelism[U](times: Int) { 
  def map(f: U => U)(nTasks: Int): Double  //1
  def filter(f: U => Boolean)(nTasks: Int): Double //2
  def timing(g: Int => Unit ): Long
}

The user has to supply the data transformation f for the map (line 1) and filter (line 2) operations of parallel collection as shown in the preceding code as well as the number of concurrent tasks nTasks. The timing method collects the duration of the times execution of a given operation g on a parallel collection:

def timing(g: Int => Unit): Long = {   
  var startTime = System.currentTimeMillis
  Range(0, times).foreach(g)
  System.currentTimeMillis - startTime
}

Let's define the mapping and reducing operation for the parallel arrays for which the benchmark is defined as follows:

class ParallelArray[U](
   u: Array[U], //3
   v: ParArray[U], //4
   times:Int) extends Parallelism[T](times)

The first argument of the benchmark constructor is the default array of the Scala standard library (line 3). The second argument is the parallel data structure (or class) associated to the array (line 4).

Let's compare the parallelized and default array on the map and reduce methods of ParallelArray as follows:

def map(f: U => U)(nTasks: Int): Unit = {    
  val pool = new ForkJoinPool(nTasks)
  v.tasksupport = new ForkJoinTaskSupport(pool)

  val duration = timing(_ => u.map(f)).toDouble  //5
  val ratio = timing( _ => v.map(f))/duration  //6
  show(s"$numTasks, $ratio")
}

The user has to define the mapping function, f, and the number of concurrent tasks, nTasks, available to execute a map transformation on the array u (line 5) and its parallelized counterpart v (line 6). The reduce method follows the same design as shown in the following code:

def reduce(f: (U,U) => U)(nTasks: Int): Unit = { 
  val pool = new ForkJoinPool(nTasks)
  v.tasksupport = new ForkJoinTaskSuppor(pool)


  val duration = timing(_ => u.reduceLeft(f)).toDouble //7
  val ratio = timing( _ => v.reduceLeft(f) )/duration  //8
  show(s"$numTasks, $ratio")
}

The user-defined function f is used to execute the reduce action on the array u (line 7) and its parallelized counterpart v (line 8).

The same template can be used for other higher Scala methods, such as filter.

The absolute timing of each operation is completely dependent on the environment. It is far more useful to record the ratio of the duration of execution of operation on the parallelized array, over the single thread array.

The benchmark class, ParallelMap, used to evaluate ParHashMap is similar to the benchmark for ParallelArray, as shown in the following code:

class ParallelMap[U](
   u: Map[Int, U], 
   v: ParMap[Int, U], 
   times: Int) extends ParBenchmark[T](times)

For example, the filter method of ParMapBenchmark evaluates the performance of the parallel map v relative to single threaded map u. It applies the filtering condition to the values of each map as follows:

def filter(f: U => Boolean)(nTasks: Int): Unit = {   
  val pool = new ForkJoinPool(nTasks)
  v.tasksupport = new ForkJoinTaskSupport(pool)

  val duration = timing(_ => u.filter(e => f(e._2))).toDouble
  val ratio = timing( _ => v.filter(e => f(e._2)))/duration
  show(s"$nTasks, $ratio")
}

Performance evaluation

The first performance test consists of creating a single-threaded and a parallel array of random values and executing the evaluation methods, map and reduce, on using an increasing number of tasks, as follows:

val sz = 1000000; val NTASKS = 16
val data = Array.fill(sz)(nextDouble) 
val pData = ParArray.fill(sz)(nextDouble) 
val times: Int = 50

val bench = new ParallelArray[Double](data, pData, times) 
val mapper = (x: Double) => sin(x*0.01) + exp(-x)
Range(1, NTASKS).foreach(bench.map(mapper)(_)) 
val reducer = (x: Double, y: Double) => x+y 
Range(1, NTASKS).foreach(bench.reduce(reducer)(_)) 

Tip

Measuring performance

The code has to be executed within a loop and the duration has to be averaged over a large number of executions to avoid transient actions such as initialization of the JVM process or collection of unused memory (GC).

The following graph shows the output of the performance test:

Performance evaluation

Impact of concurrent tasks on the performance on Scala parallelized map and reduce

The test executes the mapper and reducer functions 1 million times on an 8-core CPU with 8 GB of available memory on JVM.

The results are not surprising in the following respects:

  • The reducer doesn't take advantage of the parallelism of the array. The reduction of ParArray has a small overhead in the single-task scenario and then matches the performance of Array.
  • The performance of the map function benefits from the parallelization of the array. The performance levels off when the number of tasks allocated equals or exceeds the number of CPU core.

The second test consists of comparing the behavior of two parallel collections, ParArray and ParHashMap, on two methods, map and filter, using a configuration identical to the first test as follows:

val sz = 10000000
val mData = new HashMap[Int, Double]
Range(0, sz).foreach( mData.put(_, nextDouble)) //9
val mParData = new ParHashMap[Int, Double]
Range(0, sz).foreach( mParData.put(_, nextDouble))

val bench = new ParallelMap[Double](mData, mParData, times)
Range(1, NTASKS).foreach(bench.map(mapper)(_)) //10
val filterer = (x: Double) => (x > 0.8) 
Range(1, NTASKS).foreach( bench.filter(filterer)(_)) //11

The test initializes a HashMap instance and its parallel counter ParHashMap with 1 million random values (line 9). The benchmark, bench, processes all the elements of these hash maps with the mapper instance introduced in the first test (line 10) and a filtering function, filterer (line 11), with NTASKS = 6. The output is as shown here:

Performance evaluation

Impact of concurrent tasks on the performance on Scala parallelized array and hash map

The impact of the parallelization of collections is very similar across methods and across collections. The performance of all 4 parallel collections increases 3 to 5 fold as the number of concurrent tasks (threads) increases. It is interesting to notice that the performance of these 4 collections stabilizes for tests with more than 4 tasks. Core parking is partially responsible for this behavior. Core parking disables a few CPU cores in an effort to conserve power, and in the case of a single application, consumes almost all CPU cycles.

Tip

Further performance evaluation

The purpose of the performance test was to highlight the benefits of using Scala parallel collections. You should experiment further with collections other than ParArray and ParHashMap and other higher-order methods to confirm the pattern.

Clearly, a four-times increase in performance is nothing to complain about. That being said, parallel collections are limited to single host deployment. If you cannot live with such a restriction and still need a scalable solution, the Actor model provides a blueprint for highly distributed applications.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.231.194