Chapter 3. multicore

multicore is a popular parallel programming package for use on multiprocessor and multicore computers. It was written by Simon Urbanek, and first released on CRAN in 2009. It immediately became popular because its clever use of the fork() system call allows it to implement a parallel lapply() operation that is even easier to use than snow’s parLapply().

Unfortunately, because fork() is a Posix system call, multicore can’t really be used on Windows machines.[33] Fork() can also cause problems for functions that use resources that were allocated or initialized exclusively for the master, or parent process. This is particularly a problem with graphics functions, so it isn’t generally recommended to use multicore with an R GUI.[34] Nevertheless, multicore works perfectly for most R functions on Posix systems, such as Linux and Mac OS X, and its use of fork() makes it very efficient and convenient, as we’ll see in this chapter.

Quick Look

Motivation: You have an R script that spends an hour executing a function using lapply() on your laptop.

Solution: Replace lapply() with the mclapply() function from the multicore package.

Good because: It’s easy to install, easy to use, and makes use of hardware that you probably already own.

How It Works

multicore is intended to run on Posix-based multiprocessor and multicore systems. This includes almost all modern Mac OS X and Linux desktop and laptop computers. It can also be used on single nodes of a Linux cluster, for example, but it doesn’t support the use of multiple cluster nodes, like snow.

Since multicore is rather efficient, it can handle somewhat finer-grained parallel problems than snow, but it is still intended for coarse-grained, embarrassingly parallel applications. It cannot compete with multithreaded programming for performing fine-grained parallelism, such as vector operations, for example.

Since multicore runs on a single computer, it doesn’t give you access to greater aggregate memory, like snow. However, since fork() only copies data when it is modified, multicore often makes more efficient use of memory on a single computer than snow can on a single computer.

Setting Up

multicore is available on CRAN, so it is installed like any other CRAN package. Much of it is written in C, but it doesn’t depend on any external libraries, so building it from source is fairly easy on Posix-based systems.

Here’s how I usually install multicore:

install.packages("multicore")

It may ask you which CRAN mirror to use, and then it will download and install the package.

There is no Windows binary distribution available for multicore on CRAN, so if you’re using Windows 2000 or XP, and want to try the experimental Windows support, you’ll have to build it from the source distribution. This requires additional software to be installed, and is beyond the scope of this book.

Once you’ve installed multicore, you should verify that you can load it:

library(multicore)

If that succeeds, you are ready to start using multicore.

Working with It

The mclapply Function

The most important and commonly used function in the multicore package is mclapply(), which is basically a drop-in replacement for lapply(). It is one of the high-level functions in multicore, the others being pvec(), parallel(), and collect(), which we will discuss later.

Although mclapply() takes some additional arguments (all prefixed with “mc.”), it is essentially the same as lapply(). If you have an R script that spends a lot of time calling lapply(), it’s very possible that all you will have to do to parallelize it is to load the multicore package and replace lapply() with mclapply().

For example, let’s write a parallel K-Means using multicore:

library(multicore)
library(MASS)
results <- mclapply(rep(25, 4), function(nstart) kmeans(Boston, 4, nstart=nstart))
i <- sapply(results, function(result) result$tot.withinss)
result <- results[[which.min(i)]]

This is nearly identical to the sequential, lapply() version of K-Means from the snow chapter, except that we loaded the multicore package and replaced lapply() with mclapply(). In particular, we didn’t have to create a cluster object, and we didn’t have to initialize the workers by loading the MASS package on each of them. This is because mclapply() automatically starts the workers using fork(). These workers inherit the functions, variables and environment of the master process, making explicit worker initialization unnecessary.

It may surprise you that mclapply() creates worker processes every time it is called. snow doesn’t do that since starting workers on a cluster is often rather time consuming. However, fork() is relatively fast, especially since it doesn’t copy process data until it needs to, a technique called copy-on-write which takes advantage of the operating system’s virtual memory system. In addition, forking the workers every time mclapply() is called gives each of them a virtual copy of the master’s environment right at the point that mclapply() is executed, so worker data is in sync with the master. Thus, you don’t need to recreate the master’s data and environment in the workers, as in snow, since fork() does that automatically and efficiently.

The mc.cores Option

The mclapply() function takes a number of optional arguments that modify its behaviour. One of the most important of these is the mc.cores argument which controls the number of workers that are created, which is often set equal to the number of cores on the computer. By default, mclapply() uses the value of getOption("cores"), which can be set using the standard options() function. If this option isn’t set, mclapply() will detect and use the number of cores on the computer.

