©  Raju Kumar Mishra 2018
Raju Kumar MishraPySpark Recipeshttps://doi.org/10.1007/978-1-4842-3141-8_1

1. The Era of Big Data, Hadoop, and Other Big Data Processing Frameworks

Raju Kumar Mishra
(1)
Bangalore, Karnataka, India
 
When I first joined Orkut, I was happy. With Orkut, I had a new platform enabling me get to know the people around me, including their thoughts, their views, their purchases, and the places they visited. We were all gaining more knowledge than ever before and felt more connected to the people around us. Uploading pictures helped us share good ideas of places to visit. I was becoming more and more addicted to understanding and expressing sentiments. After a few years, I joined Facebook. And day by day, I was introduced to what became an infinite amount of information from all over world. Next, I started purchasing items online, and I liked it more than shopping offline. I could easily get a lot of information about products, and I could compare prices and features. And I wasn’t the only one; millions of people were feeling the same way about the Web.
More and more data was flooding in from every corner of the world to the Web. And thanks to all those inventions related to data storage systems, people could store this huge inflow of data.
More and more users joined the Web from all over the world, and therefore increased the amount of data being added to these storage systems. This data was in the form of opinions, pictures, videos, and other forms of data too. This data deluge forced users to adopt distributed systems. Distributed systems require distributed programming. And we also know that distributed systems require extra care for fault-tolerance and efficient algorithms. Distributed systems always need two things: reliability of the system and availability of all its components.
Apache Hadoop was introduced, ensuring efficient computation and fault-tolerance for distributed systems. Mainly, it concentrated on reliability and availability. Because Apache Hadoop was easy to program, many people became interested in big data. Big data became a popular topic for discussion everywhere. E-commerce companies wanted to know more about their customers, and the health-care industry was interested in gaining insights from the data collected, for example. More data metrics were defined. More data points started to be collected.
Many open source big data tools emerged, including Apache Tez and Apache Storm . This was also a time that many NoSQL databases emerged to deal with this huge data inflow. Apache Spark also evolved as a distributed system and became very popular during this time.
In this chapter, we are going to discuss big data as well as Hadoop as a distributed system for processing big data. In covering the components of Hadoop, we will also discuss Hadoop ecosystem frameworks such as Apache Hive and Apache Pig. The usefulness of the components of the Hadoop ecosystem is also discussed to give you an overview. Throwing light on some of the shortcomings of Hadoop will give you background on the development of Apache Spark. The chapter will then move through a description of Apache Spark. We will also discuss various cluster managers that work with Apache Spark. The chapter wouldn’t be complete without discussing NoSQL, so discussion on the NoSQL database HBase is also included. Sometimes we read data from a relational database management system (RDBMS) ; this chapter discusses PostgreSQL.

Big Data

Big data is one of the hot topics of this era. But what is big data? Big data describes a dataset that is huge and increasing with amazing speed. Apart from this volume and velocity, big data is also characterized by its variety of data and veracity. Let’s explore these terms—volume, velocity, variety, and veracity—in detail. These are also known as the 4V characteristics of big data, as illustrated in Figure 1-1.
A430628_1_En_1_Fig1_HTML.gif
Figure 1-1.
Characteristcis of big data

Volume

The volume specifies the amount of data to be processed. A large amount of data requires large machines or distributed systems. And the time required for computation will also increase with the volume of data. So it’s better to go for a distributed system, if we can parallelize our computation. Volume might be of structured data, unstructured data, or any data. If we have unstructured data, the situation becomes more complex and computing intensive. You might wonder, how big is big? What volume of data should be classified as big data? This is again a debatable question. But in general, we can say that an amount of data that we can’t handle via a conventional system can be considered big data.

Velocity

Every organization is becoming more and more data conscious. A lot of data is collected every moment. This means that the velocity of data—the speed of the data flow and of data processing—is also increasing. How will a single system be able to handle this velocity? The problem becomes complex when we have to analyze a large inflow of data in real time. Each day, systems are being developed to deal with this huge inflow of data.

