Use parallelization

As we saw in this chapter's introduction, one of the limitations of R (and most other programming languages) was that it was created before commodity personal computers had more than one processor or core. As a result, by default, R runs only one process and, thus, makes use of one processor/core at a time.

If you have more than one core on your CPU, it means that when you leave your computer alone for a few hours during a long running computation, your R task is running on one core while the others are idle. Clearly this is not ideal; if your R task took advantage of all the available processing power, you can get massive speed improvements.

Parallel computation (of the type we'll be using) works by starting multiple processes at the same time. The operating system then assigns each of these processes to a particular CPU. When multiple processes run at the same time, the time to completion is only as long as the longest process, as opposed to the time to complete all the processes added together.

Use parallelization

Figure 12.1: diagram of the parallelization and the resultant reduced time to completion

For example, let's say we have four processes in a task that takes 1 second to complete. Without using parallelization, the task would take 4 seconds, but with parallelization on four cores, the task would take 1 second.

Note

A word of warning: This is the ideal scenario; but in practice, the cost of starting multiple processes constitutes an overhead that will result in the time to completion not scaling linearly with the number of cores used.

All this sounds great, but there's an important catch; each process has to be able to run independent of the output of the other processes. For example, if we wrote an R program to compute the nth number in the Fibonacci sequence, we couldn't divide that task up into smaller processes to run in parallel, because the n Fibonacci number depends on what we compute as the n-1th Fibonacci number (and so on, ad infinitum). The parallelization of the type we'll be using in this chapter only works on problems that can be split up into processes, such that the processes don't depend on each other and there's no communication between processes. Luckily, there are many problems like this in data analysis! Almost as luckily, R makes it easy to use parallelization on problems of this type!

Problems of the nature that we just described are sometimes known as embarrassingly parallel problems, because the entire task can be broken down into independent components very easily. As an example, summing the numbers in a numeric vector of 100 elements is an embarrassingly parallel problem, because we can easily sum the first 50 elements in one process and the last 50 in another, in parallel, and just add the two numbers at the end to get the final sum. The pattern of computation we just described is sometimes referred to as split-apply-combine, divide and conquer, or map/reduce.

Note

Using parallelization to tackle the problem of summing 100 numbers is silly, since the overhead of the splitting and combining will take longer than it would to just sum up all the 100 elements serially. Also, sum is already really fast and vectorized.

Getting started with parallel R

Getting started with parallelization in R requires minimal setup, but that setup varies from platform to platform. More accurately, the setup is different for Windows than it is for every other operating system that R runs on (GNU/Linux, Mac OS X, Solaris, *BSD, and others).

If you have don't have a Windows computer, all you have to do to start is to load the parallel package:

# You don't have to install this if your copy of R is new
library(parallel)

If you use Windows, you can either (a) switch to the free operating system that over 97 percent of the 500 most powerful supercomputers in the world use, or (b) run the following setup code:

library(parallel)
cl <- makeCluster(4)

You may replace the 4 with however many processes you want to automatically split your task into. This is usually set to the number of cores available on your computer. You can query your system for the number of available cores with the following incantation:

detectCores()
------------------------
[1] 4

Our first silly (but demonstrative) application of parallelization is the task of sleeping (making a program become temporarily inactive) for 5 seconds, four different times. We can do this serially (not-parallel) as follows:

for(i in 1:4){
  Sys.sleep(5)
}

Or, equivalently, using lapply:

# lapply will pass each element of the
# vector c(1, 2, 3, 4) to the function
# we write but we'll ignore it
lapply(1:4, function(i) Sys.sleep(5))

Let's time how long this task takes to complete by wrapping the task inside the argument to the system.time function:

system.time(
  lapply(1:4, function(i) Sys.sleep(5))
)
----------------------------------------
   user  system elapsed 
  0.059   0.074  20.005

Unsurprisingly, it took 20 (4*5) seconds to run. Let's see what happens when we run this in parallel:

#######################
# NON-WINDOWS VERSION #
#######################
system.time(
  mclapply(1:4, function(i) Sys.sleep(5), mc.cores=4)
)
      
###################
# WINDOWS VERSION #
###################
system.time(
  parLapply(cl, 1:4, function(i) Sys.sleep(5))
)
----------------------------------------
   user  system elapsed 
  0.021   0.042   5.013

Check that out! 5 seconds! Just what you would expect if four processes were sleeping for 5 seconds at the same time!

For the non-windows code, we simply use the mclapply (the non-Windows parallel counterpart to lapply) instead of lapply, and pass in another argument named mc.cores, which tells mclapply how many processes to automatically split the independent computation into.

For the windows code, we use parLapply (the Windows parallel counterpart to lapply). The only difference between lapply and parLapply that we've used here is that parLapply takes the cluster we made with the makeCluster setup function as its first argument. Unlike mclapply, there's no need to specify the number of cores to use, since the cluster is already set up to the appropriate number of cores.

Note

Before R got the built-in parallel package, the two main packages that allowed for parallelization were multicore and snow. multicore used a method of creating different processes called forking that was supported on all R-running OSs except Windows. Windows users used the more general snow package to achieve parallelization. snow , which stands for Simple Network of Workstations, not only works on non-Windows computers as well but also on a cluster of different computers with identical R installations. multicore did not support cluster computing across physical machines like snow does.

Since R version 2.14, the functionality of both the multicore and snow packages have essentially been merged into the parallel package. The multicore package has since been removed from CRAN.

From now on, when we refer to the Windows counterpart to X, know that we really mean the snow counterpart to X, because the functions of snow will work on non-Windows OSs and clusters of machines. Similarly, by the non-Windows counterparts, we really mean the counterparts cannibalized from the multicore package.

You would ask, Why don't we just always use the snow functions? If you have the option to use the multicore /forking parallelism (you are running processes on just one non-Windows physical machine), the multicore parallelism tends to be light-weight. For example, sometimes the creation of a snow cluster with makeCluster can set off firewall alerts. It is safe to allow these connections, by the way.

Getting started with parallel R

An example of (some) substance

For our first real application of parallelization, we will be solving a problem that is loosely based on a real problem that I had to solve during the course of my work. In this formulation, we will be importing an open dataset from the web that contains the airport code, latitude coordinates, and longitude coordinates for 13,429 US airports. Our task will be to find the average (mean) distance from every airport to every other airport. For example, if LAX, ALB, OLM, and JFK were the only extant airports, we would calculate the distances between JFK to OLM, JFK to ALB, JFK to LAX, OLM to ALB, OLM to LAX, and ALB to LAX, and take the arithmetic mean of these distances.

Why are we doing this? Besides the fact that it was inspired by an actual, real life problem—and that I covered this very problem in no fewer than three blog posts—this problem is perfect for parallelization for two reasons:

  • It is embarrassingly parallel—This problem is very amenable to splitting-applying-and-combining (or map/reduction); each process can take a few (several hundreds, really) of the airport-to-airport combinations, the results can then be summed and divided by the number of distance calculations performed.
  • It exhibits combinatorial explosion—The term combinatorial explosion refers to the problems that grow very quickly in size or complexity due to the role of combinatorics in the problem's solution. For example, the number of distance calculations we have to perform exhibits polynomial growth as a function of the number of airports we use. In particular, the number of different calculations is given by the binomial coefficient, An example of (some) substance, or n (n-1)/2. 100 airports require 4,950 distance calculations; all 13,429 airports require 90,162,306 distance calculations. Problems of this type usually require techniques like those discussed in this chapter in order to be computationally tractable.

Note

The birthday problem: Most people are unfazed by the fact that it takes a room of 367 to guarantee that two people in the room have the same birthday. Many people are surprised, however, when it is revealed that it only requires a room full of 23 people for there to be a 50 percent chance of two people sharing the same birthday (assuming that birthdays occur on each day with equal probability). Further, it only takes a room full of 60 for there to be over a 99 percent chance that a pair will share a birthday. If this surprises you too, consider that the number of pairs of people that could possibly share their birthday grows polynomially with the number of people in the room. In fact, the number of pairs that can share a birthday grows just like our airport problem—then the number of birthday pairs is exactly the number of distance calculations we would have to perform if the people were airports.

First, let's write the function to compute the distance between two latitude/longitude pairs.

Since the Earth isn't flat (strictly speaking, it's not even a perfect sphere), the distance between the longitude and latitude degrees is not constant—meaning, you can't just take the Euclidean distance between the two points. We will be using the Haversine formula for the distances between the two points. The Haversine formula is given as follows:

