Chapter 2. Getting Started with Apache Hadoop and Apache Spark

In this chapter, we will understand the basics of Hadoop and Spark, how Spark is different from MapReduce, and get started with the installation of clusters and setting up the tools needed for analytics.

This chapter is divided into the following subtopics:

  • Introducing Apache Hadoop
  • Introducing Apache Spark
  • Discussing why we use Hadoop with Spark
  • Installing Hadoop and Spark clusters

Introducing Apache Hadoop

Apache Hadoop is a software framework that enables distributed processing on large clusters with thousands of nodes and petabytes of data. Apache Hadoop clusters can be built using commodity hardware where failure rates are generally high. Hadoop is designed to handle these failures gracefully without user intervention. Also, Hadoop uses the move computation to the data approach, thereby avoiding significant network I/O. Users will be able to develop parallel applications quickly, focusing on business logic rather than doing the heavy lifting of distributing data, distributing code for parallel processing, and handling failures.

Apache Hadoop has mainly four projects: Hadoop Common, Hadoop Distributed File System (HDFS), Yet Another Resource Negotiator (YARN), and MapReduce.

In simple words, HDFS is used to store data, MapReduce is used to process data, and YARN is used to manage the resources (CPU and memory) of the cluster and common utilities that support Hadoop. Apache Hadoop integrates with many other projects, such as Avro, Hive, Pig, HBase, Zookeeper, and Apache Spark.

Hadoop mainly brings the following three components to the table:

  • A framework for reliable distributed data storage: HDFS
  • Multiple frameworks for parallel processing of data: MapReduce, Crunch, Cascading, Hive, Tez, Impala, Pig, Mahout, Spark, and Giraph
  • A framework for cluster resource management: YARN and Slider

Let's take a look at Hadoop's adoption drivers with respect to the economy, business, and technical areas:

  • Economy: Low cost per terabyte processing when compared to commercial solutions. This is because of its open source software and commodity hardware.
  • Business: The ability to store and process all the data on a massive scale provides higher business value.
  • Technical: The ability to store and process any Variety, Volume, Velocity, and Veracity (all four Vs) of Big Data.

The following list provides the typical characteristics of Hadoop:

  • Commodity: Hadoop can be installed using commodity hardware on-premise or on any cloud provider.
  • Robust: It can handle hardware failures at the software layer without user intervention and process failures gracefully without user intervention.
  • Scalable: It can commission new nodes to scale out in order to increase the capacity of the cluster.
  • Simple: Developers can focus on business logic only, and not on scalability, fault tolerance, and multithreading.
  • Data locality: The data size is up to petabytes whereas code size is up to kilobytes. Moving code to the node where data blocks reside provides great reduction in network I/O.

Hadoop Distributed File System

HDFS is a distributed filesystem that provides high scalability and reliability on large clusters of commodity hardware.

HDFS files are divided into large blocks that are typically 128 MB in size and distributed across the cluster. Each block is replicated (typically three times) to handle hardware failures and block placement exposed by NameNode so that computation can be moved to data with the MapReduce framework, as illustrated in Figure 2.1:

Hadoop Distributed File System

Figure 2.1: HDFS architecture

In the preceding image, when storing File1, it's divided into a single block (B1) as its size (100 MB) is less than the default block size (128 MB) and replicated on Node 1, Node 2, and Node 3. Block1 (B1) is replicated on the first node (Node 1) and then Node 1 replicates on Node 2 and Node 2 replicates on Node 3. File2 is divided into two blocks as its size (150 MB) is greater than the block size, and block2 (B2) and block3 (B3) are replicated on three nodes (B2 on Node 1, Node 3, and Node 4 and B3 on Node 1, Node 2, and Node 3). Blocks' metadata (file name, blocks, location, date created, and size) is stored in NameNode, as shown in the preceding image. HDFS has a bigger block size to reduce the number of disk seeks needed to read the complete file.

The creation of a file seems like a single file to the user. However, it is stored as blocks on DataNodes and metadata is stored in NameNode. If we lose the NameNode for any reason, blocks stored on DataNodes become useless as there is no way to identify the blocks belonging to the file names. So, creating NameNode high availability and metadata backups is very important in any Hadoop cluster.

Features of HDFS

HDFS is becoming a standard enterprise Big Data storage system because of the unlimited scalability and yet provides most features needed for enterprise-grade Big Data applications. The following table explains the important features of HDFS:

Feature

Description

High availability

Enabling high availability is done by creating a standby NameNode.

Data integrity

When blocks are stored on HDFS, computed checksums are stored on the DataNodes as well. Data is verified against the checksum.

HDFS ACLs

