multicore
is a popular parallel
programming package for use on multiprocessor and multicore computers. It
was written by Simon Urbanek, and first released on CRAN in 2009. It
immediately became popular because its clever use of the fork()
system call allows it to implement a
parallel lapply()
operation that is even
easier to use than snow
’s parLapply()
.
Unfortunately, because fork()
is a
Posix system call, multicore
can’t really
be used on Windows machines.[33] Fork()
can also cause
problems for functions that use resources that were allocated or initialized
exclusively for the master, or parent process. This is particularly a
problem with graphics functions, so it isn’t generally recommended to use
multicore
with an R GUI.[34] Nevertheless, multicore
works perfectly for most R functions on Posix systems, such as Linux and Mac
OS X, and its use of fork()
makes it very
efficient and convenient, as we’ll see in this chapter.
Motivation: You have an R script
that spends an hour executing a function using lapply()
on your laptop.
Solution: Replace lapply()
with the mclapply()
function from the multicore
package.
Good because: It’s easy to install, easy to use, and makes use of hardware that you probably already own.
multicore
is intended to run on
Posix-based multiprocessor and multicore systems. This includes almost all
modern Mac OS X and Linux desktop and laptop computers. It can also be
used on single nodes of a Linux cluster, for example, but it doesn’t
support the use of multiple cluster nodes, like snow
.
Since multicore
is rather
efficient, it can handle somewhat finer-grained parallel problems than
snow
, but it is still intended for
coarse-grained, embarrassingly parallel applications. It cannot compete with
multithreaded programming for performing fine-grained parallelism, such as
vector operations, for example.
Since multicore
runs on a single
computer, it doesn’t give you access to greater aggregate memory, like
snow
. However, since fork()
only copies data when it is modified,
multicore
often makes more efficient
use of memory on a single computer than snow
can on a single computer.
multicore
is available on CRAN,
so it is installed like any other CRAN package. Much of it is written in
C, but it doesn’t depend on any external libraries, so building it from
source is fairly easy on Posix-based systems.
Here’s how I usually install multicore
:
install.packages("multicore")
It may ask you which CRAN mirror to use, and then it will download and install the package.
There is no Windows binary distribution available for multicore
on CRAN, so if you’re using Windows
2000 or XP, and want to try the experimental Windows support, you’ll have
to build it from the source distribution. This requires additional
software to be installed, and is beyond the scope of this book.
Once you’ve installed multicore
,
you should verify that you can load it:
library(multicore)
If that succeeds, you are ready to start using multicore
.
The most important and commonly used
function in the multicore
package is
mcl
apply()
, which is basically a drop-in
replacement for lapply()
. It is one
of the high-level functions in multicore
, the others being pvec()
, parallel()
, and collect()
, which we will discuss later.
Although mclapply()
takes some
additional arguments (all prefixed with “mc.”), it is essentially the
same as lapply()
. If you have an R
script that spends a lot of time calling lapply()
, it’s very possible that all you will
have to do to parallelize it is to load the multicore
package and replace lapply()
with mclapply()
.
For example, let’s write a parallel K-Means using multicore
:
library(multicore) library(MASS) results <- mclapply(rep(25, 4), function(nstart) kmeans(Boston, 4, nstart=nstart)) i <- sapply(results, function(result) result$tot.withinss) result <- results[[which.min(i)]]
This is nearly identical to the sequential, lapply()
version of K-Means from the snow
chapter, except that we loaded the
multicore
package and replaced
lapply()
with mclapply()
. In particular, we didn’t have to
create a cluster object, and we didn’t have to initialize the workers by
loading the MASS
package on each of
them. This is because mclapply()
automatically starts the workers using fork()
. These workers inherit the functions,
variables and environment of the master process, making explicit worker
initialization unnecessary.
It may surprise you that mclapply()
creates worker processes every time
it is called. snow
doesn’t do that
since starting workers on a cluster is often rather time consuming.
However, fork()
is relatively fast,
especially since it doesn’t copy process data until it needs to, a
technique called copy-on-write which takes
advantage of the operating system’s virtual memory system. In addition,
forking the workers every time mclapply()
is called gives each of them a
virtual copy of the master’s environment right at the point that
mclapply()
is executed, so worker
data is in sync with the master. Thus, you don’t need to recreate the
master’s data and environment in the workers, as in snow
, since fork()
does that automatically and
efficiently.
The mclapply()
function takes a
number of optional arguments that modify its behaviour. One of the most
important of these is the mc.cores
argument which controls the number of workers that are created, which is
often set equal to the number of cores on the computer. By default,
mclapply()
uses the value of getOption("cores")
, which can be set using the
standard options()
function. If this
option isn’t set, mclapply()
will
detect and use the number of cores on the computer.
Let’s tell mclapply()
to start
two workers using mc.cores
:
> unique(unlist(mclapply(1:100, function(i) Sys.getpid(), mc.cores = 2))) [1] 4953 4954
As you can see, there are only two unique PIDs in the results, indicating that exactly two processes executed all 100 tasks.
Now let’s use options()
to
specify three workers:
> options(cores = 3) > unique(unlist(mclapply(1:100, function(i) Sys.getpid()))) [1] 4955 4956 4957
This will also control the number of workers started by the
pvec()
function, which we will
discuss later.
Another important mclapply()
option is mc.set.seed
. When mc.set.seed
is set to TRUE
, mclapply()
will seed each of the workers to a
different value after they have been created, which is mclapply()
’s default behaviour. If mc.set.seed
is set to FALSE
, mclapply()
won’t do anything with respect to
the random number generator.
In general, I would recommend that you leave mc.set.seed
set to TRUE
unless you have a good reason to turn it
off. The problem with setting mc.set.seed
to FALSE
is that the worker processes will
inherit the state of the master’s random number generator if it is
set.
Let’s experiment with setting mc.set.seed
to FALSE
. First, we’ll generate some random
numbers on the workers using mclapply()
when the master’s state is
clean:
> mclapply(1:3, function(i) rnorm(3), mc.cores = 3, mc.set.seed = FALSE) [[1]] [1] -1.268046 0.262834 2.415977 [[2]] [1] -0.1817228 0.6496526 -0.7741212 [[3]] [1] -0.7378100 0.1080590 -0.5902874
All the values are different, so everything looks fine. But watch
what happens if we generate a random number on the master, and then call
mclapply()
again:
> rnorm(1) [1] 1.847741 > mclapply(1:3, function(i) rnorm(3), mc.cores = 3, mc.set.seed = FALSE) [[1]] [1] 0.6995516 -0.2436397 -0.6131929 [[2]] [1] 0.6995516 -0.2436397 -0.6131929 [[3]] [1] 0.6995516 -0.2436397 -0.6131929
Now the workers all produce identical random numbers, and they
will produce the same numbers if I were to call mclapply()
again!
This happens because generating any random numbers or calling
set.seed()
creates a variable called
.Random.seed
in the global
environment, and its value is used to generate subsequent random
numbers. Therefore, if that variable exists on the master when mclapply()
is executed, all the worker
processes will inherit it and produce the same stream of random numbers
unless something is done to reseed each of the
workers.
When mc.set.seed
is TRUE
, mclapply()
will explicitly set the seed
differently in each of the workers before calling the user’s function.
Let’s try that after setting the seed in the master to make sure the
workers do indeed produce different random numbers:
> set.seed(7777442) > mclapply(1:3, function(i) rnorm(3), mc.cores = 3, mc.set.seed = TRUE) [[1]] [1] -1.0757472 -0.7850815 -0.1700620 [[2]] [1] -0.63224810 -0.04542427 1.46662809 [[3]] [1] -0.2067085 0.7669072 0.4032044
As of multicore 0.1-5
, setting
mc.set.seed
to TRUE
will cause mclapply()
to execute set.seed(Sys.getpid())
in each of the workers.
Thus, not only are the workers seeded differently from each other, but
they are also seeded differently from the workers created by previous
calls to mclapply()
.[35]
What if you want load balancing with multicore
? By default, mclapply()
will work like snow
’s parLapply()
function. That is, it preschedules
the work by dividing it into as many tasks as there are cores. Sometimes
that works well, even if the tasks have very different lengths. But to
best balance the work performed by each of the workers, prescheduling
can be turned off by setting mc.preschedule
to FALSE
. This makes
mcl
apply()
work more like snow
’s clusterApplyLB()
function.
Let’s use the parallel sleep example to see what difference prescheduling can make:
> set.seed(93564990) > sleeptime <- abs(rnorm(10, 10, 10)) > system.time(mclapply(sleeptime, Sys.sleep, mc.cores = 4)) user system elapsed 0.012 0.008 64.763 > system.time(mclapply(sleeptime, Sys.sleep, mc.cores = 4, mc.preschedule = FALSE)) user system elapsed 0.032 0.028 57.347
Unfortunately we can’t easily generate performance plots, as with
snow
, but the elapsed times
demonstrate that it can help to turn off prescheduling if the times to
execute the aggregated tasks are different. The difference isn’t as
great as we demonstrated between clusterApply()
and clusterApplyLB()
, since prescheduling tends to
smooth out the differences in the length of individual tasks, but it can
still be significant.
Keep in mind that a new worker is forked for every element of the
vector passed to mclapply()
when
prescheduling is turned off. That means that the performance could
suffer if each call to the function is relatively short. In other words,
you should probably only set mc.preschedule
to FALSE
if the tasks are both long and varying
in length. Otherwise, it’s probably a safer bet to leave prescheduling
turned on.
The pvec()
function was
introduced in multicore 0.1-4
. It is
a high-level function used to execute vector
functions in parallel. Let’s use it to take the cube root of a
vector:
> x <- 1:10 > pvec(x, "^", 1/3) [1] 1.000000 1.259921 1.442250 1.587401 1.709976 1.817121 1.912931 2.000000 [9] 2.080084 2.154435
This is like the parVapply()
function that we developed in the snow
chapter. In both cases, the worker
function is executed on subvectors of the input vector, rather than
on each element of it, making it
potentially more efficient and convenient than mcl
apply()
for this case.
pvec()
takes the same
additional arguments as mclapply()
(all prefixed with “mc.”)—except for mc.preschedule
, which isn’t appropriate for
pvec()
.
Many vector functions, including ^
, are not compute intensive enough to make
the use of pvec()
worthwhile. This
example runs slower on my computers than the equivalent sequential
version, regardless of the vector length.
The parallel()
and collect()
functions are the last of the
high-level functions in multicore
, and are used together. The
parallel()
function creates a new
process using fork()
to evaluate an
expression in parallel with the calling process. It returns a parallelJob
object which
is passed to the collect()
function
to retrieve the result of the computation. collect()
can be called with either a single
parallelJob
object, or a list of
parallelJob
objects. It returns the
corresponding results in a list, in the same order that the jobs were
specified to collect()
(but only if
wait
is TRUE
, as we’ll see later).
Normally, you would call parallel()
multiple times, and then use
collect()
to retrieve all of the
results. This can be useful if you want to execute several different
functions in parallel, or start a job running in the background and then
do something else before waiting for it to complete.
Let’s use parallel()
and
collect()
to execute three different
functions in parallel. For demonstration purposes, I’ll define very
contrived functions that each sleep for a different period of time and
then return a number that identifies them:
library(multicore) fun1 <- function() {Sys.sleep(10); 1} fun2 <- function() {Sys.sleep(5); 2} fun3 <- function() {Sys.sleep(1); 3}
Let’s start each of them executing using parallel()
, and then wait for the results
using collect()
:
> f1 <- parallel(fun1()) > f2 <- parallel(fun2()) > f3 <- parallel(fun3()) > collect(list(f1, f2, f3)) $`4862` [1] 1 $`4863` [1] 2 $`4864` [1] 3
As you can see, the results are returned in the same order that
they were specified to collect()
.
That is the basic way of using parallel()
and collect()
. You can think of parallel()
as a submit
operation, and collect()
as a
wait operation, similar to batch queueing
commands.
The collect()
function has two
options that give you more control over how it waits for jobs started
via parallel()
: wait
and timeout
. If wait
is set to TRUE
(the default value), then collect()
waits for all of the specified jobs
to finish, regardless of the value of timeout
, and returns the results in a list in
the same order that the jobs were specified to collect()
. But if wait
is set to FALSE
, then collect()
waits for up to timeout
seconds for at least one of the jobs
to finish or a process to exit, and returns the results in a list in
arbitrary order, using a NULL
to
indicate that a process exited. If no jobs finish in that time, collect()
returns a NULL
.
To check for results without waiting at all, you call collect()
with wait
set to FALSE
, and timeout
set to its default value of 0. Let’s
do that several times, pausing after the first collect()
to wait for some results:
> f1 <- parallel(fun1()) > f2 <- parallel(fun2()) > f3 <- parallel(fun3()) > collect(list(f1, f2, f3), wait=FALSE) NULL > Sys.sleep(15) > collect(list(f1, f2, f3), wait=FALSE) [[1]] [1] 3 [[2]] [1] 2 [[3]] [1] 1 > collect(list(f1, f2, f3), wait=FALSE) [[1]] NULL [[2]] NULL [[3]] NULL > collect(list(f1, f2, f3), wait=FALSE) NULL
Here’s what each of the four values returned by collect()
indicate:
The timeout
argument allows you
to wait a specified number of seconds for at least one result to
complete or one process to exit (assuming wait
is set to TRUE
). Let’s do that repeatedly in order to
collect all of the results:
> f1 <- parallel(fun1()) > f2 <- parallel(fun2()) > f3 <- parallel(fun3()) > collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) [[1]] [1] 3 > collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) [[1]] NULL > collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) [[1]] [1] 2 > collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) [[1]] NULL > collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) [[1]] [1] 1 > collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) [[1]] NULL > collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) NULL
Here’s what each of the seven values returned by collect()
indicate:
Note that if we had used a shorter timeout, such a 2, collect()
would have returned some NULL
s, indicating that the timeout had expired
before any jobs completed or processes exited.
Unfortunately, there is no support built into the multicore
package for any of the parallel
random number generation packages, such as rlecuyer
or rsprng
.[36] It isn’t too hard to use them directly, but since the
high-level functions fork the workers each time
they are called, you can’t initialize the workers once and then use them
repeatedly, as in snow
. You might
need to generate a new seed every time you do a parallel operation, or
perhaps have the workers return their state along with the result so
that the next set of workers can pick up where the previous set left
off.
Here’s the idea: initialize each of the workers to use parallel
random numbers at the start of the task. We can even use the initSprngNode()
function which is defined in
snow
to do that:
> library(snow) > nw <- 3 > seed <- 7777442 > kind <- 0 > para <- 0 > f1 <- parallel({ + initSprngNode(0, nw, seed, kind, para) + rnorm(1) + }) > f2 <- parallel({ + initSprngNode(1, nw, seed, kind, para) + rnorm(1) + }) > f3 <- parallel({ + initSprngNode(2, nw, seed, kind, para) + rnorm(1) + }) > unlist(collect(list(f1, f2, f3)), use.names = FALSE) [1] -0.1447636 1.0686927 -0.4137106
Since parallel()
takes an
expression, it is easy to prepend a call to initSprngNode
to the expression using curly
braces. We could do something similar with mclapply()
using a wrapper function, except
having an additional varying argument might require
a bit of work. Being able to easily specify a different first argument
to initSprngNode
for each worker can
make parallel()
easier to use.
Notice that we get the same results using snow
:
> cl <- makeCluster(3, type = "SOCK") > seed <- 7777442 > clusterSetupSPRNG(cl, seed = seed) > unlist(clusterEvalQ(cl, rnorm(1)), use.names = FALSE) [1] -0.1447636 1.0686927 -0.4137106 > stopCluster(cl)
The same basic approach can be used with the rlecuyer
package. See the source for clusterSetupRNGstream()
in snow
to figure out how.
So far, we’ve only discussed multicore
’s high-level
API. There is also a low-level API which
includes functions such as fork()
,
selectChildren()
, readChild()
, sendMaster()
, and exit()
. Those are the basic functions
used to implement mclapply()
, and to
demonstrate how they can be used, I will implement a stripped down
version of mcl
apply()
, which I call
mclapply.init. To make it more interesting, I will
include an option called mc.init
that
can be used to initialize the worker processes. The value of mc.init
should be a function that takes two
arguments: id
and cores
. This function will be called in each of
the child/worker processes before executing the worker function.
Here is the definition of mclapply.init using
multicore
’s low-level API:
mclapply.init <- function(X, FUN, ..., mc.cores=4, mc.init=NULL) { cores <- max(min(mc.cores, length(X)), 1) ix <- lapply(1:cores, function(i) seq(i, length(X), by=cores)) forkloop <- function(core) { proc <- fork() if (inherits(proc, "masterProcess")) { sendMaster(tryCatch({ suppressWarnings(rm(".Random.seed", pos=.GlobalEnv)) if (is.function(mc.init)) mc.init(core, cores) lapply(X[ix[[core]]], FUN, ...) }, error=function(e) { lapply(ix[[core]], function(i) e) })) exit(0) } proc$pid } pids <- sapply(1:cores, forkloop) results <- vector("list", length(X)) while (! is.null(ready <- selectChildren(pids, 1))) { if (is.integer(ready)) { for (pid in ready) { data <- readChild(pid) if (is.raw(data)) { core <- which(pid == pids) results[ix[[core]]] <- unserialize(data) } } } } names(results) <- names(X) results }
If you’re familiar with Unix system programming, this should look
pretty familiar. The master process calls fork()
to start each worker process. Fork()
returns a process object which will be
a childProcess
object in the parent
process and a masterProcess
object in
the child process. The code immediately after fork()
uses this process object to determine
its own identity. If the object is a masterProcess
, then it is the child;
otherwise, it is the parent/master. The master simply returns the
child’s process ID contained in the childProcess
object. The child executes the
worker function on its portion of the input vector, and sends the result
to the master process via the sendMaster()
function. Meanwhile, the master
calls selectChildren()
to wait for
the children to do something. selectChildren()
returns an integer vector of
process IDs of the children that have either sent data to the master or
exited. The master then calls readChild()
for each of those process IDs. If
readChild()
returns a raw vector, the
master unserializes it and saves the results in a list; otherwise, it
ignores the value which indicates that the child has exited.
However, I glossed over a couple of important things that the
child process does before executing the worker function. First, it
removes the .Random.seed
variable
from the global environment, in order to avoid inheriting the state of
the master’s random number generator. Then it calls the function
specified by the mc.init
argument,
passing it the values of core
and
cores
. This function can be used to
initialize the worker, and the two argument values may be helpful in
doing that.
Let’s say that we would like the worker function to tag each of
the result values with its own ID. It can do that by passing a function
to mc.init
that assigns the value of
id
to a variable in the global
environment:
> set.worker.id <- function(id, cores) { + assign(".MC.WORKER.ID", id, pos = .GlobalEnv) + } > mclapply.init(11:13, function(i) c(i, .MC.WORKER.ID), mc.cores = 2, + mc.init = set.worker.id) [[1]] [1] 11 1 [[2]] [1] 12 2 [[3]] [1] 13 1
Now the producer of each of the results can be identified.
Another possible use of mc.init
is to initialize the random number generator. To make mclapply.init()
work like mclapply()
with mc.set.seed
set to TRUE
, we can specify the following mc.init
function:
> set.worker.seed <- function(id, cores) { + set.seed(Sys.getpid()) + } > mclapply.init(1:3, function(i) rnorm(1), mc.init = set.worker.seed) [[1]] [1] 0.1699496 [[2]] [1] 0.1616656 [[3]] [1] -0.3883378
We could also initialize the workers to use a parallel random number generator package, but I’ll leave that as an exercise for the reader.
The best feature in multicore
is
its drop-in replacement for lapply()
:
mclapply()
.[37] It’s about as close as it comes to something that “Just
Works” in the world of Parallel R.[38]
The biggest gotchas in multicore
are not supporting Windows and weak support for parallel random number
generation.
You now know how to run your R scripts in parallel on the multicore
computer that you probably use to read your email every day. You’ve also
seen how running on a single machine bypasses many of the difficulties
associated with running on multiple machines. So why don’t more R packages
take advantage of multicore
to run in
parallel? The next chapter discusses a new parallel programming package
that will come built into R, starting with R 2.14.0, which might encourage
more R developers to parallelize their packages. And it will be easy for
you to learn, since it uses much of the code from the snow
and multicore
packages, so almost everything that
you’ve learned so far will work in the new parallel
package.
[33] An experimental attempt was made to support Windows in multicore 0.1-4
using the Windows NT/2000
Native API, but it only partially works on Windows 2000 and XP, and not
at all on Vista and Windows 7.
[34] multicore 0.1-4
attempts to
disable the event loop in forked processes on Mac OS X in order to
support the Mac GUI for R.
[35] Of course, Unix process IDs usually only go up to about 32767, so they will wrap around eventually, but I’ll ignore that issue.
[36] This is one of the problems solved by the new parallel
package.
[37] If you’re using lapply()
with
a function that modifies a variable outside of its local scope, then
mclapply()
probably won’t work the
same way as lapply()
. However, that
hasn’t been a problem in my experience. Programmers tend to use
for-loops for that sort of code.
[38] And did I mention that multicore
is easy to install?
13.59.200.206