Variety

Sometimes the variety of data adds enough complexity that conventional data analysis systems can’t analyze data well. What do we mean by variety? You might think data is just data. But this is not the case. Image data is different from simple tabular data, for example, because of the way it is organized and saved. In addition, an infinite number of file systems are available, and every file system requires a different way of dealing with it. Reading and writing a JSON file, for instance, will be different from the way we deal with a CSV file. Nowadays, a data scientist has to handle a combination of these data types. The data you are going to deal with might be a combination of pictures, videos, and text. This variety of data makes big data more complex to analyze.

Veracity

Can you imagine a logically incorrect computer program resulting in the correct output? Of course not. Similarly, data that is not accurate is going to provide misleading results. The veracity of data is one of the important concerns related to big data. When we consider the condition of big data, we have to think about any abnormalities in the data.

Hadoop

Hadoop is a distributed and scalable framework for solving big data problems. Hadoop, developed by Doug Cutting and Mark Cafarella, is written in Java. It can be installed on a cluster of commodity hardware, and it scales horizontally on distributed systems. Easy to program Inspiration from Google research paper Hadoop was developed. Hadoop’s capability to work on commodity hardware makes it cost-effective. If we are working on commodity hardware, fault-tolerance is an inevitable issue. But Hadoop provides a fault-tolerant system for data storage and computation, and this fault-tolerant capability has made Hadoop popular.
Hadoop has two components, as illustrated in Figure 1-2. The first component is the Hadoop Distributed File System (HDFS) . The second component is MapReduce. HDFS is for distributed data storage, and MapReduce is for performing computation on the data stored in HDFS.
A430628_1_En_1_Fig2_HTML.gif
Figure 1-2.
Hadoop components

HDFS

HDFS is used to store large amounts of data in a distributed and fault-tolerant fashion. HDFS is written in Java and runs on commodity hardware. It was inspired by a Google research paper about the Google File System (GFS) . It is a write-once and read-many-times system that’s effective for large amounts of data.
HDFS comprises two components: NameNode and DataNode . These two components are Java daemon processes. A NameNode, which maintains metadata of files distributed on a cluster, works as the master for many DataNodes. HDFS divides a large file into small blocks and saves the blocks on different DataNodes . The actual file data blocks reside on DataNodes.
HDFS provides a set of Unix shell-like commands to deal with it. But we can use the Java file system API provided by HDFS to work at a finer level on large files. Fault-tolerance is implemented by using replications of data blocks.
We can access the HDFS files by using a single-thread process and also in parallel. HDFS provides a useful utility, distcp, which is generally used to transfer data in parallel from one HDFS system to another. It copies data by using parallel map tasks. You can see the HDFS components in Figure 1-3.
A430628_1_En_1_Fig3_HTML.gif
Figure 1-3.
Components of HDFS

MapReduce

The Map-Reduce model of computation first appeared in a Google research paper. This research paper was implemented in Hadoop as Hadoop’s MapReduce. Hadoop’s MapReduce is the computation engine of the Hadoop framework, which performs computations on the distributed data in HDFS. MapReduce is horizontally scalable on distributed systems of commodity hardware. It also scales for large problems. In MapReduce, the solution is broken into two phases: the map phase and the reduce phase. In the map phase, a chunk of data is processed, and in the reduce phase, an aggregation or a reduction operation is run on the result of the map phase. Hadoop’s MapReduce framework is written in Java.
MapReduce uses a master/slave model. In Hadoop 1, this map-reduce computation was managed by two daemon processes: Jobtracker and Tasktracker. Jobtracker is a master process that deals with many Tasktrackers. There’s no need to say that Tasktracker is a slave to Jobtracker. But in Hadoop 2, Jobtracker and Tasktracker were replaced by YARN.
Because we know that Hadoop’s MapReduce framework is written in Java, we can write our MapReduce code by using an API provided by the framework and programmed in Java. The Hadoop streaming module gives further power so that a person knowing another programming language (such as Python or Ruby) can program MapReduce.
MapReduce algorithms are good for many algorithms. Many machine-learning algorithms are implemented as Apache Mahout . Mahout used to run on Hadoop as Pig and Hive.
But MapReduce wasn’t very good for iterative algorithms. At the end of every Hadoop job, MapReduce will save the data to HDFS and read it back again for the next job. We know that reading and writing data to a file is one of the costliest activities. Apache Spark mitigated this shortcoming of MapReduce by providing in-memory data persisting and computation.