HDFS implements POSIX-style permissions that enable an owner and group for every file with read, write, and execute permissions. In addition to POSIX permissions, HDFS supports POSIX Access Control Lists (ACLs) to provide access for specific named users or groups.

Snapshots

HDFS Snapshots are read-only point-in-time copies of the HDFS filesystem, which are useful to protect datasets from user or application errors.

HDFS rebalancing

The HDFS rebalancing feature will rebalance the data uniformly across all DataNodes in the cluster.

Caching

Caching of blocks on DataNodes is used for high performance. DataNodes cache the blocks in an off-heap cache.

APIs

HDFS provides a native Java API, Pipes API for C++, and Streaming API for scripting languages such as Python, Perl, and others. FileSystem Shell and web browsers can be used to access data as well. Also, WebHDFS and HttpFs can be used to access data over HTTP.

Data encryption

HDFS will encrypt the data at rest once enabled. Data encryption and decryption happens automatically without any changes to application code.

Kerberos authentication

When Kerberos is enabled, every service in the Hadoop cluster being accessed will have to be authenticated using the Kerberos principle. This provides tight security to Hadoop clusters.

NFS access

Using this feature, HDFS can be mounted as part of the local filesystem, and users can browse, download, upload, and append data to it.

Metrics

Hadoop exposes many metrics that are useful in troubleshooting. Java Management Extensions (JMX) metrics can be viewed from a web UI or the command line.

Rack awareness

Hadoop clusters can be enabled with rack awareness. Once enabled, HDFS block placement will be done as per the rack awareness script, which provides better fault tolerance.

Storage policies

Storage policies are introduced in order to allow files to be stored in different storage types according to the storage policy (Hot, Cold, Warm, All_SSD, One_SSD, or Lazy_Persist).

WORM

Write Once and Read Many (WORM) times is a feature of HDFS that does not allow updating or deleting records in place. However, records can be appended to the files.

MapReduce

MapReduce (MR) is a framework to write analytical applications in batch mode on terabytes or petabytes of data stored on HDFS. An MR job usually processes each block (excluding replicas) of input file(s) in HDFS with the mapper tasks in a parallel manner. The MR framework sorts and shuffles the outputs of the mappers to the reduce tasks in order to produce the output. The framework takes care of computing the number of tasks needed, scheduling tasks, monitoring them, and re-executing them if they fail. The developer needs to focus only on writing the business logic, and all the heavy lifting is done by the HDFS and MR frameworks.

For example, in Figure 2.1, if an MR job is submitted for File1, one map task will be created and run on any Node 1, 2, or 3 to achieve data locality. In the case of File2, two map tasks will be created with map task 1 running on Node 1, 3, or 4, and map task 2 running on Node 1, 2, or 3, depending on resource availability. The output of the mappers will be sorted and shuffled to reducer tasks. By default, the number of reducers is one. However, the number of reducer tasks can be increased to provide parallelism at the reducer level.

MapReduce features

MR provides you with excellent features to build Big Data applications. The following table describes MR's key features and techniques used, such as sorting and joining:

Feature/techniques

Description

Data locality

MR moves the computation to the data. It ships the programs to the nodes where HDFS blocks reside. This reduces the network I/O significantly.

APIs

Native Java API.

Pipes: C++ API.

Streaming: Any shell scripting such as Python and Perl.

Distributed cache

A distributed cache is used to cache files such as archives, jars, or any files that are needed by applications at runtime.

Combiner

The combiner feature is used to reduce the network traffic, or, in other words, reduce the amount of data sent from mappers to reducers over the network.

Custom partitioner

This controls which reducer each intermediate key and its associated values go to. A custom partitioner can be used to override the default hash partitioner.

Sorting

Sorting is done in the sort and shuffle phase, but there are different ways to achieve and control sorting—total sort, partial sort, and secondary sort.

Joining

Joining two massive datasets with the joining process is easy. If the join is performed by the mapper tasks, it is called a map-side join. If the join is performed by the reducer task, it is called a reduce-side join. Map-side joins are always preferred because it avoids sending a lot of data over the network for reducers to join.

Counters

The MR framework provides built-in counters that give an insight in to how the MR job is performing. It allows the user to define a set of counters in the code, which are then incremented as desired in the mapper or reducer.

MapReduce v1 versus MapReduce v2

Apache Hadoop's MapReduce has been a core processing engine that supports the distributed processing of large-scale data workloads. MR has undergone a complete refurbishment in the Hadoop 0.23 version and now it's called MapReduce 2.0 (MR v2) or YARN.

MapReduce v1, which is also called Classic MapReduce, has three main components:

  • An API to develop MR-based applications
  • A framework to execute mappers, shuffle the data, and execute reducers
  • A resource management framework to schedule and monitor resources

