Using the distributed cache in MapReduce to find lines that contain matching keywords over news archives

The distributed cache in MapReduce is almost always required for any complex assignment involving dependent libraries and code. One very common operation is passing cache files for use in each map/reduce task JVM. This recipe will use the MapReduce API and the distributed cache to mark any lines in the news archive dataset that contain one or more keywords denoted in a list. We will use the distributed cache to make each mapper aware of the list location in HDFS.

Getting ready

This recipe assumes you have a basic familiarity with the Hadoop 0.20 MapReduce API. You will need access to the news_archives.zip dataset supplied with this book. Inside the ZIP file, you will find rural.txt and science.txt. Place both in a single HDFS directory. Additionally, inside the ZIP file you will find news_keywords.txt. You will need to place this file in an HDFS directory with the absolute path /cache_files/news_archives.txt. Feel free to add any additional words to this file, so long as they each appear on a new line.

You will need access to a pseudo-distributed or fully-distributed cluster capable of running MapReduce jobs using the newer MapReduce API introduced in Hadoop 0.20.

You will also need to package this code inside a JAR file that is to be executed by the Hadoop JAR launcher from the shell. Only the core Hadoop libraries are required to compile and run this example.

How to do it...

Carry out the following steps to implement a word-matching MapReduce job:

  1. Create a class named LinesWithMatchingWordsJob.java in your JAR file at whatever source package is appropriate.
  2. The following code will serve as the Tool implementation for job submission:
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileReader;
    import java.io.IOException;
    import java.net.URI;
    import java.util.HashSet;
    import java.util.Set;
    import java.util.regex.Pattern;
    
    public class LinesWithMatchingWordsJob implements Tool {
        private Configuration conf;
    
        public static final String NAME = "linemarker";
    
        public void setConf(Configuration conf) {
            this.conf = conf;
        }
    
        public Configuration getConf() {
            return conf;
        }
    
        public static void main(String[] args) throws Exception {
            if(args.length != 2) {
                 System.err.println("Usage: linemarker <input> <output>");
                 System.exit(1);
            }
            ToolRunner.run(new LinesWithMatchingWordsJob(    new Configuration()), args);
        }
    
        public LinesWithMatchingWordsJob(Configuration conf) {
            this.conf = conf;
        }
  3. The run() method is where we set the input/output formats, mapper class configuration, and key-value class configuration:
        public int run(String[] args) throws Exception {
    
            DistributedCache.addCacheFile(new Path("/cache_files/news_keywords.txt").toUri(), conf);
    
            Job job = new Job(conf, "Line Marker");
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setMapperClass(LineMarkerMapper.class);
            job.setNumReduceTasks(0);
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(Text.class);
            job.setJarByClass(LinesWithMatchingWordsJob.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            return job.waitForCompletion(true) ? 1 : 0;
        }
  4. The map() function is implemented in the following code snippet by extending mapreduce.Mapper:
        public static class LineMarkerMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
    
            private Pattern space_pattern = Pattern.compile("[ ]");
            private Set<String> keywords = new HashSet<String>();
  5. Inside the setup() routine, we must load and write the file to a local disk from the distributed cache:
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                URI[] uris =DistributedCache.getCacheFiles(
                 context.getConfiguration());
                FileSystem fs = 
                      FileSystem.get(context.getConfiguration());
                if(uris == null || uris.length == 0) {
                   throw new IOException("Error reading file from 
                              distributed cache. No URIs found.");
                }
                String localPath = "./keywords.txt";
                fs.copyToLocalFile(new Path(uris[0]), new 
                                   Path(localPath));
                BufferedReader reader = new BufferedReader(new 
                                        FileReader(localPath));
                String word = null;
                while((word = reader.readLine()) != null) {
                     keywords.add(word);
                }
            }

    The map() function:

            @Override
            protected void map(LongWritable key, Text value,
                               Context context) throws 
                               IOException, InterruptedException {
                String[] tokens = 
                            space_pattern.split(value.toString());
                for(String token : tokens) {
                    if(keywords.contains(token)) {
                        context.write(key, new Text(token));
                    }
                }
    
            }
    
        }
    }

How it works...

First, we set up our imports and create a public class LinesWithMatchingWordsJob. This class implements the Hadoop Tool interface for easy submission using the ToolRunner. Before the job is submitted, we first check for the existence of both input and output parameters. Inside the run() method, we immediately call the DistributedCache static helper method addCacheFile() and pass it a hardcoded reference to the HDFS cache file at the absolute path /cache_files/news_keywords.txt. This file contains the keywords, separated by newline characters, that we are interested in locating within the news archives corpus. We pass the helper method a URI reference to this path and the Configuration instance.

Now we can begin configuring the rest of the job. Since we are working with text, we will use the TextInputFormat and TextOutputFormat classes to read and write lines as strings. We will also configure the Mapper class to use the public static inner class LineMarkerMapper. This is a map-only job, so we set the number of reducers to zero. We also configure the output key type to be LongWritable for the line numbers and the output value as Text for the words, as we locate them. It's also very important to call setJarByClass() so that the TaskTrackers can properly unpack and find the Mapper and Reducer classes. The job uses the static helper methods on FileInputFormat and FileOutputFormat to set the input and output directories respectively. Now we are completely set up and ready to submit the job.

The Mapper class has two very important member variables. There is a statically compiled regex pattern used to tokenize each line by spaces, and a wordlist Set used to store each distinct word we are interested in searching for.

The setup() method in the Mapper is told to pull the complete list of cache file URIs currently in the distributed cache. We first check that the URI array returned a non-null value and that the number of elements is greater than zero. If the array passes these tests, grab the keywords file located in HDFS and write it to the temporary working directory for the task. Save the contents in a local file named ./keywords.txt. Now we are free to use the standard Java I/O classes to read/write off the local disk. Each line contained in the file denotes a keyword that we can store in the keywords' HashSet. Inside our map() function, we first tokenize the line by spaces, and for each token, we see if it's contained in our keyword list. If a match is found, emit the line number it was found on as the key and the token itself as the value.

There's more...

The following are a few additional tips to know when starting out with the distributed cache in MapReduce.

Use the distributed cache to pass JAR dependencies to map/reduce task JVMs

Very frequently, your map and reduce tasks will depend on third-party libraries that take the form of JAR files. If you store these dependencies in HDFS, you can use the static helper method DistributedCache.addArchiveToClassPath() to initialize your job with the dependencies and have Hadoop automatically add the JAR files as classpath dependencies for every task JVM in that job.

Distributed cache does not work in local jobrunner mode

If the configuration parameter mapred.job.tracker is set to local, the DistributedCache cannot be used to configure archives or cache files from HDFS.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.52.203