Apache Hadoop is a very popular software framework for distributed storage and distributed processing on a cluster. Its strengths are in the price (it's free), flexibility (it's open source, and although being written in Java, it can by used by other programming languages), scalability (it can handle clusters composed by thousands of nodes), and robustness (it was inspired by a published paper from Google and has been around since 2011), making it the de facto standard to handle and process big data. Moreover, lots of other projects from the Apache foundation extend its functionalities.
Logically, Hadoop is composed of two pieces: distributed storage (HDFS) and distributed processing (YARN and MapReduce). Although the code is very complex, the overall architecture is fairly easy to understand. A client can access both storage and processing through two dedicated modules; they are then in charge of distributing the job across all theorking nodes:
All the Hadoop modules run as services (or instances), that is, a physical or virtual node can run many of them. Typically, for small clusters, all the nodes run both distributed computing and processing services; for big clusters, it may be better to separate the two functionalities specializing the nodes.
We will see the functionalities offered by the two layers in detail.
The Hadoop Distributed File System (HDFS) is a fault-tolerant distributed filesystem, designed to run on commodity low-cost hardware and able to handle very large datasets (in the order of hundred petabytes to exabytes). Although HDFS requires a fast network connection to transfer data across nodes, the latency can't be as low as in classic filesystems (it may be in the order of seconds); therefore, HDFS has been designed for batch processing and high throughput. Each HDFS node contains a part of the filesystem's data; the same data is also replicated in other instances and this ensures a high throughput access and fault-tolerance.
HDFS's architecture is master-slave. If the master (Name Node) fails, there is a secondary/backup one ready to take control. All the other instances are slaves (Data Nodes); if one of them fails, there's no problem as HDFS has been designed with this in mind.
Data Nodes contain blocks of data: each file saved in HDFS is broken up in chunks (or blocks), typically 64MB each, and then distributed and replicated in a set of Data Nodes.
The Name Node stores just the metadata of the files in the distributed filesystem; it doesn't store any actual data, but just the right indications on how to access the files in the multiple Data Nodes that it manages.
A client asking to read a file shall first contact the Name Node, which will give back a table containing an ordered list of blocks and their locations (as in Data Nodes). At this point, the client should contact the Data Nodes separately, downloading all the blocks and reconstructing the file (by appending the blocks together).
To write a file, instead, a client should first contact the Name Node, which will first decide how to handle the request, updating its records and then replying to the client with an ordered list of Data Nodes of where to write each block of the file. The client will now contact and upload the blocks to the Data Nodes, as reported in the Name Node reply.
Namespace queries (for example, listing a directory content, creating a folder, and so on) are instead completely handled by the Name Node by accessing its metadata information.
Moreover, Name Node is also responsible for handling a Data Node failure properly (it's marked as dead if no Heartbeat packets are received) and its data re-replication to other nodes.
Although these operations are long and hard to be implemented with robustness, they're completely transparent to the user, thanks to many libraries and the HDFS shell. The way you operate on HDFS is pretty similar to what you're currently doing on your filesystem and this is a great benefit of Hadoop: hiding the complexity and letting the user use it with simplicity.
Let's now take a look at the HDFS shell and later, a Python library.
Now, open a new notebook; this operation will take more time than usual as each notebook is connected to the Hadoop cluster framework. When the notebook is ready to be used, you'll see a flag saying Kernel starting, please wait … on the top right disappear.
The first piece is about the HDFS shell; therefore, all the following commands can be run at a prompt or shell of the virtualized machine. To run them in an IPython Notebook, all of them are anticipated by a question mark !
, which is a short way to execute bash code in a notebook.
The common denominator of the following command lines is the executable; we will always run the hdfs
command. It's the main interface to access and manage the HDFS system and the main command for the HDFS shell.
We start with a report on the state of HDFS. To obtain the details of the distributed filesystem (dfs) and its Data Nodes, use the dfsadmin
subcommand:
In:!hdfs dfsadmin –report Out:Configured Capacity: 42241163264 (39.34 GB) Present Capacity: 37569168058 (34.99 GB) DFS Remaining: 37378433024 (34.81 GB) DFS Used: 190735034 (181.90 MB) DFS Used%: 0.51% Under replicated blocks: 0 Blocks with corrupt replicas: 0 Missing blocks: 0 ------------------------------------------------- Live datanodes (1): Name: 127.0.0.1:50010 (localhost) Hostname: sparkbox Decommission Status : Normal Configured Capacity: 42241163264 (39.34 GB) DFS Used: 190735034 (181.90 MB) Non DFS Used: 4668290330 (4.35 GB) DFS Remaining: 37380775936 (34.81 GB) DFS Used%: 0.45% DFS Remaining%: 88.49% Configured Cache Capacity: 0 (0 B) Cache Used: 0 (0 B) Cache Remaining: 0 (0 B) Cache Used%: 100.00% Cache Remaining%: 0.00% Xceivers: 1 Last contact: Tue Feb 09 19:41:17 UTC 2016
The dfs subcommand allows using some well-known Unix commands to access and interact with the distributed filesystem. For example, list the content of the the root directory as follows:
In:!hdfs dfs -ls / Out:Found 2 items drwxr-xr-x - vagrant supergroup 0 2016-01-30 16:33 /spark drwxr-xr-x - vagrant supergroup 0 2016-01-30 18:12 /user
The output is similar to the ls
command provided by Linux, listing the permissions, number of links, user and group owning the file, size, timestamp of the last modification, and name for each file or directory.
Similar to the df
command, we can invoke the -df
argument to display the amount of available disk space in HDFS. The -h
option will make the output more readable (using gigabytes and megabytes instead of bytes):
In:!hdfs dfs -df -h / Out:Filesystem Size Used Available Use% hdfs://localhost:9000 39.3 G 181.9 M 34.8 G 0%
Similar to du
, we can use the -du
argument to display the size of each folder contained in the root. Again, -h
will produce a more human readable output:
In:!hdfs dfs -du -h / Out:178.9 M /spark 1.4 M /user
So far, we've extracted some information from HDFS. Let's now do some operations on the distributed filesystem, which will modify it. We can start with creating a folder with the -mkdir
option followed by the name. Note that this operation may fail if the directory already exists (exactly as in Linux, with the mkdir
command):
In:!hdfs dfs -mkdir /datasets
Let's now transfer some files from the hard disk of the node to the distributed filesystem. In the VM that we've created, there is already a text file in the ../datasets
directory; let's download a text file from the Internet. Let's move both of them to the HDFS directory that we've created with the previous command:
In: !wget -q http://www.gutenberg.org/cache/epub/100/pg100.txt -O ../datasets/shakespeare_all.txt !hdfs dfs -put ../datasets/shakespeare_all.txt /datasets/shakespeare_all.txt !hdfs dfs -put ../datasets/hadoop_git_readme.txt /datasets/hadoop_git_readme.txt
Was the importing successful? Yes, we didn't have any errors. However, to remove any doubt, let's list the HDFS directory/datasets to see the two files:
In:!hdfs dfs -ls /datasets Out:Found 2 items -rw-r--r-- 1 vagrant supergroup 1365 2016-01-31 12:41 /datasets/hadoop_git_readme.txt -rw-r--r-- 1 vagrant supergroup 5589889 2016-01-31 12:41 /datasets/shakespeare_all.txt
To concatenate some files to the standard output, we can use the -cat
argument. In the following piece of code, we're counting the new lines appearing in a text file. Note that the first command is piped into another command that is operating on the local machine:
In:!hdfs dfs -cat /datasets/hadoop_git_readme.txt | wc –l Out:30
Actually, with the -cat
argument, we can concatenate multiple files from both the local machine and HDFS. To see it, let's now count how many newlines are present when the file stored on HDFS is concatenated to the same one stored on the local machine. To avoid misinterpretations, we can use the full Uniform Resource Identifier (URI), referring to the files in HDFS with the hdfs:
scheme and to local files with the file:
scheme:
In:!hdfs dfs -cat hdfs:///datasets/hadoop_git_readme.txt file:///home/vagrant/datasets/hadoop_git_readme.txt | wc –l Out:60
In order to copy in HDFS, we can use the -cp
argument:
In : !hdfs dfs -cp /datasets/hadoop_git_readme.txt /datasets/copy_hadoop_git_readme.txt
To delete a file (or directories, with the right option), we can use the –rm
argument. In this snippet of code, we're removing the file that we've just created with the preceding command. Note that HDFS has the thrash mechanism; consequently, a deleted file is not actually removed from the HDFS but just moved to a special directory:
In:!hdfs dfs -rm /datasets/copy_hadoop_git_readme.txt Out:16/02/09 21:41:44 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes. Deleted /datasets/copy_hadoop_git_readme.txt
To empty the thrashed data, here's the command:
In:!hdfs dfs –expunge Out:16/02/09 21:41:44 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
To obtain (get) a file from HDFS to the local machine, we can use the -get
argument:
In:!hdfs dfs -get /datasets/hadoop_git_readme.txt /tmp/hadoop_git_readme.txt
To take a look at a file stored in HDFS, we can use the -tail
argument. Note that there's no head function in HDFS as it can be done using cat
and the result then piped in a local head command. As for the tail, the HDFS shell just displays the last kilobyte of data:
In:!hdfs dfs -tail /datasets/hadoop_git_readme.txt Out:ntry, of encryption software. BEFORE using any encryption software, please check your country's laws, regulations and policies concerning the import, possession, or use, and re-export of encryption software, to see if this is permitted. See <http://www.wassenaar.org/> for more information. [...]
The hdfs
command is the main entry point for HDFS, but it's slow and invoking system commands from Python and reading back the output is very tedious. For this, there exists a library for Python, Snakebite, which wraps many distributed filesystem operations. Unfortunately, the library is not as complete as the HDFS shell and is bound to a Name Node. To install it on your local machine, simply use pip install snakebite
.
To instantiate the client object, we should provide the IP (or its alias) and the port of the Name Node. In the VM we provided, it's running on port 9000:
In:from snakebite.client import Client client = Client("localhost", 9000)
To print some information about the HDFS, the client object has the serverdefaults
method:
In:client.serverdefaults() Out:{'blockSize': 134217728L, 'bytesPerChecksum': 512, 'checksumType': 2, 'encryptDataTransfer': False, 'fileBufferSize': 4096, 'replication': 1, 'trashInterval': 0L, 'writePacketSize': 65536}
To list the files and directories in the root, we can use the ls
method. The result is a list of dictionaries, one for each file, containing information such as permissions, timestamp of the last modification, and so on. In this example, we're just interested in the paths (that is, the names):
In:for x in client.ls(['/']): print x['path'] Out:/datasets /spark /user
Exactly as the preceding code, the Snakebite client has the du
(for disk usage) and df
(for disk free) methods available. Note that many methods (like du
) return generators, which means that they need to be consumed (like an iterator or list) to be executed:
In:client.df() Out:{'capacity': 42241163264L, 'corrupt_blocks': 0L, 'filesystem': 'hdfs://localhost:9000', 'missing_blocks': 0L, 'remaining': 37373218816L, 'under_replicated': 0L, 'used': 196237268L} In:list(client.du(["/"])) Out:[{'length': 5591254L, 'path': '/datasets'}, {'length': 187548272L, 'path': '/spark'}, {'length': 1449302L, 'path': '/user'}]
As for the HDFS shell example, we will now try to count the newlines appearing in the same file with Snakebite. Note that the .cat
method returns a generator:
In: for el in client.cat(['/datasets/hadoop_git_readme.txt']): print el.next().count(" ") Out:30
Let's now delete a file from HDFS. Again, pay attention that the delete
method returns a generator and the execution never fails, even if we're trying to delete a non-existing directory. In fact, Snakebite doesn't raise exceptions, but just signals to the user in the output dictionary that the operation failed:
In:client.delete(['/datasets/shakespeare_all.txt']).next() Out:{'path': '/datasets/shakespeare_all.txt', 'result': True}
Now, let's copy a file from HDFS to the local filesystem. Observe that the output is a generator, and you need to check the output dictionary to see if the operation was successful:
In: (client .copyToLocal(['/datasets/hadoop_git_readme.txt'], '/tmp/hadoop_git_readme_2.txt') .next()) Out:{'error': '', 'path': '/tmp/hadoop_git_readme_2.txt', 'result': True, 'source_path': '/datasets/hadoop_git_readme.txt'}
Finally, create a directory and delete all the files matching a string:
In:list(client.mkdir(['/datasets_2'])) Out:[{'path': '/datasets_2', 'result': True}] In:client.delete(['/datasets*'], recurse=True).next() Out:{'path': '/datasets', 'result': True}
Where is the code to put a file in HDFS? Where is the code to copy an HDFS file to another one? Well, these functionalities are not yet implemented in Snakebite. For them, we shall use the HDFS shell through system calls.
MapReduce is the programming model implemented in the earliest versions of Hadoop. It's a very simple model, designed to process large datasets on a distributed cluster in parallel batches. The core of MapReduce is composed of two programmable functions—a mapper that performs filtering and a reducer that performs aggregation—and a shuffler that moves the objects from the mappers to the right reducers.
Specifically, here are the steps of MapReduce for the Hadoop implementation:
For example, if we want to count the number of characters, words, and lines in a text file, a nice split can be a line of text.
Continuing the preceding example, for each line, three key-value pairs are generated in this step—one containing the number of characters in the line (the key can simply be a chars string), one containing the number of words (in this case, the key must be different, let's say words), and one containing the number of lines, which is always one (in this case, the key can be lines).
In the example, all the values connected to the words key arrive at a reducer; its job is just summing up all the values. The same happens for the other keys, resulting in three final values: the number of characters, number of words, and number of lines. Note that these results may be on different reducers.
part-r-00000
is the output of the first reducer, part-r-00001
of the second, and so on). To have a full list of results on a file, you should concatenate all of them.Visually, this operation can be simply communicated and understood as follows:
There's also an optional step that can be run by each mapper instance after the mapping step—the combiner. It basically anticipates, if possible, the reducing step on the mapper and is often used to decrease the amount of information to shuffle, speeding up the process. In the preceding example, if a mapper processes more than one line of the input file, during the (optional) combiner step, it can pre-aggregate the results, outputting the smaller number of key-value pairs. For example, if the mapper processes 100 lines of text in each chunk, why output 300 key-value pairs (100 for the number of chars, 100 for words, and 100 for lines) when the information can be aggregated in three? That's actually the goal of the combiner.
In the MapReduce implementation provided by Hadoop, the shuffle operation is distributed, optimizing the communication cost, and it's possible to run more than one mapper and reducer per node, making full use of the hardware resources available on the nodes. Also, the Hadoop infrastructure provides redundancy and fault-tolerance as the same task can be assigned to multiple workers.
Let's now see how it works. Although the Hadoop framework is written in Java, thanks to the Hadoop Streaming utility, mappers and reducers can be any executable, including Python. Hadoop Streaming uses the pipe and standard inputs and outputs to stream the content; therefore, mappers and reducers must implement a reader from stdin and a key-value writer on stdout.
Now, turn on the virtual machine and open a new IPython notebook. Even in this case, we will first introduce the command line way to run MapReduce jobs provided by Hadoop, then introduce a pure Python library. The first example will be exactly what we've described: a counter of the number of characters, words, and lines of a text file.
First, let's insert the datasets into HDFS; we're going to use the Hadoop Git readme (a short text file containing the readme file distributed with Apache Hadoop) and the full text of all the Shakespeare books, provided by Project Gutenberg (although it's just 5MB, it contains almost 125K lines). In the first cell, we'll be cleaning up the folder from the previous experiment, then, we download the file containing the Shakespeare bibliography in the dataset folder, and finally, we put both datasets on HDFS:
In:!hdfs dfs -mkdir -p /datasets !wget -q http://www.gutenberg.org/cache/epub/100/pg100.txt -O ../datasets/shakespeare_all.txt !hdfs dfs -put -f ../datasets/shakespeare_all.txt /datasets/shakespeare_all.txt !hdfs dfs -put -f ../datasets/hadoop_git_readme.txt /datasets/hadoop_git_readme.txt !hdfs dfs -ls /datasets
Now, let's create the Python executable files containing the mapper and reducer. We will use a very dirty hack here: we're going to write Python files (and make them executable) using a write operation from the Notebook.
Both the mapper and reducer read from the stdin and write to the stdout (with simple print commands). Specifically, the mapper reads lines from the stdin and prints the key-value pairs of the number of characters (except the newline), the number of words (by splitting the line on the whitespace), and the number of lines, always one. The reducer, instead, sums up the values for each key and prints the grand total:
In: with open('mapper_hadoop.py', 'w') as fh: fh.write("""#!/usr/bin/env python import sys for line in sys.stdin: print "chars", len(line.rstrip('\n')) print "words", len(line.split()) print "lines", 1 """) with open('reducer_hadoop.py', 'w') as fh: fh.write("""#!/usr/bin/env python import sys counts = {"chars": 0, "words":0, "lines":0} for line in sys.stdin: kv = line.rstrip().split() counts[kv[0]] += int(kv[1]) for k,v in counts.items(): print k, v """) In:!chmod a+x *_hadoop.py
To see it at work, let's try it locally without using Hadoop. In fact, as mappers and reducers read and write to the standard input and output, we can just pipe all the things together. Note that the shuffler can be replaced by the sort -k1,1
command, which sorts the input strings using the first field (that is, the key):
In:!cat ../datasets/hadoop_git_readme.txt | ./mapper_hadoop.py | sort -k1,1 | ./reducer_hadoop.py Out:chars 1335 lines 31 words 179
Let's now use the Hadoop MapReduce way to get the same result. First of all, we should create an empty directory in HDFS able to store the results. In this case, we create a directory named /tmp
and we remove anything inside named in the same way as the job output (Hadoop will fail if the output file already exists). Then, we use the right command to run the MapReduce job. This command includes the following:
–mapper
and –reducer
options)–files
option)–input
option) and the output directory (the –output
option)In:!hdfs dfs -mkdir -p /tmp !hdfs dfs -rm -f -r /tmp/mr.out !hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar -files mapper_hadoop.py,reducer_hadoop.py -mapper mapper_hadoop.py -reducer reducer_hadoop.py -input /datasets/hadoop_git_readme.txt -output /tmp/mr.out Out:[...] 16/02/04 17:12:22 INFO mapreduce.Job: Running job: job_1454605686295_0003 16/02/04 17:12:29 INFO mapreduce.Job: Job job_1454605686295_0003 running in uber mode : false 16/02/04 17:12:29 INFO mapreduce.Job: map 0% reduce 0% 16/02/04 17:12:35 INFO mapreduce.Job: map 50% reduce 0% 16/02/04 17:12:41 INFO mapreduce.Job: map 100% reduce 0% 16/02/04 17:12:47 INFO mapreduce.Job: map 100% reduce 100% 16/02/04 17:12:47 INFO mapreduce.Job: Job job_1454605686295_0003 completed successfully [...] Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 [...] 16/02/04 17:12:47 INFO streaming.StreamJob: Output directory: /tmp/mr.out
The output is very verbose; we just extracted three important sections in it. The first indicates the progress of the MapReduce job, and it's very useful to track and estimate the time needed to complete the operation. The second section highlights the errors, which may have occurred during the job, and the last section reports the output directory and timestamp of the termination. The whole process on the small file (of 30 lines) took almost half a minute! The reasons are very simple: first, Hadoop MapReduce has been designed for robust big data processing and contains a lot of overhead, and second, the ideal environment is a cluster of powerful machines, not a virtualized VM with 4GB of RAM. On the other hand, this code can be run on much bigger datasets and a cluster of a very powerful machine, without changing anything.
Let's not see the results immediately. First, let's take a peek at the output directory in HDFS:
In:!hdfs dfs -ls /tmp/mr.out Out:Found 2 items -rw-r--r-- 1 vagrant supergroup 0 2016-02-04 17:12 /tmp/mr.out/_SUCCESS -rw-r--r-- 1 vagrant supergroup 33 2016-02-04 17:12 /tmp/mr.out/part-00000
There are two files: the first is empty and named _SUCCESS
and indicates that the MapReduce job has finished the writing stage in the directory, and the second is named part-00000 and contains the actual results (as we're operating on a node with just one reducer). Reading this file will provide us with the final results:
In:!hdfs dfs -cat /tmp/mr.out/part-00000 Out:chars 1335 lines 31 words 179
As expected, they're the same as the piped command line shown previously.
Although conceptually simple, Hadoop Streaming is not the best way to run Hadoop jobs with Python code. For this, there are many libraries available on Pypy; the one we're presenting here is one of the most flexible and maintained open source one—MrJob. It allows you to run the jobs seamlessly on your local machine, your Hadoop cluster, or the same cloud cluster environments, such as Amazon Elastic MapReduce; it merges all the code in a standalone file even if multiple MapReduce steps are needed (think about iterative algorithms) and interprets Hadoop errors in the code. Also, it's very simple to install; to have the MrJob library on your local machine, simply use pip install mrjob
.
Although MrJob is a great piece of software, it doesn't work very well with IPython Notebook as it requires a main function. Here, we need to write the MapReduce Python code in a separate file and then run a command line.
We start with the example that we've seen many times so far: counting characters, words, and lines in a file. First, let's write the Python file using the MrJob functionalities; mappers and reducers are wrapped in a subclass of MRJob
. Inputs are not read from stdin, but passed as a function argument, and outputs are not printed, but yielded (or returned).
Thanks to MrJob, the whole MapReduce program becomes just a few lines of code:
In: with open("MrJob_job1.py", "w") as fh: fh.write(""" from mrjob.job import MRJob class MRWordFrequencyCount(MRJob): def mapper(self, _, line): yield "chars", len(line) yield "words", len(line.split()) yield "lines", 1 def reducer(self, key, values): yield key, sum(values) if __name__ == '__main__': MRWordFrequencyCount.run() """)
Let's now execute it locally (with the local version of the dataset).The MrJob library, beyond executing the mapper and reducer steps (locally, in this case), also prints the result and cleans up the temporary directory:
In:!python MrJob_job1.py ../datasets/hadoop_git_readme.txt Out: [...] Streaming final output from /tmp/MrJob_job1.vagrant.20160204.171254.595542/output "chars" 1335 "lines" 31 "words" 179 removing tmp directory /tmp/MrJob_job1.vagrant.20160204.171254.595542
To run the same process on Hadoop, just run the same Python file, this time inserting the –r hadoop
option in the command line, and automatically MrJob will execute it using Hadoop MapReduce and HDFS. In this case, remember to point the hdfs
path of the input file:
In: !python MrJob_job1.py -r hadoop hdfs:///datasets/hadoop_git_readme.txt Out:[...] HADOOP: Running job: job_1454605686295_0004 HADOOP: Job job_1454605686295_0004 running in uber mode : false HADOOP: map 0% reduce 0% HADOOP: map 50% reduce 0% HADOOP: map 100% reduce 0% HADOOP: map 100% reduce 100% HADOOP: Job job_1454605686295_0004 completed successfully [...] HADOOP: Shuffle Errors HADOOP: BAD_ID=0 HADOOP: CONNECTION=0 HADOOP: IO_ERROR=0 HADOOP: WRONG_LENGTH=0 HADOOP: WRONG_MAP=0 HADOOP: WRONG_REDUCE=0 [...] Streaming final output from hdfs:///user/vagrant/tmp/mrjob/MrJob_job1.vagrant.20160204.171255.073506/output "chars" 1335 "lines" 31 "words" 179 removing tmp directory /tmp/MrJob_job1.vagrant.20160204.171255.073506 deleting hdfs:///user/vagrant/tmp/mrjob/MrJob_job1.vagrant.20160204.171255.073506 from HDFS
You will see the same output of the Hadoop Streaming command line as seen previously, plus the results. In this case, the HDFS temporary directory, used to store the results, is removed after the termination of the job.
Now, to see the flexibility of MrJob, let's try running a process that requires more than one MapReduce Step. While done from the command line, this is a very difficult task; in fact, you have to run the first iteration of MapReduce, check the errors, read the results, and then launch the second iteration of MapReduce, check the errors again, and finally read the results. This sounds very time-consuming and prone to errors. Thanks to MrJob, this operation is very easy: within the code, it's possible to create a cascade of MapReduce operations, where each output is the input of the next stage.
As an example, let's now find the most common word used by Shakespeare (using, as input, the 125K lines file). This operation cannot be done in a single MapReduce step; it requires at least two of them. We will implement a very simple algorithm based on two iterations of MapReduce:
In: with open("MrJob_job2.py", "w") as fh: fh.write(""" from mrjob.job import MRJob from mrjob.step import MRStep import re WORD_RE = re.compile(r"[w']+") class MRMostUsedWord(MRJob): def steps(self): return [ MRStep(mapper=self.mapper_get_words, reducer=self.reducer_count_words), MRStep(mapper=self.mapper_word_count_one_key, reducer=self.reducer_find_max_word) ] def mapper_get_words(self, _, line): # yield each word in the line for word in WORD_RE.findall(line): yield (word.lower(), 1) def reducer_count_words(self, word, counts): # send all (num_occurrences, word) pairs to the same reducer. yield (word, sum(counts)) def mapper_word_count_one_key(self, word, counts): # send all the tuples to same reducer yield None, (counts, word) def reducer_find_max_word(self, _, count_word_pairs): # each item of word_count_pairs is a tuple (count, word), yield max(count_word_pairs) if __name__ == '__main__': MRMostUsedWord.run() """)
We can then decide to run it locally or on the Hadoop cluster, obtaining the same result: the most common word used by William Shakespeare is the word the, used more than 27K times. In this piece of code, we just want the result outputted; therefore, we launch the job with the --quiet
option:
In:!python MrJob_job2.py --quiet ../datasets/shakespeare_all.txt Out:27801 "the" In:!python MrJob_job2.py -r hadoop --quiet hdfs:///datasets/shakespeare_all.txt Out:27801 "the"
With Hadoop 2 (the current branch as of 2016),a layer has been introduced on top of HDFS that allows multiple applications to run, for example, MapReduce is one of them (targeting batch processing). The name of this layer is Yet Another Resource Negotiator (YARN) and its goal is to manage the resource management in the cluster.
YARN follows the paradigm of master/slave and is composed of two services: Resource Manager and Node Manager.
The Resource Manager is the master and is responsible for two things: scheduling (allocating resources) and application management (handling job submission and tracking their status). Each Node Manager, the slaves of the architecture, is the per-worker framework running the tasks and reporting to the Resource Manager.
The YARN layer introduced with Hadoop 2 ensures the following:
3.16.54.63