Time for action – causing task failure

Let's cause a task to fail; before we do, we will need to modify the default timeouts:

  1. Add this configuration property to mapred-site.xml:
    <property>
    <name>mapred.task.timeout</name>
    <value>30000</value>
    </property>
  2. We will now modify our old friend WordCount from Chapter 3, Understanding MapReduce. Copy WordCount3.java to a new file called WordCountTimeout.java and add the following imports:
    import java.util.concurrent.TimeUnit ;
    import org.apache.hadoop.fs.FileSystem ;
    import org.apache.hadoop.fs.FSDataOutputStream ;
  3. Replace the map method with the following one:
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
    String lockfile = "/user/hadoop/hdfs.lock" ;
      Configuration config = new Configuration() ;  
    FileSystem hdfs = FileSystem.get(config) ;  
    Path path = new Path(lockfile) ;  
    if (!hdfs.exists(path))
    {
    byte[] bytes = "A lockfile".getBytes() ;
      FSDataOutputStream out = hdfs.create(path) ;  
    out.write(bytes, 0, bytes.length);
    out.close() ;
    TimeUnit.SECONDS.sleep(100) ;
    }
    
    String[] words = value.toString().split(" ") ;
    
    for (String str: words)
    {
            word.set(str);
            context.write(word, one);
    
        }
        }
      }
  4. Compile the file after changing the class name, jar it up, and execute it on the cluster:
    $ Hadoop jar wc.jar WordCountTimeout test.txt output
    
    11/12/11 19:19:51 INFO mapred.JobClient:  map 50% reduce 0%
    11/12/11 19:20:25 INFO mapred.JobClient:  map 0% reduce 0%
    11/12/11 19:20:27 INFO mapred.JobClient: Task Id : attempt_201112111821_0004_m_000000_0, Status : FAILED
    Task attempt_201112111821_0004_m_000000_0 failed to report status for 32 seconds. Killing!
    11/12/11 19:20:31 INFO mapred.JobClient:  map 100% reduce 0%
    11/12/11 19:20:43 INFO mapred.JobClient:  map 100% reduce 100%
    11/12/11 19:20:45 INFO mapred.JobClient: Job complete: job_201112111821_0004
    11/12/11 19:20:45 INFO mapred.JobClient: Counters: 18
    11/12/11 19:20:45 INFO mapred.JobClient:   Job Counters
    
    

What just happened?

We first modified a default Hadoop property that manages how long a task can seemingly make no progress before the Hadoop framework considers it for termination.

Then we modified WordCount3 to add some logic that causes the task to sleep for 100 seconds. We used a lock file on HDFS to ensure that only a single task instance sleeps. If we just had the sleep statement in the map operation without any checks, every mapper would timeout and the job would fail.

Have a go hero – HDFS programmatic access

We said we would not really deal with programmatic access to HDFS in this book. However, take a look at what we have done here and browse through the Javadoc for these classes. You will find that the interface largely follows the patterns for access to a standard Java filesystem.

Then we compile, jar up the classes, and execute the job on the cluster. The first task goes to sleep and after exceeding the threshold we set (the value was specified in milliseconds), Hadoop kills the task and reschedules another mapper to process the split assigned to the failed task.

Hadoop's handling of slow-running tasks

Hadoop has a balancing act to perform here. It wants to terminate tasks that have got stuck or, for other reasons, are running abnormally slowly; but sometimes complex tasks simply take a long time. This is especially true if the task relies on any external resources to complete its execution.

Hadoop looks for evidence of progress from a task when deciding how long it has been idle/quiet/stuck. Generally this could be:

  • Emitting results
  • Writing values to counters
  • Explicitly reporting progress

For the latter, Hadoop provides the Progressable interface which contains one method of interest:

Public void progress() ;

The Context class implements this interface, so any mapper or reducer can call context.progress() to show it is alive and continuing to process.

Speculative execution

Typically, a MapReduce job will comprise of many discrete maps and reduce task executions. When run across a cluster, there is a real risk that a misconfigured or ill host will cause its tasks to run significantly slower than the others.

To address this, Hadoop will assign duplicate maps or reduce tasks across the cluster towards the end of the map or reduce phase. This speculative task execution is aimed at preventing one or two slow running tasks from causing a significant impact on the overall job execution time.

Hadoop's handling of failing tasks

Tasks won't just hang; sometimes they'll explicitly throw exceptions, abort, or otherwise stop executing in a less silent way than the ones mentioned previously.

Hadoop has three configuration properties that control how it responds to task failures, all set in mapred-site.xml:

  • mapred.map.max.attempts: A given map task will be retried this many times before causing the job to fail
  • mapred.reduce.max.attempts: A given reduce task will be retried these many times before causing the job to fail
  • mapred.max.tracker.failures: The job will fail if this many individual task failures are recorded

The default value for all of these is 4.

Note

Note that it does not make sense for mapred.tracker.max.failures to be set to a value smaller than either of the other two properties.

Which of these you consider setting will depend on the nature of your data and jobs. If your jobs access external resources that may occasionally cause transient errors, increasing the number of repeat failures of a task may be useful. But if the task is very data-specific, these properties may be less applicable as a task that fails once will do so again. However, note that a default value higher than 1 does make sense as in a large complex system various transient failures are always possible.

Have a go hero – causing tasks to fail

Modify the WordCount example; instead of sleeping, have it throw a RuntimeException based on a random number. Modify the cluster configuration and explore the relationship between the configuration properties that manage how many failed tasks will cause the whole job to fail.

Task failure due to data

The final types of failure that we will explore are those related to data. By this, we mean tasks that crash because a given record had corrupt data, used the wrong data types or formats, or a wide variety of related problems. We mean those cases where the data received diverges from expectations.

Handling dirty data through code

One approach to dirty data is to write mappers and reducers that deal with data defensively. So, for example, if the value received by the mapper should be a comma-separated list of values, first validate the number of items before processing the data. If the first value should be a string representation of an integer, ensure that the conversion into a numerical type has solid error handling and default behavior.

The problem with this approach is that there will always be some type of weird data input that was not considered, no matter how careful you were. Did you consider receiving values in a different unicode character set? What about multiple character sets, null values, badly terminated strings, wrongly encoded escape characters, and so on?

If the data input to your jobs is something you generate and/or control, these possibilities are less of a concern. However, if you are processing data received from external sources, there will always be grounds for surprise.

Using Hadoop's skip mode

The alternative is to configure Hadoop to approach task failures differently. Instead of looking upon a failed task as an atomic event, Hadoop can instead attempt to identify which records may have caused the problem and exclude them from future task executions. This mechanism is known as skip mode . This can be useful if you are experiencing a wide variety of data issues where coding around them is not desirable or practical. Alternatively, you may have little choice if, within your job, you are using third-party libraries for which you may not have the source code.

Skip mode is currently available only for jobs written to the pre 0.20 version of API, which is another consideration.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.21.246.223