Time for action – using ChainMapper for field validation/analysis

Let's use this principle and employ the ChainMapper class to help us provide some record validation within our job:

  1. Create the following class as UFORecordValidationMapper.java:
    import java.io.IOException;
    
    import org.apache.hadoop.io.* ;
    import org.apache.hadoop.mapred.* ;
    import org.apache.hadoop.mapred.lib.* ;
    
    public class UFORecordValidationMapper extends MapReduceBase
    implements Mapper<LongWritable, Text, LongWritable, Text>
    {
    
        public void map(LongWritable key, Text value,
            OutputCollector<LongWritable, Text> output,
            Reporter reporter) throws IOException
    {
    String line = value.toString();
            if (validate(line))
                output.collect(key, value);
        }
    
            private boolean validate(String str)
            {
                String[] parts = str.split("	") ;
    
                if (parts.length != 6)
                return false ;
    
                return true ;
            }
        }
  2. Create the following as UFOLocation.java:
    import java.io.IOException;
    import java.util.Iterator ;
    import java.util.regex.* ;
    
    import org.apache.hadoop.conf.* ;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.* ;
    import org.apache.hadoop.mapred.* ;
    import org.apache.hadoop.mapred.lib.* ;
    
    public class UFOLocation
    {
    
        public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, LongWritable>
    {
    
    private final static LongWritable one = new LongWritable(1);
    private static Pattern locationPattern = Pattern.compile(
    "[a-zA-Z]{2}[^a-zA-Z]*$") ;
    
    public void map(LongWritable key, Text value,
    OutputCollector<Text, LongWritable> output,
    Reporter reporter) throws IOException
    {
    String line = value.toString();
            String[] fields = line.split("	") ;
            String location = fields[2].trim() ;
            if (location.length() >= 2)
            {
    
                Matcher matcher = locationPattern.matcher(location) ;
                if (matcher.find() )
                {
                    int start = matcher.start() ;
                    String state = location.substring(start,start+2);
    
                    output.collect(new Text(state.toUpperCase()), 
                           One);
                }
            }
        }
    }
    
    public static void main(String[] args) throws Exception
    {
        Configuration config = new Configuration() ;
    JobConf conf = new JobConf(config, UFOLocation.class);
    conf.setJobName("UFOLocation");
    
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(LongWritable.class);
    
    JobConf mapconf1 = new JobConf(false) ;
    ChainMapper.addMapper( conf, UFORecordValidationMapper.class,                  
    LongWritable.class, Text.class, LongWritable.class, 
    Text.class, true, mapconf1) ;
    
    JobConf mapconf2 = new JobConf(false) ;
    ChainMapper.addMapper( conf, MapClass.class, 
    LongWritable.class, Text.class, 
    Text.class, LongWritable.class, true, mapconf2) ;
    conf.setMapperClass(ChainMapper.class);
    conf.setCombinerClass(LongSumReducer.class);
    conf.setReducerClass(LongSumReducer.class);
    
    FileInputFormat.setInputPaths(conf,args[0]) ;
    FileOutputFormat.setOutputPath(conf, new Path(args[1])) ;
    
    JobClient.runJob(conf);
    }
    }
  3. Compile both files:
    $ javac UFORecordValidationMapper.java UFOLocation.java
    
  4. Jar up the class files and submit the job to Hadoop:
    $ Hadoop jar ufo.jar UFOLocation ufo.tsv output
    
  5. Copy the output file to the local filesystem and examine it:
    $ Hadoop fs -get output/part-00000 locations.txt
    $ more locations.txt

What just happened?

There's quite a bit happening here, so let's look at it one piece at a time.

The first mapper is our simple validation mapper. The class follows the same interface as the standard MapReduce API and the map method simply returns the result of a utility validation method. We split this out into a separate method to highlight the functionality of the mapper, but the checks could easily have been within the main map method itself. For simplicity, we keep to our previous validation strategy of looking for the number of fields and discarding lines that don't break into exactly six tab-delimited fields.

