As mentioned earlier, there are several top-level Apache open source projects that are part of CDH. Let's discuss these components in detail.
CDH comes with Apache Hadoop, a system that we have already been introduced to, for high-volume storage and computing. The subcomponents that are part of Hadoop are HDFS, Fuse-DFS, MapReduce, and MapReduce 2 (YARN). Fuse-DFS is a module that helps to mount HDFS to the user space. Once mounted, HDFS will be accessible like any other traditional filesystem.
Apache Flume NG Version 1.x is a distributed framework that handles the collection and aggregation of large amounts of log data. This project was primarily built to handle streaming data. Flume is robust, reliable, and fault tolerant. Though Flume was built to handle the streaming of log data, its flexibility when handling multiple data sources makes it easy to configure it to handle event data. Flume can handle almost any kind of data. Flume performs the operations of collection and aggregation using agents. An agent is comprised of a source, a channel, and a sink.
Events such as the streaming of log files are fed to the source. There are different types of Flume sources, which can consume different types of data. After receiving the events, the sources store the data in channels. A channel is a queue that contains all the data received from a source. The data is retained in the channel until it is consumed by the sink. The sink is responsible for taking data from channels and placing it on an external store such as HDFS.
The following diagram shows the flow of event/log data to HDFS via the agent:
In the preceding diagram, we see a simple data flow where events or logs are provided as an input to a Flume agent. The source, which is a subcomponent of the agent, forwards the data to one or more channels. The data from the channel is then taken by the sink and finally pushed to HDFS. It is important to note that the source and sink of an agent work asynchronously. The rate at which the data is pushed to the channel and the rate at which the sink pulls the data from the channel are configured to handle spikes that occur with the event/log data.
Using Flume, you can configure more complex data flows where the sink from one agent could be an input to the source of another agent. Such flows are referred to as multi-hop flows.
The following diagram shows the flow of event/log data to HDFS via multiple agents:
As an administrator, you will appreciate the flexibility of Flume because in many cases, it will be the administrator who recommends solutions to collect and aggregate data to HDFS in a Hadoop cluster.
While analyzing data, data analysts often have to gather data from different sources such as external relational databases and bring it into HDFS for processing. Also, after processing data in Hadoop, analysts may also send the data from HDFS back to some external relational data stores. Apache Sqoop is just the tool for such requirements. Sqoop is used to transfer data between HDFS and relational database systems such as MySQL and Oracle.
Sqoop expects the external database to define the schema for the imports to HDFS. Here, the schema refers to metadata or the structure of the data. The importation and exportation of data in Sqoop is done using MapReduce, thereby leveraging the robust features of MapReduce to perform its operations.
When importing data from an external relational database, Sqoop takes the table as an input, reads the table row by row, and generates output files that are placed in HDFS. The Sqoop import runs in a parallel model (MapReduce), generating several output files for a single input table.
The following diagram shows the two-way flow of data from RDBMS to HDFS and vice versa:
Once the data is in HDFS, analysts process this data, which generates subsequent output files. These results, if required, can be exported to an external relational database system using Sqoop. Sqoop reads delimited files from HDFS, constructs database records, and inserts them into the external table.
Sqoop is a highly configurable tool where you can define the columns that need to be imported/exported to and from HDFS. All operations in Sqoop are done using the command-line interface. Sqoop 2, a newer version of Sqoop, now provides an additional web user interface to perform the importations and exportations.
Sqoop is a client-side application whereas the new Sqoop 2 is a server-side (Sqoop server) application. The Sqoop 2 server also provides a REST API for other applications to easily talk to Sqoop 2.
Hadoop is a powerful framework. The processing of data in Hadoop is achieved by MapReduce, which is a Java-based framework. All MapReduce applications are written in Java. To make it easier for non-Java programmers to work with Hadoop, Yahoo! developed a platform known as Pig.
Pig, a top-level Apache project, provides a simple high-level scripting language called Pig Latin, which allows users to write intuitive scripts to process data stored in HDFS.
Internally, Pig Latin is converted to several MapReduce jobs to process the data in HDFS. Pig is an abstraction over MapReduce.
Just like Pig, Hive is an abstraction over MapReduce. However, the Hive interface is more similar to SQL. This helps SQL-conversant users work with Hadoop. Hive provides a mechanism to define a structure of the data stored in HDFS and queries it just like a relational database. The query language for Hive is called HiveQL.
Hive provides a very handy way to plug in custom mappers and reducers written in MapReduce to perform advanced data processing.
Hive usually runs on the client-side machine. Internally, it interacts directly with the jobtracker daemon on the Hadoop cluster to create MapReduce jobs based on the HiveQL statement provided via the Hive command-line interface. Hive maintains a metastore where it stores all table schemas for the required files stored in HDFS. This metastore is often a relational database system like MySQL.
The following diagram shows the high-level workings of Apache Hive:
The Hive command-line interface uses the schema available on the metastore along with the query provided, to compute the number of MapReduce jobs that need to be executed on the cluster. Once all the jobs are executed, the output (based on the query) is either displayed onto the client's terminal or is represented as an output table in Hive. The table is nothing but a schema (structure) for the output files generated by the internal MapReduce jobs that were spawned for the provided HiveQL.
Building a distributed application requires the management of several nodes and processes working together at the same time. Synchronization and coordination of the nodes is the primary responsibility of any distributed application. As this is a common requirement for many distributed applications, having a common framework to achieve this has been the primary focus of the open source community in the distributed computing space.
Apache ZooKeeper is a distributed coordination service. It is a framework that can be used to build distributed applications by providing a set of services such as a name service, locking, synchronization, configuration management, and leader election services. These services are explained as follows:
ZooKeeper maintains all its data in a hierarchical structure, just like a traditional filesystem. Each data register (a unit of storage of information) in ZooKeeper is called a znode (ZooKeeper node).
A typical ZooKeeper service comprises a set of servers that are used for the replication of information. These multiple servers (ensemble) allow ZooKeeper to be highly available, making ZooKeeper itself a distributed application. A client to a ZooKeeper service is the nodes in a cluster. All ZooKeeper information runs in the memory, making it really fast. A copy of the in-memory representation is also maintained on the disk of the server.
The following diagram shows the high-level workings of the ZooKeeper service:
In the preceding diagram, you see a ZooKeeper service with five servers. There is one server that is a leader and four others that are followers. Each client (in a Hadoop cluster, each node in the cluster is a client) connects to exactly one server in the ensemble to read information. The leader is responsible for performing write operations in ZooKeeper. All servers need to know about the other servers in the ensemble.
Once the leader updates the znode with the write operation, the information is propagated to the followers. If the leader server fails, one of the followers becomes a leader and the rest remain followers.
The concept of ZooKeeper will be clearer when we see how Apache Hadoop uses ZooKeeper for namenode high availability. This will be covered in Chapter 4, Exploring HDFS Federation and Its High Availability.
HBase is the Hadoop database. HBase provides fast, random read-write access to a large volume of data. HBase leverages the Hadoop cluster to store large tables that have millions to billions of rows with millions of columns.
HBase is a column-oriented NoSQL data store and was designed based on Google's BigTable implementation. HBase is built on top of HDFS.
Tables in HBase are made of rows and columns. The intersection of a row and column is called a cell. The cell in HBase is versioned by applying a timestamp (by default) of when the data was inserted. The row acts as a key for the table, and any access operations on the table are done using the row key.
The following diagram shows the workings of the HBase service:
As shown in the preceding diagram, an HBase implementation consists of an HBase master node, which is responsible for managing different RegionServers. When a table in HBase grows in size, it is divided into different regions and is spread across the different nodes of the cluster. Each node that hosts regions is known as RegionServer. HBase relies on ZooKeeper to manage the state of the cluster. All important configuration information is stored on the ZooKeeper ensemble. All data for HBase is usually stored in HDFS.
As an administrator, it is very important to know the different components of an HBase cluster as it helps with faster troubleshooting.
Organizations that run Hadoop usually set up their hardware infrastructure in-house. However, cloud infrastructure providers such as Amazon and Rackspace allow users to set up a complete cluster in the cloud. Apache Whirr provides the user with a set of libraries/scripts that help users set up and manage a Hadoop cluster on the cloud. As an administrator, you may be tasked with the responsibility of setting up a Hadoop cluster on infrastructure provided by a cloud service provider such as Amazon. If you are given this task, Apache Whirr is the tool that you should be using.
In Chapter 2, HDFS and MapReduce, we discussed the MapReduce flow in detail. If you recollect, the map phase generates intermediate output files, which are then transferred to reducers for the reduce phase. The output files generated by a map phase can be compressed. The compression allows the intermediate files to be written and read faster. Snappy is a compression/decompression library developed by Google and can be applied to perform the compressions of these output files. Snappy is known for its speed of compression, which in turn improves the speed of the overall operations.
The two properties shown in the following code need to be set in the mapred-site.xml
file to enable snappy compression during the MapReduce operations:
<property> <name>mapred.compress.map.output</name> <value>true</value> </property> <property> <name>mapred.map.output.compression.codec</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> </property>
Data analysts often apply a few standard, well-established algorithms on their data to generate useful information. When the volumes of data are large like the ones that are available on a Hadoop cluster, they need to be expressed as MapReduce programs. Apache Mahout is a collection of algorithms related to collaborative filtering, clustering, and classification of data. Most of these algorithms have been implemented in MapReduce and are readily available at the disposal of the analysts for their data analysis and processing.
During the processing of data in a distributed manner, several objects are built and transferred between the nodes of a cluster. These objects are transferred using the process of serialization. Serialization is the process of transforming an object in the memory to a stream of bytes. This stream of bytes is then transferred over the wire to the destination node. The destination node reads the stream of bytes and reconstructs the object. This reconstruction is called deserialization. Another use of a serialized object is to write it to a persistent store (file). Apache Avro is a serialization-deserialization framework used in Apache Hadoop. In Hadoop, Avro is used for interprocess communication between the different nodes in a cluster.
When dealing with big data processing, the task of processing is broken down into several jobs. These jobs need to be executed in a specific sequence to achieve the desired output. Executing these jobs manually would be very tedious. The coordination and scheduling of jobs is called a workflow. Apache Oozie is a data workflow management system for Apache Hadoop. Different types of jobs such as MapReduce, Hive, Pig, Sqoop, or custom jobs such as Java programs can be scheduled and coordinated using Oozie.
An Oozie workflow consists of action nodes and control nodes. An action node is a node that executes a specific process, for example, a MapReduce job. Control nodes are nodes that help in controlling the workflow, for example, the start node, end node, and fail node.
The configuration of Oozie workflows is done using Hadoop Process Definition Language (hPDL). hPDL is an XML-based definition language.
The following diagram shows a sample Oozie workflow:
Cloudera Search is a search engine for Hadoop. It is a full-text search engine built using Apache Solr, an open source enterprise class search server. The other important components of Cloudera Search are Apache Lucene, Apache SolrCloud, Apache Flume, and Apache Tika. Cloudera Search indexes files stored in HDFS and HBase, making ad hoc analysis on Hadoop super fast.
Cloudera Impala allows users to query data stored in HDFS at real-time speeds. It uses SQL-like query commands similar to that in Apache Hive to query data. Unlike Hive, which is used for long running batch queries, Impala is used for quick data processing and analytics, and also does not create MapReduce jobs to execute queries.
The objective of Cloudera Hue is to make Hadoop more useable. Hue achieves this by eliminating the need to use the command line to operate Hadoop. Hue provides a beautiful web interface with access to all the common tools used for big data processing. Hue is open source.
The following screenshot shows the Cloudera Hue home screen:
The Hue home is divided into three sections—Query, Hadoop, and Workflow. The Query section lists all the tools that could be used to process data stored in the cluster. The Hadoop section lists all the administrative tools that deal with the stored data. The Workflow section deals with Oozie-related tasks. The links on the three sections can also be accessed using the fixed toolbar on the top of the page.
Beeswax is the Hive UI application that enables users to write HiveQL queries using a web UI. Beeswax allows users to create Hive tables, load data, and execute Hive queries.
The following screenshot shows the Beeswax Hive UI screen:
The Hive UI is divided into the following five different tabs:
Hue provides a very simple interface to construct and execute Cloudera Impala queries. The following screenshot shows the Cloudera Impala screen:
The File Browser application displays all the files stored in the cluster (HDFS). Users can perform basic file operations such as upload, download, rename, move, copy, and delete. This interface is very handy to quickly browse HDFS. The following screenshot shows the File Browser application's screen:
The Metastore Manager application is used to perform the following actions:
The following screenshot shows the Metastore Manager application's screen:
The Sqoop Jobs screen provides a very intuitive interface to build Sqoop jobs. The New job link on the top-right corner brings up a simple three-step process screen to build Sqoop jobs.
The Job Browser screen lists all the jobs that have been executed on the cluster. The list can be filtered on the different status flags: SUCCEEDED, RUNNING, FAILED, and KILLED. The ID column is a hyperlink, which when clicked, will show more details of that specific job. Details of the job, such as the status, the percentage completions of the maps and reduces, and the duration of the task are also visible. Such information is very useful to monitor jobs submitted to the cluster.
The Job Designs page allows users to configure different types of jobs such as the MapReduce, Sqoop, Pig, and Hive jobs. Once the jobs are configured, they can be submitted to the cluster. After submission, the status of the jobs can be monitored from the Job Browser section.
The Oozie Editor/Dashboard is divided into the following four tabs:
The Workflows dashboard section displays the running and completed workflows that were submitted to the cluster. The Coordinators dashboard section displays the list of running and completed coordinated jobs that were submitted to the cluster. The Oozie coordinator allows users to configure interdependent workflow jobs. The Bundle dashboard section lists all the running and completed bundles that were submitted to the cluster. The Oozie section displays the status and configuration parameters of the Oozie workflow system.
The Collection Manager screen provides the user with the feature to import collections. A collection is basically an index of a dataset. Once the collection is imported, a search can be performed on the collection by navigating to the Search page link on the top-right corner.
The following screenshot shows the Collection Manager screen:
18.226.185.196