Apache Hive

The world of computer science is one of abstraction. Everyone knows that all data ultimately exists in the form of bits. Programming languages such as C enable us to avoid programming in machine-level language. The C language provides an abstraction over machine and assembly language. More abstraction is provided by other high-level languages. Structured Query Language (SQL) is one of the abstractions. SQL is widely used all over the world by many data modeling experts. Hadoop is good for analysis of big data. So how can a large population knowing SQL utilize the power of Hadoop computational power on big data? In order to write Hadoop’s MapReduce program, users must know a programming language that can be used to program Hadoop’s MapReduce.
In the real world, day-to-day problems follow patterns. In data analysis, some problems are common, such as manipulating data, handling missing values, transforming data, and summarizing data. Writing MapReduce code for these day-to-day problems is head-spinning work for a nonprogrammer. Writing code to solve a problem is not a very intelligent thing. But writing efficient code that has performance scalability and can be extended is something that is valuable. Having this problem in mind, Apache Hive was developed at Facebook, so that general problems can be solved without writing MapReduce code.
According to the Hive wiki, “Hive is a data warehousing infrastructure based on Apache Hadoop.” Hive has its own SQL dialect, which is known as Hive Query Language (abbreviated as HiveQL or HQL) . Using HiveQL, Hive can query data in HDFS. Hive can run not only on HDFS, but also on Spark and other big data frameworks such as Apache Tez.
Hive provides the user an abstraction that is like a relational database management system for structured data in HDFS. We can create tables and run SQL-like queries on them. Hive saves the table schema in an RDBMS. Apache Derby is the default RDBMS, which is shipped with the Apache Hive distribution. Apache Derby has been fully written in Java; this open source RDBMS comes with the Apache License, Version 2.0.
HiveQL commands are transformed into Hadoop’s MapReduce code, and then it runs on Hadoop cluster. You can see the Hive command execution flow in Figure 1-4.
A430628_1_En_1_Fig4_HTML.gif
Figure 1-4.
Code execution flow in Apache Hive
A person knowing SQL can easily learn Apache Hive and HiveQL and can use the benefits of storage and the computation power of Hadoop in their day-to-day data analysis of big data. HiveQL is also supported by PySparkSQL . We can run HiveQL commands in PySparkSQL. Apart from executing HiveQL queries, we can also read data from Hive directly to PySparkSQL and write results to Hive.
Note
You can read more about Hive and the Apache Derby RDBMS at the following web pages:

Apache Pig

Apache Pig is data-flow framework for performing data-analysis algorithms on huge amounts of data. It was developed by Yahoo!, open sourced to the Apache Software Foundation, and is now available under the Apache License , Version 2.0. The pig programming language is a Pig Latin scripting language. Pig is loosely connected to Hadoop, which means that we can connect it to Hadoop and perform analysis. But Pig can be used with other tools such as Apache Tez and Apache Spark.
Apache Hive is used as reporting tool, whereas Apache Pig is used as an extract, transform, and load (ETL) tool. We can extend the functionality of Pig by using user-defined functions (UDFs) . User-defined functions can be written in many languages, including Java, Python, Ruby, JavaScript, Groovy, and Jython.
Apache Pig uses HDFS to read and store the data, and Hadoop’s MapReduce to execute the data-science algorithms. Apache Pig is similar to Apache Hive in using the Hadoop cluster. As Figure 1-5 depicts, on Hadoop, Pig Latin commands are first transformed into Hadoop’s MapReduce code. And then the transformed MapReduce code runs on the Hadoop cluster.
A430628_1_En_1_Fig5_HTML.gif
Figure 1-5.
Code execution flow in Apache Pig
The best part of Pig is that the code is optimized and tested to work for day-to-day problems. A user can directly install Pig and start using it. Pig provides a Grunt shell to run interactive Pig commands, so anyone who knows Pig Latin can enjoy the benefits of HDFS and MapReduce, without knowing an advanced programming language such as Java or Python.

