Using MapReduce and secondary sort to calculate page views

In a typical MapReduce job, key-value pairs are emitted from the mappers, shuffled, and sorted, and then finally passed to the reducers. There is no attempt by the MapReduce framework to sort the values passed to the reducers for processing. However, there are cases when we need the values passed to the reducers to be sorted, such as in the case of counting page views.

To calculate page views, we need to calculate distinct IPs by page. One way to calculate this is to have the mappers emit the key-value pairs: page and IP. Then, in the reducer, we can store all of the IPs associated with a page in a set. However, this approach is not scalable. What happens if the weblogs contain a large number of distinct IPs visiting a single page? We might not be able to fit the entire set of distinct IPs in memory.

The MapReduce framework provides a way to work around this complication. In this recipe, we will write a MapReduce application that allows us to sort the values going to a reducer using an approach known as the secondary sort. Instead of holding all of the distinct IPs in memory, we can keep track of the last IP we saw while processing the values in the reducer, and we can maintain a counter to calculate distinct IPs.

Getting ready

You will need to download the apache_nobots_tsv.txt dataset from http://www.packtpub.com/support and place the file into HDFS.

How to do it...

The following steps show how to implement a secondary sort in MapReduce to calculate page views:

  1. Create a class that implements the Hadoop WritableComparable interface. We will use this class to store the key and sort fields:
    public class CompositeKey implements WritableComparable {
    
        private Text first = null;
        private Text second = null;
        
        public CompositeKey() {
            
        }
    
        public CompositeKey(Text first, Text second) {
            this.first = first;
            this.second = second;
        }
    
        //...getters and setters
        
        public void write(DataOutput d) throws IOException {
            first.write(d);
            second.write(d);
        }
    
        public void readFields(DataInput di) throws IOException {
            if (first == null) {
                first = new Text();
            }
            if (second == null) {
                second = new Text();
            }
            first.readFields(di);
            second.readFields(di);
        }
    
        public int compareTo(Object obj) {
            CompositeKey other = (CompositeKey) obj;
            int cmp = first.compareTo(other.getFirst());
            if (cmp != 0) {
                return cmp;
            }
            return second.compareTo(other.getSecond());
        }
    
        @Override
        public boolean equals(Object obj) {
            CompositeKey other = (CompositeKey)obj;
            return first.equals(other.getFirst());
        }
    
        @Override
        public int hashCode() {
            return first.hashCode();
        }
    }
  2. Next, write the Mapper and Reducer classes. The Mapper class will use the CompositeKey class to store two fields. The first will be the page field, which is used to group and partition the data leaving the mapper. The second is the ip field, which is used to sort the values passed to the reducer.
    public class PageViewMapper extends Mapper<Object, Text, CompositeKey, Text> {
        private CompositeKey compositeKey = new CompositeKey();
        private Text first = new Text();
        private Text second = new Text();
        private Text outputValue = new Text();
        @Override
        protected void map(Object key, Text value, Context 
          context) throws IOException, InterruptedException {
            String[] tokens = value.toString().split("	");
            if (tokens.length > 3) {
                String page = tokens[2];
                String ip = tokens[0];
                first.set(page);
                second.set(ip);
                compositeKey.setFirst(first);
                compositeKey.setSecond(second);
                outputValue.set(ip);
                context.write(compositeKey, outputValue);
            }
        }
    }
    
    public class PageViewReducer extends Reducer<CompositeKey, Text, Text, LongWritable> {
        private LongWritable pageViews = new LongWritable();
    
        @Override
        protected void reduce(CompositeKey key, Iterable<Text> 
          values, Context context) 
          throws IOException, InterruptedException {
            String lastIp = null;
            long pages = 0;
            for(Text t : values) {
                String ip = t.toString();
                if (lastIp == null) {
                    lastIp = ip;
                    pages++;
                }
                else if (!lastIp.equals(ip)) {
                    lastIp = ip;
                    pages++;
                }
                else if (lastIp.compareTo(ip) > 0) {
                    throw new IOException("secondary sort failed");
                }
            }
            pageViews.set(pages);
            context.write(key.getFirst(), pageViews);
        }
    }
  3. Create three classes to partition, group, and sort the data leaving the mapper. These classes are used by the MapReduce framework. First, write a class to partition the data emitted from the mapper by the page field:
    static class CompositeKeyParitioner extends Partitioner<CompositeKey, Writable> {
            
            @Override
            public int getPartition(CompositeKey key, Writable value, int numParition) {
                return (key.getFirst().hashCode() &  0x7FFFFFFF) % numParition;
            }
        }
  4. Next, write a Comparator that will group all of the keys together:
    static class GroupComparator extends WritableComparator {
            public GroupComparator() {
                super(CompositeKey.class, true);
            }
    
            @Override
            public int compare(WritableComparable a, WritableComparable b) {
                CompositeKey lhs = (CompositeKey)a;
                CompositeKey rhs = (CompositeKey)b;
                return lhs.getFirst().compareTo(rhs.getFirst());
            }
        }
  5. Write a second Comparator that will sort the values passed to the reducer:
    static class SortComparator extends WritableComparator {
            public SortComparator() {
                super(CompositeKey.class, true);
            }
    
            @Override
            public int compare(WritableComparable a, WritableComparable b) {
                CompositeKey lhs = (CompositeKey)a;
                CompositeKey rhs = (CompositeKey)b;
                int cmp = lhs.getFirst().compareTo(rhs.getFirst());
                if (cmp != 0) {
                    return cmp;
                }
                return lhs.getSecond().compareTo(rhs.getSecond());
            }
        }
  6. Finally, write the code to set up a normal MapReduce job, but tell the MapReduce framework to use our own partitioner and comparator classes:
    public int run(String[] args) throws Exception {
            
            Path inputPath = new Path(args[0]);
            Path outputPath = new Path(args[1]);
            
            Configuration conf = getConf();
            Job weblogJob = new Job(conf);
            weblogJob.setJobName("PageViews");
            weblogJob.setJarByClass(getClass());
            weblogJob.setMapperClass(PageViewMapper.class);
            weblogJob.setMapOutputKeyClass(CompositeKey.class);
            weblogJob.setMapOutputValueClass(Text.class);
            
            weblogJob.setPartitionerClass(CompositeKeyParitioner.class);
            weblogJob.setGroupingComparatorClass(GroupComparator.class);
            weblogJob.setSortComparatorClass(SortComparator.class);
            
            weblogJob.setReducerClass(PageViewReducer.class);
            weblogJob.setOutputKeyClass(Text.class);
            weblogJob.setOutputValueClass(Text.class);
            weblogJob.setInputFormatClass(TextInputFormat.class);
            weblogJob.setOutputFormatClass(TextOutputFormat.class);
            
            FileInputFormat.setInputPaths(weblogJob, inputPath);
            FileOutputFormat.setOutputPath(weblogJob, outputPath);
            
           
            if(weblogJob.waitForCompletion(true)) {
                return 0;
            }
            return 1;
        }

