Time for action – implementing WordCount

We have seen the use of the WordCount example program in Chapter 2, Getting Hadoop Up and Running. Now we will explore our own Java implementation by performing the following steps:

  1. Enter the following code into the WordCount1.java file:
    Import java.io.* ;
    import org.apache.hadoop.conf.Configuration ;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
    
    
    public class WordCount1 
    {
        
    
        public static class WordCountMapper
        extends Mapper<Object, Text, Text, IntWritable>
    {
            
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
            
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                String[] words = value.toString().split(" ") ;
                
                for (String str: words)
                {
                    word.set(str);
                    context.write(word, one);
                }
            }
        }
        
        public static class WordCountReducer
        extends Reducer<Text,IntWritable,Text,IntWritable> {
            public void reduce(Text key, Iterable<IntWritable> values,
                Context context
                ) throws IOException, InterruptedException {
                    int total = 0;
                for (IntWritable val : values) {
                    total++ ;
                }
                context.write(key, new IntWritable(total));
            }
        }
        
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = new Job(conf, "word count");
            job.setJarByClass(WordCount1.class);
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
  2. Now compile it by executing the following command:
    $ javac WordCount1.java
    

What just happened?

This is our first complete MapReduce job. Look at the structure and you should recognize the elements we have previously discussed: the overall Job class with the driver configuration in its main method and the Mapper and Reducer implementations defined as inner classes.

We'll do a more detailed walkthrough of the mechanics of MapReduce in the next section, but for now let's look at the preceding code and think of how it realizes the key/value transformations we talked about earlier.

The input to the Mapper class is arguably the hardest to understand, as the key is not actually used. The job specifies TextInputFormat as the format of the input data and, by default, this delivers to the mapper data where the key is the line number in the file and the value is the text of that line. In reality, you may never actually see a mapper that uses that line number key, but it is provided.

The mapper is executed once for each line of text in the input source and every time it takes the line and breaks it into words. It then uses the Context object to output (more commonly known as emitting) each new key/value of the form <word, 1>. These are our K2/V2 values.

We said before that the input to the reducer is a key and a corresponding list of values, and there is some magic that happens between the map and reduce methods to collect together the values for each key that facilitates this, which we'll not describe right now. Hadoop executes the reducer once for each key and the preceding reducer implementation simply counts the numbers in the Iterable object and gives output for each word in the form of <word, count>. This is our K3/V3 values.

Take a look at the signatures of our mapper and reducer classes: the WordCountMapper class gives IntWritable and Text as input and gives Text and IntWritable as output. The WordCountReducer class gives Text and IntWritable both as input and output. This is again quite a common pattern, where the map method performs an inversion on the key and values, and instead emits a series of data pairs on which the reducer performs aggregation.

The driver is more meaningful here, as we have real values for the parameters. We use arguments passed to the class to specify the input and output locations.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.146.37.250