An example of (some) substance

where ϕ and λ are the latitude and longitude respectively, r is the Earth's radius, and Δ is the difference between the two latitudes or longitudes.

haversine <- function(lat1, long1, lat2, long2, unit="km"){
  radius <- 6378      # radius of Earth in kilometers
  delta.phi <- to.radians(lat2 - lat1)
  delta.lambda <- to.radians(long2 - long1)
  phi1 <- to.radians(lat1)
  phi2 <- to.radians(lat2)
  term1 <- sin(delta.phi/2) ^ 2
  term2 <- cos(phi1) * cos(phi2) * sin(delta.lambda/2) ^ 2
  the.terms <- term1 + term2
  delta.sigma <- 2 * atan2(sqrt(the.terms), sqrt(1-the.terms))
  distance <- radius * delta.sigma
  if(unit=="km") return(distance)
  if(unit=="miles") return(0.621371*distance)
}

Everything must be measured in radians (not degrees), so let's make a helper function for conversion to radians, too:

to.radians <- function(degrees){
  degrees * pi / 180
}

Now let's load the dataset from the web. Since it's from an outside source and it might be messy, this is an excellent chance to use our assertr chops to make sure the foreign data set matches our expectations: the dataset is 13,429 observations long, it has three named columns, the latitude should be 90 or below, and the longitude should be 180 or below.