Apache Kafka

Apache Kafka is a publish-subscribe, distributed messaging platform. It was developed at LinkedIn and later open sourced to the Apache Foundation. It is fault-tolerant, scalable, and fast. A message, in Kafka terms, is the smallest unit of data that can flow from a producer to a consumer through a Kafka server, and that can be persisted and used at a later time. You might be confused about the terms producer and consumer. We are going to discuss these terms soon. Another key term we are going to use in the context of Kafka is topic. A topic is stream of messages of a similar category. Kafka comes with a built-in API, which developers can use to build their applications. We are the ones who define the topic. Now let’s discuss the three main components of Apache Kafka.

Producer

A Kafka producer produces the message to a Kafka topic. It can publish data to more than one topic.

Broker

The broker is the main Kafka server that runs on a dedicated machine. Messages are pushed to the broker by the producer. The broker persists topics in different partitions, and these partitions are replicated to different brokers to deal with faults. The broker is stateless, so the consumer has to track the message it has consumed.

Consumer

A consumer fetches messages from the Kafka broker. Remember, it fetches the messages; the Kafka broker doesn’t push messages to the consumer; rather, the consumer pulls data from the Kafka broker. Consumers are subscribed to one or more topics on the Kafka broker, and they read the messages. The consumer also keeps tracks of all the messages that it has already consumed. Data is persisted in a broker for a specified time. If the consumer fails, it can fetch the data after its restart.
Figure 1-6 explains the message flow of Apache Kafka. The producer publishes a message to the topic. Then the consumer pulls data from the broker. In between publishing and pulling, the message is persisted by the Kafka broker.
A430628_1_En_1_Fig6_HTML.gif
Figure 1-6.
Apache Kafka message flow
We will integrate Apache Kafka with PySpark in Chapter 7, which discusses Kafka further.
Note
You can read more about Apache Kafka at the following sites:

Apache Spark

Apache Spark is a general-purpose, distributed programming framework. It is considered very good for iterative as well as batch processing of data. Developed at the AMPLab at the University of California, Berkeley, Spark is now open source software that provides an in-memory computation framework. On the one hand, it is good for batch processing; on the other hand, it works well with real-time (or, better to say, near-real-time) data. Machine learning and graph algorithms are iterative. Where Spark do magic. According to its research paper, it is approximately 100 times faster than its peer, Hadoop. Data can be cached in memory. Caching intermediate data in iterative algorithms provides amazingly fast processing speed. Spark can be programmed with Java, Scala, Python, and R.
If anyone is considering Spark as an improved Hadoop, then to some extent, that is fine in my view. Because we can implement a MapReduce algorithm in Spark, Spark uses the benefit of HDFS; this means Spark can read data from HDFS and store data to HDFS too, and Spark handles iterative computation efficiently because data can be persisted in memory. Apart from in-memory computation, Spark is good for interactive data analysis.
We are going to study Apache Spark with Python. This is also known as PySpark. PySpark comes with many libraries for writing efficient programs, and there are some external libraries as well. Here are some of them:
  • PySparkSQL : A PySpark library to apply SQL-like analysis on a huge amount of structured or semistructured data. We can also use SQL queries with PySparkSQL. We can connect it to Apache Hive, and HiveQL can be applied too. PySparkSQL is a wrapper over the PySpark core. PySparkSQL introduced the DataFrame, which is a tabular representation of structured data that is like a table in a relational database management system. Another data abstraction, the DataSet, was introduced in Spark 1.6, but it does not work with PySparkSQL.
  • MLlib: MLlib is a wrapper over the PySpark core that deals with machine-learning algorithms. The machine-learning API provided by the MLlib library is easy to use. MLlib supports many machine-learning algorithms for classification, clustering, text analysis, and more.
  • GraphFrames: The GraphFrames library provides a set of APIs for performing graph analysis efficiently, using the PySpark core and PySparkSQL. At the time of this writing, DataFrames is an external library. You have to download and install it separately. We are going to perform graph analysis in Chapter 8.

