In this section, we will discuss the general flow and components that are required for Big Data Machine Learning. Although many of the components, such as data acquisition or storage, are not directly related to Machine Learning methodologies, they inevitably have an impact on the frameworks and processes. Giving a complete catalog of the available components and tools is beyond the scope of this book, but we will discuss the general responsibilities of the tasks involved and give some information on the techniques and tools available to accomplish them.
The general Big Data framework is illustrated in the following figure:
The choice of how the Big Data framework is set up and deployed in the cluster is one of the decisions that affects the choice of tools, techniques, and cost. The data acquisition or collection component is the first step and it consists of several techniques, both synchronous and asynchronous, to absorb data into the system. Various techniques ranging from publish-subscribe, source-sink, relational database queries, and custom data connectors are available in the components.
Data storage choices ranging from distributed filesystems such as HDFS to non-relational databases (NoSQL) are available based on various other functional requirements. NoSQL databases are described in the section on Data Storage.
Data preparation, or transforming the large volume of stored data so that it is consumable by the Machine Learning analytics, is an important processing step. This has some dependencies on the frameworks, techniques, and tools used in storage. It also has some dependency on the next step: the choice of Machine Learning analytics/frameworks that will be used. There are a wide range of choices for processing frameworks that will be discussed in the following sub-section.
Recall that, in batch learning, the model is trained simultaneously on a number of examples that have been previously collected. In contrast to batch learning, in real-time learning model training is continuous, each new instance that arrives becoming part of a dynamic training set. See Chapter 5, Real-Time Stream Machine Learning for details. Once the data is collected, stored, and transformed based on the domain requirements, different Machine Learning methodologies can be employed, including batch learning, real-time learning, and batch-real-time mixed learning. Whether one selects supervised learning, unsupervised learning, or a combination of the two also depends on the data, the availability of labels, and label quality. These will be discussed in detail later in this chapter.
The results of analytics during the development stage as well as the production or runtime stage also need to be stored and visualized for humans and automated tasks.
There are many frameworks that are built on the core Hadoop (References [3]) open source platform. Each of them provides a number of tools for the Big Data components described previously.
Hortonworks Data Platform (HDP) provides an open source distribution comprising various components in its stack, from data acquisition to visualization. Apache Ambari is often the user interface used for managing services and provisioning and monitoring clusters. The following screenshot depicts Ambari used for configuring various services and the health-check dashboard:
Like HDP, Cloudera CDH (References [4]) provides similar services and Cloudera Services Manager can be used in a similar way to Ambari for cluster management and health checks, as shown in the following screenshot:
Amazon Elastic MapReduce (EMR) (References [5]) is another Big Data cluster, platform similar to HDP and Cloudera, which supports a wide variety of frameworks. EMR has two modes—cluster mode and step execution mode. In cluster mode, you choose the Big Data stack vendor EMR or MapR and in step execution mode, you give jobs ranging from JARs to SQL queries for execution. In the following screenshot, we see the interface for configuring a new cluster as well as defining a new job flow:
Microsoft Azure HDInsight (References [6]) is another platform that allows cluster management with most of the services that are required, including storage, processing, and Machine Learning. The Azure portal, as shown in the following screenshot, is used to create, manage, and help in learning the statuses of the various components of the cluster:
In the Big Data framework, the acquisition component plays an important role in collecting the data from disparate source systems and storing it in Big Data storage. Based on types of source and volume, velocity, functional, and performance-based requirements, there are a wide variety of acquisition frameworks and tools. We will describe a few of the most well-known frameworks and tools used to give readers some insight.
In publish-subscribe based frameworks, the publishing source pushes the data in different formats to the broker, which has different subscribers waiting to consume them. The publisher and subscriber are unaware of each other, with the broker mediating in between.
Apache Kafka (References [9]) and Amazon Kinesis are two well-known implementations that are based on this model. Apache Kafka defines the concepts of publishers, consumers, and topics—on which things get published and consumed—and a broker to manage the topics. Amazon Kinesis is built on similar concepts with producers and consumers connected through Kinesis streams, which are similar to topics in Kafka.
In source-sink models, sources push the data into the framework and the framework pushes the system to the sinks. Apache Flume (References [7]) is a well-known implementation of this kind of framework with a variety of sources, channels to buffer the data, and a number of sinks to store the data in the Big Data world.
Since many traditional data stores are in the form of SQL-based RDBMS, SQL-based frameworks provide a generic way to import the data from RDBMS and store it in Big Data, mainly in the HDFS format. Apache Sqoop (References [10]) is a well-known implementation that can import data from any JDBC-based RDBMS and store it in HDFS-based systems.
Message queueing frameworks are push-pull based frameworks similar to publisher-subscriber systems. Message queues separate the producers and consumers and can store the data in the queue, in an asynchronous communication pattern. Many protocols have been developed on this such as Advanced Message Queueing Protocol (AMQP) and ZeroMQ Message Transfer Protocol (ZMTP). RabbitMQ, ZeroMQ, Amazon SQS, and so on, are some well-known implementations of this framework.
The data storage component plays a key part in connecting the acquisition and the rest of the components together. Performance, impact on data processing, cost, high-availability, ease of management, and so on, should be taken into consideration while deciding on data storage. For pure real-time or near real-time systems there are in-memory based frameworks for storage, but for batch-based systems there are mainly distributed File Systems such as HDFS or NoSQL.
HDFS can run on a large cluster of nodes and provide all the important features such as high-throughput, replications, fail-over, and so on.
The basic architecture of HDFS has the following components:
fsimage
and edit
files. The fsimage
contains the filesystem metadata as a snapshot, while edit files contain the incremental changes to the metadata.fsimage
and edit
files at every predefined checkpoint.Non-relational databases, also referred to as NoSQL databases, are gaining enormous popularity in the Big Data world. High throughput, better horizontal scaling, improved performance on retrieval, and storage at the cost of weaker consistency models are notable characteristics of most NoSQL databases. We will discuss some important forms of NoSQL database in this section along with implementations.
Key-value databases are the most prominent NoSQL databases used mostly for semi-structured or unstructured data. As the name suggests, the structure of storage is quite basic, with unique keys associating the data values that can be of any type including string, integer, double precision, and so on—even BLOBS. Hashing the keys for quick lookup and retrieval of the values together with partitioning the data across multiple nodes gives high throughput and scalability. The query capabilities are very limited. Amazon DynamoDB, Oracle NoSQL, MemcacheDB, and so on, are examples of key-value databases.
Document databases store semi-structured data in the form of XML, JSON, or YAML documents, to name some of the most popular formats. The documents have unique keys to which they are mapped. Though it is possible to store documents in key-value stores, the query capabilities offered by document stores are greater as the primitives making up the structure of the document—which may include names or attributes—can also be used for retrieval. When the data is ever-changing and has variable numbers or lengths of fields, document databases are often a good choice. Document databases do not offer join capabilities and hence all information needs to be captured in the document values. MongoDB, ElasticSearch, Apache Solr, and so on, are some well-known implementations of document databases.
The use of columns as the basic unit of storage with name, value, and often timestamp, differentiates columnar databases from traditional relational databases. Columns are further combined to form column families. A row is indexed by the row key and has multiple column families associated with the row. Certain rows can use only column families that are populated, giving it a good storage representation in sparse data. Columnar databases do not have fixed schema-like relational databases; new columns and families can be added at any time, giving them a significant advantage. HBase, Cassandra, and Parquet are some well-known implementations of columnar databases.
In many applications, the data has an inherent graph structure with nodes and links. Storing such data in graph databases makes it more efficient for storage, retrieval, and queries. The nodes have a set of attributes and generally represent entities, while links represent relationships between the nodes that can be directed or undirected. Neo4J, OrientDB, and ArangoDB are some well-known implementations of graph databases.
The data preparation step involves various preprocessing steps before the data is ready to be consumed by analytics and machine learning algorithms. Some of the key tasks involved are:
Apache Hive (References [11]) is a powerful tool for performing various data preparation activities in HDFS systems. Hive organizes the underlying HDFS data a of structure that is similar to relational databases. HQL is like SQL and helps in performing various aggregates, transformations, cleanup, and normalization, and the data is then serialized back to HDFS. The logical tables in Hive are partitioned across and sub-divided into buckets for speed-up. Complex joins and aggregate queries in Hive are automatically converted into MapReduce jobs for throughput and speed-up.
Spark SQL, which is a major component of Apache Spark (References [1] and [2]), provides SQL-like functionality—similar to what HQL provides—for performing changes to the Big Data. Spark SQL can work with underlying data storage systems such as Hive or NoSQL databases such as Parquet. We will touch upon some aspects of Spark SQL in the section on Spark later.
Amazon Redshift provides several warehousing capabilities especially on Amazon EMR setups. It can process petabytes of data using its massively parallel processing (MPP) data warehouse architecture.
In many Big Data deployments, processing and performing the transformations specified previously must be done on the stream of data in real time rather than from stored batch data. There are various Stream Processing Engines (SPE) such as Apache Storm (References [12]) and Apache Samza, and in-memory processing engines such as Spark-Streaming that are used for stream processing.
Machine learning helps to perform descriptive, predictive, and prescriptive analysis on Big Data. There are two broad extremes that will be covered in this chapter:
Both topics are covered at length in the remainder of this chapter.
With batch learning done at modeling time and real-time learning done at runtime, predictions—the output of applying the models to new data—must be stored in some data structure and then analyzed by the users. Visualization tools and other reporting tools are frequently used to extract and present information to the users. Based on the domain and the requirements of the users, the analysis and visualization can be static, dynamic, or interactive.
Lightning is a framework for performing interactive visualizations on the Web with different binding APIs using REST for Python, R, Scala, and JavaScript languages.
Pygal and Seaborn are Python-based libraries that help in plotting all possible charts and graphs in Python for analysis, reporting, and visualizations.
3.144.46.141