In this exercise, we will be attempting to count the number of occurrences of each word in one of the longest novels ever written. For the exercise, we have selected the book Artamène ou le Grand Cyrus written by Georges and/or Madeleine de Scudéry between 1649-1653. The book is considered to be the second longest novel ever written, per the related list on Wikipedia (https://en.wikipedia.org/wiki/List_of_longest_novels). The novel consists of 13,905 pages across 10 volumes and has close to two million words.
To begin, we need to launch the Cloudera Distribution of Hadoop Quickstart VM in VirtualBox and double-click on the Cloudera Quickstart VM instance:
It will take some time to start up as it initializes all the CDH-related processes such as the DataNode, NameNode, and so on:
Once the process starts up, it will launch a default landing page that contains references to numerous tutorials related to Hadoop. We'll be writing our MapReduce code in the Unix terminal for this section. Launch the terminal from the top-left menu, as shown in the following screenshot:
Now, we must follow these steps:
- Create a directory named cyrus. This is where we will store all the files which contain the text of the book.
- Run getCyrusFiles.sh as shown in step 4. This will download the book into the cyrus directory.
- Run processCyrusFiles.sh as shown. The book contains various Unicode and non-printable characters. Additionally, we would like to change all the words to lowercase in order to ignore double-counting words that are the same but have capitalizations.
- This will produce a file called cyrusprint.txt. This document contains the entire text of the book. We will be running our MapReduce code on this text file.
- Prepare mapper.py and reducer.py. As the name implies, mapper.py runs the map part of the MapReduce process. Similarly, reducer.py runs the reduce part of the MapReduce process. The file mapper.py will split the document into words and assign a value of one to each word in the document. The file, reducer.py, will read in the sorted output of mapper.py and sum the occurrences of the same word (by first initializing the count of the word to one and incrementing it for each new occurrence of the word). The final output is a file containing the count of each word in the document.
The steps are as follows:
- Create getCyrusFiles.sh - this script will be used to retrieve the data from the web:
[cloudera@quickstart ~]$ mkdir cyrus [cloudera@quickstart ~]$ vi getCyrusFiles.sh [cloudera@quickstart ~]$ cat getCyrusFiles.sh for i in `seq 10` do curl www.artamene.org/documents/cyrus$i.txt -o cyrus$i.txt done
- Create processCyrusFiles.sh - this script will be used to concatenate and cleanse the files that were downloaded in the previous step:
[cloudera@quickstart ~]$ vi processCyrusFiles.sh [cloudera@quickstart ~]$ cat processCyrusFiles.sh cd ~/cyrus; for i in `ls cyrus*.txt` do cat $i >> cyrusorig.txt; done cat cyrusorig.txt | tr -dc '[:print:]' | tr A-Z a-z > cyrusprint.txt
- Change the permissions to 755 to make the .sh files executable at the command prompt:
[cloudera@quickstart ~]$ chmod 755 getCyrusFiles.sh [cloudera@quickstart ~]$ chmod 755 processCyrusFiles.sh
- Execute getCyrusFiles.sh:
[cloudera@quickstart cyrus]$ ./getCyrusFiles.sh % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 908k 100 908k 0 0 372k 0 0:00:02 0:00:02 --:--:-- 421k % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 1125k 100 1125k 0 0 414k 0 0:00:02 0:00:02 --:--:-- 471k % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 1084k 100 1084k 0 0 186k 0 0:00:05 0:00:05 --:--:-- 236k % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 1048k 100 1048k 0 0 267k 0 0:00:03 0:00:03 --:--:-- 291k % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 1116k 100 1116k 0 0 351k 0 0:00:03 0:00:03 --:--:-- 489k % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 1213k 100 1213k 0 0 440k 0 0:00:02 0:00:02 --:--:-- 488k % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 1119k 100 1119k 0 0 370k 0 0:00:03 0:00:03 --:--:-- 407k % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 1132k 100 1132k 0 0 190k 0 0:00:05 0:00:05 --:--:-- 249k % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 1084k 100 1084k 0 0 325k 0 0:00:03 0:00:03 --:--:-- 365k % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 1259k 100 1259k 0 0 445k 0 0:00:02 0:00:02 --:--:-- 486k [cloudera@quickstart cyrus]$ ls cyrus10.txt cyrus3.txt cyrus6.txt cyrus9.txt cyrus1.txt cyrus4.txt cyrus7.txt getCyrusFiles.sh cyrus2.txt cyrus5.txt cyrus8.txt processCyrusFiles.sh
- Execute processCyrusFiles.sh:
[cloudera@quickstart cyrus]$ ./processCyrusFiles.sh [cloudera@quickstart cyrus]$ ls cyrus10.txt cyrus3.txt cyrus6.txt cyrus9.txt getCyrusFiles.sh cyrus1.txt cyrus4.txt cyrus7.txt cyrusorig.txt processCyrusFiles.sh cyrus2.txt cyrus5.txt cyrus8.txt cyrusprint.txt [cloudera@quickstart cyrus]$ ls -altrh cyrusprint.txt -rw-rw-r-- 1 cloudera cloudera 11M Jun 28 20:02 cyrusprint.txt [cloudera@quickstart cyrus]$ wc -w cyrusprint.txt 1953931 cyrusprint.txt
- Execute the following steps to copy the final file, named cyrusprint.txt, to HDFS, create the mapper.py and reducer.py scripts.
The files, mapper.py and reducer.py, are referenced on Glenn Klockwood's website (http://www.glennklockwood.com/data-intensive/hadoop/streaming.html), which provides a wealth of information on MapReduce and related topics.
The following code shows the contents of mapper.py:
[cloudera@quickstart cyrus]$ hdfs dfs -ls /user/cloudera [cloudera@quickstart cyrus]$ hdfs dfs -mkdir /user/cloudera/input [cloudera@quickstart cyrus]$ hdfs dfs -put cyrusprint.txt /user/cloudera/input/ [cloudera@quickstart cyrus]$ vi mapper.py [cloudera@quickstart cyrus]$ cat mapper.py #!/usr/bin/env python #the above just indicates to use python to intepret this file #This mapper code will input a line of text and output <word, 1> # import sys sys.path.append('.') for line in sys.stdin: line = line.strip() keys = line.split() for key in keys: value = 1 print ("%s %d" % (key,value)) [cloudera@quickstart cyrus]$ vi reducer.py # Copy-Paste the content of reducer.py as shown below using the vi or nano Unix editor. [cloudera@quickstart cyrus]$ cat reducer.py #!/usr/bin/env python import sys sys.path.append('.') last_key = None running_total = 0 for input_line in sys.stdin: input_line = input_line.strip() this_key, value = input_line.split(" ", 1) value = int(value) if last_key == this_key: running_total += value else: if last_key: print("%s %d" % (last_key, running_total)) running_total = value last_key = this_key if last_key == this_key: print( "%s %d" % (last_key, running_total) ) [cloudera@quickstart cyrus]$ chmod 755 *.py
- Execute the mapper and reducer scripts that will perform the MapReduce operations in order to produce the word count. You may see error messages as shown here, but for the purpose of this exercise (and for generating the results), you may disregard them:
[cloudera@quickstart cyrus]$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -input /user/cloudera/input -output /user/cloudera/output -mapper /home/cloudera/cyrus/mapper.py -reducer /home/cloudera/cyrus/reducer.py packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.10.0.jar] /tmp/streamjob1786353270976133464.jar tmpDir=null 17/06/28 20:11:21 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 17/06/28 20:11:21 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 17/06/28 20:11:22 INFO mapred.FileInputFormat: Total input paths to process : 1 17/06/28 20:11:22 INFO mapreduce.JobSubmitter: number of splits:2 17/06/28 20:11:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1498704103152_0002 17/06/28 20:11:23 INFO impl.YarnClientImpl: Submitted application application_1498704103152_0002 17/06/28 20:11:23 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1498704103152_0002/ 17/06/28 20:11:23 INFO mapreduce.Job: Running job: job_1498704103152_0002 17/06/28 20:11:30 INFO mapreduce.Job: Job job_1498704103152_0002 running in uber mode : false 17/06/28 20:11:30 INFO mapreduce.Job: map 0% reduce 0% 17/06/28 20:11:41 INFO mapreduce.Job: map 50% reduce 0% 17/06/28 20:11:54 INFO mapreduce.Job: map 83% reduce 0% 17/06/28 20:11:57 INFO mapreduce.Job: map 100% reduce 0% 17/06/28 20:12:04 INFO mapreduce.Job: map 100% reduce 100% 17/06/28 20:12:04 INFO mapreduce.Job: Job job_1498704103152_0002 completed successfully 17/06/28 20:12:04 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=18869506 FILE: Number of bytes written=38108830 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=16633042 HDFS: Number of bytes written=547815 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Killed map tasks=1 Launched map tasks=3 Launched reduce tasks=1 Data-local map tasks=3 Total time spent by all maps in occupied slots (ms)=39591 Total time spent by all reduces in occupied slots (ms)=18844 Total time spent by all map tasks (ms)=39591 Total time spent by all reduce tasks (ms)=18844 Total vcore-seconds taken by all map tasks=39591 Total vcore-seconds taken by all reduce tasks=18844 Total megabyte-seconds taken by all map tasks=40541184 Total megabyte-seconds taken by all reduce tasks=19296256 Map-Reduce Framework Map input records=1 Map output records=1953931 Map output bytes=14961638 Map output materialized bytes=18869512 Input split bytes=236 Combine input records=0 Combine output records=0 Reduce input groups=45962 Reduce shuffle bytes=18869512 Reduce input records=1953931 Reduce output records=45962 Spilled Records=3907862 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=352 CPU time spent (ms)=8400 Physical memory (bytes) snapshot=602038272 Virtual memory (bytes) snapshot=4512694272 Total committed heap usage (bytes)=391979008 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=16632806 File Output Format Counters Bytes Written=547815 17/06/28 20:12:04 INFO streaming.StreamJob: Output directory: /user/cloudera/output
- The results are stored in HDFS under the /user/cloudera/output directory in files prefixed with part- :
[cloudera@quickstart cyrus]$ hdfs dfs -ls /user/cloudera/output Found 2 items -rw-r--r-- 1 cloudera cloudera 0 2017-06-28 20:12 /user/cloudera/output/_SUCCESS -rw-r--r-- 1 cloudera cloudera 547815 2017-06-28 20:12 /user/cloudera/output/part-00000
- To view the contents of the file use hdfs dfs -cat and provide the name of the file. In this case we are viewing the first 10 lines of the output:
[cloudera@quickstart cyrus]$ hdfs dfs -cat /user/cloudera/output/part-00000 | head -10 ! 1206 !) 1 !quoy, 1 ' 3 '' 1 '. 1 'a 32 'appelloit 1 'auoit 1 'auroit 10