We'll also just start with a subset of all the airports. Because we are going to be taking a random sample of all the observations, we'll set the random number generator seed so that my calculations will align with yours, dear reader.

set.seed(1)

the.url <- "http://opendata.socrata.com/api/views/rxrh-4cxm/rows.csv?accessType=DOWNLOAD"
all.airport.locs <- read.csv(the.url, stringsAsFactors=FALSE)

library(magrittr)
library(assertr)
CHECKS <- . %>%
  verify(nrow(.) == 13429) %>%
  verify(names(.) %in% c("locationID", "Latitude", "Longitude")) %>%
  assert(within_bounds(0, 90), Latitude) %>%
  assert(within_bounds(0,180), Longitude)

all.airport.locs <- CHECKS(all.airport.locs)
  
# Let's start off with 400 airports
smp.size <- 400
 
# choose a random sample of airports
random.sample <- sample((1:nrow(all.airport.locs)), smp.size)
airport.locs <- all.airport.locs[random.sample, ]
row.names(airport.locs) <- NULL

head(airport.locs)
-------------------------------------
  locationID Latitude Longitude
1        LWV  38.7642   87.6056
2       LS77  30.7272   91.1486
3        2N2  43.5919   71.7514
4       VG00  37.3697   75.9469

Now let's write a function called single.core that computes the average distance between every two pairs of airports not using any parallel computation. For each lat/long pair, we need to find the distance between it and the rest of the lat/longs pairs. Since the distance between point a and b is the same as the distance between b and a, for every row, we need only compute the distance between it and the remaining rows in the airport.locs data frame:

single.core <- function(airport.locs){
  running.sum <- 0
  for(i in 1:(nrow(airport.locs)-1)){
    for(j in (i+1):nrow(airport.locs)){
      # i is the row of the first lat/long pair
      # j is the row of the second lat/long pair
      this.dist <- haversine(airport.locs[i, 2],
                             airport.locs[i, 3],
                             airport.locs[j, 2],
                             airport.locs[j, 3])
      running.sum <- running.sum + this.dist
    }
  }
  # Now we have to divide by the number of
  # distances we took. This is given by
  return(running.sum /
         ((nrow(airport.locs)*(nrow(airport.locs)-1))/2))
}

Now, let's time it!

system.time(ave.dist <- single.core(airport.locs))
print(ave.dist)
----------------------------
   user  system elapsed 
  5.400   0.034   5.466 
 [1] 1667.186

All right, 5 and a half seconds for 400 airports.

In order to use the parallel surrogates for lapply, let's rewrite the function to use lapply. Observe the output of the following incantation:

# We'll have to limit the output to the
# first 11 columns
combn(1:10, 2)[,1:11]
----------------------------------------
     [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [,9]
[1,]    1    1    1    1    1    1    1    1    1
[2,]    2    3    4    5    6    7    8    9   10
     [,10] [,11]
[1,]     2     2
[2,]     3     4

The preceding function used the combn function to create a matrix that contains all pairs of two numbers from 1 to 10, stored as columns in two rows. If we use the combn function with a vector of integer numbers from 1 to n (where n is the number of airports in our dataframe), each column of the resultant matrix will refer to all the different indices with which to index the airport data frame in order to obtain all the possible pairs of airports. For example, let's go back to the world where LAX, ALB, OLM, and JFK were the only extant airports; consider the following:

small.world <- c("LAX", "ALB", "OLM", "JFK")
all.combs <- combn(1:length(small.world), 2)

for(i in 1:ncol(all.combs)){
  from <- small.world[all.combs[1, i]]
  to <- small.world[all.combs[2, i]]
  print(paste(from, " <-> ", to))
}
----------------------------------------
[1] "LAX  <->  ALB"
[1] "LAX  <->  OLM"
[1] "LAX  <->  JFK"
[1] "ALB  <->  OLM"   # back to olympia
[1] "ALB  <->  JFK"
[1] "OLM  <->  JFK"

Formulating our solution around this matrix of indices, we can use lapply to loop over the columns in the matrix:

small.world <- c("LAX", "ALB", "OLM", "JFK")
all.combs <- combn(1:length(small.world), 2)

# instead of printing each airport pair in a string,
# we'll return the string
results <- lapply(1:ncol(all.combs), function(x){
  from <- small.world[all.combs[1, x]]
  to <- small.world[all.combs[2, x]]
  return(paste(from, " <-> ", to))
})

print(results)
-------------------------

[[1]]
[1] "LAX  <->  ALB"

[[2]]
[1] "LAX  <->  OLM"

[[3]]
[1] "LAX  <->  JFK"
........

In our problem, we will be returning numerics from the anonymous function in lapply. However, because we are using lapply, the results will be a list. Because we can't call sum on a list of numerics, we will use the unlist function to turn the list into a vector.

unlist(results)
---------------------
[1] "LAX  <->  ALB" "LAX  <->  OLM" "LAX  <->  JFK"
[4] "ALB  <->  OLM" "ALB  <->  JFK" "OLM  <->  JFK"

We have everything we need to rewrite the single.core function using lapply.

single.core.lapply <- function(airport.locs){
  all.combs <- combn(1:nrow(airport.locs), 2)
  numcombs <- ncol(all.combs)
  results <- lapply(1:numcombs, function(x){
      lat1  <- airport.locs[all.combs[1, x], 2]
      long1 <- airport.locs[all.combs[1, x], 3]
      lat2  <- airport.locs[all.combs[2, x], 2]
      long2 <- airport.locs[all.combs[2, x], 3]
      return(haversine(lat1, long1, lat2, long2))
  })
  return(sum(unlist(results)) / numcombs)
}

system.time(ave.dist <- single.core.lapply(airport.locs))
print(ave.dist)
---------------------------------------
   user  system elapsed 
  5.890   0.042   5.968 
 [1] 1667.186

This particular solution is a little bit slower than our solution with the double for loops, but it's about to pay enormous dividends; now we can use one of the parallel surrogates for lapply to solve the problem:

#######################
# NON-WINDOWS VERSION #
#######################
multi.core <- function(airport.locs){
  all.combs <- combn(1:nrow(airport.locs), 2)
  numcombs <- ncol(all.combs)
  results <- mclapply(1:numcombs, function(x){
      lat1  <- airport.locs[all.combs[1, x], 2]
      long1 <- airport.locs[all.combs[1, x], 3]
      lat2  <- airport.locs[all.combs[2, x], 2]
      long2 <- airport.locs[all.combs[2, x], 3]
      return(haversine(lat1, long1, lat2, long2))
  }, mc.cores=4)
  return(sum(unlist(results)) / numcombs)
}

###################
# WINDOWS VERSION #
###################
clusterExport(cl, c("haversine", "to.radians"))

multi.core <- function(airport.locs){
  all.combs <- combn(1:nrow(airport.locs), 2)
  numcombs <- ncol(all.combs)
  results <- parLapply(cl, 1:numcombs, function(x){
      lat1  <- airport.locs[all.combs[1, x], 2]
      long1 <- airport.locs[all.combs[1, x], 3]
      lat2  <- airport.locs[all.combs[2, x], 2]
      long2 <- airport.locs[all.combs[2, x], 3]
      return(haversine(lat1, long1, lat2, long2))
  })
  return(sum(unlist(results)) / numcombs)
}

system.time(ave.dist <- multi.core(airport.locs))
print(ave.dist)
-------------------------------
   user  system elapsed 
  7.363   0.240   2.743 
 [1] 1667.186

Before we interpret the output, direct your attention to the first line of the Windows segment. When mclapply creates additional processes, these processes share the memory with the parent process, and have access to all the parent's environment. With parLapply, however, the procedure that spawns new processes is a little different and requires that we manually export all the functions and libraries we need to load onto each new process beforehand. In this example, we need the new workers to have the haversine and to.radians functions.

Now to the output of the last code snippet. On my Macintosh machine with four cores, this brings what once was a 5.5 second affair down to a 2.7 second affair. This may not seem like a big deal, but when we expand and start to include more than just 400 airports, we start to see the multicore version really pay off.

To demonstrate just what we've gained from our hassles in parallelizing the problem, I ran this on a GNU/Linux cloud server with 16 cores, and recorded the time it took to complete the calculations for different sample sizes with 1, 2, 4, 8, and 16 cores. The results are depicted in the following image:

An example of (some) substance

Figure 12.2: The running times for the average-distance-between-all-airports task at different sample sizes for 1, 2, 4, 8, and 16 cores. For reference, the dashed line is the 4 core performance curve, the top most curve is the single core performance curve, and the bottom most curve is the 16 core curve.

It may be hard to tell from the plot, but the estimated times to completion for the task running on 1, 2, 4, 8, and 16 cores are 2.4 hours, 1.2 hours, 36 minutes, 19 minutes, and 17 minutes respectively. Using parallelized R on a 4-core machine—which is not an uncommon setup at the time of writing—has been able to shave a full two hours of the task's running time! Note the diminishing marginal returns on the number of cores used; there is barely any difference between the performances of the 8 and 16 cores. C'est la vie.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.139.86.131