MapReduce v2, which is also called NextGen, moves resource management to YARN, as shown in Figure 2.2:

MapReduce v1 versus MapReduce v2

Figure 2.2: MapReduce v1 to MapReduce v2

MapReduce v1 challenges

MapReduce v1 had three challenges:

  • Inflexible CPU slots configured on a cluster for Map and Reduce led to the underutilization of the cluster
  • Resources could not be shared with non-MR applications (for example, Impala or Spark)
  • Limited scalability, only up to 4,000 nodes

The following table shows you the differences between v1 and v2:

 

MR v1

MR v2

Components used

Job tracker as master and task tracker as slave

Resource manager as master and node manager as slave

Resource allocation

DataNodes are configured to run a fixed number of map tasks and reduce tasks

Containers are allocated as needed for any type of task

Resource management

One job tracker per cluster, which supports up to 4,000 nodes

One resource manager per cluster, which supports up to tens of thousands of nodes

Types of jobs

MR jobs only

Supports MR and other frameworks such as Spark, Impala, and Giraph

YARN

YARN is the resource management framework that enables an enterprise to process data in multiple ways simultaneously for batch processing, interactive analytics, or real-time analytics on shared datasets. While HDFS provides scalable, fault-tolerant, and cost-efficient storage for Big Data, YARN provides resource management to clusters. Figure 2.3 shows you how multiple frameworks are typically run on top of HDFS and YARN frameworks in Hadoop 2.0. YARN is like an operating system for Hadoop, which manages the cluster resources (CPU and Memory) efficiently. Applications such as MapReduce, Spark, and others request YARN to allocate resources for their tasks. YARN allocates containers on nodes with the requested amount of RAM and virtual CPU from the total available on that node:

YARN

Figure 2.3: Hadoop 1.0 and 2.0 frameworks

YARN's original purpose was to split up the two major responsibilities of the JobTracker/TaskTracker (which are part of MapReduce v1) into separate entities:

  • ResourceManager
  • A per-application ApplicationMaster
  • A per-node slave NodeManager
  • A per-application container running on NodeManager

ResourceManager keeps track of the resource availability of the entire cluster and provides resources to applications when requested by ApplicationMaster.

ApplicationMaster negotiates the resources needed by the application to run their tasks. ApplicationMaster also tracks and monitors the progress of the application. Note that this monitoring functionality was handled by TaskTrackers and JobTrackers in MR v1, which led to overloading the JobTracker.

NodeManager is responsible for launching containers provided by ResourceManager, monitoring the resource usage on the slave nodes, and reporting to ResourceManager.

The application container is responsible for running the tasks of the application. YARN also has pluggable schedulers (Fair Scheduler and Capacity Scheduler) to control the resource assignments to different applications. Detailed steps of the YARN application life cycle are shown in Figure 2.4 with two resource requests by an application:

YARN

Figure 2.4: The YARN application life cycle

The following is our interpretation of the preceding figure:

  • The client submits the MR or Spark job
  • The YARN ResourceManager creates an ApplicationMaster on one NodeManager
  • The ApplicationMaster negotiates the resources with the ResourceManager
  • The ResourceManager provides resources, the NodeManager creates the containers, and the ApplicationMaster launches tasks (Map, Reduce, or Spark tasks) in the containers
  • Once the tasks are finished, the containers and the ApplicationMaster will be terminated

Let's summarize the preceding points concerning YARN:

  • MapReduce v2 is based on YARN:
    • YARN replaced the JobTracker/TaskTracker architecture of MR v1 with the ResourceManager and NodeManager
    • The ResourceManager takes care of scheduling and resource allocation
    • The ApplicationMaster schedules tasks in containers and monitors the tasks
  • Why YARN?
    • Better scalability
    • Efficient resource management
    • Flexibility to run multiple frameworks
  • Views from the user's perspective:
    • No significant changes—the same API, CLI, and web UIs.
    • Backward-compatible with MR v1 without any changes

Storage options on Hadoop

XML and JSON files are well-accepted industry standard formats. So, why can't we just use XML or JSON files on Hadoop? There are many disadvantages of XML and JSON, including the following:

  • Larger size of the data because of storing schema along with the data
  • Does not support schema evolution
  • Files cannot be split on Hadoop when compressed
  • Not efficient when transferring the data over network

When storing data and building applications on Hadoop, some fundamental questions arises: What storage format is useful for my application? What compression codec is optimum for my application?

Hadoop provides you with a variety of file formats built for different use cases. Choosing the right file format and compression codec provides optimum performance for the use case that you are working on. Let's go through the file formats and understand when to use them.

File formats

