17.5 Hadoop

The next several sections show how Apache Hadoop and Apache Spark deal with big-data storage and processing challenges via huge clusters of computers, massively parallel processing, Hadoop MapReduce programming and Spark in-memory processing techniques. Here, we discuss Apache Hadoop, a key big-data infrastructure technology that also serves as the foundation for many recent advancements in big-data processing and an entire ecosystem of software tools that are continually evolving to support today’s big-data needs.

17.5.1 Hadoop Overview

When Google was launched in 1998, the amount of online data was already enormous with approximately 2.4 million websites20—truly big data. Today there are now nearly two billion websites21 (almost a thousandfold increase) and Google is handling over two trillion searches per year!22 Having used Google search since its inception, our sense is that today’s responses are significantly faster.

When Google was developing their search engine, they knew that they needed to return search results quickly. The only practical way to do this was to store and index the entire Internet using a clever combination of secondary storage and main memory. Computers of that time couldn’t hold that amount of data and could not analyze that amount of data fast enough to guarantee prompt search-query responses. So Google developed a clustering system, tying together vast numbers of computers—called nodes. Because having more computers and more connections between them meant greater chance of hardware failures, they also built in high levels of redundancy to ensure that the system would continue functioning even if nodes within clusters failed. The data was distributed across all these inexpensive “commodity computers.” To satisfy a search request, all the computers in the cluster searched in parallel the portion of the web they had locally. Then the results of those searches were gathered up and reported back to the user.

To accomplish this, Google needed to develop the clustering hardware and software, including distributed storage. Google publishes its designs, but did not open source its software. Programmers at Yahoo!, working from Google’s designs in the “Google File System” paper,23 then built their own system. They open-sourced their work and the Apache organization implemented the system as Hadoop. The name came from an elephant stuffed animal that belonged to a child of one of Hadoop’s creators.

Two additional Google papers also contributed to the evolution of Hadoop—“MapReduce: Simplified Data Processing on Large Clusters”24 and “Bigtable: A Distributed Storage System for Structured Data,”25 which was the basis for Apache HBase (a NoSQL key–value and column-based database).26

HDFS, MapReduce and YARN

Hadoop’s key components are:

  • HDFS (Hadoop Distributed File System) for storing massive amounts of data throughout a cluster, and

  • MapReduce for implementing the tasks that process the data.

Earlier in the book we introduced basic functional-style programming and filter/map/reduce. Hadoop MapReduce is similar in concept, just on a massively parallel scale. A MapReduce task performs two steps—mapping and reduction. The mapping step, which also may include filtering, processes the original data across the entire cluster and maps it into tuples of key–value pairs. The reduction step then combines those tuples to produce the results of the MapReduce task. The key is how the MapReduce step is performed. Hadoop divides the data into batches that it distributes across the nodes in the cluster—anywhere from a few nodes to a Yahoo! cluster with 40,000 nodes and over 100,000 cores.27 Hadoop also distributes the MapReduce task’s code to the nodes in the cluster and executes the code in parallel on every node. Each node processes only the batch of data stored on that node. The reduction step combines the results from all the nodes to produce the final result. To coordinate this, Hadoop uses YARN (“yet another resource negotiator”) to manage all the resources in the cluster and schedule tasks for execution.

Hadoop Ecosystem

