Enabling MapReduce jobs to skip bad records

When working with the amounts of data that Hadoop was designed to process, it is only a matter of time before even the most robust job runs into unexpected or malformed data. If not handled properly, bad data can easily cause a job to fail. By default, Hadoop will not skip bad data. For some applications, it may be acceptable to skip a small percentage of the input data. Hadoop provides a way to do just that. Even if skipping data is not acceptable for a given use case, Hadoop's skipping mechanism can be used to pinpoint the bad data and log it for review.

How to do it...

  1. To enable the skipping of 100 bad records in a map job, add the following to the run() method where the job configuration is set up:
    SkipBadRecords.setMapperMaxSkipRecords(conf, 100);
  2. To enable the skipping of 100 bad record groups in a reduce job, add the following to the run() method where the job configuration is set up:
    SkipBadRecords.setReducerMaxSkipGroups(conf, 100);

How it works...

The process to skip bad records will trigger if skipping has been enabled. Skipping is enabled by calling the static methods on the SkipBadRecords class and once a task has failed twice. Hadoop will then perform a binary search through the input data to identify the bad records. Keep in mind that this is an expensive task that could require multiple attempts. A job that enables skipping will probably want to increase the number of map and reduce attempts. This can be done by using the JobConf.setMaxMapAttempts() and JobConf.setMaxReduceAttempts() methods.

There's more...

By default, the process to skip bad records will be triggered after two failed attempts. This default can be changed using the setAttemptsToStartSkipping() method on the SkipBadRecords class. The output folder of the skipped records can be controlled using the setSkipOutputPath() method on the SkipBadRecords class. By default, skipped records will be logged to the _log/skip/ folder. These files are formatted as Hadoop sequence files. To get them into human-readable format, use the following command:

hadoop fs –text _log/skip/<filename>

Record-skipping can also be controlled using MapReduce job properties. The following table is a relevant snippet from the table provided at http://hadoop.apache.org/common/docs/r0.20.2/mapred-default.html.

Property

Default Value

Description

mapred.skip.attempts.to.start.skipping

2

The number of Task attempts after which skip mode will be kicked off. When skip mode is kicked off, the task reports the range of records that it will process next to the TaskTracker. This is so that, on failures, the TaskTracker knows which ones are possibly the bad records. On further executions, those are skipped.

mapred.skip.map.auto.incr.proc.count

true

On setting this flag to true, the MapRunner increments the SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS counter after invoking the map function. This value must be set to fal se for applications that process the records asynchronously or buffer the input records. For example, streaming. In such cases, applications should increment this counter on their own.

mapred.skip.reduce.auto.incr.proc.count

true

On setting this flag to true, the framework increments the SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS counter after invoking the reduce function. This value must be set to false for applications that process the records asynchronously or buffer the input records. For example, streaming. In such cases, applications should increment this counter on their own.

mapred.skip.out.dir

 

If no value is specified here, the skipped records are written to the output folder at _logs/skip. User can stop writing skipped records by giving the value none.

mapred.skip.map.max.skip.records

0

This is the number of acceptable skip records surrounding the bad record per bad record in the mapper. The number includes the bad record as well. To turn the feature of detection/skipping of bad records off, set the value to 0. The framework tries to narrow down the skipped range by retrying until this threshold is met or all attempts get exhausted for this task. Set the value to the value of Long.MAX_VALUE to indicate that the framework need not try to narrow down. Whatever records (depends on the application) get skipped, are acceptable.

mapred.skip.map.max.skip.records

0

This is the number of acceptable skip groups surrounding the bad group per bad group in the reducer. The number includes the bad group as well. To turn the feature of detection/skipping of bad groups off, set the value to 0. The framework tries to narrow down the skipped range by retrying until this threshold is met or all attempts get exhausted for this task. Set the value to the value of Long.MAX_VALUE to indicate that the framework need not try to narrow down. Whatever groups (depends on the application) get skipped, are acceptable.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.30.210