Generating n-grams over news archives using MapReduce

n-gram analysis is one approach for looking at blocks of free text that analyze contiguous words (grams) together in a sequence. This recipe will demonstrate how to use the Java MapReduce API to calculate n-grams over news archives. Some of the code listed in this recipe will be useful across a variety of different MapReduce jobs. It includes code for the ToolRunner setup, custom parameter passing via configuration, and automatic output directory removal before job submission.

Getting ready

This recipe assumes you have a basic familiarity with the Hadoop 0.20 MapReduce API and the general concept of n-gram calculations. You will need access to the news_archives.zip dataset supplied with this book. Inside the ZIP file, you will find the rural.txt and science.txt files. Place both in a single HDFS directory.

You will need access to a pseudo-distributed or fully-distributed cluster capable of running MapReduce jobs using the newer MapReduce API introduced in Hadoop 0.20.

You will also need to package this code inside a JAR file that is to be executed by the Hadoop JAR launcher from the shell. Only the core Hadoop libraries are required to compile and run this example.

How to do it...

Carry out the following steps to implement n-gram in MapReduce:

  1. Create a class named NGram.java in your JAR file at whatever source package is appropriate.
  2. The first step involves creating your concrete Tool class for job submission. The methods are implemented as follows:
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    import java.util.regex.Pattern;
    
    
    public class NGramJob implements Tool{
    
        private Configuration conf;
    
        public static final String NAME = "ngram";
        private static final String GRAM_LENGTH =     "number_of_grams";
    
        public void setConf(Configuration conf) {
            this.conf = conf;
        }
    
        public Configuration getConf() {
            return conf;
        }
    
        public static void main(String[] args) throws Exception {
             if(args.length != 3) {
                 System.err.println("Usage: ngram <input> <output>                        <number_of_grams>");
                 System.exit(1);
             }
            ToolRunner.run(new NGramJob(new Configuration()), args);
        }
        public NGramJob(Configuration conf) {
            this.conf = conf;
        }
  3. The run() method is where we set the input/output formats, mapper class configuration, and key-value class configuration:
        public int run(String[] args) throws Exception {
            conf.setInt(GRAM_LENGTH, Integer.parseInt(args[2]));
    
            Job job = new Job(conf, "NGrams");
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setMapperClass(NGramJob.NGramMapper.class);
            job.setNumReduceTasks(0);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            job.setJarByClass(NGramJob.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, removeAndSetOutput(args[1]));
    
            return job.waitForCompletion(true) ? 1 : 0;
        }
  4. The removeAndSetOutput()method is not required, but helps circumvent previously existing directories that have errors at the mentioned path:
        private Path removeAndSetOutput(String outputDir) throws IOException {
            FileSystem fs = FileSystem.get(conf);
            Path path = new Path(outputDir);
            fs.delete(path, true);
            return path;
        }
  5. The map() function is implemented in the following code snippet by extending mapreduce.Mapper:
        public static class NGramMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    
            private int gram_length;
            private Pattern space_pattern = Pattern.compile("[ ]");
            private StringBuilder gramBuilder = new StringBuilder();
    
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
               gram_length = context.getConfiguration().getInt(NGramJob.GRAM_LENGTH, 0);
            }
    
            @Override
            protected void map(LongWritable key, Text value,
                               Context context) throws IOException, InterruptedException {
                String[] tokens = space_pattern.split(value.toString());
                for (int i = 0; i < tokens.length; i++) {
                    String token = tokens[i];
                    gramBuilder.setLength(0);
                    if(i + gram_length <= tokens.length) {
                       for(int j = i; j < i + gram_length; j++) {
                           gramBuilder.append(tokens[j]);
                           gramBuilder.append(" ");
                       }
                       context.write(new Text(gramBuilder.toString()), NullWritable.get());
                    }
                }
    
            }
    
        }
    }

How it works...

First, we set up our imports and create a public class named NGram that implements the MapReduce Tool interface. The static string NAME is useful, should you decide to configure this job in a Hadoop Driver implementation. The NGram program requires three parameters in exact order, namely the input path in HDFS, the desired output location in HDFS, and the total number of grams to be calculated per token. We pass the ToolRunner with an instance of the NGramJob class, as well as a Configuration object initialized with the aforementioned parameters.

Inside the run() method, we configure the job to accept TextInputFormat and TextOutputFormat to read the input as lines of text, and write lines of text out from the map phase. We are also required to set the Mapper class to the public static inner class NGramMapper. Since this is a map-only job, we set the number of reducers to zero. Then we set the parameterized Writable types for the key-value pairs out of the mapper. It's also very important to call the setJarByClass() method so the TaskTrackers can properly unpack and find the Mapper and Reducer classes. The job uses the static helper methods on FileInputFormat and FileOutputFormat to set the input and output directories respectively. Since the output directory cannot exist, the program first deletes any previously defined HDFS files or directories located at the supplied path. With everything configured properly, the job is now ready for submission to the JobTracker.

The NGramMapper class has a few very important member variables. The variable gram_length is dereferenced from the job configuration, which was set before submission to the user-supplied argument. The variable space_pattern is statically compiled to perform a regex split on space characters. The StringBuilder instance gramBuilder is used to store the space-separated list of grams that correspond to each string token. The mapper receives line numbers as LongWritable instances and the line content as a Text instance. The function immediately splits the line into space-separated tokens. For each token, reset gramBuilder, and if that token's position on the line when summed with gram_length exceeds the total length in characters of the line, ignore it. Otherwise, iterate over and store each following token in gramBuilder until the loop reaches gram_length; then, output the gramBuilder content and cycle the outer loop to the next token. The net result is one or more part files stored in the directory specified by the user-supplied argument, which contains a line-separated list of n-grams in the news archives.

Sample output of bigrams (2 grams):

AWB has 
has been 
been banned 
banned from 
from trading

There's more...

The following two sections discuss how to use NullWritable objects effectively, and also remind developers to use the HDFS filesystem delete functions with care.

Use caution when invoking FileSystem.delete()

The method removeAndSetPath() in this implementation automatically removes the directory string argument without warning. This method is supplied this parameter by the user-supplied output directory argument, which if reversed accidentally with the input argument, would remove the input directory. Although programmatically inserting this kind of behavior into a MapReduce setup routine is very handy, FileSystem.delete() should be used with extreme caution.

Use NullWritable to avoid unnecessary serialization overhead

This program makes use of NullWritable as the output value type from the mapper. Since the program writes a single gram per line, we can just use the key to emit all our output. If your MapReduce job does not require both the key and the value to be emitted, using NullWritable will save the framework the trouble of having to serialize unnecessary objects out to the disk. In many scenarios, it is often cleaner and more readable than using blank placeholder values or static singleton instances for output.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.70.170