Using Counters in a MapReduce job to track bad records

The MapReduce framework provides Counters as an efficient mechanism for tracking the occurrences of global events within the map and reduce phases of jobs. For example, a typical MapReduce job will kick off several mapper instances, one for each block of the input data, all running the same code. These instances are part of the same job, but run independent of one another. Counters allow a developer to track aggregated events from all of those separate instances.

A more concrete use of Counters can be found in the MapReduce framework itself. Each MapReduce job defines several standard Counters. The output of these Counters can be found in the job details of the Job Tracker web UI.

Using Counters in a MapReduce job to track bad records

The UI shows the Counter group, name, mapper totals, reducer totals, and job totals.

Counters should be limited to tracking metadata about the job. The standard Counters are good examples of this. The Map input records counter provides useful information about a particular execution of a job. If Counters did not exist, these kinds of statistics would have to be part of the job's main output, where they don't belong; or more likely as part of a secondary output, complicating the logic of the job.

The following recipe is a simple map-only job that filters out bad records and uses a counter to log the number of records that were filtered.

Getting ready

You will need to download the weblog_entries_bad_records.txt dataset from the Packt website, http://www.packtpub.com/support.

How to do it...

  1. Copy the weblog_entries_bad_records.txt file from the local file system into the new folder created in HDFS:
    hadoop fs –copyFromLocal weblog_entries.txt /data/weblogs
    
  2. Submit the CountersExample job:
    hadoop jar ./CountersExample.jar com.packt.hadoop.solutions.CounterExample /data/weblogs/weblog_entries_bad_records.txt /data/weblogs/weblog_entries_clean.txt  
    
  3. To view the counter in the Job Tracker UI, open a web browser and navigate to the Job Tracker UI. The default address is localhost:50030. Scroll down to the Completed Jobs section. Then locate the CounterExample job. The most recent jobs are at the bottom of the table. Once the job has been located, click on Jobid. This page has high-level statistics about the job, including the Counters.
    How to do it...

How it works...

Counters are defined in groups. In Java, each Counter group is an Enum. In the CounterExample job, an Enum for tracking the count of each type of bad record was defined:

static enum BadRecords{INVALID_NUMBER_OF_COLUMNS, INVALID_IP_ADDRESS};

In the map function, there are two checks for valid data. The first check splits the data delimited by tabs. For this example, if properly formed, each record should have five columns. If a record does not have five columns, a call is made to the Context class to get the counter for BadRecords.INVALID_NUMBER_OF_COLUMNS. The counter is then incremented by 1.

String record = value.toString();
String [] columns = record.split("	");

// Check for valid number of columns
if (columns.length != 5) {
context.getCounter(BadRecords.INVALID_NUMBER_OF_COLUMNS).increment(1);
return;
}

The second check is for validating IP addresses. A regular expression, VALID_IP_ADDRESS, is defined. As its name implies, this regular expression will match valid IP addresses.

private static final String VALID_IP_ADDRESS = "^([01]?\d\d?|2[0-4]\d|25[0-5])\.([01]?\d\d?|2[0-4]\d|25[0-5])\." +
           "([01]?\d\d?|2[0-4]\d|25[0-5])\.([01]?\d\d?|2[0-4]\d|25[0-5])$";

The VALID_IP_ADDRESS regular expression is used to check every record's IP address column for a match. For each record that does not match, the INVALID_IP_ADDRESS counter is incremented.

// Check for valid IP addresses
Matcher matcher = pattern.matcher(columns[4]);
If (!matcher.matches()) {
   context.getCounter(BadRecords.INVALID_IP_ADDRESS).increment(1);
   return;
}

Each increment of a counter is first stored locally by each mapper. The counter values are then sent to the Task Tracker for a second level of aggregation. Finally, the values are sent to the Job Tracker where the global aggregation takes place.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.148.144.228