Chapter 6. R+Hadoop

Of the three Hadoop-related strategies we discuss in this book, this is the most raw: you get to spend time up close and personal with the system. On the one hand, that means you have to understand Hadoop. On the other hand, it gives you the most control. I’ll walk you through Hadoop programming basics and then explain how to use it to run your R code.

If you skipped straight to this chapter, but you’re new to Hadoop, you’ll want to review Chapter 5.

Quick Look

Motivation: You need to run the same R code many times over different parameters or inputs. For example, you plan to test an algorithm over a series of historical data.

Solution: Use a Hadoop cluster to run your R code.

Good because: Hadoop distributes work across a cluster of machines. As such, using Hadoop as a driver overcomes R’s single-threaded limitation as well as its memory boundaries.

How It Works

There are several ways to submit work to a cluster, two of which are relevant to R users: streaming and the Java API.

In streaming, you write your Map and Reduce operations as R scripts. (Well, streaming lets you write Map and Reduce code in pretty much any scripting language; but since this is a book about R, let’s pretend that R is all that exists.) The Hadoop framework launches your R scripts at the appropriate times and communicates with them via standard input and standard output.

By comparison, when using the Java API, your Map and Reduce operations are written in Java. Your Java code, in turn, invokes Runtime.exec() or some equivalent to launch your R scripts.

Which is the appropriate method depends on several factors, including your understanding of Java versus R, and the particular problem you’re trying to solve. Streaming tends to win for rapid development. The Java API is useful for working with binary or output input data such as images or sound files. (You can still use streaming for binary data, mind you, but it requires additional programming and infrastructure overhead. I’ll explain that in detail in the code walkthroughs.)

Setting Up

You can fetch the Hadoop distribution from http://hadoop.apache.org/. So long as you also have a Java runtime (JRE or SDK) installed, this is all you’ll need to submit work to a Hadoop cluster. Just extract the ZIP or tar file and run the hadoop command as we describe below.

Check with your local Hadoop admins for details on how to connect to your local cluster. If you don’t have a Hadoop cluster, you can peek at Chapter 5 for some hints on how to get a cluster in the cloud.

Working with It

Let’s take a walk through some examples of mixing Hadoop and R. In three cases, I’ll only use the Map phase of MapReduce for simple task parallelization. In the fourth example, I’ll use the full Map and Reduce to populate and operate on a data.frame.

The unifying theme of these examples is the need to execute a block of long-running R code for several (hundred, or thousand, or whatever) iterations. Perhaps it is a function that will run once for each of many input values, such as an analysis over each day’s worth of historical data or a series of Markov Chains.[50] Maybe you’re trying a variety of permutations over a function’s parameter values in search of some ideal set, such as in a timeseries modeling exercise.[51] So long as each iteration is independent—that is, it does not rely on the results from any previous iteration—this is an ideal candidate for parallel execution.

Some examples will borrow the “phone records” data format mentioned in the previous chapter.

Simple Hadoop Streaming (All Text)

Situation: In this first example, the input data is several million lines of plain-text phone call records. Each CSV input line is of the format:

{date},{caller num},{caller carrier},{dest num},{dest carrier},{length}

The plan is to analyze each call record separately, so there’s no need to sort and group the data. In turn, we won’t need the full MapReduce cycle but can use a Map-only job to distribute the work throughout the cluster.

The code: To analyze each call record, consider a function callAnalysis() that takes all of the record’s fields as parameters:

callAnalysis( date , caller.num, caller.carrier , dest.num , dest.carrier , length )

Hadoop streaming does not invoke R functions directly. You provide an R script that calls the functions, and Hadoop invokes your R script. Specifically, Hadoop will pass an entire input record to the Map operation R script via standard input. It’s up to your R script to disassemble the record into its components (here, split it by commas) and feed it into the function (see Example 6-1).

Example 6-1. mapper.R