Though Hadoop began with HDFS and MapReduce, followed closely by YARN, it has grown into a large ecosystem that includes Spark (discussed in Sections 17.617.7) and many other Apache projects:28,29,30

  • Ambari (https://ambari.apache.org)—Tools for managing Hadoop clusters.

  • Drill (https://drill.apache.org)—SQL querying of non-relational data in Hadoop and NoSQL databases.

  • Flume (https://flume.apache.org)—A service for collecting and storing (in HDFS and other storage) streaming event data, like high-volume server logs, IoT messages and more.

  • HBase (https://hbase.apache.org)—A NoSQL database for big data with "billions of rows by31 millions of columns—atop clusters of commodity hardware."

  • Hive (https://hive.apache.org)—Uses SQL to interact with data in data warehouses. A data warehouse aggregates data of various types from various sources. Common operations include extracting data, transforming it and loading (known as ETL) into another database, typically so you can analyze it and create reports from it.

  • Impala (https://impala.apache.org)—A database for real-time SQL-based queries across distributed data stored in Hadoop HDFS or HBase.

  • Kafka (https://kafka.apache.org)—Real-time messaging, stream processing and storage, typically to transform and process high-volume streaming data, such as website activity and streaming IoT data.

  • Pig (https://pig.apache.org)—A scripting platform that converts data analysis tasks from a scripting language called Pig Latin into MapReduce tasks.

  • Sqoop (https://sqoop.apache.org)—Tool for moving structured, semi-structured and unstructured data between databases.

  • Storm (https://storm.apache.org)—A real-time stream-processing system for tasks such as data analytics, machine learning, ETL and more.

  • ZooKeeper (https://zookeeper.apache.org)—A service for managing cluster configurations and coordination between clusters.

  • And more.

Hadoop Providers

Numerous cloud vendors provide Hadoop as a service, including Amazon EMR, Google Cloud DataProc, IBM Watson Analytics Engine, Microsoft Azure HDInsight and others. In addition, companies like Cloudera and Hortonworks (which at the time of this writing are merging) offer integrated Hadoop-ecosystem components and tools via the major cloud vendors. They also offer free downloadable environments that you can run on the desktop32 for learning, development and testing before you commit to cloud-based hosting, which can incur significant costs. We introduce MapReduce programming in the example in the following sections by using a Microsoft cloud-based Azure HDInsight cluster, which provides Hadoop as a service.

Hadoop 3

Apache continues to evolve Hadoop. Hadoop 333 was released in December of 2017 with many improvements, including better performance and significantly improved storage efficiency.34

17.5.2 Summarizing Word Lengths in Romeo and Juliet via MapReduce

In the next several subsections, you’ll create a cloud-based, multi-node cluster of computers using Microsoft Azure HDInsight. Then, you’ll use the service’s capabilities to demonstrate Hadoop MapReduce running on that cluster. The MapReduce task you’ll define will determine the length of each word in RomeoAndJuliet.txt (from the “Natural Language Processing” chapter), then summarize how many words of each length there are. After defining the task’s mapping and reduction steps, you’ll submit the task to your HDInsight cluster, and Hadoop will decide how to use the cluster of computers to perform the task.

17.5.3 Creating an Apache Hadoop Cluster in Microsoft Azure HDInsight

Most major cloud vendors have support for Hadoop and Spark computing clusters that you can configure to meet your application’s requirements. Multi-node cloud-based clusters typically are paid services, though most vendors provide free trials or credits so you can try out their services.

We want you to experience the process of setting up clusters and using them to perform tasks. So, in this Hadoop example, you’ll use Microsoft Azure’s HDInsight service to create cloud-based clusters of computers in which to test our examples. Go to

https://azure.microsoft.com/en-us/free 

to sign up for an account. Microsoft requires a credit card for identity verification.

Various services are always free and some you can continue to use for 12 months. For information on these services see:

https://azure.microsoft.com/en-us/free/free-account-faq/

Microsoft also gives you a credit to experiment with their paid services, such as their HDInsight Hadoop and Spark services. Once your credits run out or 30 days pass (whichever comes first), you cannot continue using paid services unless you authorize Microsoft to charge your card.

Because you’ll use your new Azure account’s credit for these examples,35 we’ll discuss how to configure a low-cost cluster that uses less computing resources than Microsoft allocates by default.36 Caution: Once you allocate a cluster, it incurs costs whether you’re using it or not. So, when you complete this case study, be sure to delete your cluster(s) and other resources, so you don’t incur additional charges. For more information, see:

https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-portal

For Azure-related documentation and videos, visit:

Creating an HDInsight Hadoop Cluster

The following link explains how to set up a cluster for Hadoop using the Azure HDInsight service:

https://docs.microsoft.com/en-us/azure/hdinsight/hadoop/apache-hadoop-linux-create-cluster-get-started-portal

While following their Create a Hadoop cluster steps, please note the following:

  • In Step 1, you access the Azure portal by logging into your account at

https://portal.azure.com
  • In Step 2, Data + Analytics is now called Analytics, and the HDInsight icon and icon color have changed from what is shown in the tutorial.

  • In Step 3, you must choose a cluster name that does not already exist. When you enter your cluster name, Microsoft will check whether that name is available and display a message if it is not. You must create a password. For the Resource group, you’ll also need to click Create new and provide a group name. Leave all other settings in this step as is.

  • In Step 5: Under Select a Storage account, click Create new and provide a storage account name containing only lowercase letters and numbers. Like the cluster name, the storage account name must be unique.

When you get to the Cluster summary you’ll see that Microsoft initially configures the cluster as Head (2 x D12 v2), Worker (4 x D4 v2). At the time of this writing, the estimated cost-per-hour for this configuration was $3.11. This setup uses a total of 6 CPU nodes with 40 cores—far more than we need for demonstration purposes.

You can edit this setup to use fewer CPUs and cores, which also saves money. Let’s change the configuration to a four-CPU cluster with 16 cores that uses less powerful computers. In the Cluster summary:

  1. Click Edit to the right of Cluster size.

  2. Change the Number of Worker nodes to 2.

  3. Click Worker node size, then View all, select D3 v2 (this is the minimum CPU size for Hadoop nodes) and click Select.

  4. Click Head node size, then View all, select D3 v2 and click Select.

  5. Click Next and click Next again to return to the Cluster summary. Microsoft will validate the new configuration.

  6. When the Create button is enabled, click it to deploy the cluster.

It takes 20–30 minutes for Microsoft to “spin up” your cluster. During this time, Microsoft is allocating all the resources and software the cluster requires.

After the changes above, our estimated cost for the cluster was $1.18 per hour, based on average use for similarly configured clusters. Our actual charges were less than that. If you encounter any problems configuring your cluster, Microsoft provides HDInsight chat-based support at:

https://azure.microsoft.com/en-us/resources/knowledge-center/technical-chat/

17.5.4 Hadoop Streaming

For languages like Python that are not natively supported in Hadoop, you must use Hadoop streaming to implement your tasks. In Hadoop streaming, the Python scripts that implement the mapping and reduction steps use the standard input stream and standard output stream to communicate with Hadoop. Usually, the standard input stream reads from the keyboard and the standard output stream writes to the command line. However, these can be redirected (as Hadoop does) to read from other sources and write to other destinations. Hadoop uses the streams as follows:

  • Hadoop supplies the input to the mapping script—called the mapper. This script reads its input from the standard input stream.

  • The mapper writes its results to the standard output stream.

  • Hadoop supplies the mapper’s output as the input to the reduction script—called the reducer—which reads from the standard input stream.

  • The reducer writes its results to the standard output stream.

  • Hadoop writes the reducer’s output to the Hadoop file system (HDFS).

The mapper and reducer terminology used above should sound familiar to you from our discussions of functional-style programming and filter, map and reduce in the “Sequences: Lists and Tuples” chapter.

17.5.5 Implementing the Mapper

In this section, you’ll create a mapper script that takes lines of text as input from Hadoop and maps them to key–value pairs in which each key is a word, and its corresponding value is 1. The mapper sees each word individually so, as far as it is concerned, there’s only one of each word. In the next section, the reducer will summarize these keyvalue pairs by key, reducing the counts to a single count for each key. By default, Hadoop expects the mapper’s output and the reducer’s input and output to be in the form of keyvalue pairs separated by a tab.

In the mapper script (length_mapper.py), the notation #! in line 1 tells Hadoop to execute the Python code using python3, rather than the default Python 2 installation. This line must come before all other comments and code in the file. At the time of this writing, Python 2.7.12 and Python 3.5.2 were installed. Note that because the cluster does not have Python 3.6 or higher, you cannot use f-strings in your code.

1 #!/usr/bin/env python3
2 # length_mapper.py
3 """Maps lines of text to key-value pairs of word lengths and 1."""
4 import sys
5
6 def tokenize_input():
7     """Split each line of standard input into a list of strings."""
8     for line in sys.stdin:
9         yield line.split()
10
11 # read each line in the the standard input and for every word
12 # produce a key-value pair containing the word, a tab and 1
13 for line in tokenize_input():
14     for word in line:
15         print(str(len(word)) + '	1')

Generator function tokenize_input (lines 6–9) reads lines of text from the standard input stream and for each returns a list of strings. For this example, we are not removing punctuation or stop words as we did in the “Natural Language Processing” chapter.

When Hadoop executes the script, lines 13–15 iterate through the lists of strings from tokenize_input. For each list (line) and for every string (word) in that list, line 15 outputs a key–value pair with the word’s length as the key, a tab ( ) and the value 1, indicating that there is one word (so far) of that length. Of course, there probably are many words of that length. The MapReduce algorithm’s reduction step will summarize these key–value pairs, reducing all those with the same key to a single key–value pair with the total count.

17.5.6 Implementing the Reducer

In the reducer script (length_reducer.py), function tokenize_input (lines 8–11) is a generator function that reads and splits the keyvalue pairs produced by the mapper. Again, the MapReduce algorithm supplies the standard input. For each line, tokenize_input strips any leading or trailing whitespace (such as the terminating newline) and yields a list containing the key and a value.

1 #!/usr/bin/env python3
2 # length_reducer.py
3 """Counts the number of words with each length."""
4 import sys
5 from itertools import groupby
6 from operator import itemgetter
7
8 def tokenize_input():
9     """Split each line of standard input into a key and a value."""
10    for line in sys.stdin:
11       yield line.strip().split('	')
12
13 # produce key-value pairs of word lengths and counts separated by tabs
14 for word_length, group in groupby(tokenize_input(), itemgetter(0)):
15     try:
16         total = sum(int(count) for word_length, count in group)
17         print(word_length + '	' + str(total))
18     except ValueError:
19         pass # ignore word if its count was not an integer

When the MapReduce algorithm executes this reducer, lines 14–19 use the groupby function from the itertools module to group all word lengths of the same value:

  • The first argument calls tokenize_input to get the lists representing the keyvalue pairs.

  • The second argument indicates that the keyvalue pairs should be grouped based on the element at index 0 in each list—that is the key.

Line 16 totals all the counts for a given key. Line 17 outputs a new keyvalue pair consisting of the word and its total. The MapReduce algorithm takes all the final word-count outputs and writes them to a file in HDFS—the Hadoop file system.

17.5.7 Preparing to Run the MapReduce Example

Next, you’ll upload files to the cluster so you can execute the example. In a Command Prompt, Terminal or shell, change to the folder containing your mapper and reducer scripts and the RomeoAndJuliet.txt file. We assume all three are in this chapter’s ch17 examples folder, so be sure to copy your RomeoAndJuliet.txt file to this folder first.

Copying the Script Files to the HDInsight Hadoop Cluster

Enter the following command to upload the files. Be sure to replace YourClusterName with the cluster name you specified when setting up the Hadoop cluster and press Enter only after you’ve typed the entire command. The colon in the following command is required and indicates that you’ll supply your cluster password when prompted. At that prompt, type the password you specified when setting up the cluster, then press Enter:

scp length_mapper.py length_reducer.py RomeoAndJuliet.txt
     sshuser@YourClusterName-ssh.azurehdinsight.net:

The first time you do this, you’ll be asked for security reasons to confirm that you trust the target host (that is, Microsoft Azure).

Copying RomeoAndJuliet into the Hadoop File System

For Hadoop to read the contents of RomeoAndJuliet.txt and supply the lines of text to your mapper, you must first copy the file into Hadoop’s file system. First, you must use ssh37 to log into your cluster and access its command line. In a Command Prompt, Terminal or shell, execute the following command. Be sure to replace YourClusterName with your cluster name. Again, you’ll be prompted for your cluster password:

ssh sshuser@YourClusterName-ssh.azurehdinsight.net

For this example, we’ll use the following Hadoop command to copy the text file into the already existing folder /examples/data that the cluster provides for use with Microsoft’s Azure Hadoop tutorials. Again, press Enter only when you’ve typed the entire command:

hadoop fs -copyFromLocal RomeoAndJuliet.txt
     /example/data/RomeoAndJuliet.txt

17.5.8 Running the MapReduce Job

Now you can run the MapReduce job for RomeoAndJuliet.txt on your cluster by executing the following command. For your convenience, we provided the text of this command in the file yarn.txt with this example, so you can copy and paste it. We reformatted the command here for readability:

yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar
   -D mapred.output.key.comparator.class=
      org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
   -D mapred.text.key.comparator.options=-n
   -files length_mapper.py,length_reducer.py
   -mapper length_mapper.py
   -reducer length_reducer.py
   -input /example/data/RomeoAndJuliet.txt
   -output /example/wordlengthsoutput

The yarn command invokes the Hadoop’s YARN (“yet another resource negotiator”) tool to manage and coordinate access to the Hadoop resources the MapReduce task uses. The file hadoop-streaming.jar contains the Hadoop streaming utility that allows you to use Python to implement the mapper and reducer. The two -D options set Hadoop properties that enable it to sort the final keyvalue pairs by key (KeyFieldBasedComparator) in descending order numerically (-n; the minus indicates descending order) rather than alphabetically. The other command-line arguments are:

  • -files—A comma-separated list of file names. Hadoop copies these files to every node in the cluster so they can be executed locally on each node.

  • -mapper—The name of the mapper’s script file.

  • -reducer—The name of the reducer’s script file

  • -input—The file or directory of files to supply as input to the mapper.

  • -output—The HDFS directory in which the output will be written. If this folder already exists, an error will occur.

The following output shows some of the feedback that Hadoop produces as the MapReduce job executes. We replaced chunks of the output with … to save space and bolded several lines of interest including:

  • The total number of “input paths to process”—the 1 source of input in this example is the RomeoAndJuliet.txt file.

  • The “number of splits”—2 in this example, based on the number of worker nodes in our cluster.

  • The percentage completion information.

  • File System Counters, which include the numbers of bytes read and written.

  • Job Counters, which show the number of mapping and reduction tasks used and various timing information.

  • Map-Reduce Framework, which shows various information about the steps performed.

packageJobJar: [] [/usr/hdp/2.6.5.3004-13/hadoop-
mapreduce/hadoopstreaming-2.7.3.2.6.5.3004-13.jar] /tmp/streamjob2764990629848702405.jar
tmpDir=null
...
18/12/05 16:46:25 INFO mapred.FileInputFormat: Total input paths to
process : 1
18/12/05 16:46:26 INFO mapreduce.JobSubmitter: number of splits:2
...
18/12/05 16:46:26 INFO mapreduce.Job: The url to track the job: http://
hn0-paulte.y3nghy5db2kehav5m0opqrjxcb.cx.internal.cloudapp.net:8088
/proxy/application_1543953844228_0025/
...
18/12/05 16:46:35 INFO mapreduce.Job: map 0% reduce 0%
18/12/05 16:46:43 INFO mapreduce.Job: map 50% reduce 0%
18/12/05 16:46:44 INFO mapreduce.Job: map 100% reduce 0%
18/12/05 16:46:48 INFO mapreduce.Job: map 100% reduce 100%
18/12/05 16:46:50 INFO mapreduce.Job: Job job_1543953844228_0025
completed successfully
18/12/05 16:46:50 INFO mapreduce.Job: Counters: 49
         File System Counters
              FILE: Number of bytes read=156411
              FILE: Number of bytes written=813764
...
         Job Counters
              Launched map tasks=2
              Launched reduce tasks=1
...
         Map-Reduce Framework
              Map input records=5260
              Map output records=25956
              Map output bytes=104493
              Map output materialized bytes=156417
              Input split bytes=346
              Combine input records=0
              Combine output records=0
              Reduce input groups=19
              Reduce shuffle bytes=156417
              Reduce input records=25956
              Reduce output records=19
              Spilled Records=51912
              Shuffled Maps =2
              Failed Shuffles=0
              Merged Map outputs=2
              GC time elapsed (ms)=193
              CPU time spent (ms)=4440
              Physical memory (bytes) snapshot=1942798336
              Virtual memory (bytes) snapshot=8463282176
              Total committed heap usage (bytes)=3177185280
...
18/12/05 16:46:50 INFO streaming.StreamJob: Output directory: /example/
wordlengthsoutput

Viewing the Word Counts

Hadoop MapReduce saves its output into HDFS, so to see the actual word counts you must look at the file in HDFS within the cluster by executing the following command:

hdfs dfs -text /example/wordlengthsoutput/part-00000

Here are the results of the preceding command:

18/12/05 16:47:19 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
18/12/05 16:47:19 INFO lzo.LzoCodec: Successfully loaded & initialized
native-lzo library [hadoop-lzo rev
b5efb3e531bc1558201462b8ab15bb412ffa6b89]
1        1140
2        3869
3        4699
4        5651
5        3668
6        2719
7        1624
8        1062
9        855
10       317
11       189
12       95
13       35
14       13
15       9
16       6
17       3
18       1
23       1

Deleting Your Cluster So You Do Not Incur Charges

Caution: Be sure to delete your cluster(s) and associated resources (like storage) so you don’t incur additional charges. In the Azure portal, click All resources to see your list of resources, which will include the cluster you set up and the storage account you set up. Both can incur charges if you do not delete them. Select each resource and click the Delete button to remove it. You’ll be asked to confirm by typing yes. For more information, see:

https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-portal

Self Check for Section 17.5

  1. (Fill-In) Hadoop’s key components are       for storing massive amounts of data throughout a cluster and       for implementing the tasks that process the data.
    Answer: HDFS (Hadoop Distributed File System), MapReduce.

  2. (Fill-In) For learning, development and testing before you commit to cloud-based services, the vendors       and       offer free downloadable environments with integrated Hadoop ecosystem components.
    Answer: Cloudera, Hortonworks.

  3. (Fill-In) The       command launches a MapReduce task.
    Answer: yarn.

  4. (Fill-In) To implement MapReduce tasks in languages like Python that are not natively supported, you must use Hadoop       in which the mapper and reducer communicated with Hadoop via the      .
    Answer: streaming, standard input and standard output streams.

  5. (True/False) Hadoop MapReduce does not place requirements on the format of the mapper’s output and the reducer’s input and output.
    Answer: False. MapReduce expects the mapper’s output and the reducer’s input and output to be in the form of key–value pairs in which each key and value are separated by a tab.

  6. (True/False) Hadoop MapReduce keeps the task’s final output in main memory for easy access.
    Answer: False. In big-data processing, the results typically would not fit in main memory, so Hadoop MapReduce writes its final output to HDFS. To access the results, you must read them from the HDFS folder in which the output was written.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.12.155.100