Note that the ChainMapper class has unfortunately been one of the last components to be migrated to the context object API and as of Hadoop 1.0, it can only be used with the older API. It remains a valid concept and useful tool but until Hadoop 2.0, where it will finally be migrated into the org.apache.hadoop.mapreduce.lib.chain package, its current use requires the older approach.

The other file contains another mapper implementation and an updated driver in the main method. The mapper looks for a two-letter sequence at the end of the location field in a UFO sighting report. From some manual examination of data, it is obvious that most location fields are of the form city, state, where the standard two-character abbreviation is used for the state.

Some records, however, add trailing parenthesis, periods, or other punctuation. Some others are simply not in this format. For our purposes, we are happy to discard those records and focus on those that have the trailing two-character state abbreviation we are looking for.

The map method extracts this from the location field using another regular expression and gives the output as the capitalized form of the abbreviation along with a simple count.

The driver for the job has the most changes as the previous configuration involving a single map class is replaced with multiple calls on the ChainMapper class.

The general model is to create a new configuration object for each mapper, then add the mapper to the ChainMapper class along with a specification of its input and output, and a reference to the overall job configuration object.

Notice that the two mappers have different signatures. Both input a key of type LongWritable and value of type Text which are also the output types of UFORecordValidationMapper. UFOLocationMapper however outputs the reverse with a key of type Text and a value of type LongWritable.

The important thing here is to match the input from the final mapper in the chain (UFOLocationMapper) with the inputs expected by the reduce class (LongSumReducer). When using theChainMapper class the mappers in the chain can have different input and output as long as the following are true:

  • For all but the final mapper each map output matches the input of the subsequent mapper in the chain
  • For the final mapper, its output matches the input of the reducer

We compile these classes and put them in the same jar file. This is the first time we have bundled the output from more than one Java source file together. As may be expected, there is no magic here; the usual rules on jar files, path, and class names apply. Because in this case we have both our classes in the same package, we don't have to worry about an additional import in the driver class file.

We then run the MapReduce job and examine the output, which is not quite as expected.

Have a go hero

Use the Java API and the previousChainMapper example to reimplement the mappers previously written in Ruby that produce the shape frequency and duration reports.

Too many abbreviations

The following are the first few entries from our result file of the previous job:

AB      286
AD      6
AE      7
AI      6
AK      234
AL      548
AM      22
AN      161
…

The file had 186 different two-character entries. Plainly, our approach of extracting the final character digraph from the location field was not sufficiently robust.

We have a number of issues with the data which becomes apparent after a manual analysis of the source file:

  • There is inconsistency in the capitalization of the state abbreviations
  • A non-trivial number of sightings are from outside the U.S. and though they may follow a similar (city, area) pattern, the abbreviation is not one of the 50 we'd expect
  • Some fields simply don't follow the pattern at all, yet would still be captured by our regular expression

We need to filter these results, ideally by normalizing the U.S. records into correct state output and by gathering everything else into a broader category.

To perform this task we need to add to the mapper some notion of what the valid U.S. state abbreviations are. We could of course hardcode this into the mapper but that does not seem right. Although we are for now going to treat all non-U.S. sightings as a single category, we may wish to extend that over time and perhaps do a breakdown by country. If we hardcode the abbreviations, we would need to recompile our mapper each time.

Using the Distributed Cache

Hadoop gives us an alternative mechanism to achieve the goal of sharing reference data across all tasks in the job, the Distributed Cache. This can be used to efficiently make available common read-only files that are used by the map or reduce tasks to all nodes. The files can be text data as in this case but could also be additional jars, binary data, or archives; anything is possible.

The files to be distributed are placed on HDFS and added to the DistributedCache within the job driver. Hadoop copies the files onto the local filesystem of each node prior to job execution, meaning every task has local access to the files.

An alternative is to bundle needed files into the job jar submitted to Hadoop. This does tie the data to the job jar making it more difficult to share across jobs and requires the jar to be rebuilt if the data changes.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.5.201