Let’s tell mclapply() to start two workers using mc.cores:

> unique(unlist(mclapply(1:100, function(i) Sys.getpid(), mc.cores = 2)))
[1] 4953 4954

As you can see, there are only two unique PIDs in the results, indicating that exactly two processes executed all 100 tasks.

Now let’s use options() to specify three workers:

> options(cores = 3)
> unique(unlist(mclapply(1:100, function(i) Sys.getpid())))
[1] 4955 4956 4957

This will also control the number of workers started by the pvec() function, which we will discuss later.

The mc.set.seed Option

Another important mclapply() option is mc.set.seed. When mc.set.seed is set to TRUE, mclapply() will seed each of the workers to a different value after they have been created, which is mclapply()’s default behaviour. If mc.set.seed is set to FALSE, mclapply() won’t do anything with respect to the random number generator.

In general, I would recommend that you leave mc.set.seed set to TRUE unless you have a good reason to turn it off. The problem with setting mc.set.seed to FALSE is that the worker processes will inherit the state of the master’s random number generator if it is set.

Let’s experiment with setting mc.set.seed to FALSE. First, we’ll generate some random numbers on the workers using mclapply() when the master’s state is clean:

> mclapply(1:3, function(i) rnorm(3), mc.cores = 3, mc.set.seed = FALSE)
[[1]]
[1] -1.268046  0.262834  2.415977

[[2]]
[1] -0.1817228  0.6496526 -0.7741212

[[3]]
[1] -0.7378100  0.1080590 -0.5902874

All the values are different, so everything looks fine. But watch what happens if we generate a random number on the master, and then call mclapply() again:

> rnorm(1)
[1] 1.847741
> mclapply(1:3, function(i) rnorm(3), mc.cores = 3, mc.set.seed = FALSE)
[[1]]
[1]  0.6995516 -0.2436397 -0.6131929

[[2]]
[1]  0.6995516 -0.2436397 -0.6131929

[[3]]
[1]  0.6995516 -0.2436397 -0.6131929

Now the workers all produce identical random numbers, and they will produce the same numbers if I were to call mclapply() again!

This happens because generating any random numbers or calling set.seed() creates a variable called .Random.seed in the global environment, and its value is used to generate subsequent random numbers. Therefore, if that variable exists on the master when mclapply() is executed, all the worker processes will inherit it and produce the same stream of random numbers unless something is done to reseed each of the workers.

When mc.set.seed is TRUE, mclapply() will explicitly set the seed differently in each of the workers before calling the user’s function. Let’s try that after setting the seed in the master to make sure the workers do indeed produce different random numbers:

> set.seed(7777442)
> mclapply(1:3, function(i) rnorm(3), mc.cores = 3, mc.set.seed = TRUE)
[[1]]
[1] -1.0757472 -0.7850815 -0.1700620

[[2]]
[1] -0.63224810 -0.04542427  1.46662809

[[3]]
[1] -0.2067085  0.7669072  0.4032044

As of multicore 0.1-5, setting mc.set.seed to TRUE will cause mclapply() to execute set.seed(Sys.getpid()) in each of the workers. Thus, not only are the workers seeded differently from each other, but they are also seeded differently from the workers created by previous calls to mclapply().[35]

Load Balancing with mclapply

What if you want load balancing with multicore? By default, mclapply() will work like snow’s parLapply() function. That is, it preschedules the work by dividing it into as many tasks as there are cores. Sometimes that works well, even if the tasks have very different lengths. But to best balance the work performed by each of the workers, prescheduling can be turned off by setting mc.preschedule to FALSE. This makes mclapply() work more like snow’s clusterApplyLB() function.

Let’s use the parallel sleep example to see what difference prescheduling can make:

> set.seed(93564990)
> sleeptime <- abs(rnorm(10, 10, 10))
> system.time(mclapply(sleeptime, Sys.sleep, mc.cores = 4))
   user  system elapsed
  0.012   0.008  64.763
> system.time(mclapply(sleeptime, Sys.sleep, mc.cores = 4, mc.preschedule = FALSE))
   user  system elapsed
  0.032   0.028  57.347

Unfortunately we can’t easily generate performance plots, as with snow, but the elapsed times demonstrate that it can help to turn off prescheduling if the times to execute the aggregated tasks are different. The difference isn’t as great as we demonstrated between clusterApply() and clusterApplyLB(), since prescheduling tends to smooth out the differences in the length of individual tasks, but it can still be significant.