Cluster Managers

In a distributed system, a job or application is broken into different tasks, which can run in parallel on different machines of the cluster. A task, while running, needs resources such as memory and a processor. The most important part is that if a machine fails, you then have to reschedule the task on another machine. The distributed system generally faces scalability problems due to mismanagement of resources. As another scenario, say a job is already running on a cluster. Another person wants to run another job. The second job has to wait until the first is finished. But in this way, we are not utilizing the resources optimally. This resource management is easy to explain but difficult to implement on a distributed system.
Cluster managers were developed to manage cluster resources optimally. There are three cluster managers available for Spark: Standalone, Apache Mesos, and YARN. The best part of these cluster managers is that they provide an abstraction layer between the user and the cluster. The user feels like he’s working on a single machine, while in reality he’s working on a cluster, due to the abstraction provided by cluster managers. Cluster managers schedule cluster resources to running applications.

Standalone Cluster Manager

Apache Spark is shipped with the Standalone Cluster Manager. It provides a master/slave architecture to the Spark cluster. It is Spark’s only cluster manager. You can run only Spark applications when using the Standalone Cluster Manager. Its components are the master and workers. Workers are the slaves to the master process. Standalone is the simplest cluster manager. Spark Standalone Cluster Manager can be configured using scripts in the sbin directory of Spark. We will configure Spark Standalone Cluster Manager in the coming chapters and will deploy PySpark applications by using Standalone Cluster Manager.

Apache Mesos Cluster Manager

Apache Mesos is a general-purpose cluster manager. It was developed at the University of California, Berkeley, AMPLab. Apache Mesos helps distributed solutions scale efficiently. You can run different applications using different frameworks on the same cluster when using Mesos. What do I mean by different applications using different frameworks? I mean that we can run a Hadoop application and a Spark application simultaneously on Mesos. While multiple applications are running on Mesos, they share the resources of the cluster. The two important components of Apache Mesos are master and slaves. It has a master/slave architecture similar to Spark Standalone Cluster Manager. The applications running on Mesos are known as the framework. Slaves inform the master about the resources available to it as a resource offer. Slave machines provides resource offers periodically. The allocation module of the master server decides the framework that will get the resources.

YARN Cluster Manager

YARN stands for Yet Another Resource Negotiator. YARN was introduced in Hadoop 2 to scale Hadoop; resource management and job management were separated. Separating these two components made Hadoop scale better. YARN’s main components are ResourceManager, ApplicationMaster, and NodeManager. There is one global ResourceManager, and many NodeManagers will be running per cluster. NodeManagers are slaves to the ResourceManager. The Scheduler, which is a component of ResourceManager, allocates resources for different applications working on the cluster. The best part is, we can run a Spark application and any other applications such as Hadoop or MPI simultaneously on clusters managed by YARN . There is one ApplicationMaster per application, which deals with the task running in parallel on a distributed system. Remember, Hadoop and Spark have their own kinds of ApplicationMaster.

PostgreSQL

Relational database management systems are till very frequent in different organizations. What is the meaning or relational here? It means tables. PostgreSQL is an RDBMS. It runs on nearly all major operating systems, including Microsoft Windows, Unix-based operating systems, macOS, and many more. It is open source software, and the code is available under the PostgreSQL license. Therefore, you can use it freely and modify it according to your requirements.
PostgreSQL databases can be connected through other programming languages such as Java, Perl, Python, C, and C++ and through various programming interfaces. It can be also be programmed using a procedural programming language, Procedural Language/PostgreSQL (PL/pgSQL) , which is similar to PL/SQL. The user can add custom functions to this database. We can write our custom functions in C/C++ and other programming languages. We can read data from PostgreSQL from PySparkSQL by using Java Database Connectivity (JDBC) connectors. In upcoming chapters, we are going to read data tables from PostgreSQL by using PySparkSQL. We are also going to explore more facets of PostgreSQL in upcoming chapters.
PostgreSQL follows the ACID (Atomicity, Consistency, Isolation, and Durability) principles. It comes with many features, and some might be unique to PostgreSQL itself. It supports updatable views, transactional integrity, complex queries, triggers, and other features. PostgreSQL performs its concurrency management by using a multiversion concurrency control model.
There is a large community of support if you find a problem while using PostgreSQL. PostgreSQL has been designed and developed to be extensible.

