Big Data Ingestion and Streaming Patterns
Traditional business intelligence (BI) and data warehouse (DW) solutions use structured data extensively. Database platforms such as Oracle, Informatica, and others had limited capabilities to handle and manage unstructured data such as text, media, video, and so forth, although they had a data type called CLOB and BLOB; which were used to store large amounts of text, and accessing data from these platforms was a problem. With the advent of multistructured (a.k.a. unstructured) data in the form of social media and audio/video, there has to be a change in the way data is ingested, preprocessed, validated, and/or cleansed and integrated or co-related with nontextual formats. This chapter deals with the following topics:
Understanding Data Ingestion
In typical ingestion scenarios, you have multiple data sources to process. As the number of data sources increases, the processing starts to become complicated. Also, in the case of big data, many times the source data structure itself is not known; hence, following the traditional data integration approaches creates difficulty in integrating data.
Common challenges encountered while ingesting several data sources include the following:
Problem
What are the typical data ingestion patterns?
Solution
Unstructured data, if stored in a relational database management system (RDBMS) will create performance and scalability concerns. Hence, in the big data world, data is loaded using multiple solutions and multiple target destinations to solve the specific types of problems encountered during ingestion.
Ingestion patterns describe solutions to commonly encountered problems in data source to ingestion layer communications. These solutions can be chosen based on the performance, scalability, and availability requirements. We’ll look at these patterns (which are shown in Figure 3-1) in the subsequent sections. We will cover the following common data-ingestion and streaming patterns in this chapter:
Figure 3-1. Data ingestion layer and associated patterns
Multisource Extractor Pattern
Problem
How will you ingest data from multiple sources and different formats in an efficient manner?
Solution
The multisource extractor pattern (shown in Figure 3-2) is applicable in scenarios where enterprises that have large collections of unstructured data need to investigate these disparate datasets and nonrelational databases (for example, NoSQL, Cassandra, and so forth); typical industry examples are claims and underwriting, financial trading, telecommunications, e-commerce, fraud detection, social media, gaming, and wagering. Feeds from energy exploration and video-surveillance equipment where application workloads are CPU and I/O-intensive are also ideal candidates for the multisource extractor pattern.
Figure 3-2. Multisource extractor pattern
Multisource extractor taxonomy ensures that the ingestion tool/framework is highly available and distributed. It also ensures that huge volumes of data get segregated into multiple batches across different nodes. For a very small implementation involving a handful of clients and/or only a small volume of data, even a single-node implementation will work. But, for a continuous stream of data influx from multiple clients and a huge volume, it makes sense to have clustered implementation with batches partitioned into small volumes.
Generally, in large ingestion systems, big data operators employ enrichers to do initial data aggregation and cleansing. (See Figure 3-2.) An enricher reliably transfers files, validates them, reduces noise, compresses and transforms from a native format to an easily interpreted representation. Initial data cleansing (for example, removing duplication) is also commonly performed in the enricher tier.
Once the files are processed by enrichers, they are transferred to a cluster of intermediate collectors for final processing and loading to destination systems.
Because the ingestion layer has to be fault-tolerant, it always makes sense to have multiple nodes. The number of disks and disk size per node have to be based on each client’s volume. Multiple nodes will be able to write to more drives in parallel and provide greater throughput.
However, the multisource extractor pattern has a number of significant disadvantages that make it unusable for real-time ingestion. The major shortcomings are as follows:
Table 3-1 outlines a sample textual data ingestion using a single-node taxonomy against a multinode taxonomy.
Table 3-1. Distributed and Clustered Flume Taxonomy
Protocol Converter Pattern
Problem
How will you ingest data from multiple sources and different formats/protocols in an efficient manner?
Solution
The protocol converter pattern (shown in Figure 3-3) is applicable in scenarios where enterprises have a wide variety of unstructured data from data sources that have different data protocols and formats. In this pattern, the ingestion layer does the following:
Figure 3-3. Protocol converter pattern
Protocol conversion is required when the source of data follows various different protocols. The variation in the protocol is either in the headers or the actual message. It could be either the number of bits in the headers, the length of the various fields and the corresponding logic required to decipher the data content, the message could be fixed length or variable length with separators.
This pattern is required to standardize the structure of the various different messages so that it is possible to analyze the information together using an analytics tool . The converter fits the different messages into a standard canonical message format that is usually mapped to a NoSQL data structure.
This concept is important when a system needs to be designed to address multiple protocols having multiple structures for incoming data.
In this pattern, the ingestion layer provides the following services:
Multidestination Pattern
Problem
Should all the raw data be ingested only in HDFS? In what scenario should it be ingested in multiple destinations?
Solution
Many organizations have traditional RDBMS systems as well as analytics platforms like SAS or Informatica. However, the ever-growing amount of data from an increasing number of data streams causes storage overflow problems. Also, the cost of licenses required to process this huge data slowly starts to become prohibitive. Increasing volume also causes data errors (a.k.a., data regret), and the time required to process the data increases exponentially. Because the RDBMS and analytics platforms are physically separate, a huge amount of data needs to be transferred over the network on a daily basis.
To overcome these challenges, an organization can start ingesting data into multiple data stores, both RDBMS as well as NoSQL data stores. The data transformation can be performed in the HDFS storage. Hive or Pig can be used to analyze the data at a lower cost. This also reduces the load on the existing SAS/Informatica analytics engines.
The Hadoop layer uses map reduce jobs to prepare the data for effective querying by Hive and Pig. This also ensures that large amounts of data need not be transferred over the network, thus avoiding huge costs.
The multidestination pattern (Figure 3-4) is very similar to the multisource ingestion pattern until it is ready to integrate with multiple destinations. A router publishes the “enriched” data and then broadcasts it to the subscriber destinations. The destinations have to register with the publishing agent on the router. Enrichers can be used as required by the publishers as well as the subscribers. The router can be deployed in a cluster, depending on the volume of data and number of subscribing destinations.
Figure 3-4. Multidestination pattern
This pattern solves some of the problems of ingesting and storing huge volumes of data:
Just-in-Time Transformation Pattern
Problem
Should preprocessing of data—for example, cleansing/validation—always be done before ingesting data in HDFS?
Solution
For a huge volume of data and a huge number of analytical computations, it makes sense to ingest all the raw data into HDFS and then run dependent preprocessing batch jobs based on the business case to be implemented to cleanse, validate, co-relate, and transform the data. This transformed data, then, can again be stored in HDFS itself or transferred to data marts, warehouses, or real-time analytics engines. In short, raw data and transformed data can co-exist in HDFS and running all preprocessing transformations before ingestion might not be always ideal.
But basic validations can be performed as part of preprocessing on data being ingested.
This section introduces you to the just-in-time transformation pattern, where data is loaded and then transformed when required by the business. Notice the absence of the enricher layer in Figure 3-5. Multiple batch jobs run in parallel to transform data as required in the HDFS storage.
Figure 3-5. Raw data as well as transformed data co-existing in HDFS
Problem
How do we develop big data applications for processing continuous, real-time and unstructured inflow of data into the enterprise?
Solution
The key characteristics of a real-time streaming ingestion system (Figure 3-6) are as follows:
Figure 3-6. Real-time streaming pattern
Event processing nodes (EPs) consume the different inputs from various data sources. EPs create events that are captured by the event listeners of the event processing engines. Event listeners are the logical hosts to EPs. Event processing engines have a very large in-memory capacity (big memory). EPs get triggered by events as they are based on an event driven architecture. As soon as a event occurs the EP is triggered to execute a specific operation and then forward it to the alerter. The alerter publishes the results of the in-memory big data analytics to the enterprise BPM (business process management) engines. The BPM processes can redirect the results of the analysis to various channels like mobile, CIO dashboards, BAM systems and so forth.
Problem
What are the essential tools/frameworks required in your big data ingestion layer to handle files in batch-processing mode?
Solution
There are many product options to facilitate batch-processing-based ingestion. Here are some of the major frameworks available in the market:
Problem
What are the essential tools/frameworks required in your big data ingestion layer to handle real-time streaming data?
Solution
There are many product options to facilitate real-time streaming ingestion. Here are some of the major frameworks available in the market:
Problem
Can traditional ETL tools be used to ingest data into HDFS?
Solution
Traditional ETL tools like the open source Talend, Pentahho DI, or well-known products like Informatica can also be leveraged for data ingestion. Some of the traditional ETL tools can read and write multiple files in parallel from and to HDFS.
ETL tools help to get data from one data environment and put it into another data environment. ETL is generally used with batch processing in data warehouse environments. Data warehouses provide business users with a way to consolidate information across disparate sources to analyze and report on insights relevant to their specific business focus. ETL tools are used to transform the data into the format required by the data warehouse. The transformation is actually done in an intermediate location before the data is loaded into the data warehouse.
In the big data world, ETL tools like Informatica have been used to enable a fast and flexible ingestion solution (greater than 150 GBs/day) that can support ad hoc capability for data and insight discovery. Informatica can be used in place of Sqoop and Flume solutions. Informatica PowerCenter can be utilized as a primary raw data ingestion engine.
Figure 3-7 depicts a scenario in which a traditional ETL tool has been used to ingest data into HDFS.
Figure 3-7. Ingestion using traditional ETL tools
Problem
Are there message-transformation best practices in the ingestion layers that facilitate faster ingestion?
Solution
For the transformation of messages and location of the transformation process, these guidelines can be followed:
This allows selecting only the record of interest for loading and, ideally, that should be the data that has changed since the last extraction. Simple transformation, such as decoding an attribute and uppercase/lowercase conversions, can be performed in the source system.
Transformation can be performed in the staging area prior to loading in the HDFS system. When there are multiple data sources, the data needs to be consolidated and mapped to a different data structure. Such an intermediate data store is called staging. We can use Hadoop ETL tools like Hive and Pig in this area to do the transformation.
Some simple transformation can be done during the loading process itself.
Transformation of data in memory is considered to be a better option for large and complex transformations with no latency between the extraction and loading processes. But this involves large amounts of memory. This is useful for near real-time system analytics (for example, SAP HANA), where transformation and loading is done with very low latency.
Solution
Many vendors like IBM, Oracle, EMC, and others have come out with hardware appliances that promise end-to-end systems that optimize the big data ingestion, processing, and storage functions. The next section gives you a brief idea about the capabilities of such appliances.
Oracle has a big data appliance that handles data to the tune of 600 TBs. It utilizes Apache CDH for big data management. It also has an inherent 10 GbE high speed network between the nodes for rapid real time ingestion and replication.
EMC comes with a similar appliance called Greenplum with similar features to facilitate high speed low cost data processing using Hadoop.
Solution
BigQuery is Google’s cloud-based big data analytics service. You can upload your big data set to BigQuery for analysis. However, depending on your data’s structure, you might need to prepare the data before loading it into BigQuery. For example, you might need to export your data into a different format or transform the data. BigQuery supports two data formats: CSV and JSON.
BigQuery can load uncompressed files significantly faster than compressed files due to parallel load operations, but because uncompressed files are larger in size, using them can lead to bandwidth limitations and higher Google Cloud Storage costs. For example, uncompressed files that live on third-party services can consume considerable bandwidth and time if uploaded to Google Cloud Storage for loading.
With BigQuery, processing is faster if you denormalize the data structure to enable super-fast querying.
Large datasets are often represented using XML. BigQuery doesn’t support directly loading XML files, but XML files can be easily converted to an equivalent JSON format or flat CSV structure and then uploaded to continue processing.
Summary
With the huge volume of data coming into enterprises from various data sources, different challenges are encountered that can be solved using the patterns mentioned in this chapter. These patterns provide topologies to address multiple types of data formats and protocols, as well provide guidance about how much time it takes to process the data, the location of the transformation and so forth. A judicious use of these patterns will help the big data architect sift the noise from the true information before it enters the enterprise for further analysis in the Hadoop storage area.
3.129.218.69