Let's cause a task to fail; before we do, we will need to modify the default timeouts:
mapred-site.xml
:<property> <name>mapred.task.timeout</name> <value>30000</value> </property>
WordCount3.java
to a new file called WordCountTimeout.java
and add the following imports:import java.util.concurrent.TimeUnit ; import org.apache.hadoop.fs.FileSystem ; import org.apache.hadoop.fs.FSDataOutputStream ;
map
method with the following one:public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String lockfile = "/user/hadoop/hdfs.lock" ; Configuration config = new Configuration() ; FileSystem hdfs = FileSystem.get(config) ; Path path = new Path(lockfile) ; if (!hdfs.exists(path)) { byte[] bytes = "A lockfile".getBytes() ; FSDataOutputStream out = hdfs.create(path) ; out.write(bytes, 0, bytes.length); out.close() ; TimeUnit.SECONDS.sleep(100) ; } String[] words = value.toString().split(" ") ; for (String str: words) { word.set(str); context.write(word, one); } } }
$ Hadoop jar wc.jar WordCountTimeout test.txt output … 11/12/11 19:19:51 INFO mapred.JobClient: map 50% reduce 0% 11/12/11 19:20:25 INFO mapred.JobClient: map 0% reduce 0% 11/12/11 19:20:27 INFO mapred.JobClient: Task Id : attempt_201112111821_0004_m_000000_0, Status : FAILED Task attempt_201112111821_0004_m_000000_0 failed to report status for 32 seconds. Killing! 11/12/11 19:20:31 INFO mapred.JobClient: map 100% reduce 0% 11/12/11 19:20:43 INFO mapred.JobClient: map 100% reduce 100% 11/12/11 19:20:45 INFO mapred.JobClient: Job complete: job_201112111821_0004 11/12/11 19:20:45 INFO mapred.JobClient: Counters: 18 11/12/11 19:20:45 INFO mapred.JobClient: Job Counters …
We first modified a default Hadoop property that manages how long a task can seemingly make no progress before the Hadoop framework considers it for termination.
Then we modified WordCount3 to add some logic that causes the task to sleep for 100 seconds. We used a lock file on HDFS to ensure that only a single task instance sleeps. If we just had the sleep statement in the map operation without any checks, every mapper would timeout and the job would fail.
We said we would not really deal with programmatic access to HDFS in this book. However, take a look at what we have done here and browse through the Javadoc for these classes. You will find that the interface largely follows the patterns for access to a standard Java filesystem.
Then we compile, jar up the classes, and execute the job on the cluster. The first task goes to sleep and after exceeding the threshold we set (the value was specified in milliseconds), Hadoop kills the task and reschedules another mapper to process the split assigned to the failed task.
Hadoop has a balancing act to perform here. It wants to terminate tasks that have got stuck or, for other reasons, are running abnormally slowly; but sometimes complex tasks simply take a long time. This is especially true if the task relies on any external resources to complete its execution.
Hadoop looks for evidence of progress from a task when deciding how long it has been idle/quiet/stuck. Generally this could be:
For the latter, Hadoop provides the Progressable
interface which contains one method of interest:
Public void progress() ;
The Context
class implements this interface, so any mapper or reducer can call context.progress()
to show it is alive and continuing to process.
Typically, a MapReduce job will comprise of many discrete maps and reduce task executions. When run across a cluster, there is a real risk that a misconfigured or ill host will cause its tasks to run significantly slower than the others.
To address this, Hadoop will assign duplicate maps or reduce tasks across the cluster towards the end of the map or reduce phase. This speculative task execution is aimed at preventing one or two slow running tasks from causing a significant impact on the overall job execution time.
Tasks won't just hang; sometimes they'll explicitly throw exceptions, abort, or otherwise stop executing in a less silent way than the ones mentioned previously.
Hadoop has three configuration properties that control how it responds to task failures, all set in mapred-site.xml
:
mapred.map.max.attempts
: A given map task will be retried this many times before causing the job to failmapred.reduce.max.attempts
: A given reduce task will be retried these many times before causing the job to failmapred.max.tracker.failures
: The job will fail if this many individual task failures are recordedThe default value for all of these is 4.
Note that it does not make sense for mapred.tracker.max.failures
to be set to a value smaller than either of the other two properties.
Which of these you consider setting will depend on the nature of your data and jobs. If your jobs access external resources that may occasionally cause transient errors, increasing the number of repeat failures of a task may be useful. But if the task is very data-specific, these properties may be less applicable as a task that fails once will do so again. However, note that a default value higher than 1 does make sense as in a large complex system various transient failures are always possible.
Modify the WordCount example; instead of sleeping, have it throw a RuntimeException based on a random number. Modify the cluster configuration and explore the relationship between the configuration properties that manage how many failed tasks will cause the whole job to fail.
The final types of failure that we will explore are those related to data. By this, we mean tasks that crash because a given record had corrupt data, used the wrong data types or formats, or a wide variety of related problems. We mean those cases where the data received diverges from expectations.
One approach to dirty data is to write mappers and reducers that deal with data defensively. So, for example, if the value received by the mapper should be a comma-separated list of values, first validate the number of items before processing the data. If the first value should be a string representation of an integer, ensure that the conversion into a numerical type has solid error handling and default behavior.
The problem with this approach is that there will always be some type of weird data input that was not considered, no matter how careful you were. Did you consider receiving values in a different unicode character set? What about multiple character sets, null values, badly terminated strings, wrongly encoded escape characters, and so on?
If the data input to your jobs is something you generate and/or control, these possibilities are less of a concern. However, if you are processing data received from external sources, there will always be grounds for surprise.
The alternative is to configure Hadoop to approach task failures differently. Instead of looking upon a failed task as an atomic event, Hadoop can instead attempt to identify which records may have caused the problem and exclude them from future task executions. This mechanism is known as skip mode . This can be useful if you are experiencing a wide variety of data issues where coding around them is not desirable or practical. Alternatively, you may have little choice if, within your job, you are using third-party libraries for which you may not have the source code.
Skip mode is currently available only for jobs written to the pre 0.20 version of API, which is another consideration.
3.21.246.223