Time for action – implementing WordCount using Streaming

Let's flog the dead horse of WordCount one more time and implement it using Streaming by performing the following steps:

  1. Save the following file to wcmapper.rb:
    #/bin/env ruby
    
    while line = gets
        words = line.split("	")
        words.each{ |word| puts word.strip+"	1"}}
    end
  2. Make the file executable by executing the following command:
    $ chmod +x wcmapper.rb
    
  3. Save the following file to wcreducer.rb:
    #!/usr/bin/env ruby
    
    current = nil
    count = 0
    
    while line = gets
        word, counter = line.split("	")
    
        if word == current
            count = count+1
        else
            puts current+"	"+count.to_s if current
            current = word
            count = 1
        end
    end
    puts current+"	"+count.to_s
  4. Make the file executable by executing the following command:
    $ chmod +x wcreducer.rb
    
  5. Execute the scripts as a Streaming job using the datafile from the previous chapter:
    $ hadoop jar hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar 
    -file wcmapper.rb -mapper wcmapper.rb -file wcreducer.rb 
    -reducer wcreducer.rb -input test.txt -output output
    packageJobJar: [wcmapper.rb, wcreducer.rb, /tmp/hadoop-hadoop/hadoop-unjar1531650352198893161/] [] /tmp/streamjob937274081293220534.jar tmpDir=null
    12/02/05 12:43:53 INFO mapred.FileInputFormat: Total input paths to process : 1
    12/02/05 12:43:53 INFO streaming.StreamJob: getLocalDirs(): [/var/hadoop/mapred/local]
    12/02/05 12:43:53 INFO streaming.StreamJob: Running job: job_201202051234_0005
    
    12/02/05 12:44:01 INFO streaming.StreamJob:  map 100%  reduce 0%
    12/02/05 12:44:13 INFO streaming.StreamJob:  map 100%  reduce 100%
    12/02/05 12:44:16 INFO streaming.StreamJob: Job complete: job_201202051234_0005
    12/02/05 12:44:16 INFO streaming.StreamJob: Output: wcoutput
    
  6. Check the result file:
    $ hadoop fs -cat output/part-00000
    

What just happened?

Ignore the specifics of Ruby. If you don't know the language, it isn't important here.

Firstly, we created the script that will be our mapper. It uses the gets function to read a line from standard input, splits this into words, and uses the puts function to write the word and the value 1 to the standard output. We then made the file executable.

Our reducer is a little more complex for reasons we will describe in the next section. However, it performs the job we would expect, it counts the number of occurrences for each word, reads from standard input, and gives the output as the final value to standard output. Again we made sure to make the file executable.

Note that in both cases we are implicitly using Hadoop input and output formats discussed in the earlier chapters. It is the TextInputFormat property that processes the source file and provides each line one at a time to the map script. Conversely, the TextOutputFormat property will ensure that the output of reduce tasks is also correctly written as textual data. We can of course modify these if required.

Next, we submitted the Streaming job to Hadoop via the rather cumbersome command line shown in the previous section. The reason for each file to be specified twice is that any file not available on each node must be packaged up by Hadoop and shipped across the cluster, which requires it to be specified by the -file option. Then, we also need to tell Hadoop which script performs the mapper and reducer roles.

Finally, we looked at the output of the job, which should be identical to the previous Java-based WordCount implementations

Differences in jobs when using Streaming

The Streaming WordCount mapper looks a lot simpler than the Java version, but the reducer appears to have more logic. Why? The reason is that the implied contract between Hadoop and our tasks changes when we use Streaming.

In Java we knew that our map() method would be invoked once for each input key/value pair and our reduce() method would be invoked for each key and its set of values.

With Streaming we don't have the concept of the map or reduce methods anymore, instead we have written scripts that process streams of received data. This changes how we need to write our reducer. In Java the grouping of values to each key was performed by Hadoop; each invocation of the reduce method would receive a single key and all its values. In Streaming, each instance of the reduce task is given the individual ungathered values one at a time.

Hadoop Streaming does sort the keys, for example, if a mapper emitted the following data:

First     1
Word      1
Word      1
A         1
First     1

The Streaming reducer would receive this data in the following order:

A         1
First     1
First     1
Word      1
Word      1

Hadoop still collects the values for each key and ensures that each key is passed only to a single reducer. In other words, a reducer gets all the values for a number of keys and they are grouped together; however, they are not packaged into individual executions of the reducer, that is, one per key, as with the Java API.

This should explain the mechanism used in the Ruby reducer; it first sets empty default values for the current word; then after reading each line it determines if this is another value for the current key, and if so, increments the count. If not, then there will be no more values for the previous key and its final output is sent to standard output and the counting begins again for the new word.

After reading so much in the earlier chapters about how great it is for Hadoop to do so much for us, this may seem a lot more complex, but after you write a few Streaming reducers it's actually not as bad as it may first appear. Also remember that Hadoop does still manage the assignment of splits to individual map tasks and the necessary coordination that sends the values for a given key to the same reducer. This behavior can be modified through configuration settings to change the number of mappers and reducers just as with the Java API.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.168.203