Creating custom Hadoop Writable and InputFormat to read geographical event data

When reading input, or writing output from a MapReduce application, it is sometimes easier to work with data using an abstract class instead of the primitive Hadoop Writable classes (for example, Text and IntWritable). This recipe demonstrates how to create a custom Hadoop Writable and InputFormat that can be used by MapReduce applications.

Getting ready

You will need to download the Nigeria_ACLE D_cleaned.tsv dataset from http://www.packtpub.com/support and place the file into HDFS.

How to do it...

Follow these steps to create custom InputFormat and Writable classes:

  1. First we will define two custom WritableComparable classes. These classes represent the key-value pairs that are passed to the mapper, much as how TextInputFormat passes LongWritable and Text to the mapper.

    Write the key class:

    public class GeoKey implements WritableComparable {
        private Text location;
        private FloatWritable latitude;
        private FloatWritable longitude;
        public GeoKey() {
            location = null;
            latitude = null;
            longitude = null;
        }
    
        public GeoKey(Text location, FloatWritable latitude, 
          FloatWritable longitude) {
            this.location = location;
            this.latitude = latitude;
            this.longitude = longitude;
        }
    
        //...getters and setters
    
        public void readFields(DataInput di) throws IOException {
            if (location == null) {
                location = new Text();
            }
            if (latitude == null) {
                latitude = new FloatWritable();
            }
            if (longitude == null) {
                longitude = new FloatWritable();
            }
            location.readFields(di);
            latitude.readFields(di);
            longitude.readFields(di);
        }
        public int compareTo(Object o) {
            GeoKey other = (GeoKey)o;
            int cmp = location.compareTo(other.location);
            if (cmp != 0) {
                return cmp;
            }
            cmp = latitude.compareTo(other.latitude);
            if (cmp != 0) {
                return cmp;
            }
            return longitude.compareTo(other.longitude);
        }    
        
    }
  2. Now, the value class:
    public class GeoValue implements WritableComparable {
        private Text eventDate;
        private Text eventType;
        private Text actor;
        private Text source;
        private IntWritable fatalities;
        
        public GeoValue() {
            eventDate = null;
            eventType = null;
            actor = null;
            source = null;
            fatalities = null;
        }
        //...getters and setters
    
        public void write(DataOutput d) throws IOException {
            eventDate.write(d);
            eventType.write(d);
            actor.write(d);
            source.write(d);
            fatalities.write(d);
        }
    
        public void readFields(DataInput di) throws IOException {
            if (eventDate == null) {
                eventDate = new Text();
            }
            if (eventType == null) {
                eventType = new Text();
            }
            if (actor == null) {
                actor = new Text();
            }
            if (source == null) {
                source = new Text();
            }
            if (fatalities == null) {
                fatalities = new IntWritable();
            }
            eventDate.readFields(di);
            eventType.readFields(di);
            actor.readFields(di);
            source.readFields(di);
            fatalities.readFields(di);
        }
    
        public int compareTo(Object o) {
            GeoValue other = (GeoValue)o;
            int cmp = eventDate.compareTo(other.eventDate);
            if (cmp != 0) {
                return cmp;
            }
            cmp = eventType.compareTo(other.eventType);
            if (cmp != 0) {
                return cmp;
            }
            cmp = actor.compareTo(other.actor);
            if (cmp != 0) {
                return cmp;
            }
            cmp = source.compareTo(other.source);
            if (cmp != 0) {
                return cmp;
            }
            return fatalities.compareTo(other.fatalities);
        }
        
    }
  3. Next, we need to create an InputFormat to serialize the text from our input file and create the GeoKey and GeoValue instances. This input format extends the Hadoop FileInputFormat class and returns our own implementation of a RecordReader:
    public class GeoInputFormat extends FileInputFormat<GeoKey, GeoValue> {
    
        @Override
        public RecordReader<GeoKey, GeoValue> createRecordReader(InputSplit split, TaskAttemptContext context) {
            return new GeoRecordReader();
        }
        
        @Override
        protected boolean isSplitable(JobContext context, Path file) {
            CompressionCodec codec =
                    new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
            return codec == null;
        }
    }
  4. Now, create a RecordReader to read from the Nigeria_ACLED_cleaned.tsv dataset:
    public class GeoRecordReader extends RecordReader<GeoKey, GeoValue> {
    
        private GeoKey key;
        private GeoValue value;
        private LineRecordReader reader = new LineRecordReader();
        @Override
        public void initialize(InputSplit is, TaskAttemptContext tac) throws IOException, InterruptedException {
            reader.initialize(is, tac);
        }
    
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
           
            boolean gotNextKeyValue = reader.nextKeyValue();
            if(gotNextKeyValue) {
                if (key == null) {
                    key = new GeoKey();
                }
                if (value == null) {
                    value = new GeoValue();
                }
                Text line = reader.getCurrentValue();
                String[] tokens = line.toString().split("	");
                key.setLocation(new Text(tokens[0]));
                key.setLatitude(new FloatWritable(Float.parseFloat(tokens[4])));
                key.setLongitude(new FloatWritable(Float.parseFloat(tokens[5])));
                
                value.setActor(new Text(tokens[3]));
                value.setEventDate(new Text(tokens[1]));
                value.setEventType(new Text(tokens[2]));
                try {
                    value.setFatalities(new IntWritable(Integer.parseInt(tokens[7])));
                } catch(NumberFormatException ex) {
                    value.setFatalities(new IntWritable(0));
                }
                value.setSource(new Text(tokens[6]));
            }
            else {
                key = null;
                value = null;
            }
            return gotNextKeyValue;
        }
    
        @Override
        public GeoKey getCurrentKey() throws IOException, InterruptedException {
            return key;
        }
        @Override
        public GeoValue getCurrentValue() throws IOException, InterruptedException {
            return value;
        }
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return reader.getProgress();
        }
        @Override
        public void close() throws IOException {
            reader.close();
        }
        
    }
  5. Finally, create a simple map-only job to test the InputFormat:
    public class GeoFilter extends Configured implements Tool {
        
        public static class GeoFilterMapper extends Mapper<GeoKey, GeoValue, Text, IntWritable> {
            @Override
            protected void map(GeoKey key, GeoValue value, Context context) throws IOException, InterruptedException {
                String location = key.getLocation().toString();
                if (location.toLowerCase().equals("aba")) {
                    context.write(value.getActor(), 
                      value.getFatalities());
                }
            }
        }
        
        public int run(String[] args) throws Exception {
            
            Path inputPath = new Path(args[0]);
            Path outputPath = new Path(args[1]);
            
            Configuration conf = getConf();
            Job geoJob = new Job(conf);
            geoJob.setNumReduceTasks(0);
            geoJob.setJobName("GeoFilter");
            geoJob.setJarByClass(getClass());
            geoJob.setMapperClass(GeoFilterMapper.class);    
            geoJob.setMapOutputKeyClass(Text.class);
            geoJob.setMapOutputValueClass(IntWritable.class);
            geoJob.setInputFormatClass(GeoInputFormat.class);
            geoJob.setOutputFormatClass(TextOutputFormat.class);
            
            FileInputFormat.setInputPaths(geoJob, inputPath);
            FileOutputFormat.setOutputPath(geoJob, outputPath);
           
            if(geoJob.waitForCompletion(true)) {
                return 0;
            }
            return 1;
        }
        
        public static void main(String[] args) throws Exception {
            int returnCode = ToolRunner.run(new GeoFilter(), args);
            System.exit(returnCode);
        }
    }

How it works...

The first task was to define our own Hadoop key and value representations by implementing the WritableComparable interface. The WritableComparable interface allows us to create our own abstract types, which can be used as keys or values by the MapReduce framework.

Next, we created an InputFormat that inherits from the FileInputFormat class. The Hadoop FileInputFormat is the base class for all file-based InputFormats. The InputFormat takes care of managing the input files for a MapReduce job. Since we do not want to change the way in which our input files are split and distributed across the cluster, we only need to override two methods, createRecordReader() and isSplitable().

The isSplitable() method is used to instruct the FileInputFormat class that it is acceptable to split up the input files if there is a codec available in the Hadoop environment to read and split the file. The createRecordReader() method is used to create a Hadoop RecordReader that processes individual file splits and generates a key-value pair for the mappers to process.

After the GeoInputFormat class was written, we wrote a RecordReader to process the individual input splits and create GeoKey and GeoValue for the mappers. The GeoRecordReader class reused the Hadoop LineRecordReader class to read from the input split. When the LineRecordReader class completed reading a record from the Nigeria_ACLED_cleaned.tsv dataset, we created two objects. These objects are GeoKey and GeoValue, which are sent to the mapper.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.31.22