Let's flog the dead horse of WordCount one more time and implement it using Streaming by performing the following steps:
wcmapper.rb
:#/bin/env ruby while line = gets words = line.split(" ") words.each{ |word| puts word.strip+" 1"}} end
$ chmod +x wcmapper.rb
wcreducer.rb
:#!/usr/bin/env ruby current = nil count = 0 while line = gets word, counter = line.split(" ") if word == current count = count+1 else puts current+" "+count.to_s if current current = word count = 1 end end puts current+" "+count.to_s
$ chmod +x wcreducer.rb
$ hadoop jar hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar -file wcmapper.rb -mapper wcmapper.rb -file wcreducer.rb -reducer wcreducer.rb -input test.txt -output output packageJobJar: [wcmapper.rb, wcreducer.rb, /tmp/hadoop-hadoop/hadoop-unjar1531650352198893161/] [] /tmp/streamjob937274081293220534.jar tmpDir=null 12/02/05 12:43:53 INFO mapred.FileInputFormat: Total input paths to process : 1 12/02/05 12:43:53 INFO streaming.StreamJob: getLocalDirs(): [/var/hadoop/mapred/local] 12/02/05 12:43:53 INFO streaming.StreamJob: Running job: job_201202051234_0005 … 12/02/05 12:44:01 INFO streaming.StreamJob: map 100% reduce 0% 12/02/05 12:44:13 INFO streaming.StreamJob: map 100% reduce 100% 12/02/05 12:44:16 INFO streaming.StreamJob: Job complete: job_201202051234_0005 12/02/05 12:44:16 INFO streaming.StreamJob: Output: wcoutput
$ hadoop fs -cat output/part-00000
Ignore the specifics of Ruby. If you don't know the language, it isn't important here.
Firstly, we created the script that will be our mapper. It uses the gets
function to read a line from standard input, splits this into words, and uses the puts
function to write the word and the value 1
to the standard output. We then made the file executable.
Our reducer is a little more complex for reasons we will describe in the next section. However, it performs the job we would expect, it counts the number of occurrences for each word, reads from standard input, and gives the output as the final value to standard output. Again we made sure to make the file executable.
Note that in both cases we are implicitly using Hadoop input and output formats discussed in the earlier chapters. It is the TextInputFormat
property that processes the source file and provides each line one at a time to the map script. Conversely, the TextOutputFormat
property will ensure that the output of reduce tasks is also correctly written as textual data. We can of course modify these if required.
Next, we submitted the Streaming job to Hadoop via the rather cumbersome command line shown in the previous section. The reason for each file to be specified twice is that any file not available on each node must be packaged up by Hadoop and shipped across the cluster, which requires it to be specified by the -file
option. Then, we also need to tell Hadoop which script performs the mapper and reducer roles.
Finally, we looked at the output of the job, which should be identical to the previous Java-based WordCount implementations
The Streaming WordCount mapper looks a lot simpler than the Java version, but the reducer appears to have more logic. Why? The reason is that the implied contract between Hadoop and our tasks changes when we use Streaming.
In Java we knew that our map()
method would be invoked once for each input key/value pair and our reduce()
method would be invoked for each key and its set of values.
With Streaming we don't have the concept of the map
or reduce
methods anymore, instead we have written scripts that process streams of received data. This changes how we need to write our reducer. In Java the grouping of values to each key was performed by Hadoop; each invocation of the reduce
method would receive a single key and all its values. In Streaming, each instance of the reduce
task is given the individual ungathered values one at a time.
Hadoop Streaming does sort the keys, for example, if a mapper emitted the following data:
First 1 Word 1 Word 1 A 1 First 1
The Streaming reducer would receive this data in the following order:
A 1 First 1 First 1 Word 1 Word 1
Hadoop still collects the values for each key and ensures that each key is passed only to a single reducer. In other words, a reducer gets all the values for a number of keys and they are grouped together; however, they are not packaged into individual executions of the reducer, that is, one per key, as with the Java API.
This should explain the mechanism used in the Ruby reducer; it first sets empty default values for the current word; then after reading each line it determines if this is another value for the current key, and if so, increments the count. If not, then there will be no more values for the previous key and its final output is sent to standard output and the counting begins again for the new word.
After reading so much in the earlier chapters about how great it is for Hadoop to do so much for us, this may seem a lot more complex, but after you write a few Streaming reducers it's actually not as bad as it may first appear. Also remember that Hadoop does still manage the assignment of splits to individual map tasks and the necessary coordination that sends the values for a given key to the same reducer. This behavior can be modified through configuration settings to change the number of mappers and reducers just as with the Java API.
18.191.168.203