Chapter 2. snow

snow (“Simple Network of Workstations”) is probably the most popular parallel programming package available for R. It was written by Luke Tierney, A. J. Rossini, Na Li, and H. Sevcikova, and is actively maintained by Luke Tierney. It is a mature package, first released on the “Comprehensive R Archive Network” (CRAN) in 2003.

Quick Look

Motivation: You want to use a Linux cluster to run an R script faster. For example, you’re running a Monte Carlo simulation on your laptop, but you’re sick of waiting many hours or days for it to finish.

Solution: Use snow to run your R code on your company or university’s Linux cluster.

Good because: snow fits well into a traditional cluster environment, and is able to take advantage of high-speed communication networks, such as InfiniBand, using MPI.

How It Works

snow provides support for easily executing R functions in parallel. Most of the parallel execution functions in snow are variations of the standard lapply() function, making snow fairly easy to learn. To implement these parallel operations, snow uses a master/worker architecture, where the master sends tasks to the workers, and the workers execute the tasks and return the results to the master.

One important feature of snow is that it can be used with different transport mechanisms to communicate between the master and workers. This allows it to be portable, but still take advantage of high-performance communication mechanisms if available. snow can be used with socket connections, MPI, PVM, or NetWorkSpaces. The socket transport doesn’t require any additional packages, and is the most portable. MPI is supported via the Rmpi package, PVM via rpvm, and NetWorkSpaces via nws. The MPI transport is popular on Linux clusters, and the socket transport is popular on multicore computers, particularly Windows computers.[5]

snow is primarily intended to run on traditional clusters and is particularly useful if MPI is available. It is well suited to Monte Carlo simulations, bootstrapping, cross validation, ensemble machine learning algorithms, and K-Means clustering.

Good support is available for parallel random number generation, using the rsprng and rlecuyer packages. This is very important when performing simulations, bootstrapping, and machine learning, all of which can depend on random number generation.

snow doesn’t provide mechanisms for dealing with large data, such as distributing data files to the workers. The input arguments must fit into memory when calling a snow function, and all of the task results are kept in memory on the master until they are returned to the caller in a list. Of course, snow can be used with high-performance distributed file systems in order to operate on large data files, but it’s up to the user to arrange that.

Setting Up

snow is available on CRAN, so it is installed like any other CRAN package. It is pure R code and almost never has installation problems. There are binary packages for both Windows and Mac OS X.

Although there are various ways to install packages from CRAN, I generally use the install.packages() function:


It may ask you which CRAN mirror to use, and then it will download and install the package.

If you’re using an old version of R, you may get a message saying that snow is not available. snow has required R 2.12.1 since version 0.3-5, so you might need to download and install snow 0.3-3 from the CRAN package archives. In your browser, search for “CRAN snow” and it will probably bring you to snow’s download page on CRAN. Click on the “snow archive” link, and then you can download snow_0.3-3.tar.gz. Or you can try directly downloading it from:

Once you’ve downloaded it, you can install it from the command line with:

% R CMD INSTALL snow_0.3-3.tar.gz

You may need to use the -l option to specify a different installation directory if you don’t have permission to install it in the default directory. For help on this command, use the --help option. For more information on installing R packages, see the section “Installing packages” in the “R Installation and Administration” manual, written by the “R Development Core Team”, and available from the R Project website.


As a developer, I always use the most recent version of R. That makes it easier to install packages from CRAN, since packages are only built for the most recent version of R on CRAN. They keep around older binary distributions of packages, but they don’t build new packages or new versions of packages for anything but the current version of R. And if a new version of a package depends on a newer version of R, as with snow, you can’t even build it for yourself on an older version of R. However, if you’re using R for production use, you need to be much more cautious about upgrading to the latest version of R.

To use snow with MPI, you will also need to install the Rmpi package. Unfortunately, installing Rmpi is a frequent cause of problems because it has an external dependency on MPI. For more information, see Installing Rmpi.

Fortunately, the socket transport can be used without installing any additional packages. For that reason, I suggest that you start by using the socket transport if you are new to snow.

Once you’ve installed snow, you should verify that you can load it:


If that succeeds, you are ready to start using snow.

Working with It

Creating Clusters with makeCluster

In order to execute any functions in parallel with snow, you must first create a cluster object. The cluster object is used to interact with the cluster workers, and is passed as the first argument to many of the snow functions. You can create different types of cluster objects, depending on the transport mechanism that you wish to use.

The basic cluster creation function is makeCluster() which can create any type of cluster. Let’s use it to create a cluster of four workers on the local machine using the socket transport:

cl <- makeCluster(4, type="SOCK")

The first argument is the cluster specification, and the second is the cluster type. The interpretation of the cluster specification depends on the type, but all cluster types allow you to specify a worker count.

Socket clusters also allow you to specify the worker machines as a character vector. The following will launch four workers on remote machines:

spec <- c("n1", "n2", "n3", "n4")
cl <- makeCluster(spec, type="SOCK")

The socket transport launches each of these workers via the ssh command[6] unless the name is “localhost”, in which case makeCluster() starts the worker itself. For remote execution, you should configure ssh to use password-less login. This can be done using public-key authentication and SSH agents, which is covered in chapter 6 of SSH, The Secure Shell: The Definitive Guide (O’Reilly) and many websites.

makeCluster() allows you to specify addition arguments as configuration options. This is discussed further in snow Configuration.

The type argument can be “SOCK”, “MPI”, “PVM” or “NWS”. To create an MPI cluster with four workers, execute:

cl <- makeCluster(4, type="MPI")

This will start four MPI workers on the local machine unless you make special provisions, as described in the section Executing snow Programs on a Cluster with Rmpi.

You can also use the functions makeSOCKcluster(), makeMPIcluster(), makePVMcluster(), and makeNWScluster() to create specific types of clusters. In fact, makeCluster() is nothing more than a wrapper around these functions.

To shut down any type of cluster, use the stopCluster() function:


Some cluster types may be automatically stopped when the R session exits, but it’s good practice to always call stopCluster() in snow scripts; otherwise, you risk leaking cluster workers if the cluster type is changed, for example.


Creating the cluster object can fail for a number of reasons, and is therefore a source of problems. See the section Troubleshooting snow Programs for help in solving these problems.

