As we saw in this chapter's introduction, one of the limitations of R (and most other programming languages) was that it was created before commodity personal computers had more than one processor or core. As a result, by default, R runs only one process and, thus, makes use of one processor/core at a time.
If you have more than one core on your CPU, it means that when you leave your computer alone for a few hours during a long running computation, your R task is running on one core while the others are idle. Clearly this is not ideal; if your R task took advantage of all the available processing power, you can get massive speed improvements.
Parallel computation (of the type we'll be using) works by starting multiple processes at the same time. The operating system then assigns each of these processes to a particular CPU. When multiple processes run at the same time, the time to completion is only as long as the longest process, as opposed to the time to complete all the processes added together.
For example, let's say we have four processes in a task that takes 1 second to complete. Without using parallelization, the task would take 4 seconds, but with parallelization on four cores, the task would take 1 second.
All this sounds great, but there's an important catch; each process has to be able to run independent of the output of the other processes. For example, if we wrote an R program to compute the nth number in the Fibonacci sequence, we couldn't divide that task up into smaller processes to run in parallel, because the n Fibonacci number depends on what we compute as the n-1th Fibonacci number (and so on, ad infinitum). The parallelization of the type we'll be using in this chapter only works on problems that can be split up into processes, such that the processes don't depend on each other and there's no communication between processes. Luckily, there are many problems like this in data analysis! Almost as luckily, R makes it easy to use parallelization on problems of this type!
Problems of the nature that we just described are sometimes known as embarrassingly parallel problems, because the entire task can be broken down into independent components very easily. As an example, summing the numbers in a numeric
vector of 100 elements is an embarrassingly parallel problem, because we can easily sum the first 50 elements in one process and the last 50 in another, in parallel, and just add the two numbers at the end to get the final sum. The pattern of computation we just described is sometimes referred to as split-apply-combine, divide and conquer, or map/reduce.
Getting started with parallelization in R requires minimal setup, but that setup varies from platform to platform. More accurately, the setup is different for Windows than it is for every other operating system that R runs on (GNU/Linux, Mac OS X, Solaris, *BSD, and others).
If you have don't have a Windows computer, all you have to do to start is to load the parallel
package:
# You don't have to install this if your copy of R is new library(parallel)
If you use Windows, you can either (a) switch to the free operating system that over 97 percent of the 500 most powerful supercomputers in the world use, or (b) run the following setup code:
library(parallel) cl <- makeCluster(4)
You may replace the 4
with however many processes you want to automatically split your task into. This is usually set to the number of cores available on your computer. You can query your system for the number of available cores with the following incantation:
detectCores() ------------------------ [1] 4
Our first silly (but demonstrative) application of parallelization is the task of sleeping (making a program become temporarily inactive) for 5
seconds, four different times. We can do this serially (not-parallel) as follows:
for(i in 1:4){ Sys.sleep(5) } Or, equivalently, using lapply: # lapply will pass each element of the # vector c(1, 2, 3, 4) to the function # we write but we'll ignore it lapply(1:4, function(i) Sys.sleep(5))
Let's time how long this task takes to complete by wrapping the task inside the argument to the system.time
function:
system.time( lapply(1:4, function(i) Sys.sleep(5)) ) ---------------------------------------- user system elapsed 0.059 0.074 20.005
Unsurprisingly, it took 20 (4*5) seconds to run. Let's see what happens when we run this in parallel:
####################### # NON-WINDOWS VERSION # ####################### system.time( mclapply(1:4, function(i) Sys.sleep(5), mc.cores=4) ) ################### # WINDOWS VERSION # ################### system.time( parLapply(cl, 1:4, function(i) Sys.sleep(5)) ) ---------------------------------------- user system elapsed 0.021 0.042 5.013
Check that out! 5 seconds! Just what you would expect if four processes were sleeping for 5 seconds at the same time!
For the non-windows code, we simply use the mclapply
(the non-Windows parallel counterpart to lapply
) instead of lapply,
and pass in another argument named mc.cores
, which tells mclapply
how many processes to automatically split the independent computation into.
For the windows code, we use parLapply
(the Windows parallel counterpart to lapply
). The only difference between lapply
and parLapply
that we've used here is that parLapply
takes the cluster we made with the makeCluster
setup function as its first argument. Unlike mclapply
, there's no need to specify the number of cores to use, since the cluster is already set up to the appropriate number of cores.
Before R got the built-in parallel
package, the two main packages that allowed for parallelization were multicore
and snow
. multicore
used a method of creating different processes called forking that was supported on all R-running OSs except Windows. Windows users used the more general snow package to achieve parallelization. snow
, which stands for Simple Network of Workstations, not only works on non-Windows computers as well but also on a cluster of different computers with identical R installations. multicore
did not support cluster computing across physical machines like snow
does.
Since R version 2.14, the functionality of both the multicore
and snow
packages have essentially been merged into the parallel
package. The multicore
package has since been removed from CRAN.
From now on, when we refer to the Windows counterpart to X, know that we really mean the snow
counterpart to X, because the functions of snow
will work on non-Windows OSs and clusters of machines. Similarly, by the non-Windows counterparts, we really mean the counterparts cannibalized from the multicore package.
You would ask, Why don't we just always use the snow functions? If you have the option to use the multicore
/forking parallelism (you are running processes on just one non-Windows physical machine), the multicore
parallelism tends to be light-weight. For example, sometimes the creation of a snow
cluster with makeCluster
can set off firewall alerts. It is safe to allow these connections, by the way.
For our first real application of parallelization, we will be solving a problem that is loosely based on a real problem that I had to solve during the course of my work. In this formulation, we will be importing an open dataset from the web that contains the airport code, latitude coordinates, and longitude coordinates for 13,429 US airports. Our task will be to find the average (mean) distance from every airport to every other airport. For example, if LAX, ALB, OLM, and JFK were the only extant airports, we would calculate the distances between JFK to OLM, JFK to ALB, JFK to LAX, OLM to ALB, OLM to LAX, and ALB to LAX, and take the arithmetic mean of these distances.
Why are we doing this? Besides the fact that it was inspired by an actual, real life problem—and that I covered this very problem in no fewer than three blog posts—this problem is perfect for parallelization for two reasons:
n (n-1)/2
. 100 airports require 4,950 distance calculations; all 13,429 airports require 90,162,306 distance calculations. Problems of this type usually require techniques like those discussed in this chapter in order to be computationally tractable.
The birthday problem: Most people are unfazed by the fact that it takes a room of 367 to guarantee that two people in the room have the same birthday. Many people are surprised, however, when it is revealed that it only requires a room full of 23 people for there to be a 50 percent chance of two people sharing the same birthday (assuming that birthdays occur on each day with equal probability). Further, it only takes a room full of 60 for there to be over a 99 percent chance that a pair will share a birthday. If this surprises you too, consider that the number of pairs of people that could possibly share their birthday grows polynomially with the number of people in the room. In fact, the number of pairs that can share a birthday grows just like our airport problem—then the number of birthday pairs is exactly the number of distance calculations we would have to perform if the people were airports.
First, let's write the function to compute the distance between two latitude/longitude pairs.
Since the Earth isn't flat (strictly speaking, it's not even a perfect sphere), the distance between the longitude and latitude degrees is not constant—meaning, you can't just take the Euclidean distance between the two points. We will be using the Haversine formula for the distances between the two points. The Haversine formula is given as follows:
where ϕ and λ are the latitude and longitude respectively, r is the Earth's radius, and Δ is the difference between the two latitudes or longitudes.
haversine <- function(lat1, long1, lat2, long2, unit="km"){ radius <- 6378 # radius of Earth in kilometers delta.phi <- to.radians(lat2 - lat1) delta.lambda <- to.radians(long2 - long1) phi1 <- to.radians(lat1) phi2 <- to.radians(lat2) term1 <- sin(delta.phi/2) ^ 2 term2 <- cos(phi1) * cos(phi2) * sin(delta.lambda/2) ^ 2 the.terms <- term1 + term2 delta.sigma <- 2 * atan2(sqrt(the.terms), sqrt(1-the.terms)) distance <- radius * delta.sigma if(unit=="km") return(distance) if(unit=="miles") return(0.621371*distance) }
Everything must be measured in radians (not degrees), so let's make a helper function for conversion to radians, too:
to.radians <- function(degrees){ degrees * pi / 180 }
Now let's load the dataset from the web. Since it's from an outside source and it might be messy, this is an excellent chance to use our assertr
chops to make sure the foreign data set matches our expectations: the dataset is 13,429 observations long, it has three named columns, the latitude should be 90 or below, and the longitude should be 180 or below.
We'll also just start with a subset of all the airports. Because we are going to be taking a random sample of all the observations, we'll set the random number generator seed so that my calculations will align with yours, dear reader.
set.seed(1) the.url <- "http://opendata.socrata.com/api/views/rxrh-4cxm/rows.csv?accessType=DOWNLOAD" all.airport.locs <- read.csv(the.url, stringsAsFactors=FALSE) library(magrittr) library(assertr) CHECKS <- . %>% verify(nrow(.) == 13429) %>% verify(names(.) %in% c("locationID", "Latitude", "Longitude")) %>% assert(within_bounds(0, 90), Latitude) %>% assert(within_bounds(0,180), Longitude) all.airport.locs <- CHECKS(all.airport.locs) # Let's start off with 400 airports smp.size <- 400 # choose a random sample of airports random.sample <- sample((1:nrow(all.airport.locs)), smp.size) airport.locs <- all.airport.locs[random.sample, ] row.names(airport.locs) <- NULL head(airport.locs) ------------------------------------- locationID Latitude Longitude 1 LWV 38.7642 87.6056 2 LS77 30.7272 91.1486 3 2N2 43.5919 71.7514 4 VG00 37.3697 75.9469
Now let's write a function called single.core
that computes the average distance between every two pairs of airports not using any parallel computation. For each lat/long pair, we need to find the distance between it and the rest of the lat/longs pairs. Since the distance between point a and b is the same as the distance between b and a, for every row, we need only compute the distance between it and the remaining rows in the airport.locs
data frame:
single.core <- function(airport.locs){ running.sum <- 0 for(i in 1:(nrow(airport.locs)-1)){ for(j in (i+1):nrow(airport.locs)){ # i is the row of the first lat/long pair # j is the row of the second lat/long pair this.dist <- haversine(airport.locs[i, 2], airport.locs[i, 3], airport.locs[j, 2], airport.locs[j, 3]) running.sum <- running.sum + this.dist } } # Now we have to divide by the number of # distances we took. This is given by return(running.sum / ((nrow(airport.locs)*(nrow(airport.locs)-1))/2)) } Now, let's time it! system.time(ave.dist <- single.core(airport.locs)) print(ave.dist) ---------------------------- user system elapsed 5.400 0.034 5.466 [1] 1667.186
All right, 5 and a half seconds for 400 airports.
In order to use the parallel surrogates for lapply
, let's rewrite the function to use lapply
. Observe the output of the following incantation:
# We'll have to limit the output to the # first 11 columns combn(1:10, 2)[,1:11] ---------------------------------------- [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [,9] [1,] 1 1 1 1 1 1 1 1 1 [2,] 2 3 4 5 6 7 8 9 10 [,10] [,11] [1,] 2 2 [2,] 3 4
The preceding function used the combn
function to create a matrix that contains all pairs of two numbers from 1 to 10, stored as columns in two rows. If we use the combn
function with a vector of integer numbers from 1 to n (where n is the number of airports in our dataframe), each column of the resultant matrix will refer to all the different indices with which to index the airport data frame in order to obtain all the possible pairs of airports. For example, let's go back to the world where LAX, ALB, OLM, and JFK were the only extant airports; consider the following:
small.world <- c("LAX", "ALB", "OLM", "JFK") all.combs <- combn(1:length(small.world), 2) for(i in 1:ncol(all.combs)){ from <- small.world[all.combs[1, i]] to <- small.world[all.combs[2, i]] print(paste(from, " <-> ", to)) } ---------------------------------------- [1] "LAX <-> ALB" [1] "LAX <-> OLM" [1] "LAX <-> JFK" [1] "ALB <-> OLM" # back to olympia [1] "ALB <-> JFK" [1] "OLM <-> JFK"
Formulating our solution around this matrix of indices, we can use lapply
to loop over the columns in the matrix:
small.world <- c("LAX", "ALB", "OLM", "JFK") all.combs <- combn(1:length(small.world), 2) # instead of printing each airport pair in a string, # we'll return the string results <- lapply(1:ncol(all.combs), function(x){ from <- small.world[all.combs[1, x]] to <- small.world[all.combs[2, x]] return(paste(from, " <-> ", to)) }) print(results) ------------------------- [[1]] [1] "LAX <-> ALB" [[2]] [1] "LAX <-> OLM" [[3]] [1] "LAX <-> JFK" ........
In our problem, we will be returning numeric
s from the anonymous function in lapply
. However, because we are using lapply
, the results
will be a list
. Because we can't call sum
on a list
of numeric
s, we will use the unlist
function to turn the list into a vector.
unlist(results) --------------------- [1] "LAX <-> ALB" "LAX <-> OLM" "LAX <-> JFK" [4] "ALB <-> OLM" "ALB <-> JFK" "OLM <-> JFK"
We have everything we need to rewrite the single.core
function using lapply
.
single.core.lapply <- function(airport.locs){ all.combs <- combn(1:nrow(airport.locs), 2) numcombs <- ncol(all.combs) results <- lapply(1:numcombs, function(x){ lat1 <- airport.locs[all.combs[1, x], 2] long1 <- airport.locs[all.combs[1, x], 3] lat2 <- airport.locs[all.combs[2, x], 2] long2 <- airport.locs[all.combs[2, x], 3] return(haversine(lat1, long1, lat2, long2)) }) return(sum(unlist(results)) / numcombs) } system.time(ave.dist <- single.core.lapply(airport.locs)) print(ave.dist) --------------------------------------- user system elapsed 5.890 0.042 5.968 [1] 1667.186
This particular solution is a little bit slower than our solution with the double for
loops, but it's about to pay enormous dividends; now we can use one of the parallel surrogates for lapply
to solve the problem:
####################### # NON-WINDOWS VERSION # ####################### multi.core <- function(airport.locs){ all.combs <- combn(1:nrow(airport.locs), 2) numcombs <- ncol(all.combs) results <- mclapply(1:numcombs, function(x){ lat1 <- airport.locs[all.combs[1, x], 2] long1 <- airport.locs[all.combs[1, x], 3] lat2 <- airport.locs[all.combs[2, x], 2] long2 <- airport.locs[all.combs[2, x], 3] return(haversine(lat1, long1, lat2, long2)) }, mc.cores=4) return(sum(unlist(results)) / numcombs) } ################### # WINDOWS VERSION # ################### clusterExport(cl, c("haversine", "to.radians")) multi.core <- function(airport.locs){ all.combs <- combn(1:nrow(airport.locs), 2) numcombs <- ncol(all.combs) results <- parLapply(cl, 1:numcombs, function(x){ lat1 <- airport.locs[all.combs[1, x], 2] long1 <- airport.locs[all.combs[1, x], 3] lat2 <- airport.locs[all.combs[2, x], 2] long2 <- airport.locs[all.combs[2, x], 3] return(haversine(lat1, long1, lat2, long2)) }) return(sum(unlist(results)) / numcombs) } system.time(ave.dist <- multi.core(airport.locs)) print(ave.dist) ------------------------------- user system elapsed 7.363 0.240 2.743 [1] 1667.186
Before we interpret the output, direct your attention to the first line of the Windows segment. When mclapply
creates additional processes, these processes share the memory with the parent process, and have access to all the parent's environment. With parLapply
, however, the procedure that spawns new processes is a little different and requires that we manually export all the functions and libraries we need to load onto each new process beforehand. In this example, we need the new workers to have the haversine
and to.radians
functions.
Now to the output of the last code snippet. On my Macintosh machine with four cores, this brings what once was a 5.5 second affair down to a 2.7 second affair. This may not seem like a big deal, but when we expand and start to include more than just 400 airports, we start to see the multicore version really pay off.
To demonstrate just what we've gained from our hassles in parallelizing the problem, I ran this on a GNU/Linux cloud server with 16 cores, and recorded the time it took to complete the calculations for different sample sizes with 1, 2, 4, 8, and 16 cores. The results are depicted in the following image:
It may be hard to tell from the plot, but the estimated times to completion for the task running on 1, 2, 4, 8, and 16 cores are 2.4 hours, 1.2 hours, 36 minutes, 19 minutes, and 17 minutes respectively. Using parallelized R on a 4-core machine—which is not an uncommon setup at the time of writing—has been able to shave a full two hours of the task's running time! Note the diminishing marginal returns on the number of cores used; there is barely any difference between the performances of the 8 and 16 cores. C'est la vie.
3.139.86.131