Using Counters in a streaming job

Hadoop is not limited to running MapReduce jobs written in Java or other JVM languages. It also provides a generic streaming interface. Using the streaming interface, any application that can read and write to stdin and stdout can be used in a MapReduce job. Since streaming jobs do not have access to the Hadoop Java classes, different approaches need to be taken to get access to the framework's features. One convenient and extremely useful feature provided by Hadoop is Counters. This recipe will use a simple Python program to show how to increment a counter from a streaming application. The Python code does not have direct access to the Java Reporter class used by the Hadoop framework for working with Counters. Instead, it will write data to stderr in a format that has special meaning. The Hadoop framework will interpret this as a request to increment the specified counter.

Getting ready

You will need to download the weblog_entries_bad_records.txt dataset from the Packt website, http://www.packtpub.com/support. This example will use the streaming_counters.py Python program provided in the code section of this chapter.

How to do it...

Complete the following steps to execute a Hadoop streaming job using the streaming_counters.py program:

  1. Run the following command:
    hadoop jar $HADOOP_HOME/contrib/hadoop-*streaming*.jar 
    -file streaming_counters.py 
    -mapper streaming_counters.py 
    -reducer NONE 
    -input /data/weblogs/weblog_entries_bad_records.txt 
    -output /data/weblogs/weblog_entries_filtered.txt
    
  2. To view the counter in the Job Tracker UI, open a web browser and navigate to the Job Tracker UI. The default address is localhost:50030. Scroll down to the Completed Jobs section. Then locate the streaming_counters job. The most recent jobs are at the bottom of the table. Once the job has been located, click on Jobid.

How it works...

The Hadoop framework constantly monitors stderr for entries that fit the following format:

reporter:counter:group,counter,value 

If it finds a string that matches this format, the Hadoop framework will check to see if that group and counter exists. If they do exist, the current value will be incremented by that value. If they do not exist, the group and counter will be created and set to that value.

The Python code performs two validation checks on the weblog data. The first checks for an invalid number of columns:

if len(cols) < 5:
sys.stderr.write("reporter:counter:BadRecords,
INVALID_NUMBER_OF_COLS,1")
   continue

If a line has less than five columns, the program will write to stderr in the format that Hadoop expects for manipulating the Counter. Similarly, the second validation verifies the IP address of each record and increments a counter each time an invalid IP address is found.

m = re.match(('^([01]?\d\d?|2[0-4]\d|25[0-5])'
                      '.([01]?\d\d?|2[0-4]\d|25[0-5])'
                      '.([01]?\d\d?|2[0-4]\d|25[0-5])'
                      '.([01]?\d\d?|2[0-4]\d|25[0-5])$'), ip)
if not m:
sys.stderr.write("reporter:counter:BadRecords,INVALID_IP,1")
continue

There's more...

Streaming jobs also have access to setting the task's status message using the same basic method. Writing to stderr in the following format will update a task's status, setting it to message.

reporter:status:message

See also

  • Using Counters in a MapReduce job to track bad records
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.129.249.92