Keep in mind that a new worker is forked for every element of the vector passed to mclapply() when prescheduling is turned off. That means that the performance could suffer if each call to the function is relatively short. In other words, you should probably only set mc.preschedule to FALSE if the tasks are both long and varying in length. Otherwise, it’s probably a safer bet to leave prescheduling turned on.

The pvec Function

The pvec() function was introduced in multicore 0.1-4. It is a high-level function used to execute vector functions in parallel. Let’s use it to take the cube root of a vector:

> x <- 1:10
> pvec(x, "^", 1/3)
 [1] 1.000000 1.259921 1.442250 1.587401 1.709976 1.817121 1.912931 2.000000
 [9] 2.080084 2.154435

This is like the parVapply() function that we developed in the snow chapter. In both cases, the worker function is executed on subvectors of the input vector, rather than on each element of it, making it potentially more efficient and convenient than mclapply() for this case.

pvec() takes the same additional arguments as mclapply() (all prefixed with “mc.”)—except for mc.preschedule, which isn’t appropriate for pvec().

Note

Many vector functions, including ^, are not compute intensive enough to make the use of pvec() worthwhile. This example runs slower on my computers than the equivalent sequential version, regardless of the vector length.

The parallel and collect Functions

The parallel() and collect() functions are the last of the high-level functions in multicore, and are used together. The parallel() function creates a new process using fork() to evaluate an expression in parallel with the calling process. It returns a parallelJob object which is passed to the collect() function to retrieve the result of the computation. collect() can be called with either a single parallelJob object, or a list of parallelJob objects. It returns the corresponding results in a list, in the same order that the jobs were specified to collect() (but only if wait is TRUE, as we’ll see later).

Normally, you would call parallel() multiple times, and then use collect() to retrieve all of the results. This can be useful if you want to execute several different functions in parallel, or start a job running in the background and then do something else before waiting for it to complete.

Let’s use parallel() and collect() to execute three different functions in parallel. For demonstration purposes, I’ll define very contrived functions that each sleep for a different period of time and then return a number that identifies them:

library(multicore)
fun1 <- function() {Sys.sleep(10); 1}
fun2 <- function() {Sys.sleep(5);  2}
fun3 <- function() {Sys.sleep(1);  3}

Let’s start each of them executing using parallel(), and then wait for the results using collect():

> f1 <- parallel(fun1())
> f2 <- parallel(fun2())
> f3 <- parallel(fun3())
> collect(list(f1, f2, f3))
$`4862`
[1] 1

$`4863`
[1] 2

$`4864`
[1] 3

As you can see, the results are returned in the same order that they were specified to collect().

That is the basic way of using parallel() and collect(). You can think of parallel() as a submit operation, and collect() as a wait operation, similar to batch queueing commands.

Using collect Options

The collect() function has two options that give you more control over how it waits for jobs started via parallel(): wait and timeout. If wait is set to TRUE (the default value), then collect() waits for all of the specified jobs to finish, regardless of the value of timeout, and returns the results in a list in the same order that the jobs were specified to collect(). But if wait is set to FALSE, then collect() waits for up to timeout seconds for at least one of the jobs to finish or a process to exit, and returns the results in a list in arbitrary order, using a NULL to indicate that a process exited. If no jobs finish in that time, collect() returns a NULL.

To check for results without waiting at all, you call collect() with wait set to FALSE, and timeout set to its default value of 0. Let’s do that several times, pausing after the first collect() to wait for some results:

> f1 <- parallel(fun1())
> f2 <- parallel(fun2())
> f3 <- parallel(fun3())
> collect(list(f1, f2, f3), wait=FALSE)
NULL 1
> Sys.sleep(15)
> collect(list(f1, f2, f3), wait=FALSE)
[[1]] 2
[1] 3

[[2]]
[1] 2

[[3]]
[1] 1

> collect(list(f1, f2, f3), wait=FALSE)
[[1]] 3
NULL 

[[2]]
NULL 

[[3]]
NULL 

> collect(list(f1, f2, f3), wait=FALSE)
NULL 4

Here’s what each of the four values returned by collect() indicate:

1

No results are available and no processes have exited

2

fun3(), fun2(), and fun1() have completed

3

All three of the processes have exited

4

All results have been returned and all processes have exited

The timeout argument allows you to wait a specified number of seconds for at least one result to complete or one process to exit (assuming wait is set to TRUE). Let’s do that repeatedly in order to collect all of the results:

> f1 <- parallel(fun1())
> f2 <- parallel(fun2())
> f3 <- parallel(fun3())
> collect(list(f1, f2, f3), wait=FALSE, timeout=1000000)
[[1]]
[1] 3 1

> collect(list(f1, f2, f3), wait=FALSE, timeout=1000000)
[[1]]
NULL 2

> collect(list(f1, f2, f3), wait=FALSE, timeout=1000000)
[[1]]
[1] 2 3

> collect(list(f1, f2, f3), wait=FALSE, timeout=1000000)
[[1]]
NULL 4

> collect(list(f1, f2, f3), wait=FALSE, timeout=1000000)
[[1]]
[1] 1 5

> collect(list(f1, f2, f3), wait=FALSE, timeout=1000000)
[[1]]
NULL 6

> collect(list(f1, f2, f3), wait=FALSE, timeout=1000000)
NULL 7

Here’s what each of the seven values returned by collect() indicate:

1

fun3() has completed

2

The process that executed fun3() has exited

3

fun2() has completed

4

the process that executed fun2() has exited

5

fun1() has completed

6

The process that executed fun1() has exited

7

All results have been returned and all processes exited

Note that if we had used a shorter timeout, such a 2, collect() would have returned some NULLs, indicating that the timeout had expired before any jobs completed or processes exited.

Parallel Random Number Generation

Unfortunately, there is no support built into the multicore package for any of the parallel random number generation packages, such as rlecuyer or rsprng.[36] It isn’t too hard to use them directly, but since the high-level functions fork the workers each time they are called, you can’t initialize the workers once and then use them repeatedly, as in snow. You might need to generate a new seed every time you do a parallel operation, or perhaps have the workers return their state along with the result so that the next set of workers can pick up where the previous set left off.

Here’s the idea: initialize each of the workers to use parallel random numbers at the start of the task. We can even use the initSprngNode() function which is defined in snow to do that:

> library(snow)
> nw <- 3
> seed <- 7777442
> kind <- 0
> para <- 0
> f1 <- parallel({
+     initSprngNode(0, nw, seed, kind, para)
+     rnorm(1)
+ })
> f2 <- parallel({
+     initSprngNode(1, nw, seed, kind, para)
+     rnorm(1)
+ })
> f3 <- parallel({
+     initSprngNode(2, nw, seed, kind, para)
+     rnorm(1)
+ })
> unlist(collect(list(f1, f2, f3)), use.names = FALSE)
[1] -0.1447636  1.0686927 -0.4137106

Since parallel() takes an expression, it is easy to prepend a call to initSprngNode to the expression using curly braces. We could do something similar with mclapply() using a wrapper function, except having an additional varying argument might require a bit of work. Being able to easily specify a different first argument to initSprngNode for each worker can make parallel() easier to use.

Notice that we get the same results using snow:

> cl <- makeCluster(3, type = "SOCK")
> seed <- 7777442
> clusterSetupSPRNG(cl, seed = seed)
> unlist(clusterEvalQ(cl, rnorm(1)), use.names = FALSE)
[1] -0.1447636  1.0686927 -0.4137106
> stopCluster(cl)

The same basic approach can be used with the rlecuyer package. See the source for clusterSetupRNGstream() in snow to figure out how.

The Low-Level API

So far, we’ve only discussed multicore’s high-level API. There is also a low-level API which includes functions such as fork(), selectChildren(), readChild(), sendMaster(), and exit(). Those are the basic functions used to implement mclapply(), and to demonstrate how they can be used, I will implement a stripped down version of mclapply(), which I call mclapply.init. To make it more interesting, I will include an option called mc.init that can be used to initialize the worker processes. The value of mc.init should be a function that takes two arguments: id and cores. This function will be called in each of the child/worker processes before executing the worker function.

Here is the definition of mclapply.init using multicore’s low-level API:

mclapply.init <- function(X, FUN, ..., mc.cores=4, mc.init=NULL) {
  cores <- max(min(mc.cores, length(X)), 1)
  ix <- lapply(1:cores, function(i) seq(i, length(X), by=cores))
  forkloop <- function(core) {
    proc <- fork()
    if (inherits(proc, "masterProcess")) {
      sendMaster(tryCatch({
        suppressWarnings(rm(".Random.seed", pos=.GlobalEnv))
        if (is.function(mc.init))
          mc.init(core, cores)
        lapply(X[ix[[core]]], FUN, ...)
      },
      error=function(e) {
        lapply(ix[[core]], function(i) e)
      }))
      exit(0)
    }
    proc$pid
  }
  pids <- sapply(1:cores, forkloop)
  results <- vector("list", length(X))
  while (! is.null(ready <- selectChildren(pids, 1))) {
    if (is.integer(ready)) {
      for (pid in ready) {
        data <- readChild(pid)
        if (is.raw(data)) {
          core <- which(pid == pids)
          results[ix[[core]]] <- unserialize(data)
        }
      }
    }
  }
  names(results) <- names(X)
  results
}

If you’re familiar with Unix system programming, this should look pretty familiar. The master process calls fork() to start each worker process. Fork() returns a process object which will be a childProcess object in the parent process and a masterProcess object in the child process. The code immediately after fork() uses this process object to determine its own identity. If the object is a masterProcess, then it is the child; otherwise, it is the parent/master. The master simply returns the child’s process ID contained in the childProcess object. The child executes the worker function on its portion of the input vector, and sends the result to the master process via the sendMaster() function. Meanwhile, the master calls selectChildren() to wait for the children to do something. selectChildren() returns an integer vector of process IDs of the children that have either sent data to the master or exited. The master then calls readChild() for each of those process IDs. If readChild() returns a raw vector, the master unserializes it and saves the results in a list; otherwise, it ignores the value which indicates that the child has exited.

However, I glossed over a couple of important things that the child process does before executing the worker function. First, it removes the .Random.seed variable from the global environment, in order to avoid inheriting the state of the master’s random number generator. Then it calls the function specified by the mc.init argument, passing it the values of core and cores. This function can be used to initialize the worker, and the two argument values may be helpful in doing that.

Let’s say that we would like the worker function to tag each of the result values with its own ID. It can do that by passing a function to mc.init that assigns the value of id to a variable in the global environment:

> set.worker.id <- function(id, cores) {
+     assign(".MC.WORKER.ID", id, pos = .GlobalEnv)
+ }
> mclapply.init(11:13, function(i) c(i, .MC.WORKER.ID), mc.cores = 2, 
+    mc.init = set.worker.id)
[[1]]
[1] 11  1

[[2]]
[1] 12  2

[[3]]
[1] 13  1

Now the producer of each of the results can be identified.

Another possible use of mc.init is to initialize the random number generator. To make mclapply.init() work like mclapply() with mc.set.seed set to TRUE, we can specify the following mc.init function:

> set.worker.seed <- function(id, cores) {
+     set.seed(Sys.getpid())
+ }
> mclapply.init(1:3, function(i) rnorm(1), mc.init = set.worker.seed)
[[1]]
[1] 0.1699496

[[2]]
[1] 0.1616656

[[3]]
[1] -0.3883378

We could also initialize the workers to use a parallel random number generator package, but I’ll leave that as an exercise for the reader.

When It Works…

The best feature in multicore is its drop-in replacement for lapply(): mclapply().[37] It’s about as close as it comes to something that “Just Works” in the world of Parallel R.[38]

…And When It Doesn’t

The biggest gotchas in multicore are not supporting Windows and weak support for parallel random number generation.

The Wrap-up

You now know how to run your R scripts in parallel on the multicore computer that you probably use to read your email every day. You’ve also seen how running on a single machine bypasses many of the difficulties associated with running on multiple machines. So why don’t more R packages take advantage of multicore to run in parallel? The next chapter discusses a new parallel programming package that will come built into R, starting with R 2.14.0, which might encourage more R developers to parallelize their packages. And it will be easy for you to learn, since it uses much of the code from the snow and multicore packages, so almost everything that you’ve learned so far will work in the new parallel package.



[33] An experimental attempt was made to support Windows in multicore 0.1-4 using the Windows NT/2000 Native API, but it only partially works on Windows 2000 and XP, and not at all on Vista and Windows 7.

[34] multicore 0.1-4 attempts to disable the event loop in forked processes on Mac OS X in order to support the Mac GUI for R.

[35] Of course, Unix process IDs usually only go up to about 32767, so they will wrap around eventually, but I’ll ignore that issue.

[36] This is one of the problems solved by the new parallel package.

[37] If you’re using lapply() with a function that modifies a variable outside of its local scope, then mclapply() probably won’t work the same way as lapply(). However, that hasn’t been a problem in my experience. Programmers tend to use for-loops for that sort of code.

[38] And did I mention that multicore is easy to install?

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.59.200.206