HBase

HBase is an open source, distributed, NoSQL database. When I say NoSQL, you might consider it schemaless. And you’re right, to a certain extent, but not completely. At the time that you define a table, you have to mention the column family, so the database is not fully schemaless. We are going to create an HBase table in this section so you can understand this semi-schemaless property. HBase is a column-oriented database. You might wonder what that means. Let me explain: in column-oriented databases, data is saved columnwise.
We are going to install HBase in the next chapter, but for now, let me show how a table is created and how data is put inside the tables. You can apply all these commands after installing HBase on your system. In the coming chapter, we are going to read the same data table by using PySpark.
hbase(main):001:0> list
TABLE                                                                           
0 row(s) in 0.1750 seconds
=> []
hbase(main):002:0> create 'pysparkBookTable','btcf1','btcf2'
0 row(s) in 2.2750 seconds
=> Hbase::Table - pysparkBookTable
hbase(main):003:0> list
TABLE                                                                           
pysparkBookTable                                                                
1 row(s) in 0.0190 seconds
=> ["pysparkBookTable"]
hbase(main):004:0> put 'pysparkBookTable', '00001', 'btcf1:btc1','c11'
0 row(s) in 0.1680 seconds
hbase(main):005:0> put 'pysparkBookTable', '00001', 'btcf2:btc2','c21'
0 row(s) in 0.0240 seconds
hbase(main):006:0> put 'pysparkBookTable', '00002', 'btcf1:btc1','c12'
0 row(s) in 0.0150 seconds
hbase(main):007:0> put 'pysparkBookTable', '00002', 'btcf2:btc2','c22'
0 row(s) in 0.0070 seconds
hbase(main):008:0> put 'pysparkBookTable', '00003', 'btcf1:btc1','c13'
0 row(s) in 0.0080 seconds
hbase(main):009:0> put 'pysparkBookTable', '00003', 'btcf2:btc2','c23'
0 row(s) in 0.0060 seconds
hbase(main):010:0>  put 'pysparkBookTable', '00004', 'btcf1:btc1','c14'
0 row(s) in 0.0240 seconds
hbase(main):011:0>  put 'pysparkBookTable', '00004', 'btcf2:btc2','c24'
0 row(s) in 0.0280 seconds
hbase(main):012:0> scan 'pysparkBookTable'
ROW                   COLUMN+CELL                                               
 00001                column=btcf1:btc1, timestamp=1496715394968, value=c11     
 00001                column=btcf2:btc2, timestamp=1496715408865, value=c21     
 00002                column=btcf1:btc1, timestamp=1496715423206, value=c12     
 00002                column=btcf2:btc2, timestamp=1496715436087, value=c22     
 00003                column=btcf1:btc1, timestamp=1496715450562, value=c13     
 00003                column=btcf2:btc2, timestamp=1496715463134, value=c23     
 00004                column=btcf1:btc1, timestamp=1496715503014, value=c14     
 00004                column=btcf2:btc2, timestamp=1496715516864, value=c24     
4 row(s) in 0.0770 seconds
Note
You can get a lot of information about HBase at https://hbase.apache.org/ .
Spark can be used with three cluster managers: Standalone, Apache Mesos, and YARN. Standalone cluster manager is shipped with Spark and it is Spark only cluster manager. With Apache Mesos and YARN, we can run heterogeneous applications.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.212.160