How it works...

We first created a class named CompositeKey. This class extends the Hadoop WritableComparable interface so that we can use the CompositeKey class just like any normal Hadoop WritableComparable interface (for example, Text and IntWritable). The CompositeKey class holds two Text objects. The first Text object is used to partition and group the key-value pairs emitted from the mapper. The second Text object is used to perform the secondary sort.

Next, we wrote a mapper class to emit the key-value pair CompositeKey (which consists of page and IP) as the key, and IP as the value. In addition, we wrote a reducer class that receives a CompositeKey object and a sorted list of IPs. The distinct IP count is calculated by incrementing a counter whenever we see an IP that does not equal a previously seen IP.

After writing the mapper and reducer classes, we created three classes to partition, group, and sort the data. The CompositeKeyPartitioner class is responsible for partitioning the data emitted from the mapper. In this recipe, we want all of the same pages to go to the same partition. Therefore, we calculate the partition location based only on the first field of the CompositeKey class.

Next, we created a GroupComparator class that uses the same logic as CompositeKeyPartitioner. We want all of the same page keys grouped together for processing by a reducer. Therefore, the group comparator only inspects the first member of the CompositeKey class for comparison.

Finally, we created the SortComparator class. This class is responsible for sorting all of the values that are sent to the reducer. As you can see from the method signature, compare(WritableComparable a, WritableComparable b), we only receive the keys that are sent to each reducer, which is why we needed to include the IP with each and every key the mapper emitted. The SortComparator class compares both the first and second members of the CompositeKey class to ensure that the values a reducer receives are sorted.

See also

  • Creating custom Hadoop Writable and InputFormat to read geographical event data
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.148.144.228