File formats are divided into two categories. Hadoop can store all the data regardless of what format the data is stored in. Data can be stored in its raw form using the standard file format or the special Hadoop container file format that offers benefits in specific use case scenarios, which can be split even when data is compressed. Broadly, there are two types of file formats: Standard file formats and Hadoop file formats:

  • Standard file formats:
    • Structured text data: CSV, TSV, XML, and JSON files
    • Unstructured text data: Log files and documents
    • Unstructured binary data: Images, videos, and audio files
  • Hadoop file formats:

    Provides splittable compression

    • File-based structures:

      Sequence file

    • Serialization format:

      Thrift

      Protocol buffers

      Avro

    • Columnar formats:

      RCFile

      ORCFile

      Parquet

    Let's go through the Hadoop file format features and use cases in which they can be used.

Sequence file

Sequence files store data as binary key-value pairs. It supports the Java language only and does not support schema evolution. It supports the splitting of files even when the data is compressed.

Let's see a use case for the sequence file:

  • Small files problem: On an average, each file occupies 600 bytes of space in memory. One million files of 100 KB need 572 MB of main memory on the NameNode. Additionally, the MR job will create one million mappers.
  • Solution: Create a sequence file with the key as the filename and value as the content of the file, as shown in the following table. Only 600 bytes of memory space is needed in NameNode and an MR job will create 762 mappers with 128 MB block size:

    Key

    Value

    Key

    Value

    Key

    Value

    File1.txt

    File.txt content

    File2.txt

    File2.txt content

    FileN.txt

    FileN.txt content

Protocol buffers and thrift

Protocol buffers were developed by Google and open sourced in 2008. Thrift was developed at Facebook and offers more features and language support than protocol buffers. Both of these are serialization frameworks that offer high performance while sending over the network. Avro is a specialized serialization format that is designed for Hadoop.

A generic usage pattern for protocol buffers and thrift is as follows:

  • Use Avro on Hadoop-specific formats and use protocol buffers and thrift for non-Hadoop projects.

Avro

Avro is a row-based data serialization system used for storage and sends data over the network efficiently. Avro provides the following benefits:

  • Rich data structures
  • Compact and fast binary data format
  • Simple integration with any language
  • Support for evolving schemas
  • Great interoperability between Hive, Tez, Impala, Pig, and Spark

A use case for Avro is as follows:

  • Data warehouse offloading to Hadoop: Data is offloaded to Hadoop where Extract, Transform, and Load (ETL) tasks are performed. The schema changes frequently.
  • Solution: Sqoop imports data as Avro files that supports schema evolution, less storage space, and faster ETL tasks.

Parquet

Parquet is a columnar format that skips I/O and decompression (if applicable) on columns that are not part of the query. It is generally very efficient in terms of compression on columns because column data is similar within the same column than it is in a block of rows.

A use case for Parquet is as follows:

  • BI access on Hadoop: Data marts created on Hadoop are accessed by users using Business Intelligence (BI) tools such as Tableau. User queries always need a few columns only. Query performance is poor.
  • Solution: Store data in Parquet, which is a columnar format and provides high performance for BI queries.

RCFile and ORCFile

Record Columnar File (RCFile) was the first columnar format for Hive that provided efficient query processing. Optimized Row Columnar (ORC) format was introduced in Hive 0.11 and offered better compressions and efficiency than the RCFile format. ORCFile has lightweight indexing that enables the skipping of irrelevant columns.

A use case for ORC and Parquet files is as follows:

  • Both ORC files and Parquet files are columnar formats and skip columns and rows (predicate pushdown) while reading data. Choose ORC or Parquet, depending on the application and integration requirements with other components of the project. A common use case for ORC will be same as the Parquet use case described earlier, exposing data to end users with BI tools.

Compression formats

A variety of compression formats are available for Hadoop storage. However, if Hadoop storage is cheap, then why do I need to compress my data? The following list answers your question:

  • Compressed data can speed up I/O operations
  • Compressed data saves storage space
  • Compressed data speeds up data transfer over the network

Compression and decompression increases CPU time. Understanding these trade-offs is very important in providing optimum performance of jobs running on Hadoop.

Standard compression formats

The following table shows you the standard compression formats available on the Hadoop platform:

Compression format

Tool

Algorithm

File extension

Splittable?

gzip

Gzip

DEFLATE

.gz

No

bzip2

bizp2

bzip2

.bz2

Yes

LZO

Lzop

LZO

.lzo

Yes, if indexed

Snappy

N/A

Snappy

.snappy

No

Recommended usage patterns for compression are as follows:

  • For storage only: Use gzip (high compression ratio)
  • For ETL tasks: Use Snappy (optimum compression ratio and speed)

Tip

In general, always compress data on Hadoop for better performance. Choosing the right compression codec depends on the trade-off between the compression ratio versus speed.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.139.169