Parallel K-Means

We’re finally ready to use snow to do some parallel computing, so let’s look at a real example: parallel K-Means. K-Means is a clustering algorithm that partitions rows of a dataset into k clusters.[7] It’s an iterative algorithm, since it starts with a guess of the location for each of the cluster centers, and gradually improves the center locations until it converges on a solution.

R includes a function for performing K-Means clustering in the stats package: the kmeans() function. One way of using the kmeans() function is to specify the number of cluster centers, and kmeans() will pick the starting points for the centers by randomly selecting that number of rows from your dataset. After it iterates to a solution, it computes a value called the total within-cluster sum of squares. It then selects another set of rows for the starting points, and repeats this process in an attempt to find a solution with a smallest total within-cluster sum of squares.

Let’s use kmeans() to generate four clusters of the “Boston” dataset, using 100 random sets of centers:

result <- kmeans(Boston, 4, nstart=100)

We’re going to take a simple approach to parallelizing kmeans() that can be used for parallelizing many similar functions and doesn’t require changing the source code for kmeans(). We simply call the kmeans() function on each of the workers using a smaller value of the nstart argument. Then we combine the results by picking the result with the smallest total within-cluster sum of squares.

But before we execute this in parallel, let’s try using this technique using the lapply() function to make sure it works. Once that is done, it will be fairly easy to convert to one of the snow parallel execution functions:

results <- lapply(rep(25, 4), function(nstart) kmeans(Boston, 4, nstart=nstart))
i <- sapply(results, function(result) result$tot.withinss)
result <- results[[which.min(i)]]

We used a vector of four 25s to specify the nstart argument in order to get equivalent results to using 100 in a single call to kmeans(). Generally, the length of this vector should be equal to the number of workers in your cluster when running in parallel.

Now let’s parallelize this algorithm. snow includes a number of functions that we could use, including clusterApply(), clusterApplyLB(), and parLapply(). For this example, we’ll use clusterApply(). You call it exactly the same as lapply(), except that it takes a snow cluster object as the first argument. We also need to load MASS on the workers, rather than on the master, since it’s the workers that use the “Boston” dataset.

Assuming that snow is loaded and that we have a cluster object named cl, here’s the parallel version:

