Counting distinct IPs in weblog data using MapReduce and Combiners

This recipe will walk you through creating a MapReduce program to count distinct IPs in weblog data. We will demonstrate the application of a combiner to optimize data transfer overhead between the map and reduce stages. The code is implemented in a generic fashion and can be used to count distinct values in any tab-delimited dataset.

Getting ready

This recipe assumes that you have a basic familiarity with the Hadoop 0.20 MapReduce API. You will need access to the weblog_entries dataset supplied with this book and stored in an HDFS folder at the path /input/weblog.

You will need access to a pseudo-distributed or fully-distributed cluster capable of running MapReduce jobs using the newer MapReduce API introduced in Hadoop 0.20.

You will also need to package this code inside a JAR file to be executed by the Hadoop JAR launcher from the shell. Only the core Hadoop libraries are required to compile and run this example.

How to do it...

Perform the following steps to count distinct IPs using MapReduce:

  1. Open a text editor/IDE of your choice, preferably one with Java syntax highlighting.
  2. Create a class named DistinctCounterJob.java in your JAR file at whatever source package is appropriate.
  3. The following code will serve as the Tool implementation for job submission:
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import java.io.IOException;
    import java.util.regex.Pattern;
    
    
    public class DistinctCounterJob implements Tool {
    
        private Configuration conf;
        public static final String NAME = "distinct_counter";
        public static final String COL_POS = "col_pos";
    
    
        public static void main(String[] args) throws Exception {
            ToolRunner.run(new Configuration(), new DistinctCounterJob(), args);
        }
  4. The run() method is where we set the input/output formats, mapper class configuration, combiner class, and key/value class configuration:
        public int run(String[] args) throws Exception {
            if(args.length != 3) {
                System.err.println("Usage: distinct_counter <input> <output> <element_position>");
                System.exit(1);
            }
            conf.setInt(COL_POS, Integer.parseInt(args[2]));
    
            Job job = new Job(conf, "Count distinct elements at position");
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
    
            job.setMapperClass(DistinctMapper.class);
            job.setReducerClass(DistinctReducer.class);
            job.setCombinerClass(DistinctReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setJarByClass(DistinctCounterJob.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            return job.waitForCompletion(true) ? 1 : 0;
    
        }
    
        public void setConf(Configuration conf) {
            this.conf = conf;
        }
    
        public Configuration getConf() {
            return conf;
        }
    }
  5. The map()function is implemented in the following code by extending mapreduce.Mapper:
        public static class DistinctMapper
                extends Mapper<LongWritable, Text, Text, IntWritable> {
    
            private static int col_pos;
            private static final Pattern pattern = Pattern.compile("\t");
            private Text outKey = new Text();
            private static final IntWritable outValue = new IntWritable(1);
    
            @Override
            protected void setup(Context context
            ) throws IOException, InterruptedException {
                col_pos = context.getConfiguration().getInt(DistinctCounterJob.COL_POS, 0);
            }
    
            @Override
            protected void map(LongWritable key, Text value,
                               Context context) throws IOException, InterruptedException {
                String field = pattern.split(value.toString())[col_pos];
                outKey.set(field);
                context.write(outKey, outValue);
            }
        }
  6. The reduce() function is implemented in the following code by extending mapreduce.Reducer:
        public static class DistinctReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
    
            private IntWritable count = new IntWritable();
    
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context
            ) throws IOException, InterruptedException {
                int total = 0;
                for(IntWritable value: values) {
                    total += value.get();
                }
                count.set(total);
                context.write(key, count);
            }
        }
  7. The following command shows the sample usage against weblog data with column position number 4, which is the IP column:
    hadoop jar myJobs.jar distinct_counter /input/weblog/ /output/weblog_distinct_counter 4

How it works...

First we set up DistinctCounterJob to implement a Tool interface for remote submission. The static constant NAME is of potential use in the Hadoop Driver class, which supports the launching of different jobs from the same JAR file. The static constant COL_POS is initialized to the third required argument from the command line <element_position>. This value is set within the job configuration, and should match the position of the column you wish to count for each distinct entry. Supplying 4 will match the IP column for the weblog data.

Since we are reading and writing text, we can use the supplied TextInputFormat and TextOutputFormat classes. We will set the Mapper and Reduce classes to match our DistinctMapper and DistinctReducer implemented classes respectively. We also supply DistinctReducer as a combiner class. This decision is explained in more detail as follows:

It's also very important to call setJarByClass() so that the TaskTrackers can properly unpack and find the Mapper and Reducer classes. The job uses the static helper methods on FileInputFormat and FileOutputFormat to set the input and output directories respectively. Now we're set up and ready to submit the job.

The Mapper class sets up a few member variables as follows:

  • col_pos: This is initialized to a value supplied in the configuration. It allows users to change which column to parse and apply the count distinct operation on.
  • pattern: This defines the column's split point for each row based on tabs.
  • outKey: This is a class member that holds output values. This avoids having to create a new instance for each output that is written.
  • outValue: This is an integer representing one occurrence of the given key. It is similar to the WordCount example.

The map() function splits each incoming line's value and extracts the string located at col_pos. We reset the internal value for outKey to the string found on that line's position. For our example, this will be the IP value for the row. We emit the value of the newly reset outKey variable along with the value of outValue to mark one occurrence of that given IP address.

Without the assistance of the combiner, this would present the reducer with an iterable collection of 1s to be counted.

The following is an example of a reducer {key, value:[]} without a combiner:

{10.10.1.1, [1,1,1,1,1,1]} = six occurrences of the IP "10.10.1.1".

The implementation of the reduce() method will sum the integers and arrive at the correct total, but there's nothing that requires the integer values to be limited to the number 1. We can use a combiner to process the intermediate key-value pairs as they are output from each mapper and help improve the data throughput in the shuffle phase. Since the combiner is applied against the local map output, we may see a performance improvement as the amount of data we need to transfer for an intermediate key/value can be reduced considerably.

Instead of seeing {10.10.1.1, [1,1,1,1,1,1]}, the combiner can add the 1s and replace the value of the intermediate value for that key to {10.10.1.1, [6]}. The reducer can then sum the various combined values for the intermediate key and arrive at the same correct total. This is possible because addition is both a commutative and associative operation. In other words:

  • Commutative: The order in which we process the addition operation against the values has no effect on the final result. For example, 1 + 2 + 3 = 3 + 1 + 2.
  • Associative: The order in which we apply the addition operation has no effect on the final result. For example, (1 + 2) + 3 = 1 + (2 + 3).

For counting the occurrences of distinct IPs, we can use the same code in our reducer as a combiner for output in the map phase.

When applied to our problem, the normal output with no combiner from two separate independently running map tasks might look like the following where {key: value[]} is equal to the intermediate key-value collection:

  • Map Task A = {10.10.1.1, [1,1,1]} = three occurrences
  • Map Task B = {10.10.1.1, [1,1,1,1,1,1]} = six occurrences

Without the aid of a combiner, this will be merged in the shuffle phase and presented to a single reducer as the following key-value collection:

  • {10.10.1.1, [1,1,1,1,1,1,1,1,1]} = nine total occurrences

Now let's revisit what would happen when using a Combiner against the exact same sample output:

Map Task A = {10.10.1.1, [1,1,1]} = three occurrences

  • Combiner = {10.10,1,1, [3] = still three occurrences, but reduced for this mapper.

Map Task B = {10.10.1.1, [1,1,1,1,1,1] = six occurrences

  • Combiner = {10.10.1.1, [6] = still six occurrences

Now the reducer will see the following for that key-value collection:

  • {10.10.1.1, [3,6]} = nine total occurrences

We arrived at the same total count for that IP address, but we used a combiner to limit the amount of network I/O during the MapReduce shuffle phase by pre-reducing the intermediate key-value output from each mapper.

There's more...

The combiner can be confusing to newcomers. Here are some useful tips:

The Combiner does not always have to be the same class as your Reducer

The previous recipe and the default WordCount example show the Combiner class being initialized to the same implementation as the Reducer class. This is not enforced by the API, but ends up being common for many types of distributed aggregate operations such as sum(), min(), and max(). One basic example might be the min() operation of the Reducer class that specifically formats output in a certain way for readability. This will take a slightly different form from that of the min() operator of the Combiner class, which does not care about the specific output formatting.

Combiners are not guaranteed to run

Whether or not the framework invokes your combiner during execution depends on the intermediate spill file size from each map output, and is not guaranteed to run for every intermediate key. Your job should not depend on the combiner for correct results, it should be used only for optimization.

You can control the spill file threshold when MapReduce tries to combine intermediate values with the configuration property min.num.spills.for.combine.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.142.56