#! /usr/bin/env Rscript
input <- file( "stdin" , "r" )
while( TRUE ){
   currentLine <- readLines( input , n=1 )  1

   if( 0 == length( currentLine ) ){
       break
   }

   currentFields <- unlist( strsplit( currentLine , "," ) ) 2

   result <- callAnalysis(
       currentFields[1] , currentFields[2] , currentFields[3] ,
       currentFields[4] , currentFields[5] , currentFields[6]
   ) 3

   cat( result , "
" , sep="" ) 4
}
close( input )
1

Hadoop Streaming sends input records to the Mapper script via standard input. A Map script may receive one or more input records in a single call, so we read from standard input until there’s no more data.

2

Split apart the comma-separated line, to address each field as an element of the vector currentFields.

3

Send all of the fields to the callAnalysis() function. In a real-world scenario, this would have assigned each element of currentFields to a named variable. That would make for cleaner code.

4

Here, the code assumes the return value of callAnalysis() is a simple string. The script sends this to standard output for Hadoop to collect.

This may not be the most efficient code. That’s alright. Large-scale parallelism tends to wash away smaller code inefficiencies.

Put another way, clustered computer power is cheap compared to human thinking-power. Save your brain for solving data-related problems and let Hadoop pick up any slack. Your R code would have to be extremely inefficient before an extensive tuning exercise would yield a payoff.

Running the Hadoop job:

1
export HADOOP_VERSION="0.20.203.0"
export HADOOP_HOME="/opt/thirdparty/dist/hadoop-${HADOOP_VERSION}"
export HADOOP_COMMAND="${HADOOP_HOME}/bin/hadoop"
export HADOOP_STREAMING_JAR="${HADOOP_HOME}/contrib/streaming/hadoop-streaming-
${HADOOP_VERSION}.jar"
export HADOOP_COMPRESSION_CODEC="org.apache.hadoop.io.compress.GzipCodec"
export HADOOP_INPUTFORMAT="org.apache.hadoop.mapred.lib.NLineInputFormat"

${HADOOP_COMMAND} jar ${HADOOP_STREAMING_JAR} 
 
 -D mapreduce.job.reduces=0  2
 -D mapred.output.compress=true  3
 -D mapred.output.compression.codec=${HADOOP_COMPRESSION_CODEC} 
 -D mapred.task.timeout=600000  4
 
 -inputformat ${HADOOP_INPUTFORMAT}  5
 -input /tmp/call-records.csv 
 -output /tmp/hadoop-out 
 -mapper $PWD/mapper.R

This is a pretty standard command line for Hadoop streaming: it specifies the streaming JAR, the mapper script, and the input and output locations. Note that the “generic” Hadoop options, which begin with -D, always come first.

There are a few points of note:

1

Set some parameters as environment variables for easy reuse.

2

Hadoop accepts configuration values on the command line, passed in using -D. (Please note that, unlike Java system properties, Hadoop expects a space between the -D and the property name.) The code disables the Reduce phase because this job uses Hadoop just for its Map-side parallelism.

3

Compress the output. This saves space and reduces transfer time when we download the output data from HDFS (or S3, if you’re using Elastic MapReduce).[52] You can also specify BZip2Codec for bzip2 compression.

4

Hadoop will kill a task that is unresponsive; if Hadoop kills too many tasks, it will mark the entire job as failed. A streaming job’s only sign of life is printing to standard output, so this line tells Hadoop to give each Map task ten minutes (600,000 milliseconds) before declaring it a goner. You may have to increase the timeout for your own jobs. I’d suggest you experiment a little to see how long a given task will run, then set the timeout to double that value so you have plenty of headroom.

5

By default, Hadoop tries to divide your input data into sizable chunks, known as splits. In a typical Hadoop “big-data” scenario, this is the smart thing to do because it limits the amount of data shipped around the cluster. For “big-CPU” or “big-memory” jobs, in which each input record itself represents a sizable operation, this chunking can actually work against parallelism: it’s possible that a file with several hundred records may be divided into only two splits, such that the entire job would be saddled on just two cluster nodes. When using NLineInputFormat, Hadoop treats each line as a split and spreads the work evenly throughout the cluster.

(This code works for Hadoop 0.19 and 0.20. For Hadoop 0.21, use org.apache.hadoop.mapreduce.lib.input.NLineInputFormat. Note the subtle difference in class name!)

Warning

Resist the temptation to disable the timeout (by setting it to 0). An unresponsive task may be one that has truly hung on a particular input, or is stuck in an infinite loop. If Hadoop doesn’t kill that task, how will you ever know that it’s broken?

Reviewing the output: A typical Hadoop job will create several files, one for each Reduce operation. Since this is a Map-only job, there is just one file, called /tmp/hadoop-out/part-0000.gz. You can use gunzip to uncompress the file and then review the contents.

Anything the script wrote to standard output would be included this output file. That is, Hadoop doesn’t discern between the lines you want in the output (such as the output from the fictional callAnalysis() function) and any lingering cat() or print() calls in your code or in the modules you load. If you find stray content in your job’s output, you can post-process those files or suppress the output in your code. Here, “post-process” is a fancy term for grep’ing the job output file to extract the lines of interest. “Suppress” means you call sink() to selectively disable standard output:

sink( "/dev/null" ) ## suppress standard output

... do the work ...

sink() ## restore standard output

cat(  ... your intended result ... )
    ... exit the script

Streaming, Redux: Indirectly Working with Binary Data

As of this writing, Hadoop Streaming can only be used for text input and output. (This is slated to change in a future Hadoop release.) This doesn’t preclude you from working with binary data in a streaming job; but it does preclude your Map and Reduce scripts from accepting binary input and producing binary output. This example presents a workaround.

Situation: Imagine that you want to analyze a series of image files. Perhaps they are frames from a video recording, or a file full of serialized R objects, or maybe you run a large photo-sharing site. For this example, let’s say you have R code that will perform image feature extraction. Each image is an independent unit of work, so assuming you have a decent number of images (and/or feature extraction takes some noticeable amount of time), a Map-only Hadoop job is a good fit.

The code: Remember, though, that Hadoop Streaming can only handle line-by-line text input and output so you have to get creative here. One option would be to feed your Hadoop Streaming job an input of pointers to the data, which your R script could then fetch and process locally. For example:

  • Host the data on an internal web server, and feed Hadoop a list of URLs

  • Ditto, but use an NFS mount

  • Use scp to pull the files from a remote system

  • Make a SQL call to a database system

(Notice that I don’t mention using HDFS. Remember that HDFS doesn’t work well with small files. I’ll cover a different approach in the next example.)

For this example, let’s say the mythical imageFeatureExtraction() function accepts an R url() connection:

#! /usr/bin/env Rscript
input <- file( "stdin" , "r" )
while( TRUE ){
   currentLine <- readLines( input , n=1 )

   if( 0 == length( currentLine ) ){
       break
   }

   pulledData <- url( currentLine ) )

   result <- imageFeatureExtraction( url( currentLine ) )
   cat( result , "
" , sep="" )
}
close( input )

Running the Hadoop job: As far as Hadoop is concerned, this example looks just like the previous one (all the changes are in the R script), so it uses the same command line.

Reviewing the output: If your job outputs text (such as a series of numbers describing the binary data) then you can send that to standard output, as usual.

Let’s say your job yields binary output, such as a series of charts. In that case, you can use the same idea as you did for the input, and push the output to another system:

  • Copy it to an NFS mount

  • Use an HTTP POST operation to send the data to a remote web server

  • Call scp to ship the data to another system

  • Use SQL to push the data to an RDBMS

and so on.

Caveats: Keep in mind that this method is not perfect.

A Hadoop cluster is a robust beast. Between the software-side framework and the required hardware layout, you are protected from hard disk failures, node crashes, and even loss of network connectivity.

To reap this benefit, though, the job must be self-contained from Hadoop’s perspective. Everything required for the job must exist within the cluster. Map and Reduce scripts, input data, and output must all live in HDFS (S3 if you are using Elastic MapReduce).

Once you rely on systems or services outside of the cluster, you lose in four ways:

Loss of robustness

Hadoop can’t manage a failure or crash in a remote service.

Scaling

While the cluster may not break a sweat running your large job, that remote web server or NFS mount may fail under the weight of a Hadoop-inflicted flood of activity.

Overhead

Any of the methods described above—SSH, web server, NFS mount—requires additional setup. If you are somehow able to get this one past your sysadmins (oh, especially that NFS one), expect them to be very unhappy with you down the line.

Idempotence/risk of side effects

Hadoop may employ speculative execution. This is a fancy way of saying that, under certain circumstances, Hadoop might run a given Map or Reduce task more than once. Hadoop may kill a task in mid-run and launch it elsewhere (if it detects a timeout) or it may concurrently launch a duplicate task (if the first task seems to be taking too long to complete).

Under pure Hadoop circumstances, when all job-related activity takes place within the cluster walls, this is not a problem because Hadoop itself takes care of retrieving the output from only one of the multiple executions of that same Map or Reduce task. But when you leverage data or services from outside the cluster, those are considered side effects of a task. Hadoop doesn’t know what your Map or Reduce code is doing; it only knows how long it takes to run, and whether it meets Hadoop’s criteria for success or failure. That means it’s up to you to handle side effects such as duplicate submissions to that remote web server.

This may sound like a long list of caveats, but whether it’s really a hurdle depends on your circumstances. Leveraging external services and side effects will work just fine if most of the “action” will take place inside Hadoop, or the external data is just a small part of the process, or multiple execution is not a problem.

That said, there is another way to handle binary data in a streaming job but still keep all the data in the cluster. It’s slightly more involved than the methods described in this example, but the trade-off may be worth your while. In the next section, I’ll show you how to do this using the Java API instead of Streaming.

The Java API: Binary Input and Output

Situation: You still wish to perform feature extraction on a series of image files, but you feel the previous solution is too fragile. You want the job to be self-contained, from a Hadoop perspective.

The code: Recall, from the previous chapter, that if you’re processing whole-file data (text or binary) with Hadoop, you’ll need to pack those files into a SequenceFile archive. The streaming API cannot presently handle SequenceFiles but the Java API can. You can use the Java API to extract data from the SequenceFile input, write it to a predictable filename, and launch an R script that operates on that file.

Once again, we need Hadoop for a one-stage parallel execution, so this will be a Map-only job.

First, let’s look at the Java code in Example 6-2. Hadoop Java code is typically compiled into a JAR that has at least two classes: the driver that configures the job and the mapper that is run for a Map task. (Jobs that have a Reduce step will include a class for the Reduce task.)

Example 6-2. Excerpt from the tool class, Driver.java

public class Driver extends Configured implements Tool { 1

  public int run( String[] args ) throws Exception {
     final Job job = new Job( getConf() ) ;

     job.setJobName( "Parallel R: Chapter 5, example 3" ) ;
     job.setMapperClass( Ch5Ex3Mapper.class ) ; 2

     FileInputFormat.addInputPath( job , new Path( args[0] ) ) ; 3
     FileOutputFormat.setOutputPath( job , new Path( args[1] ) ) ;
     job.setInputFormatClass( SequenceFileInputFormat.class ) ; 4

     job.setOutputKeyClass( Text.class ) ; 5
     job.setOutputValueClass( Text.class ) ;

     job.getConfiguration().setBoolean( "mapred.output.compress" , true ) ; 6

     job.getConfiguration().setClass( "mapred.output.compression.codec" ,
       BZip2Codec.class , CompressionCodec.class ) ;
     job.getConfiguration().setInt( "mapreduce.job.reduces" , 0 ) ; 7

     return( job.waitForCompletion( true ) ? 0 : 1 ) ;

  }
}
1

Driver extends the Hadoop base class Configured and implements the interface Tool. The combined effect is that Driver gets some convenience methods for setting up the job and Hadoop will take care of parsing Hadoop-specific options for us. For example, when we launch the job based on Driver, it will magically understand any -D options passed on the Hadoop command line.

2

Set the class that will be run for Map tasks. As this is a Map-only job, there is no need to set a class for Reduce tasks. (Technically, Hadoop defaults to using its no-op Reducer class, which simply parrots the Map phase’s output.)

3

Remember when I said that using Tool and Configured would simplify command line processing? Here, the arg[] array contains all command line elements after the general Hadoop options. The first one is the input path, the second is the output path.

4

Per this line, Hadoop will expect all job input to be in SequenceFiles.

5

The Reduce phase also emits a series of key/value pairs. These two lines tell Hadoop that both key and value will be plain-text.

6

For streaming jobs, you set configuration properties using -D to ask Hadoop to compress the output. You can still do that on the command line with a Java-based job, or you can embed that in the Java code, as shown here.

7

Tell Hadoop not to run any reducers.

Example 6-3. Excerpt from the Mapper class, Ch6Ex3Mapper.java

public class Ch5Ex3Mapper extends Mapper<Text, BytesWritable , Text , Text> { 1

private final Text _outputValue ;  2
private final StringBuilder _rOutputBuffer ;

public void map(Text inputKey , BytesWritable inputValue , Context context )
   throws IOException , InterruptedException {

     _outputValue.clear() ;  3
     if( _rOutputBuffer.length() > 0 ){
         _rOutputBuffer.delete( 0 , _rOutputBuffer.length() ) ;
     }

     BufferedReader rOutputReader = null ;
     OutputStream fileWriteHandle = null ;
     final File currentFile = new File( inputKey.toString() ) ;

     try{

         // write the raw bytes to a file. (input key name is the file name)
         fileWriteHandle = new FileOutputStream( currentFile ) ;  4
         fileWriteHandle.write( inputValue.getBytes() , 0, inputValue.getLength() ) ;
         closeOutputStream( fileWriteHandle ) ;

         final ArrayList<String> tempList = new ArrayList<String>() ;  5
         final List<String> commandLine = new ArrayList<String>() ;
         commandLine.add( "/usr/bin/env" ) ;
         commandLine.add( "R" ) ;
         commandLine.add( "--vanilla" ) ;
         commandLine.add( "--slave" ) ;
         commandLine.add( "--file=helper.R" ) ;
         commandLine.add( "--args " ) ;
         commandLine.add( inputKey.toString() ) ;

         final Process runtime = new ProcessBuilder( commandLine )
             .redirectErrorStream( true )
             .start() ; 6

         final int exitCode = runtime.waitFor() ;
         rOutputReader = new BufferedReader(
           new InputStreamReader( runtime.getInputStream() ) ) ;

         if( 0 != exitCode ){
             _rOutputBuffer.append( "error! " ) ;
         }

         _rOutputBuffer.append( rOutputReader.readLine() ) ;  7
         _outputValue.set( _rOutputBuffer.toString() ) ;

         context.write( inputKey ,  _outputValue ) ;  8


     }catch( final Exception rethrow ){
         throw( new IOException( rethrow ) ) ;
     }finally{
             // … close handles and delete the image file ...
     }

  }

}
1

Per this class definition, the map() operation will expect a text string and binary data as the input key/value pair, and will emit text for the output key/value pair. Please note that the SequenceFile must therefore use a Text object as the key and a BytesWritable as the value. (If you used forqlift to create your SequenceFiles, this has been done for you.)

2

Hadoop will try to reuse a single Mapper or Reducer object several times throughout the life of a single job. As such, it’s considered good form (read: more efficient) to use instance variables instead of local method variables when possible. Here, the code recycles the Text object used for the output value and also a StringBuilder that to holds the R script’s output.

3

For that same reason, the code performs some cleanup on those instance variables each time it enters map().

4

Write the binary data to a file on-disk that R can access. This code assumes the input key is the file’s name and the value is its data.

5

Build a command line to run R. Notice that the final element is the input key, which is the name of the image file to process.

6

This line launches R. The code uses ProcessBuilder, instead of Runtime.exec(), in order to combine the R script’s standard output and standard error. That will make it easier to collect the script’s output.

7

Collect the R script’s output. A successful run of helper.R yields a single line of output.R+, so that’s all the code fetches. (Production-grade code would fetch all of the output and sift through it to check for problems.)

8

Package up the results to send on to the Reduce step. (Remember, the “no-op” Reducer will simply parrot every Map task’s results.) The input key (the image file’s name) is also the output key, in order to identify each image’s results in the job’s output.

Note

While this job used a SequenceFile for input, it’s just as easy to use a SequenceFile for output.

For example, consider a job that calls R to generate charts: you’d have to change the Driver class to specify SequenceFile output, and also change the Mapper’s class definition and map() method to BytesWritable (binary) output. Finally, you would have to use standard Java I/O to read the chart file into a byte[] array and put those bytes into the BytesWritable.

(Note that even though you’d be using a SequenceFile for input and output, your Mapper code only sees the Text and BytesWritable data types that are used inside the SequenceFile.)

Now let’s look at the R script, helper.R, which is invoked by the Java code in Example 6-4:

dataFile <- commandArgs(trailingOnly=TRUE)  1
result <- imageFeatureExtraction( dataFile )  2
output.value <- paste( dataFile , result , sep="	" )
1

commandArgs() fetches the arguments passed to the R script, which in this case is the image’s file name.

2

Here, the mythical imageFeatureExtraction() function works on the provided file.

Running the Hadoop job: Let’s say the Hadoop code is in a JAR named “launch-R.jar” and the input images are in a SequenceFile named images-sample.seq. Assuming the environment variables defined above, you can launch the job as follows:

${HADOOP_COMMAND} jar launch-R.jar 
   -files helper.R  1
   /tmp/images-sample.seq 
   /tmp/hadoop-out

This command line is much shorter than the streaming command lines, mostly because several options are set in the driver class.

This is a typical Hadoop command line for Java jobs, with one exception:

1

With a streaming job, your scripts magically appear on the cluster at runtime. When using the Java API, you have to use Hadoop’s Distributed Cache (the -files flag) to copy helper.R to the cluster.

If you’re testing the job on your workstation, in Hadoop’s “local” (single-workstation) mode, you’ll want to keep two ideas in mind:

For one, Distributed Cache doesn’t really work in local mode. You’ll want to launch the Hadoop command from the directory where helper.R lives such that the Mapper class can find it.

Secondly, the Hadoop job will treat the current directory as its runtime directory. That means the images extracted in the Mapper will be written to your current directory.

Reviewing the output: Let’s assume the feature extraction function yields text output, so you would fetch and read it as explained in the previous examples. If your R code generates binary output, such as charts, you can write that as a SequenceFile: specify SequenceFileOutputFormat as the output, and have your Java code write the file’s data to a BytesWritable object.

Caveats: This method keeps the entire job within Hadoop’s walls: unlike the previous example, you’re protected from machine crashes, network failures, and Hadoop’s speculative execution. The cost is the extra overhead involved in putting all of your input data in SequenceFiles. Even if you’re using forqlift, or you’re comfortable writing a Hadoop Java job to do this, you still need to gather the inputs. That may involve a separate effort to copy the data from an existing service, such as an internal web server or file server.

Processing Related Groups (the Full Map and Reduce Phases)

Situation: You want to collect related records and operate on that group as a whole.

Returning to the “phone records” example, let’s say you want to analyze every number’s output call patterns. That would require you to first gather all of the calls made by each number (Map phase) and then process those records together (Reduce phase).

The code: As noted above, this will require both the Map and Reduce phases.

The Map phase code will extract the caller’s phone number to use as the key, as shown in Example 6-4.

Example 6-4. mapper.R

#! /usr/bin/env Rscript

input <- file( "stdin" , "r" )
while( TRUE ){
   currentLine <- readLines( input , n=1 )
   if( 0 == length( currentLine ) ){
       break
   }

   currentFields <- unlist( strsplit( currentLine , "," ) )

   result <- paste(
       currentFields[1] ,
       currentLine ,
       sep="	"
   )

   cat( result , "
" )
}
close( input )

By now, the basic structure should look familiar. Remember that the first field in the comma-separated line is the caller’s phone number, which serves as the key output from the Map task.

The Reducer code builds a data.frame of all calls made by each number, then passes the data.frame to the analysis function. Something to note here is that the logic in a Reducer script is different from that of a Map script. The flow may seem a little strange, so I’ll explain it at a high level before showing the code sample.

In a Reducer script, each input line is of the format:

{key}{tab}{value}

where {key} and {value} are a single key/value pair, as output from a Map task.

Recall that the Reducer’s job is to collect all of the values for a given key, then process them together. Hadoop may pass a single Reducer values for multiple keys, but it will sort them first. When the key changes, then, you know you’ve seen all of the values for the previous key. You can process those values as a group, then move on to the next key.

With that in mind, a sample Reducer script for the call-analysis job is shown in Example 6-5.

Example 6-5. reducer.R

#! /usr/bin/env Rscript

input <- file( "stdin" , "r" )
lastKey <- ""

tempFile <- tempfile( pattern="hadoop-mr-demo-" , fileext="csv" ) 1
tempHandle <- file( tempFile , "w" )

while( TRUE ){

   currentLine <- readLines( input , n=1 )
   if( 0 == length( currentLine ) ){
       break
   }

   tuple <- unlist( strsplit( currentLine , "	" ) ) 2
   currentKey <- tuple[1]
   currentValue <- tuple[2]

   if( ( currentKey != lastKey ) ){
       if( lastKey != "" ){  3
         close( tempHandle )
         bucket <- read.csv( tempFile , header=FALSE )  4
         result <- anotherCallAnalysisFunction( bucket )  5
         cat( currentKey , "	" , result , "
" )  6
         tempHandle <- file( tempFile , "w" )  7
       }

       lastKey <- currentKey  8
   }

   cat( currentLine , "
" , file=tempHandle )  9
}

close( tempHandle ) 10

bucket <- read.csv( tempFile , header=FALSE )
result <- anotherCallAnalysisFunction( bucket )
cat( currentKey , "	" , result , "
" )

unlink( tempFile )

close( input )
1

Store the collected input lines in a temporary file. The input is in CSV form, so the code can call read.csv() on this temp file to build a data.frame(). (You could also build the data.frame in memory, one row at a time. The “right” way is whichever one works best for you.)

2

Recall that a Map task outputs lines in {key}{tab}{value} form. Here, the code splits that line in order to address the key and value as separate variables.

3

Check whether the key has changed. (The extra logic detects the initial condition of the key being blank.) The change in key is the cue to process the CSV data that has been accumulated into the temporary file from (1).

4

Close off the temporary handle and read the file back in as a data.frame for easy processing.

5

Pass that data.frame to the mythical anotherCallAnalysisFunction(), and collect its result.

6

Write the result to standard output for Hadoop to collect. Make sure to include the key to tie these results to a particular phone number.

7

Reopen the temp file for writing. This will zero it out, so it’s ready for the next key’s data.

8

Update the key, such that the loop detects when the key changes again.

9

Push a line to the temporary file for later processing.

10

Repeat the end-of-key code, to process the data for the final key.

Running the Hadoop job:

${HADOOP_COMMAND} jar ${HADOOP_STREAMING_JAR} 
   
   -D mapred.output.compress=true 
   -D mapred.output.compression.codec=${HADOOP_COMPRESSION_CODEC} 
   
   -inputformat ${HADOOP_INPUTFORMAT} 
   -input /tmp/call-records.csv 
   -output /tmp/hadoop-out 
   -mappermapper.R 
   -reducer reducer.R

This is a standard Hadoop streaming command line. The only deviation from previous streaming command lines is that this one specifies a reducer script.

Reviewing the output: By now, you know how to explore both text and binary job output. This section is intentionally left blank.

Caveats: When you use the full Map and Reduce phases, you need to know how your data is distributed in terms of the keys output from the Map phase. This may require you to do some up-front exploratory data analysis before you can determine whether the job is truly Hadoop-worthy.

Generally speaking, the Map phase is very lightweight (since it’s just used to assign keys to each input) and the heavy lifting takes place in the Reduce operation. To take advantage of the parallelism of the Reduce stage, then, you’ll need to meet two conditions:

  1. A large number of unique keys output from the Map phase

  2. Each key should have a similar number of records (at least, no one key should clearly dominate)

Why is this? Say you have ten million input records. If the Map operation yields only two unique keys, each with five million records, then you will have two very long-running Reduce tasks and that would not be a scalable operation.

Alternatively, let’s say the Map phase yields ten-thousand unique keys, but one of those keys has several million records. This would yield an unbalanced Reduce phase, in which the work for one key takes so long that it eliminates the gains from running the remaining keys’ work in parallel.

I expect the phone records example is still a good fit for Hadoop parallelization since it is highly unlikely that one phone number made most of the calls. For your jobs, though, this may not be such a safe assumption.

This chapter covered a lot of ground, so let’s take a step back to review when you’d want to use R+Hadoop and when you’d want to try another method.

When It Works…

Hadoop splits the work across a cluster, sending each unit of work to a different machine. Even though R itself is single-threaded, this simulates having one machine with tens or hundreds of CPUs at your disposal. Under ideal conditions—that you have the cluster to yourself for the evening—that means each execution of your R code gets all of a machine’s RAM to itself. So you can say that R+Hadoop overcomes R’s CPU and memory limitations.

…And When It Doesn’t

Not completely spared from the memory wall: Hadoop is a compute solution, not a memory grid. If your job is so memory-intensive that a single task (Map or Reduce operation) outweighs the RAM on a single cluster node, Hadoop won’t be of much use. In this case, you could try to decompose the job into even smaller pieces.

Needs infrastructure: R+Hadoop works best if you already have access to an in-house cluster. Elastic MapReduce, the cloud-based solution, runs a close second.

Building out a new cluster is no trivial matter. Businesses prefer that a new tool will pay for itself (in terms of increased profits, new revenue models, or reduced risk). Ask yourself whether your proposed Hadoop-based projects would outweigh the price tag for hardware, space, and maintenance.

Elastic MapReduce has its own pros and cons. From a business perspective, some people may be uncomfortable with their data leaving the office network to live in Amazon’s cloud. (Regulatory compliance may also weigh heavily in this decision.) From a technical point of view, you have to consider the time required to ferry data to the cloud and back, as well as online storage costs.

Needs consistent cluster nodes: Hadoop executes your R scripts for you, and for streaming jobs it will even copy the R scripts to the cluster for you. It’s otherwise up to you to keep the runtime environment consistent across the cluster. If your job requires a particular version of R, or specific R packages, your cluster admins will need to install those for you on every cluster node ahead of time.

This can be quite an adjustment for those who are accustomed to running their own, local copy of R on their workstation, where they can install any package they see fit. The solution here is social, not technical: avoid surprises. Make sure there is a clear path of communication between you and your cluster admins, and make sure you know what R packages are installed on the cluster before you prototype your job on your workstation. If you are your own cluster admin, you’ll want to invest in tools such as Puppet, Chef, or cfengine to keep the machines consistent.

The Wrap-up

In this chapter, you learned a few ways to mix Hadoop and R. R+Hadoop gives you the most control and the most power, but comes at the cost of a Hadoop learning curve. In the next two chapters, you’ll explore methods that abstract you from Hadoop, making them a little closer to “pure R” solutions.



[50] Please note that the need for iteration independence makes Hadoop unsuitable for running a single Markov Chain process, since each iteration relies on the previous iteration’s results. That said, Hadoop is more than suitable for running a set of Markov Chain processes, in which each task computes an entire Markov Chain.

[51] Some Hadoop literature refers to this type of work as a parameter sweep.

[52] By the way, if you’re going to feed the output data back into R on your workstation, remember that R can natively read gzip and bzip2-compressed files.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.50.222