ignore <- clusterEvalQ(cl, {library(MASS); NULL})
results <- clusterApply(cl, rep(25, 4), function(nstart) kmeans(Boston, 4, 
i <- sapply(results, function(result) result$tot.withinss)
result <- results[[which.min(i)]]

clusterEvalQ() takes two arguments: the cluster object, and an expression that is evaluated on each of the workers. It returns the result from each of the workers in a list, which we don’t use here. I use a compound expression to load MASS and return NULL to avoid sending unnecessary data back to the master process. That isn’t a serious issue in this case, but it can be, so I often return NULL to be safe.

As you can see, the snow version isn’t that much different than the lapply() version. Most of the work was done in converting it to use lapply(). Usually the biggest problem in converting from lapply() to one of the parallel operations is handling the data properly and efficiently. In this case, the dataset was in a package, so all we had to do was load the package on the workers.


The kmeans() function uses the function to choose the starting cluster centers, which depend on the random number generator. In order to get different solutions, the cluster workers need to use different streams of random numbers. Since the workers are randomly seeded when they first start generating random numbers,[8] this example will work, but it is good practice to use a parallel random number generator. See Random Number Generation for more information.

Initializing Workers

In the last section we used the clusterEvalQ() function to initialize the cluster workers by loading a package on each of them. clusterEvalQ() is very handy, especially for interactive use, but it isn’t very general. It’s great for executing a simple expression on the cluster workers, but it doesn’t allow you to pass any kind of parameters to the expression, for example. Also, although you can use it to execute a function, it won’t send that function to the worker first,[9] as clusterApply() does.

My favorite snow function for initializing the cluster workers is clusterCall(). The arguments are pretty simple: it takes a snow cluster object, a worker function, and any number of arguments to pass to the function. It simply calls the function with the specified arguments on each of the cluster workers, and returns the results as a list. It’s like clusterApply() without the x argument, so it executes once for each worker, like clusterEvalQ(), rather than once for each element in x.

clusterCall() can do anything that clusterEvalQ() does and more.[10] For example, here’s how we could use clusterCall() to load the MASS package on the cluster workers:

clusterCall(cl, function() { library(MASS); NULL })

This defines a simple function that loads the MASS package and returns NULL.[11] Returning NULL guarantees that we don’t accidentally send unnecessary data transfer back to the master.[12]

The following will load several packages specified by a character vector:

worker.init <- function(packages) {
  for (p in packages) {
    library(p, character.only=TRUE)
clusterCall(cl, worker.init, c('MASS', 'boot'))

Setting the character.only argument to TRUE makes library() interpret the argument as a character variable. If we didn’t do that, library() would attempt to load a package named p repeatedly.

Although it’s not as commonly used as clusterCall(), the clusterApply() function is also useful for initializing the cluster workers since it can send different data to the initialization function for each worker. The following creates a global variable on each of the cluster workers that can be used as a unique worker ID:

clusterApply(cl, seq(along=cl), function(id) WORKER.ID <<- id)

Load Balancing with clusterApplyLB

We introduced the clusterApply() function in the parallel K-Means example. The next parallel execution function that I’ll discuss is clusterApplyLB(). It’s very similar to clusterApply(), but instead of scheduling tasks in a round-robin fashion, it sends new tasks to the cluster workers as they complete their previous task. By round-robin, I mean that clusterApply() distributes the elements of x to the cluster workers one at a time, in the same way that cards are dealt to players in a card game. In a sense, clusterApply() (politely) pushes tasks to the workers, while clusterApplyLB() lets the workers pull tasks as needed. That can be more efficient if some tasks take longer than others, or if some cluster workers are slower.

To demonstrate clusterApplyLB(), we’ll execute Sys.sleep() on the workers, giving us complete control over the task lengths. Since our real interest in using clusterApplyLB() is to improve performance, we’ll use snow.time() to gather timing information about the overall execution.[13] We will also use snow.time()’s plotting capability to visualize the task execution on the workers:

sleeptime <- abs(rnorm(10, 10, 10))
tm <- snow.time(clusterApplyLB(cl, sleeptime, Sys.sleep))
image with no caption

Ideally there would be solid horizontal bars for nodes 1 through 4 in the plot, indicating that the cluster workers were always busy, and therefore running efficiently. clusterApplyLB() did pretty well, although there was some wasted time at the end.

Now let’s try the same problem with clusterApply():[14]

sleeptime <- abs(rnorm(10, 10, 10))
tm <- snow.time(clusterApply(cl, sleeptime, Sys.sleep))
image with no caption

As you can see, clusterApply() is much less efficient than clusterApplyLB() in this example: it took 53.7 seconds, versus 28.5 seconds for clusterApplyLB(). The plot shows how much time was wasted due to the round-robin scheduling.

But don’t give up on clusterApply(): it has its uses. It worked fine in the parallel K-Means example because we had the same number of tasks as workers. It is also used to implement the very useful parLapply() function, which we will discuss next.[15]

Task Chunking with parLapply

Now that we’ve discussed and compared clusterApply() and clusterApplyLB(), let’s consider parLapply(), a third parallel lapply() function that has the same arguments and basic behavior as clusterApply() and clusterApplyLB(). But there is an important difference that makes it perhaps the most generally useful of the three.

parLapply() is a high-level snow function, that is actually a deceptively simple function wrapping an invocation of clusterApply():

> parLapply
function (cl, x, fun, ...)
docall(c, clusterApply(cl, splitList(x, length(cl)), lapply, fun, ...))
<environment: namespace:snow>

Basically, parLapply() splits up x into a list of subvectors, and processes those subvectors on the cluster workers using lapply(). In effect, it is prescheduling the work by dividing the tasks into as many chunks as there are workers in the cluster. This is functionally equivalent to using clusterApply() directly, but it can be much more efficient, since there are fewer I/O operations between the master and the workers. If the length of x is already equal to the number of workers, then parLapply() has no advantage. But if you’re parallelizing an R script that already uses lapply(), the length of x is often very large, and at any rate is completely unrelated to the number of workers in your cluster. In that case, parLapply() is a better parallel version of lapply() than clusterApply().

One way to think about it is that parLapply() interprets the x argument differently than clusterApply(). clusterApply() is low-level, and treats x as a specification of the tasks to execute on the cluster workers using fun. parLapply() treats x as a source of disjoint input arguments to execute on the cluster workers using lapply() and fun. clusterApply() gives you more control over what gets sent to who, while parLapply() provides a convenient way to efficiently divide the work among the cluster workers.

An interesting consequence of parLapply()’s work scheduling is that it is much more efficient than clusterApply() if you have many more tasks than workers, and one or more large, additional arguments to pass to parLapply(). In that case, the additional arguments are sent to each worker only once, rather than possibly many times. Let’s try doing that, using a slightly altered parallel sleep function that takes a matrix as an argument:

bigsleep <- function(sleeptime, mat) Sys.sleep(sleeptime)
bigmatrix <- matrix(0, 2000, 2000)
sleeptime <- rep(1, 100)

I defined the sleeptimes to be small, many, and equally sized. This will accentuate the performance differences between clusterApply() and parLapply():

tm <- snow.time(clusterApply(cl, sleeptime, bigsleep, bigmatrix))
image with no caption

This doesn’t look very efficient: you can see that there are many sends and receives between the master and the workers, resulting in relatively big gaps between the compute operations on the cluster workers. The gaps aren’t due to load imbalance as we saw before: they’re due to I/O time. We’re now spending a significant fraction of the elapsed time sending data to the workers, so instead of the ideal elapsed time of 25 seconds,[16] it’s taking 77.9 seconds.

Now let’s do the same thing using parLapply():

tm <- snow.time(parLapply(cl, sleeptime, bigsleep, bigmatrix))
image with no caption

The difference is dramatic, both visually and in elapsed time: it took only 27.2 seconds, beating clusterApply() by 50.7 seconds.

Keep in mind that this particular use of clusterApply() is bad: it is needlessly sending the matrix to the worker with every task. There are various ways to fix that, and using parLapply() happens to work well in this case. On the other hand, if you’re sending huge objects in x, then there’s not much you can do, and parLapply() isn’t going to help. My point is that parLapply() schedules work in a useful and efficient way, making it probably the single most useful parallel execution function in snow. When in doubt, use parLapply().

Vectorizing with clusterSplit

In the previous section I showed you how parLapply() uses clusterApply() to implement a parallel operation that solves a certain class of parallel program quite nicely. Recall that parLapply() executes a user-supplied function for each element of x just like clusterApply(). But what if we want the function to operate on subvectors of x? That’s similar to what parLapply() does, but is a bit easier to implement, since it doesn’t need to use lapply() to call the user’s function.

We could use the splitList() function, like parLapply() does, but that is a snow internal function. Instead, we’ll use the clusterSplit() function which is very similar, and slightly more convenient. Let’s try splitting the sequence from 1 to 30 for our cluster using clusterSplit():

> clusterSplit(cl, 1:30)
[1] 1 2 3 4 5 6 7 8

[1]  9 10 11 12 13 14 15

[1] 16 17 18 19 20 21 22

[1] 23 24 25 26 27 28 29 30

Since our cluster has four workers, it splits the sequence into a list of four nearly equal length vectors, which is just what we need.

Now let’s define parVapply() to split x using clusterSplit(), execute the user function on each of the pieces using clusterApply(), and combine the results using and c():

parVapply <- function(cl, x, fun, ...) {"c", clusterApply(cl, clusterSplit(cl, x), fun, ...))

Like parLapply(), parVapply() always issues the same number of tasks as workers. But unlike parLapply(), the user-supplied function is only executed once per worker. Let’s use parVapply() to compute the cube root of numbers from 1 to 10 using the ^ function:

> parVapply(cl, 1:10, "^", 1/3)
 [1] 1.000000 1.259921 1.442250 1.587401 1.709976 1.817121 1.912931 2.000000
 [9] 2.080084 2.154435

This works because the ^ function takes a vector as its first argument and returns a vector of the same length.[17]


This technique can be a useful for executing vector functions in parallel. It may also be more efficient than using parLapply(), for example, but for any function worth executing in parallel, the difference in efficiency is likely to be small. And remember that most, if not all, vector functions execute so quickly that it is never worth it to execute them in parallel with snow. Such fine-grained problems fall much more into the domain of multithreaded computing.

Load Balancing Redux

We’ve talked about the advantages of parLapply() over clusterApply() at some length. In particular, when there are many more tasks than cluster workers and the task objects sent to the workers are large, there can be serious performance problems with clusterApply() that are solved by parLapply(). But what if the task execution has significant variation so that we need load balancing? clusterApplyLB() does load balancing, but would have the same performance problems as clusterApply(). We would like a load balancing equivalent to parLapply(), but there isn’t one—so let’s write it.[18]

In order to achieve dynamic load balancing, it helps to have a number of tasks that is at least a small integer multiple of the number of workers. That way, a long task assigned to one worker can be offset by many shorter tasks being done by other workers. If that is not the case, then the other workers will sit idle while the one worker completes the long task. parLapply() creates exactly one task per worker, which is not what we want in this case. Instead, we’ll first send the function and the fixed arguments to the cluster workers using clusterCall(), which saves them in the global environment, and then send the varying argument values using clusterApplyLB(), specifying a function that will execute the user-supplied function along with the full collection of arguments.

Here are the function definitions for parLapplyLB() and the two functions that it executes on the cluster workers:

parLapplyLB <- function(cl, x, fun, ...) {
  clusterCall(cl, LB.init, fun, ...)
  r <- clusterApplyLB(cl, x, LB.worker)
  clusterEvalQ(cl, rm('', '.LB.args', pos=globalenv()))
LB.init <- function(fun, ...) {
  assign('', fun, pos=globalenv())
  assign('.LB.args', list(...), pos=globalenv())
LB.worker <- function(x) {'', c(list(x), .LB.args))

parLapplyLB() initializes the workers using clusterCall(), executes the tasks with clusterApplyLB(), cleans up the global environment of the cluster workers with clusterEvalQ(), and finally returns the task results.

That’s all there is to implementing a simple and efficient load balancing parallel execution function. Let’s compare clusterApplyLB() to parLapplyLB() using the same test function that we used to compare clusterApply() and parLapply(), starting with clusterApplyLB():

bigsleep <- function(sleeptime, mat) Sys.sleep(sleeptime)
bigmatrix <- matrix(0, 2000, 2000)
sleeptime <- rep(1, 100)
tm <- snow.time(clusterApplyLB(cl, sleeptime, bigsleep, bigmatrix))
image with no caption

There are lots of gaps in the execution bars due to high I/O time: the master is barely able to supply the workers with tasks. Obviously this problem isn’t going to scale to many more workers.

Now let’s try our new parLapplyLB() function:

tm <- snow.time(parLapplyLB(cl, sleeptime, bigsleep, bigmatrix))
image with no caption

That took only 28.4 seconds versus 53.2 seconds for clusterApplyLB().

Notice that the first task on each worker has a short execution time, but a long task send time, as seen by the slope of the first four lines between the master (node 0) and the workers (nodes 1-4). Those are the worker initialization tasks executed by clusterCall() that send the large matrix to the workers. The tasks executed via clusterApplyLB() were more efficient, as seen by the vertical communication lines and the solid horizontal bars.


By using short tasks, I was able to demonstrate a pretty noticeable difference in performance, but with longer tasks, the difference becomes less significant. In other words, we can realize decent efficiency whenever the time to compute a task significantly exceeds the time needed to send the inputs to and return the outputs from the worker evaluating the task.

Functions and Environments


This section discusses a number of rather subtle points. An understanding of these is not essential for basic snow use, but could be invaluable when trying to debug more complicated usage scenarios. The reader may want to skim through this on a first reading, but remember to return to it if a seemingly obscure problem crops up.

Most of the parallel execution functions in snow take a function object as an argument, which I call the worker function, since it is sent to the cluster workers, and subsequently executed by them. In order to send it to the workers, the worker function must be serialized into a stream of bytes using the serialize() function.[19] That stream of bytes is converted into a copy of the original object using the unserialize() function.

In addition to a list of formal arguments and a body, the worker function includes a pointer to the environment in which it was created. This environment becomes the parent of the evaluation environment when the worker function is executed, giving the worker function access to non-local variables. Obviously, this environment must be serialized along with the rest of the worker function in order for the function to work properly after being unserialized.

However, environments are serialized in a special way in R. In general, the contents are included when an environment is serialized, but not always. Name space environments are serialized by name, not by value. That is, the name of the package is written to the resulting stream of bytes, not the symbols and objects contained in the environment. When a name space is unserialized, it is reconstructed by finding and loading the corresponding package. If the package cannot be loaded, then the stream of bytes cannot be unserialized. The global environment is also serialized by name, and when it is unserialized, the resulting object is simply a reference to the existing, unmodified global environment.

So what does this mean to you as a snow programmer? Basically, you must ensure that all the variables needed to execute the worker function are available after it has been unserialized on the cluster workers. If the worker function’s environment is the global environment and the worker function needs to access any variables in it, you need to send those variables to the workers explicitly. This can be done, for example, by using the clusterExport() function. But if the worker function was created by another function, its environment is the evaluation environment of the creator function when the worker function was created. All the variables in this environment will be serialized along with the worker function, and accessible to it when it is executed by the cluster workers. This can be a handy way of making variables available to the worker function, but if you’re not careful, you could accidentally serialize large, unneeded objects along with the worker function, causing performance to suffer. Also, if you want the worker function to use any of the creator function’s arguments, you need to evaluate those arguments before calling parLapply() or clusterApplyLB(); otherwise, you may not be able to evaluate them successfully on the workers due to R’s lazy argument evaluation.

Let’s look at a few examples to illustrate some of these issues. We’ll start with a script that multiplies a vector x by a sequence of numbers:

a <- 1:4
x <- rnorm(4)
clusterExport(cl, "x")
mult <- function(s) s * x
parLapply(cl, a, mult)

In this script, the function mult() is defined at the top level, so its environment is the global environment.[20] Thus, x isn’t serialized along with mult(), so we need to send it to the cluster workers using the clusterExport() function. Of course, a more natural solution in this case would be to include x as an explicit argument to mult(), and then parLapply() would send it to the workers for us. However, using clusterExport() could be more efficient if we were going to reuse x by calling mult() many times with parLapply().

Now let’s turn part of this script into a function. Although this change may seem trivial, it actually changes the way mult() is serialized in parLapply():

pmult <- function(cl) {
  a <- 1:4
  x <- rnorm(4)
  mult <- function(s) s * x
  parLapply(cl, a, mult)

Since mult() is created by pmult(), all of pmult()’s local variables will be accessible when mult() is executed by the cluster workers, including x. Thus, we no longer call clusterExport().

Pmult() would be more useful if the values to be multiplied weren’t hardcoded, so let’s improve it by passing a and x in as arguments:

pmult <- function(cl, a, x) {
  x  # force x
  mult <- function(s) s * x
  parLapply(cl, a, mult)
scalars <- 1:4
dat <- rnorm(4)
pmult(cl, scalars, dat)

At this point, you may be wondering why x is on a line by itself with the cryptic comment “force x”. Although it may look like it does nothing, this operation forces x to be evaluated by looking up the value of the variable dat (the actual argument corresponding to x that is passed to the function when pmult() is invoked) in the caller’s execution environment. R uses lazy argument evaluation, and since x is now an argument, we have to force its evaluation before calling parLapply(); otherwise, the workers will report that dat wasn’t found, since they don’t have access to the environment where dat is defined. Note that they wouldn’t say x wasn’t found: they would find x, but wouldn’t be able to evaluate it because they don’t have access to dat. By evaluating x before calling parLapply(), mult()’s environment will be serialized with x set to the value of dat, rather than the symbol dat.

Notice in this last example that, in addition to x, a and cl are also serialized along with mult(). mult() doesn’t need to access them, but since they are defined in pmult’s evaluation environment, they will be serialized along with mult(). To prevent that, we can reset the environment of mult() to the global environment and pass x to mult() explicitly:

pmult <- function(cl, a, x) {
  mult <- function(s, x) s * x
  environment(mult) <- .GlobalEnv
  parLapply(cl, a, mult, x)
scalars <- 1:4
dat <- rnorm(4)
pmult(cl, scalars, dat)

Of course, another way to achieve the same result is to create mult() at the top level of the script so that mult() is associated with the global environment in the first place.

Unfortunately, you run into some tricky issues when sending function objects over the network. You may conclude that you don’t want to use the worker function’s environment to send data to your cluster workers, and that’s a perfectly reasonable position. But hopefully you now understand the issues well enough to figure out what methods work best for you.

Random Number Generation

As I mentioned previously, snow is very useful for performing Monte Carlo simulations, bootstrapping, and other operations that depend on the use of random numbers. When running such operations in parallel, it’s important that the cluster workers generate different random numbers; otherwise, the workers may all replicate each other’s results, defeating the purpose of executing in parallel. Rather than using ad-hoc schemes for seeding the workers differently, it is better to use a parallel random number generator package. snow provides support for the rlecuyer and rsprng packages, both of which are available on CRAN. With one of these packages installed on all the nodes of your cluster, you can configure your cluster workers to use it via the clusterSetupRNG() function. The type argument specifies which generator to use. To use rlecuyer, set type to RNGstream:

clusterSetupRNG(cl, type='RNGstream')

To use rsprng, set type to SPRNG:

clusterSetupRNG(cl, type='SPRNG')

You can specify a seed using the seed argument. rsprng uses a single integer for the seed, while rlecuyer uses a vector of six integers:

clusterSetupRNG(cl, type='RNGstream', seed=c(1,22,333,444,55,6))


When using rsprng, a random seed is used by default, but not with rlecuyer. If you want to use a random seed with rlecuyer, you’ll have to specify it explicitly using the seed argument.

Now the standard random number functions will use the specified parallel random number generator:

> unlist(clusterEvalQ(cl, rnorm(1)))
[1] -1.0452398 -0.3579839 -0.5549331  0.7823642

If you reinitialize the cluster workers using the same seed, you will get the same random number from each of the workers.

We can also get reproducible results using clusterApply(), but not with clusterApplyLB() because clusterApply() always uses the same task scheduling, while clusterApplyLB() does not.[21]

snow Configuration

snow includes a number of configuration options for controlling the way the cluster is created. These options can be specified as named arguments to the cluster creation function (makeCluster(), makeSOCKcluster(), makeMPIcluster(), etc.). For example, here is the way to specify an alternate hostname for the master:

cl <- makeCluster(3, type="SOCK", master="")


The default value of master is computed as[['nodename']]. However, there’s no guarantee that the workers will all be able to resolve that name to an IP address. By setting master to an appropriate dot-separated IP address, you can often avoid hostname resolution problems.

You can also use the setDefaultClusterOptions() function to change a default configuration option during an R session. By default, the outfile option is set to /dev/null, which causes all worker output to be redirected to the null device (the proverbial bit bucket). To prevent output from being redirected, you can change the default value of outfile to the empty string:


This is a useful debugging technique which we will discuss more in Troubleshooting snow Programs.

Here is a summary of all of the snow configuration options:

Table 2-1. snow configuration options

NameTypeDescriptionDefault value



Port that the master listens on




Socket timeout in seconds

31536000 (one year in seconds)



Master’s hostname that workers connect to["nodename"]



Are workers homogeneous?

TRUE if R_SNOW_LIB set, else FALSE



Type of cluster makeCluster should create

NULL, which is handled specially



Worker log file

“/dev/null” “nul:” on Windows



Home of R installation, used to locate R executable




User for remote execution["user"]



Remote execution command




Location of R packages




Location of snow worker scripts

snow installation directory



Path of R executable




Path of “library” where snow is installed

directory in which snow is installed



Path of Rscript command

$R_HOME/bin/Rscript $R_HOME/bin/Rscript.exe on Windows



Should workers be started using Rscript command?

TRUE if file specified by Rscript exists



Should workers be started manually?


It is possible, although a bit tricky, to configure different workers differently. I’ve done this when running a snow program in parallel on an ad-hoc collection of workstations. In fact, there are two mechanisms available for that with the socket transport. The first approach works for all the transports. You set the homogeneous option to FALSE, which causes snow to use a special startup script to launch the workers. This alternate script doesn’t assume that the worker nodes are set up the same as the master node, but can look for R or Rscript in the user’s PATH, for example. It also supports the use of environment variables to configure the workers, such as R_SNOW_RSCRIPT_CMD and R_SNOW_LIB to specify the path of the Rscript command and the snow installation directory. These environment variables can be set to appropriate values in the user’s environment on each worker machine using the shell’s start up scripts.

The second approach to heterogeneous configuration only works with the socket and nws transports. When you call makeSOCKcluster(), you specify the worker machines as a list of lists. In this case, the hostname of the worker is specified by the host element of each sublist. The other elements of the sublists are used to override the corresponding option for that worker.

Let’s say we want to create a cluster with two workers: n1 and n2, but we need to log in as a different user on machine n2:

> workerList <- list(list(host = "n1"), list(host = "n2", user = "steve"))
> cl <- makeSOCKcluster(workerList)
> clusterEvalQ(cl,[["user"]])
[1] "weston"

[1] "steve"

> stopCluster(cl)

It can also be useful to set the outfile option differently to avoid file conflicts between workers:

> workerList <- list(list(host = "n1", outfile = "n1.log", user = "weston"), 
+                    list(host = "n2", outfile = "n2-1.log"), 
+                    list(host = "n2", outfile = "n2-2.log"))
> cl <- makeSOCKcluster(workerList, user = "steve")
> clusterEvalQ(cl, Sys.glob("*.log"))
[1] "n1.log"

[1] "n2-1.log" "n2-2.log"

[1] "n2-1.log" "n2-2.log"

> stopCluster(cl)

This also demonstrates that different methods for setting options can be used together. The machine-specific option values always take precedence.


I prefer to use my ssh config file to specify a different user for different hosts, but obviously that doesn’t help with setting outfile.

Installing Rmpi

As I mentioned previously, installing Rmpi can be problematic because it depends on MPI being previously installed. Also, there are multiple MPI distributions, and some of the older distributions have compatibility problems with Rmpi. In general, Open MPI is the preferred MPI distribution. Fortunately, Open MPI is readily available for modern Linux systems. The website for the Open MPI Project is

Another problem is that there isn’t a binary distribution of Rmpi available for Windows. Thus, even if you have MPI installed on a Windows machine, you will also need to install Rmpi from the source distribution, which requires additional tools that may also need to be installed. For more information on installing Rmpi on Windows, see the documentation in the Rmpi package. That’s beyond the scope of this book.

Installation of Rmpi on the Mac was quite simple on Mac OS X 10.5 and 10.6, both of which came with Open MPI, but unfortunately, Apple stopped distributing it in Mac OS X 10.7. If you’re using 10.5 or 10.6, you can (hopefully) install Rmpi quite easily:[22]


If you’re using Mac OS X 10.7, you’ll have to install Open MPI first, and then you’ll probably have to build Rmpi from the source distribution since the binary distribution probably won’t be compatible with your installation of Open MPI. I’ll discuss installing Rmpi from the source distribution shortly, but not Open MPI.

On Debian/Ubuntu, Rmpi is available in the “r-cran-rmpi” Debian package, and can be installed with apt-get. That’s the most foolproof way to install Rmpi on Ubuntu, for example, since apt-get will automatically install a compatible version of MPI, if necessary.

For non-Debian based systems, I recommend that you install Open MPI with your local packaging tool, and then try to use install.packages() to install Rmpi. This will fail if the configuration script can’t find the MPI installation. In that case you will have to download the source distribution, and install it using a command such as:

% R CMD INSTALL --configure-args="--with-mpi=$MPI_PATH" Rmpi_0.5-9.tar.gz

where the value of MPI_PATH is the directory containing the Open MPI lib and include directories.[23] Notice that this example uses the --configure-args argument to pass the --with-mpi argument to Rmpi’s configure script. Another important configure argument is --with-Rmpi-type, which may need to be set to “OPENMPI”, for example.

As I’ve said, installing Rmpi from source can be difficult. If you run into problems and don’t want to switch to Debian/Ubuntu, your best bet is to post a question on the R project’s “R-sig-hpc” mailing list. You can find it by clicking on the “Mailing Lists” link on the R project’s home page.

Executing snow Programs on a Cluster with Rmpi

Throughout this chapter I’ve been using the socket transport because it doesn’t require any additional software to install, making it the most portable snow transport. However, the MPI transport is probably the most popular, at least on clusters. Of course, most of what we’ve discussed is independent of the transport. The difference is mostly in how the cluster object is created and how the snow script is executed.

To create an MPI cluster object, set the type argument of makeCluster() to MPI or use the makeMPIcluster() function. If you’re running interactively, you can create an MPI cluster object with four workers as follows:

cl <- makeCluster(4, type="MPI")

This is equivalent to:

cl <- makeMPIcluster(4)

This creates a spawned cluster, since the workers are all started by snow for you via the mpi.comm.spawn() function.

Notice that we don’t specify which machines to use, only the number of workers. For that reason, I like to compute the worker count using the mpi.universe.size() function, which returns the size of the initial runtime environment.[24] Since the master process is included in that size, the worker count would be computed as mpi.universe.size() - 1.[25]

We shut down an MPI cluster the same as any cluster:


As you can see, there isn’t much to creating an MPI cluster object. You can specify configuration options, just as with a socket cluster, but basically it is very simple. However, you should be aware that the cluster workers are launched differently depending on how the R script was executed. If you’re running interactively, for example, the workers will always be started on the local machine. The only way that I know of to start the workers on remote machines is to execute the R interpreter using a command such as mpirun, mpiexec, or in the case of Open MPI, orterun.

As I noted previously, you can’t specify the machines on which to execute the workers with makeMPIcluster(). That is done with a separate program that comes with your MPI distribution. Open MPI comes with three utilities for executing MPI programs: orterun, mpirun, and mpiexec, but they all work in exactly the same way,[26] so I will refer to orterun for the rest of this discussion.

orterun doesn’t know anything about R or R scripts, so we need to use orterun to execute the R interpreter, which in turn executes the R script. Let’s start by creating an R script (Example 2-1), which I’ll call mpi.R.

Example 2-1. mpi.R

cl <- makeMPIcluster(mpi.universe.size() - 1)
r <- clusterEvalQ(cl, R.version.string)

This is very similar to our very first example, except that it loads the Rmpi package, calls makeMPIcluster() rather than makeSOCKcluster(), and calls mpi.quit() at the end. Loading Rmpi isn’t strictly necessary, since calling makeMPIcluster() will automatically load Rmpi, but I like to do it explicitly. makeMPIcluster() creates the MPI cluster object, as discussed in the previous section. mpi.quit() terminates the MPI execution environment, detaches the Rmpi package, and quits R, so it should always go at the end of your script. This is often left out, but I believe it is good practice to call it.[27] I’ve gotten very stern warning messages from orterun in some cases when I failed to call mpi.quit().

To execute mpi.R using the local machine as the master, and n1, n2, n3 and n4 as the workers, we can use the command:[28]

% orterun -H localhost,n1,n2,n3,n4 -n 1 R --slave -f mpi.R

The -H option specifies the list of machines available for execution. By using -n 1, orterun will only execute the command R --slave -f mpi.R on the first machine in the list, which is localhost in this example. This process is the master, equivalent to the interactive R session in our previous snow examples. When the master executes makeMPIcluster(mpi.universe.size() - 1), four workers will be spawned. orterun will execute these workers on machines n1, n2, n3 and n4, since they are next in line to receive a process.

Those are the basics, but there are a few other issues to bear in mind. First, the master and the worker processes have their working directory set to the working directory of the process executing orterun. That’s no problem for the master in our example, since the master runs on the same machine as orterun. But if there isn’t a directory with the same path on any of the worker machines, you will get an error. For that reason, it is useful to work from a directory that is shared across the cluster via a network file system. That isn’t necessary, however. If you specify the full path to the R script, you could use the orterun -wdir option to set the working directory to /tmp:

% orterun -wdir /tmp -H localhost,n1,n2,n3,n4 -n 1 R --slave -f ~/mpi.R

This example still assumes that R is in your search path on localhost. If it isn’t, you can specify the full path of the R interpreter on localhost.

That can solve some of the orterun related problems, but snow still makes a number of assumptions about where to find things on the workers as well. See snow Configuration for more information.

Executing snow Programs with a Batch Queueing System

Many cluster administrators require that all parallel programs be executed via a batch queueing system. There are different ways that this can be done, and different batch queueing systems, but I will describe a method that has been commonly used for a long time, and is supported by many batch queueing systems, such as PBS/TORQUE, SGE and LSF.

Basically you submit a shell script, and the shell script executes your R script using orterun as we described in the section Executing snow Programs on a Cluster with Rmpi. When you submit the shell script, you tell the batch queueing system how many nodes you want using the appropriate argument to the submit command. The shell script may need to read an environment variable to learn what nodes it can execute on, and then pass that information on to the orterun command via an argument such as -hostfile or -H.

Of course the details vary depending on the batch queueing system, MPI distribution, and cluster configuration. As an example, I’ll describe how this can be done using PBS/TORQUE and Open MPI.

It’s actually very simple to use PBS/TORQUE with Open MPI, since Open MPI automatically gets the list of hosts using the environment variables set by PBS/TORQUE.[29] The code in Example 2-2 simplifies the orterun command used in the script.

Example 2-2.

#PBS -j oe
orterun -n 1 /usr/bin/R --slave -f mpi.R > mpi-$PBS_JOBID.out 2>&1

This script uses PBS directives to specify the name of the job, and to merge the job’s standard output and standard error. It then cd’s to the directory from which you submitted the job, which is helpful for finding the mpi.R script. Finally it uses orterun to execute mpi.R.

We submit using the PBS/TORQUE qsub command:

% qsub -q devel -l nodes=2:ppn=4

This submits the shell script to the devel queue, requesting two nodes with four processors per node. The -l option is used to specify the resources needed by the job. The resource specifications vary from cluster to cluster, so talk to your cluster administrator to find out how you should specify the number of nodes and processors.

If you’re using LSF or SGE, you will probably need to specify the hosts via the orterun -hostfile or -H option. For LSF, use the bsub -n option to specify the number of cores, and the LSB_HOSTS environment variable to get the allocated hosts. With SGE, use the qsub -pe option and the PE_HOSTFILE environment variable. The details are different, but the basic idea is the same.

Troubleshooting snow Programs

Unfortunately, a lot of things can go wrong when using snow. That’s not really snow’s fault: there’s just a lot of things that have to be set up properly, and if the different cluster nodes are configured differently, snow may have trouble launching the cluster workers. It’s possible to configure snow to deal with heterogeneous clusters.[30] Fortunately, if your cluster is already used for parallel computing, there’s a good chance it is already set up in a clean, consistent fashion, and you won’t run into any problems when using snow.

Obviously you need to have R and snow installed on all of the machines that you’re attempting to use for your cluster object. You also need to have ssh servers running on all of the cluster workers if using the socket transport, for instance.

There are several techniques available for finding out more information about what is going wrong.

When using the socket transport, the single most useful method of troubleshooting is manual mode. In manual mode, you start the workers yourself, rather than having snow start them for you. That allows you to run snow jobs on a cluster that doesn’t have ssh servers, for example. But there are also a few other advantages to manual mode. For one thing, it makes it easier to see error messages. Rather than searching for them in log files, they can be displayed right in your terminal session.

To enable manual mode, set the manual option to TRUE when creating the socket cluster object. I also recommend specifying outfile="", which prevents output from being redirected:

cl <- makeCluster(2, type="SOCK", manual=TRUE, outfile="")

makeCluster() will display the command to start each of the workers. For each command, I open a new terminal window, ssh to the specified machine,[31] and cut and paste the specified command into the shell.

In many cases, you’ll get an error message as soon as you execute one of these commands, and the R session will exit. In that case, you need to figure out what caused the error, and solve the problem. That may not be simple, but at least you have something better to search for than “makeCluster hangs.” But very often, the error is pretty obvious, like R or snow isn’t installed. Also, snow may not guess the right hostname for the workers to use to connect back to the master process. In this case, R starts up and snow runs, but nothing happens. You can use your terminal window to use various network tools (nslookup, ping) to diagnose this problem.

Let’s create a socket cluster using manual mode and examine the output:

> cl <- makeCluster(c('n1', 'n2'), type="SOCK", manual=TRUE, outfile="")
Manually start worker on n1 with
     /usr/lib/R/bin/Rscript /usr/lib/R/site-library/snow/RSOCKnode.R
MASTER=beard PORT=10187 OUT= SNOWLIB=/usr/lib/R/site-library

The argument MASTER=beard indicates that the value of the master option is “beard.” You can now use the ping command from your terminal window on n1 to see if the master is reachable from n1 by that name. Here’s the kind of output that you should see:

n1% ping beard
PING beard ( 56(84) bytes of data.
64 bytes from beard ( icmp_req=1 ttl=64 time=0.020 ms

This demonstrates that n1 is able to resolve the name “beard,” knows a network route to that IP address, can get past any firewall, and is able to get a reply from the master machine.[32]

But if ping issues the error message “ping: unknown host beard”, then you have a hostname resolution problem. Setting the master option to a different value when creating the cluster might fix the problem. Other errors may indicate a networking problem that can be fixed by your sysadmin.

If the value of master seems good, you should execute the command displayed by makeCluster() in hopes of getting a useful error message. Note that many of these problems could occur using any snow transport, so running a simple snow test code using the socket transport and manual mode can be an effective means to ensure a good setup even if you later intend to use a different transport.

The outfile option in itself is also useful for troubleshooting. It allows you to redirect debug and error messages to a specified file. By default, output is redirected to /dev/null. I often use an empty string ("") to prevent any redirection, as we described previously.

Here are some additional troubleshooting tips:

  • Start by running on only one machine to make sure that works

  • Manually ssh to all of the workers from the master machine

  • Set the master option to a value that all workers can resolve, possibly using a dot-separated IP address

  • Run your job from a directory that is available on all machines

  • Check if there are any firewalls that might interfere

When It Works…

snow is a fairly high-level package, since it doesn’t focus on low-level communication operations, but on execution. It provides a useful variety of functions that support embarrassingly parallel computation.

…And When It Doesn’t

Communications difficulties: snow doesn’t provide functions for explicitly communicating between the master and workers, and in fact, the workers never communicate between themselves. In order to communicate between workers, you would have to use functions in the underlying communication package. Of course, that would make your program less portable, and more complicated. A package that needed to do that would probably not use snow, but use a package like nws or Rmpi directly.

The Wrap-up

In this chapter, you got a crash course on the snow package, including some advanced topics such as running snow programs via a batch queueing system. snow is a powerful package, able to run on clusters with hundreds of nodes. But if you’re more interested in running on a quad-core laptop than a supercomputer, the next chapter on the multicore package will be of particular interest to you.

[5] The multicore package is generally preferred on multicore computers, but it isn’t supported on Windows. See Chapter 3 for more information on the multicore package.

[6] This can be overridden via the rshcmd option, but the specified command must be command line-compatible with ssh.

[7] These clusters shouldn’t be confused with cluster objects and cluster workers.

[8] All R sessions are randomly seeded when they first generate random numbers, unless they were restored from a previous R session that generated random numbers. snow workers never restore previously saved data, so they are always randomly seeded.

[9] How exactly snow sends functions to the workers is a bit complex, raising issues of execution context and environment. See Functions and Environments for more information.

[10] This is guaranteed since clusterEvalQ() is implemented using clusterCall().

[11] Defining anonymous functions like this is very useful, but can be a source of performance problems due to R’s scoping rules and the way it serializes functions. See Functions and Environments for more information.

[12] The return value from library() isn’t big, but if the initialization function was assigning a large matrix to a variable, you could inadvertently send a lot of data back to the master, significantly hurting the performance of your program.

[13] snow.time() is available in snow as of version 0.3-5.

[14] I’m setting the RNG seed so we get the same value of sleeptime as in the previous example.

[15] It’s also possible that the extra overhead in clusterApplyLB() to determine which worker is ready for the next task could make clusterApply() more efficient in some case, but I’m skeptical.

[16] The ideal elapsed time is sum(sleeptime) / length(cl).

[17] Normally the second argument to ^ can have the same length as the first, but it must be length one in this example because parVapply() only splits the first argument.

[18] A future release of snow could optimize clusterApplyLB() by not sending the function and constant arguments to the workers in every task. At that point, this example will lose any practical value that it may have.

[19] Actually, if you specify the worker function by name, rather than by providing the definition of the function, most of the parallel execution functions (parLapply() is currently an exception) will use that name to look up that function in the worker processes, thus avoiding function serialization.

[20] You can verify this with the command environment(mult).

[21] Actually, you can achieve reproducibility with clusterApplyLB() by setting the seed to a task specific value. This can be done by adding the operation to the beginning of the worker function, or if using a function from a library, wrapping that function in a new function that sets the seed and then calls the library function.

[22] It’s possible that newer versions of Rmpi won’t be built for the Mac on CRAN because it won’t work on Mac OS X 10.7, but it’s still available as I’m writing this in September 2011.

[23] I use the command locate include/mpi.h to find this directory. On my machine, this returns /usr/lib/openmpi/include/mpi.h, so I set MPI_PATH to /usr/lib/openmpi.

[24] mpi.universe.size() had a bug in older versions of Rmpi, so you may need to upgrade to Rmpi 0.5-9.

[25] I don’t use mpi.universe.size() when creating an MPI cluster in an interactive session, since in that context, mpi.universe.size() returns 1, which would give an illegal worker count of zero.

[26] orterun, mpirun, and mpiexec are in fact the same program in Open MPI.

[27] You can use mpi.finalize() instead, which doesn’t quit R.

[28] The orterun command in Open MPI accepts several different arguments to specify the host list and the number of workers. It does this to be compatible with previous MPI distributions, so don’t be confused if you’re used to different argument names.

[29] Actually, it’s possible to configure Open MPI without support for PBS/TORQUE, in which case you’ll have to include the arguments -hostfile $PBS_NODEFILE when executing orterun.

[30] We discuss heterogeneous configuration in snow Configuration.

[31] If ssh fails at this point, you may have found your problem.

[32] Of course, just because ping can get past a firewall doesn’t mean that snow can. As you can see from the manual mode output, the master process is listening on port 10187, so you may have to configure your firewall to allow connections on that port. You could try the command telnet beard 10187 as a further test.

