© Butch Quinto 2018
Butch QuintoNext-Generation Big Datahttps://doi.org/10.1007/978-1-4842-3147-0_7

7. Batch and Real-Time Data Ingestion and Processing

Butch Quinto1 
(1)
Plumpton, Victoria, Australia
 

Data ingestion is the process of transferring, loading, and processing data into a data management or storage platform. This chapter discusses various tools and methods on how to ingest data into Kudu in batch and real time. I’ll cover native tools that come with popular Hadoop distributions. I’ll show examples on how to use Spark to ingest data to Kudu using the Data Source API, as well as the Kudu client APIs in Java, Python, and C++. There is a group of next-generation commercial data ingestion tools that provide native Kudu support. Internet of Things (IoT) is also a hot topic. I’ll discuss all of them in detail in this chapter starting with StreamSets.

StreamSets Data Collector

StreamSets Data Collector is a powerful, enterprise-grade streaming platform that you can use to ingest, route, and process real-time streaming and batch data from a large variety of sources. StreamSets was founded by Girish Pancha, Informatica’s ex-chief product officer; and Arvind Prabhakar, an early employee of Cloudera, where he led the development of Apache Flume and Apache Sqoop. i StreamSets is used by companies such as CBS Interactive, Cox Automotive, and Vodafone. ii

Data Collector can perform all kinds of data enrichment, transformation, and cleansing in-stream, then write your data to a large number of destinations such as HDFS, Solr, Kafka, or Kudu, all without writing a single line of code. For more complex data processing, you can write code in one of the following supported languages and frameworks: Java, JavaScript, Jython (Python), Groovy, Java Expression Language (EL), and Spark. Data Collector can run in stand-alone or cluster mode to support the largest environments.

Pipelines

To ingest data, Data Collector requires that you design a pipeline. A pipeline consists of multiple stages that you configure to define your data sources (origins), any data transformation or routing needed (processors), and where you want to write your data (destinations).

After designing your pipeline, you can start it, and it will immediately start ingesting data. Data Collector will sit on standby and quietly wait for data to arrive at all times until you stop the pipeline. You can monitor Data Collector by inspecting data as it is being ingested or by viewing real-time metrics about your pipelines.

Origins

In a StreamSets pipeline, data sources are called Origins. They’re configurable components that you can add to your canvas without any coding. StreamSets includes several origins to save you development time and efforts. Some of the available origins include MQTT Subscriber, Directory, File tail, HDFS, S3, MongoDB, Kafka, RabbitMQ, MySQL binary blog, and SQL Server and Oracle CDC client to name a few. Visit StreamSets.com for a complete list of supported origins.

Processors

Processors allows you to perform data transformation on your data. Some of the available processors are Field Hasher, Field Masker, Expression Evaluator, Record Deduplicator, JSON Parser, XML Parser, and JDBC Lookup to name a few. Some processors, such as the Stream Selector, let you easily route data based on conditions. In addition, you can use evaluators that can process data based on custom code. Supported languages and frameworks include JavaScript, Groovy, Jython, and Spark. Visit StreamSets.com for a complete list of supported processors.

Destinations

Destinations are the target destination of your pipeline. Available destinations include Kudu, S3, Azure Data Lake Store, Solr, Kafka, JDBC Producer, Elasticsearch, Cassandra, HBase, MQTT Publisher, and HDFS to name a few. Visit StreamSets.com for a complete list of supported destinations.

Executors

Executors allow you to run tasks such as a MapReduce job, Hive query, Shell script, or Spark application when it receives an event.

Data Collector Console

The console is Data Collector’s main user interface (see Figure 7-1). This is where all the action happens. This is where you design your pipeline, configure your stages, run and troubleshoot pipelines, and more.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig1_HTML.jpg
Figure 7-1

StreamSets Data Collector console

StreamSets Data Collector can ingest data in real time or batch (see Figure 7-2). Real-time data sources or origins include MQTT, Kafka, and Kinesis to name a few. Batch data sources includes HDFS, S3, Oracle, SQL Server, MongoDB, and so on. Data eventually lands on one or more destinations. In between the data sources and destinations are processors that transform and process data in-stream.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig2_HTML.jpg
Figure 7-2

Typical StreamSets Architecture

There are several pre-built processors for some of the most common data transformations and enrichments. Examples of processors include column splitters and hashers, data type converters, XML parsers, JDBC lookups, and Stream processors to name a few. These processors can be configured without writing a single line of code. However, there are times when you want to write code, mainly, because none of the existing processors can handle the specific type of transformation that you require. StreamSets provide evaluators that support JavaScript, Java, Java EL, Spark, Jython, and Groovy. StreamSets connects all these stages (origins, processors, data sources) to form a pipeline. StreamSets executes the pipeline in-memory, providing maximum performance and scalability.

Real-Time Streaming

As discussed in previous chapters, Kudu is especially suitable for real-time ingestion and processing because of its support for efficient inserts and updates as well as fast columnar data scans. iii For real-time workloads, users need to select the right type of destination. There are storage engines that are not designed for real-time streaming. Examples of these storage engine includes HDFS and S3.

HDFS was designed for batch-oriented workloads. It splits and writes data in 128MB block sizes (configurable) to enable high throughput parallel processing. The problem arises when real-time data ingestion tools such as StreamSets or Flume start writing data to HDFS as fast as it receives them in order to make the data available to users in real time or near real time. This will cause HDFS to produce tons of small files. As mentioned earlier, HDFS was not designed to handle small files. iv Each file, directory, or block in HDFS has metadata stored in the name node’s memory. Sooner or later, the amount of memory consumed by these small files will overwhelm the name node. Another problem with small files is that they are physically inefficient to access. Applications need to perform more IO than necessary to read small files. A common workaround is to use coalesce or repartition in Spark to limit the number of files written to HDFS or compact the files regularly using Impala, but that is not always feasible.

S3 has its own IO characteristics and works differently than HDFS. S3 is an eventually consistent object store. With eventually consistent filesystems, users may not always see their updates immediately. v This might cause inconsistencies when reading and writing data to and from S3. Often, data appears after just a few milliseconds or seconds of writing to S3, but there are cases where it can take as long as 12 hours for some of the data to appear and become consistent. In rare cases, some objects can take almost 24 hours to appear. vi Another limitation of S3 is latency. Accessing data stored on S3 is significantly slower compared to HDFS and other storage systems.

Each S3 operation is an API call that might take tens to hundreds of milliseconds. The latency will add up and can become a performance bottleneck if you are processing millions of objects from StreamSets. vii S3 is great for storing backups or historical data but inappropriate for streaming data. In addition to Kudu, other destinations that are suitable for real-time data ingestion include HBase and MemSQL to name a few.

Batch-Oriented Data Ingestion

As mentioned earlier, StreamSets also supports batch workloads. As an origin and destination, I suggest Kudu for most batch-oriented jobs. Kudu provides most of the benefits of HDFS without the administration overheads. Performance benchmarks performed by the Kudu development team suggests that Kudu is still slightly slower than HDFS (Parquet) in some operations. If the difference in performance matters to your application (or if you’re processing unstructured data), then I suggest you use HDFS. For most applications, the difference in performance may not be important. In that case, I would suggest you stick with Kudu (see Figure 7-3). With StreamSets, batch pipelines are built in the same way as real-time pipelines. The only difference is when you use batch-oriented origins such as S3, HDFS, RDBMS, or file directory, StreamSets detects the type of origin and automatically reads the data in batch mode.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig3_HTML.jpg
Figure 7-3

Batch data ingestion with StreamSets and Kudu

Internet of Things (IoT)

StreamSets is the Swiss army knife of data ingestion. Adding to its numerous features and capabilities is its support for IoT or Internet of Things viii (see Figure 7-4). StreamSets includes an MQTT Subscriber origin and MQTT Publisher destination that allows it to be used as an Internet of Things gateway. For reading data from SCADA networks and OPC historian, StreamSets includes an OPC UA Client origin. Finally, StreamSets supports CoAP (Constrained Application Protocol). ix CoAP is a protocol for low-power and low-bandwidth environments designed for machine-to-machine device communication. In Chapter 9, we implement a complete IoT data ingestion and visualization application using StreamSets, Kudu, and Zoomdata.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig4_HTML.jpg
Figure 7-4

StreamSets IoT Architecture

Deployment Options

StreamSets supports different deployment options, stand-alone or in-cluster modes. In stand-alone mode, StreamSets Data Collector is installed on one or more edge nodes. These edge nodes can be located in the same data center or in geographically disparate sites, as long as the network can support the latency and throughput requirements of your pipelines. In cluster mode, StreamSets pipelines are deployed and executed as Spark Streaming applications, utilizing YARN or Mesos as its cluster manager to take full advantage of the Hadoop cluster’s scalability (see Figure 7-5).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig5_HTML.jpg
Figure 7-5

StreamSets Deployment Options

Consult the Data Collector User Guide for information on installing StreamSets from a Tar ball (Service Start), an RPM Package (Service Start), and Cloudera Manager (Cluster Mode).

Using StreamSets Data Collector

Let’s start by creating our first pipeline. Navigate to the StreamSets Data Collector URL, and you will be presented with a login page. The default username is “admin.” The default password is “admin.” Log in, then click “Create New Pipeline.” When prompted, enter a title and description for your new pipeline (see Figure 7-6).

You can now design your first pipeline.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig6_HTML.jpg
Figure 7-6

StreamSets Console

Ingesting XML to Kudu

In this example we will ingest sensor data in XML format into a Kudu table. Log in to StreamSets. On the Pipelines page, click the “Create New Pipeline” button (see Figure 7-7).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig7_HTML.jpg
Figure 7-7

New Pipeline

In the Data Collector Console, Click the “Stage Library” icon located near the “Help” icon (see Figure 7-8). You will be presented with a list of available stages you can use. Choose “Origins” in the list of stages. We will use Directory origins in this example. With the Directory origins, files that are copied to a designated directory will be ingested by StreamSets. Drag the Directory origins to the right side of the canvas.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig8_HTML.jpg
Figure 7-8

Directory Origin

Next, we need to add a processor. Choose “Processors” in the stage library. We will use an XML Parser to parse our XML data and convert it into a format that can be inserted into Kudu. Scroll down until you see the XML Parser icon. Drag the XML Parser processor to the canvas, near the Directory origins. Connect the Directory origins to the XML Parser processor in the canvas as shown in Figure 7-9. You will notice a yellow Pipeline Creation Help Bar. You can use the help bar to assist you in choosing stages. For now, we’ll ignore the help bar.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig9_HTML.jpg
Figure 7-9

XML Parser

We need a Destination to complete our pipeline. Choose “Destinations” in the stage library. We will use a Kudu as our destination. Drag the Kudu destination to the canvas, near the XML Parser processor. Connect the XML Parser to the Kudu destination in the canvas as shown in Figure 7-10.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig10_HTML.jpg
Figure 7-10

Kudu destination

Before we can start configuring our stages, we need to create the directory that will serve as the origin of our data source and the destination Kudu table. We’ll also prepare some test data.

Open impala-shell and create a table.

CREATE TABLE sensordata
(
       rowid BIGINT,
       sensorid SMALLINT,
       sensortimestamp STRING,
       temperature TINYINT,
       pressure TINYINT,
       PRIMARY KEY(rowid)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;

Log in to the Cloudera cluster and create a directory that will contain our sensor data.

mkdir /sensordata
chown hadoop:hadoop /sensordata

Let’s create some test data that we will use later in the exercise.

<sensordata><rowid>1000</rowid><sensorid>12</sensorid><sensortimestamp>20170409001904</sensortimestamp><temperature>23</temperature><pressure>30</pressure></sensordata>
<sensordata><rowid>1001</rowid><sensorid>39</sensorid><sensortimestamp>20170409001927</sensortimestamp><temperature>25</temperature><pressure>28</pressure></sensordata>

Configure Pipeline

You can set configuration options for the entire pipeline by clicking on the name of the pipeline; in our case click on the “Sensor Data Kudu Pipeline” link near the upper-left side of the panel (see Figure 7-11).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig11_HTML.jpg
Figure 7-11

Configure Pipeline

In the “Error Records” tab, change “Error Records” to “Discard (Library: Basic)” (see Figure 7-12). For now, leave the rest of the default parameters.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig12_HTML.jpg
Figure 7-12

Error Records

Configure the Directory Origin

Click the Directory origin stage in the pipeline canvas. In the properties panel, navigate to the “Configurations” tab. Within the “Configurations” panel, you’ll see another set of tabs (see Figure 7-13). We’ll go through each tab.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig13_HTML.jpg
Figure 7-13

Configure Directory origin

The “General” allows users to specify a name and description for the origin. There’s an option to enable the origin to generate events. If you enable “Produce Events,” the directory origin produces event records every time the directory origin begins or ends reading a file. You can ignore this configuration item for now. “On Record Error” allows you to choose an action to take on records sent to error. The default option is “Send to Error.” The other options are “Discard” and “Stop Pipeline.” Leave the default option for now.

The “Files” tab contains various configuration options related to file ingestion. The most interesting option for us in our examples are “Files Directory,” which is the local directory where you want to place files you want to ingest. “File Name Pattern Mode” lets you select whether the file name pattern specified uses regex syntax or glob syntax. Leave it set to “Glob” for now. “File Name Pattern” is an expression that defines the pattern of the file names in the directory. Since we’re ingesting XML files, let’s set the pattern to “*.xml”. “Read Order” controls the order of how files are read and processed by Data Collector. There are two options, “Last Modified Timestamp” and “Lexicographically Ascending File Names.” When lexicographically ascending file names are used, files are read in lexicographically ascending order based on file names. x It’s important to note that this ordering will order numbers 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 as 1, 10, 11, 12, 13, 14, 15, 2, 3, 4, 5, 6, 7, 8, 9.If you must read files in lexicographically ascending order, consider adding leading zeros to the file names such as: file_001.xml, file_002.xml, file_003.xml, file_004.xml, file_010.xml, file_011.xml, file_012.xml.

When the last modified timestamp is used, files are read using the timestamp. Files with the same timestamp are ordered based on file names. For this exercise, let’s set the read order to “Last Modified Timestamp” (see Figure 7-14).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig14_HTML.jpg
Figure 7-14

Last Modified Timestamp

The “Post Processing” tab provides configuration options for files after they’ve been read and processed. “Error Directory” can be configured as the destination for files that can’t be read. “File Post Processing” provides options to take after a file has been processed. You can archive or delete a file after processing, or you can do nothing. If you choose “Archive,” additional options will be presented, such as the “Archive Directory” and “Archive Retention Time (mins)” (see Figure 7-15). Let’s set “File Post Processing” to nothing for this example.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig15_HTML.jpg
Figure 7-15

Archive

The “Data Format” tab allows you to set configuration parameters related to the format of your data (see Figure 7-16). The most interesting parameters for us are “Data Format.” Note that even though we’re ingesting XML data, we’ll set this parameter to “Text.” The XML data that we’re ingesting is not strictly valid XML since it does not include a root element or XML prolog. So we will have to process our XML data using custom delimiters. xi Let’s check “Use Customer Delimiters” and specify a customer delimiter; in our case let’s set it to “</sensordata>.” Finally, let’s check “Include Customer Delimeters.” We’ll leave the other parameters to its default values for now.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig16_HTML.jpg
Figure 7-16

Data Format

Configure the XML Parser Processor

Click the XML parser processor in the canvas and navigate to the “Configuration” section of the Properties Panel. The “General” tab will let you set a name and description for the XML Parser processor. Navigate to the “Parse” tab and set “Field to Parse” and “Target Field” to “/text” (see Figure 7-17). As mentioned earlier, the directory origins were configured to process text data. It writes records into a single text field named “text.” xii This is a convention used by SDC.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig17_HTML.jpg
Figure 7-17

Parse tab

When the Directory origins processes our XML data, it will create two records, delimited by “</sensordata>.”

<sensordata><rowid>1000</rowid><sensorid>12</sensorid><sensortimestamp>20170409001904</sensortimestamp><temperature>23</temperature><pressure>30</pressure></sensordata>
<sensordata><rowid>1001</rowid><sensorid>39</sensorid><sensortimestamp>20170409001927</sensortimestamp><temperature>25</temperature><pressure>28</pressure></sensordata>
In the “Configuration” tab, navigate to Kudu and fill in the required parameters. The hostnames and port of the “Kudu Masters.” Note that because we created the “sensordata” table in Impala, it’s considered an internally managed table. Kudu tables created in Impala follow the naming convention “impala::database.table_name.” Set the “Table Name” to “impala::default.sensordata”. You also need to set the field to column mapping as seen in Figure 7-18. The format of the SDC field looks like “/text/column[0]/value.” Finally, you set the “Default Operation” to “INSERT.” Note, that you have the option to perform other operations such as UPSERT, UPDATE, and DELETE.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig18_HTML.jpg
Figure 7-18

Kudu tab

Validate and Preview Pipeline

Now that you’re done designing your pipeline, you need to check and make sure all the configurations are valid. Click the validate icon on the upper-right corner of the canvas to validate the pipeline. Once you’ve completed the validation, you can preview your pipeline to test it without actually running it. Previewing your pipeline is an easy way to debug and step through your pipeline, allowing you to inspect your data at each stage. xiii Click the preview icon. A “Preview Configuration” window will appear (see Figure 7-19). Accept the default values and click the “Run Preview” button.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig19_HTML.jpg
Figure 7-19

Preview Configuration

You’re now in preview mode. Click the Directory origin. In the preview stage pane, you can see a list of test records (see Figure 7-20). Collapsing each record shows the actual data. Note that the third record is empty. This is going to cause errors in the XML Parser processor.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig20_HTML.jpg
Figure 7-20

Preview – inspecting records

Click the XML Parser processor. Here you can see how the input data was processed and what the resulting output data looks like. As you can see, the XML data was processed correctly but as expected, there was an error processing the empty record (see Figure 7-21).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig21_HTML.jpg
Figure 7-21

Preview – error processing empty record

As expected, the Kudu destination’s input data is the XML Parser’s output (see Figure 7-22).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig22_HTML.jpg
Figure 7-22

Preview – Kudu destination

You can also preview multiple stages. Click “multiple” on top of the preview stage panel. The “Preview Multiple Stages” panel allows you to inspect the flow of the records from one stage to another (see Figure 7-23).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig23_HTML.jpg
Figure 7-23

Preview – Multiple Stages

Start the Pipeline

Now you’re ready to start the pipeline. Click the start icon, and then copy our test data (file001.xml) to /sparkdata.

cat file001.xml
<sensordata><rowid>1000</rowid><sensorid>12</sensorid><sensortimestamp>20170409001904</sensortimestamp><temperature>23</temperature><pressure>30</pressure></sensordata>
<sensordata><rowid>1001</rowid><sensorid>39</sensorid><sensortimestamp>20170409001927</sensortimestamp><temperature>25</temperature><pressure>28</pressure></sensordata>
cp file001.xml /sparkdata
After a couple of seconds, you should see some runtime statistics on the monitoring panel such as record count, record throughput, and batch throughput to mention a few (see Figure 7-24).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig24_HTML.jpg
Figure 7-24

Monitoring pipeline

Click on each stage in the canvas to see runtime statistics specific for that stage. Let’s click on the Directory origin located in the canvas (see Figure 7-25).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig25_HTML.jpg
Figure 7-25

Monitoring– Directory Origin

Clicking on the XML Parser processor gives you a different set of metrics (see Figure 7-26).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig26_HTML.jpg
Figure 7-26

Monitoring – XML Parser

The Kudu destination has a different set of metrics as well. According to the screenshot in Figure 7-27, two rows were inserted to the sensordata Kudu table.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig27_HTML.jpg
Figure 7-27

Monitoring – Kudu destination

Let’s verify if the rows were indeed successfully inserted in Kudu. Start impala-shell and query the sensordata table.

select * from sensordata;
+-------+----------+-----------------+-------------+----------+
| rowid | sensorid | sensortimestamp | temperature | pressure |
+-------+----------+-----------------+-------------+----------+
| 1001  | 39       | 20170409001927  | 25          | 28       |
| 1000  | 12       | 20170409001904  | 23          | 30       |
+-------+----------+-----------------+-------------+----------+
The rows were successfully added to the table. Try adding more data (see Figure 7-28). You’ll see the charts and other metrics get updated after adding more data.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig28_HTML.jpg
Figure 7-28

Monitoring pipeline after one hour

Congratulations! You’ve successfully built and run your first StreamSets data pipeline.

Stream Selector

There are times when you have a requirement to route data to certain streams based on conditions. For example, you can insert data to certain tables based on “Country.” There is a default stream that handles records that do not match any of the user-defined conditions. xiv

Stream 1: ${record:value("/Country")==’USA’}
Stream 2: ${record:value("/Country")==’CANADA’}
Stream 3: ${record:value("/Country")==’BRAZIL’}
Stream 4: default

In this pipeline, all records where “Country” matches USA goes to Stream 1, Canada goes to Stream 2, and Brazil goes to Stream 3. Everything else goes to Stream 4. You’ll be able to specify these conditions using the Stream Selector.

Let’s design another pipeline. Instead of starting from scratch, let’s make a copy of the pipeline we just created and modify it to use a Stream Selector. In this example, we’ll write to three different versions of the XML data to three different Kudu tables. Here’s what our new XML data looks like. The Stream Selector will examine the value of firmwareversion.

cat file002.xml
<sensordata>
        <rowid>1000</rowid>
        <sensorid>12</sensorid>
        <sensortimestamp>20170409001904</sensortimestamp>
        <temperature>23</temperature>
        <pressure>30</pressure>
        <firmwareversion>1</firmwareversion>
</sensordata>
<sensordata>
        <rowid>1001</rowid>
        <sensorid>39</sensorid>
        <sensortimestamp>20170409001927</sensortimestamp>
        <temperature>25</temperature>
        <pressure>28</pressure>
        <firmwareversion>2</firmwareversion>
        <humidity>110</humidity>
        <ozone>31</ozone>
</sensordata>
<sensordata>
        <rowid>1001</rowid>
        <sensorid>39</sensorid>
        <sensortimestamp>20170409001927</sensortimestamp>
        <temperature>25</temperature>
        <pressure>28</pressure>
        <firmwareversion>3</firmwareversion>
        <humidity>115</humidity>
        <ozone>12</ozone>
        <location>
                <altitude>121</altitude>
                <lat>37.8136</lat>
                <long>144.9631</long>
        </location>
</sensordata>

We’ll create two more Kudu tables: sensordata2 and sensordata3. We’ll also add a firmwareversion column in the existing sensordata table. We’ll have a total of three sensordata tables, each with a different set of columns.

CREATE TABLE sensordata2
(
       rowid BIGINT,
       sensorid SMALLINT,
       sensortimestamp STRING,
       temperature TINYINT,
       pressure TINYINT,
       firmwareversion TINYINT,
       humidity TINYINT,
       ozone TINYINT,
       PRIMARY KEY(rowid)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
CREATE TABLE sensordata3
(
       rowid BIGINT,
       sensorid SMALLINT,
       sensortimestamp STRING,
       temperature TINYINT,
       pressure TINYINT,
       firmwareversion TINYINT,
       humidity TINYINT,
       ozone TINYINT,
       altitude TINYINT,
       lat FLOAT,
       long FLOAT,
       PRIMARY KEY(rowid)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
ALTER TABLE sensordata ADD COLUMNS (firmwareversion TINYINT);

Drag a Stream Selector processor and two more Kudu destinations onto the canvas. Connect the XML Parser to the Stream Connector. You need to define three conditions: XML data with firmwareversion of 1 (or any value other than 2 or 3) goes to default stream (Kudu 1), XML data with firmwareversion 2 goes to stream 2 (Kudu 2), and finally XML data with firmwareversion 3 goes to stream 3 (Kudu 3).

Add two more conditions to the Stream Selector processor with these conditions.

${record:value('/text/firmwareversion[0]/value') == "3"}
${record:value('/text/firmwareversion[0]/value') == "2"}
Your new pipeline should look like Figure 7-29.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig29_HTML.jpg
Figure 7-29

Stream Selector

For each Kudu destination, make sure you map the correct SDC field to the corresponding column names (Figure 7-30).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig30_HTML.jpg
Figure 7-30

First Kudu destination

Note that the second and third Kudu destinations will have additional columns (Figure 7-31). Don’t forget to add the firmwareversion to the first Kudu destination (Figure 7-32).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig31_HTML.jpg
Figure 7-31

Second Kudu destination

../images/456459_1_En_7_Chapter/456459_1_En_7_Fig32_HTML.jpg
Figure 7-32

Third Kudu destination

Validate and preview the pipeline you just created (Figure 7-33). Confirm that each Kudu destination is receiving the correct XML data (Figures 7-34 and 7-35).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig33_HTML.jpg
Figure 7-33

Preview – First Kudu destination

../images/456459_1_En_7_Chapter/456459_1_En_7_Fig34_HTML.jpg
Figure 7-34

Preview – Second Kudu destination

../images/456459_1_En_7_Chapter/456459_1_En_7_Fig35_HTML.jpg
Figure 7-35

Preview – Third Kudu destination

Before you start the pipeline, confirm that the destination Kudu tables are empty from the impala-shell.

select count(*) from sensordata;
+----------+
| count(*) |
+----------+
| 0        |
+----------+
select count(*) from sensordata2;
+----------+
| count(*) |
+----------+
| 0        |
+----------+
select count(*) from sensordata3;
+----------+
| count(*) |
+----------+
| 0        |
+----------+
Start the pipeline, then copy file002.xml to /sparkdata. After a few seconds, you’ll see some updates on the charts in the monitoring panel. Click the Stream Selector processor. As you can see in the Record Count chart, Input has three records, while the three Output has one record each.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig36_HTML.jpg
Figure 7-36

Monitor Stream Selector (Figure 7-36)

Check each Kudu destination to see if the records were successfully ingested.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig37_HTML.jpg
Figure 7-37

Monitor Kudu destination (Figure 7-37)

Check the Kudu tables to confirm that the records were successfully ingested.

select * from sensordata;
+-------+----------+-----------------+-------------+
| rowid | sensorid | sensortimestamp | temperature |
+-------+----------+-----------------+-------------+
| 1000  | 12       | 20170409001904  | 23          |
+-------+----------+-----------------+-------------+
+-----------+-----------------+
| pressure  | firmwareversion |
+-----------+-----------------+
| 30        | 1               |
+-----------+-----------------+
select * from sensordata2;
+-------+----------+-----------------+-------------+
| rowid | sensorid | sensortimestamp | temperature |
+-------+----------+-----------------+-------------+
| 1001  | 39       | 20170409001927  | 25          |
+-------+----------+-----------------+-------------+
+----------+-----------------+----------+-------+
| pressure | firmwareversion | humidity | ozone |
+----------+-----------------+----------+-------+
| 28       | 2               | 110      | 31    |
+----------+-----------------+----------+-------+
select * from sensordata3;
+-------+----------+-----------------+-------------+
| rowid | sensorid | sensortimestamp | temperature |
+-------+----------+-----------------+-------------+
| 1001  | 39       | 20170409001927  | 25          |
+-------+----------+-----------------+-------------+
+----------+-----------------+----------+
| pressure | firmwareversion | humidity |
+----------+-----------------+----------+
| 28       | 3               | 115      |
+----------+-----------------+----------+
+-------+----------+------------------+-------------------+
| ozone | altitude | lat              | long              |
+-------+----------+------------------+-------------------+
| 12    | 121      | 37.8135986328125 | 144.9631042480469 |
+-------+----------+------------------+-------------------+

Expression Evaluator

The Expression Evaluator allows users to perform data transformations and calculations and writes the results to new or existing fields. Users can also add or change field attributes and record header attributes. xv I often use the Expression Evaluator as a rules engine.

Let’s work on a new example. We’ll use an Expression Evaluator to perform two data transformations. We’ll convert the temperature from Celsius to Fahrenheit and save it to the existing temperature field. For the second transformation, we’ll update a new status field with the values “NORMAL,” “WARNING,” or “CRITICAL” depending on the values of the pressure field. If pressure is over 40, the status field gets updated with “WARNING,” and if pressure is over 50 it gets updated with “CRITICAL.” Let’s get started.

Add another “status” column to the Kudu sensordata table.

ALTER TABLE sensordata ADD COLUMNS (status STRING);
DESCRIBE sensordata;
+-----------------+----------+-------------+
| name            | type     | primary_key |
+-----------------+----------+-------------+
| rowid           | bigint   | true        |
| sensorid        | smallint | false       |
| sensortimestamp | string   | false       |
| temperature     | tinyint  | false       |
| pressure        | tinyint  | false       |
| firmwareversion | tinyint  | false       |
| status          | string   | false       |
+-----------------+----------+-------------+

We’ll use the following XML data for this example.

cat file003.xml
<sensordata>
       <rowid>1000</rowid>
       <sensorid>12</sensorid>
       <sensortimestamp>20170409001904</sensortimestamp>
       <temperature>23</temperature>
       <pressure>30</pressure>
       <firmwareversion>1</firmwareversion>
</sensordata>
<sensordata>
       <rowid>1001</rowid>
       <sensorid>39</sensorid>
       <sensortimestamp>20170409001927</sensortimestamp>
       <temperature>25</temperature>
       <pressure>28</pressure>
       <firmwareversion>2</firmwareversion>
       <humidity>110</humidity>
       <ozone>31</ozone>
</sensordata>
<sensordata>
        <rowid>1002</rowid>
        <sensorid>39</sensorid>
        <sensortimestamp>20170409001927</sensortimestamp>
        <temperature>25</temperature>
        <pressure>28</pressure>
        <firmwareversion>3</firmwareversion>
       <humidity>115</humidity>
       <ozone>12</ozone>
       <location>
              <altitude>121</altitude>
              <lat>37.8136</lat>
              <long>144.9631</long>
       </location>
</sensordata>
<sensordata>
       <rowid>1003</rowid>
        <sensorid>58</sensorid>
        <sensortimestamp>20170409001930</sensortimestamp>
        <temperature>22</temperature>
        <pressure>44</pressure>
        <firmwareversion>2</firmwareversion>
        <humidity>112</humidity>
        <ozone>17</ozone>
</sensordata>
<sensordata>
       <rowid>1004</rowid>
        <sensorid>72</sensorid>
        <sensortimestamp>20170409001934</sensortimestamp>
        <temperature>26</temperature>
        <pressure>59</pressure>
        <firmwareversion>2</firmwareversion>
        <humidity>115</humidity>
        <ozone>12</ozone>
</sensordata>
Make a copy of the original pipeline and add an Expression Evaluator processor in between the XML Parser processor and Kudu destination. Connect the stages. Your canvas should look like Figure 7-40.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig38_HTML.jpg
Figure 7-38

Expression Evaluator

Click the Expression Evaluator processor (Figure 7-38) and navigate to the “Expressions” tab. We’ll add two entries in the “Field Expressions” section. For the first entry, in the “Output Field” enter the following value:

/text/temperature[0]/value

In the “Field Expression”, enter the expression:

 ${record:value('/text/temperature[0]/value') * 1.8 + 32}

This is the formula to convert temperature from Celsius to Fahrenheit. Specifying an existing field in the “Output Field” will overwrite the value in the field.

For the second entry, we’ll use an if-then-else expression. Enter a new field in the “Output Field”:

/text/status

Enter the expression in “Field Expression”:

${record:value('/text/pressure[0]/value') > 50?'CRITICAL': (record:value('/text/pressure[0]/value') > 40?'WARNING':'NORMAL')}
Your screen should look like Figure 7-41.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig39_HTML.jpg
Figure 7-39

Expression Evaluator - Expression

Map “/text/status” to the new status field in the Kudu destination.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig40_HTML.jpg
Figure 7-40

Map SDC field to Kudu field

Validate and preview the pipeline. Click the Expression Evaluator stage and confirm the values of the input and output records. Note that the value of temperature in the output Record 4 and Record 5 has been converted to Fahrenheit. The value of status on both output Record 4 and Record 5 were updated with “WARNING” and “CRITICAL” respectively, based on the value of the pressure field.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig41_HTML.jpg
Figure 7-41

Preview – Expression Evaluator

Confirm the values in the Kudu destination.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig42_HTML.jpg
Figure 7-42

Preview – Kudu destination

Run the pipeline and copy the XML file to /sparkdata.

It looks like all five records were successfully inserted to the Kudu sensordata table.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig43_HTML.jpg
Figure 7-43

Five records inserted into Kudu destination

Confirm that the data were successfully inserted to Kudu. As you can see the status field was successfully updated based on pressure. Also, the temperatures are now in Fahrenheit.

SELECT * FROM sensordata;
+-------+----------+-----------------+-------------+
| rowid | sensorid | sensortimestamp | temperature |
+-------+----------+-----------------+-------------+
| 1002  | 39       | 20170409001927  | 77          |
| 1000  | 12       | 20170409001904  | 73          |
| 1001  | 39       | 20170409001927  | 77          |
| 1003  | 58       | 20170409001930  | 71          |
| 1004  | 72       | 20170409001934  | 78          |
+-------+----------+-----------------+-------------+
+----------+-----------------+----------+
|pressure  | firmwareversion | status   |
+----------+-----------------+----------+
|28        | 3               | NORMAL   |
|30        | 1               | NORMAL   |
|28        | 2               | NORMAL   |
|44        | 2               | WARNING  |
|59        | 2               | CRITICAL |
+----------+-----------------+----------+

Using the JavaScript Evaluator

StreamSets Data Collector includes a JavaScript evaluator that you can use for more complex processing and transformation. It uses JavaScript code to process one record or one batch at a time. xvi As you can imagine, processing one record at a time will most likely be slower compared to processing by batch of data at a time. Processing by batch is recommended in production environments.

Although we’re using JavaScript in this example, other languages are supported by Data Collector. In addition to JavaScript, you can use Java, Jython (Python), Groovy, and Spark Evaluators. These evaluators enable users to perform more advanced complex stream processing, taking advantage of the full capabilities of each programming languages.

Let’s start with a simple example. We’ll create two additional fields, action and uniqueid. We’ll generate the value of action based on the value of the status field. Next, we’ll generate a universally unique identifier (UUID) using a custom JavaScript function. We’ll save this UUID in our uniqueid field.

We don’t have to start from scratch. We’ll make a copy of the previous pipeline and then drag a JavaScript Evaluator processor into the canvas and place in between the Expression Evaluator processor and the Kudu destination. Reconnect the stages. Your canvas should look like Figure 7-46.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig44_HTML.jpg
Figure 7-44

JavaScript Evaluator

Now let’s configure the JavaScript Evaluator to use our custom JavaScript code in our pipeline. In the Properties panel, on the JavaScript tab, make sure that “Record Processing Mode” is set to “Batch by Batch.” “Script” will contain your customer JavaScript code. As mentioned earlier, we’ll create a function to generate a UUID. xvii The JavaScript Evaluator passes the batch to the script as an array. To access individual fields, use the format: records[arrayindex].value.text.columname. To refer to the value of the status field use “records[i].value.text.status.” We also created a new field, “record[i].value.text.action” using this format. See Listing 7-1.

function generateUUID() {
    var d = new Date().getTime();
    var uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
        var r = (d + Math.random()*16)%16 | 0;
        d = Math.floor(d/16);
        return (c=='x' ? r : (r&0x3|0x8)).toString(16);
    });
    return uuid;
};
for(var i = 0; i < records.length; i++) {
  try {
   var myUUID = generateUUID()
    if (records[i].value.text.status == "NORMAL")
             records[i].value.text.action = "None. Pressure is normal.";
       else if (records[i].value.text.status == "WARNING")
             records[i].value.text.action = "File a support ticket.";
       else if (records[i].value.text.status == "CRITICAL")
             records[i].value.text.action = "Inform your supervisor immediately!";
    records[i].value.text.uniqueid = myUUID;
    output.write(records[i]);
  } catch (e) {
    error.write(records[i], e);
  }
}
Listing 7-1

Javascript Evaluator code

Your screen should look like Figure 7-47.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig45_HTML.jpg
Figure 7-45

JavaScript Evaluator code

Add the action and uniqueid columns to the Kudu sensordata columns.

ALTER TABLE sensordata ADD COLUMNS (action STRING);
ALTER TABLE sensordata ADD COLUMNS (uniqueid STRING);
DESCRIBE sensordata;
+-----------------+----------+-------------+
| name            | type     | primary_key |
+-----------------+----------+-------------+
| rowid           | bigint   | true        |
| sensorid        | smallint | false       |
| sensortimestamp | string   | false       |
| temperature     | tinyint  | false       |
| pressure        | tinyint  | false       |
| firmwareversion | tinyint  | false       |
| status          | string   | false       |
| action          | string   | false       |
| uniqueid        | string   | false       |
+-----------------+----------+-------------+
Map the SDC field with their corresponding Kudu table columns.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig46_HTML.jpg
Figure 7-46

Map SDC field

Validate and preview your pipeline. Click the JavaScript Evaluator and confirm the values of the input and output records. Note that the action and unique fields have been populated.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig47_HTML.jpg
Figure 7-47

Preview values

Confirm the values in the Kudu destination.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig48_HTML.jpg
Figure 7-48

Confirm values in Kudu destination

Run the pipeline and copy the XML file to /sparkdata.

It looks like all five records were successfully processed by the JavaScript Evaluator.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig49_HTML.jpg
Figure 7-49

Monitor JavaScript Evaluator

All five records were also successfully inserted to the Kudu sensordata table.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig50_HTML.jpg
Figure 7-50

Monitor Kudu destination

Confirm that the data was successfully inserted to Kudu. As you can see, the action and uniqueid fields were successfully populated.

SELECT rowid, status, action, substr(1,10,uniqueid) as uniqueid FROM sensordata;
+-------+----------+-----------------------------------------+------------+
| rowid | status   | action                                  |uniqueid    |
+-------+----------+-----------------------------------------+------------+
| 1002  | NORMAL   | None. Pressure is normal.               |35c47cb3-e9 |
| 1000  | NORMAL   | None. Pressure is normal.               |d3017138-09 |
| 1001  | NORMAL   | None. Pressure is normal.               |d91416e0-1b |
| 1003  | WARNING  | WARNING. file a support ticket.         |30aa86be-71 |
| 1004  | CRITICAL | CRITICAL. inform your supervisor immediately!|f7161ea3-eb |
+-------+----------+----------------------------------------+-------------+

Congratulations! You’ve successfully used a JavaScript evaluator in your pipeline.

Ingesting into Multiple Kudu Clusters

Sometimes you’ll be required to ingest data into two or more active-active clusters for high availability and scalability reasons. With Data Collector this is easily accomplished by simply adding Kudu destinations to the canvas and connecting them to the processor. In this example, we will simultaneously ingest XML data into two Kudu clusters, kuducluster01 and kuducluster02.

Drag a second Kudu destination to the canvas. You should now have two Kudu destinations. Make sure they’re both connected to the XML processor.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig51_HTML.jpg
Figure 7-51

Multiple Kudu destinations

Click the first Kudu destination. Make sure the correct host name and ports are specified in the Kudu Masters field. Make sure the SDC fields also map to the Kudu table fields.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig52_HTML.jpg
Figure 7-52

Configure first Kudu destination

Next, click the second Kudu destination. Perform the same task as you did on the first Kudu destination.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig53_HTML.jpg
Figure 7-53

Configure second Kudu destination

Validate and preview the pipeline. When you’re done, start the pipeline and start adding data. Monitor the XML Parser. Note the number of input and output data.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig54_HTML.jpg
Figure 7-54

Monitor XML Parser

Monitor the first Kudu destination. Note that it processed 1,582, 554 records.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig55_HTML.jpg
Figure 7-55

Monitor first Kudu destination

Check the second Kudu destination and note that it also ingested the same amount of records.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig56_HTML.jpg
Figure 7-56

Monitor second Kudu destination

Verify in Impala. Perform a SELECT COUNT on the sensordata table on the first Kudu cluster.

select count(*) from sensordata;
+----------+
| count(*) |
+----------+
| 1582554  |
+----------+

Do the same SELECT COUNT on the second Kudu cluster.

select count(*) from sensordata;
+----------+
| count(*) |
+----------+
| 1582554  |
+----------+

You ingested data into two Kudu clusters simultaneously. You can ingest to different combinations of platforms such as HBase, Cassandra, Solr, Kafka, S3, MongoDB, and so on. Consult the Data Collector User Guide for more details.

REST API

Data Collector has an easy-to-use web-based GUI for designing, running, and administering pipelines. However, you can also use the built-in REST API if you’d like to programmatically interact with Data Collector, for automation purposes, for example. xviii The REST API lets you access all aspects of Data Collector, from starting and stopping pipelines, returning configuration information and monitoring pipeline metrics.

You can access the REST API by clicking the Help icon and then selecting “RESTful API.”
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig57_HTML.jpg
Figure 7-57

REST API

You’ll be presented with a group of available operations. Expand the manager list by clicking on “manager.” You can explore different operations under the “manager” group such as returning pipeline status, starting and stopping pipelines, and so on.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig58_HTML.jpg
Figure 7-58

List of available operations

Expand “Return All Pipeline Status.” Choose the response content type “application/json” and click the “try it out” button.

The “response body” will contain details about your pipelines in JSON format similar to Listing 7-2. Depending on the number of pipelines and type of error messages returned, the response body could be quite large.

{
  "47234db3-7a94-40ab-9465-2dc799e132e6": {
    "user": "admin",
    "name": "47234db3-7a94-40ab-9465-2dc799e132e6",
    "rev": "0",
    "status": "EDITED",
    "message": "Pipeline edited",
    "timeStamp": 1491742372750,
    "attributes": {
      "IS_REMOTE_PIPELINE": false
    },
    "executionMode": "STANDALONE",
    "metrics": null,
    "retryAttempt": 0,
    "nextRetryTimeStamp": 0
  },
  "6c92cc6d-bdef-4b2b-ad62-69537e057128": {
    "user": "admin",
    "name": "6c92cc6d-bdef-4b2b-ad62-69537e057128",
    "rev": "0",
    "status": "EDITED",
    "message": "Pipeline edited",
    "timeStamp": 1491739709715,
    "attributes": {
      "IS_REMOTE_PIPELINE": false
    },
    "executionMode": "STANDALONE",
    "metrics": null,
    "retryAttempt": 0,
    "nextRetryTimeStamp": 0
  },
  "de3b27ed-0a92-47cb-8400-da5aa4cdf43e": {
    "user": "admin",
    "name": "de3b27ed-0a92-47cb-8400-da5aa4cdf43e",
    "rev": "0",
    "status": "STOPPED",
    "message": "The pipeline was stopped. The last committed source offset is 'file006.xml::-1'.",
    "timeStamp": 1492067839465,
    "attributes": {
      "IS_REMOTE_PIPELINE": false
    },
    "executionMode": "STANDALONE",
    "metrics": null,
    "retryAttempt": 0,
    "nextRetryTimeStamp": 0
  },
  "e4ded330-c573-4ab0-8fa8-004991493398": {
    "user": "admin",
    "name": "e4ded330-c573-4ab0-8fa8-004991493398",
    "rev": "0",
    "status": "STOPPED",
    "message": "The pipeline was stopped. The last committed source offset is 'file006.xml::-1'.",
    "timeStamp": 1492176332877,
    "attributes": {
      "IS_REMOTE_PIPELINE": false
    },
    "executionMode": "STANDALONE",
    "metrics": null,
    "retryAttempt": 0,
    "nextRetryTimeStamp": 0
  }
}
Listing 7-2

Return all pipeline status response body

You can make REST API requests by using curl, a utility that can download data using standard protocols. The username and password and Custom HTTP header attribute (X-Requested-By) are required.

curl -u admin:mypassword http://localhost:18630/rest/v1/pipelines/status -H "X-Requested-By:myscript"

Event Framework

StreamSets has an Event Framework that allows users to kick off tasks in response to triggers or events that happen in the pipeline. StreamSets uses dataflow triggers to execute tasks such as send e-mails, execute a shell script, starting a JDBC query, or starting a Spark job after an event, for example, after the pipeline successfully completes a JDBC query.

Dataflow Performance Manager

One of the most powerful feature of StreamSets is StreamSets Dataflow Performance Manager. StreamSets Dataflow Performance Manager (DPM) is a management console that lets you manage complex data flows, providing a unified view of all running pipelines in your environment. DPM is extremely helpful in monitoring and troubleshooting pipelines. You’ll appreciate its value more when you’re tasked to monitor hundreds or even thousands of pipelines. Covering DPM is beyond the scope of this book. For more information, consult the Data Collector User Guide.

I’ve only covered a small subset of StreamSets features and capabilities. For more information about StreamSets, consult the StreamSets Data Collector User Guide online. In Chapter 9, I use StreamSets, Zoomdata , and Kudu to create a complete Internet of Things (IoT) application that ingests and visualizes sensor data in real time.

Other Next-Generation Big Data Integration Tools

There are other next-gen data integration tools available in the market. I discuss some of the most popular options in the Cask Data Application Platform.

The Cask Data Application Platform (CDAP) is an open source platform that you can use to develop big data applications and ETL jobs using Spark and the Hadoop stack. CDAP provides a GUI-based data ingestion studio for developing, deploying, and administering data pipelines. Its data preparation features provide an interactive method for data cleansing and transformation, a set of tasks commonly known as data wrangling. CDAP also has an application development framework, high-level Java APIs for rapid application development, and deployment. On top of that, it has metadata, data lineage, and security features that are important to enterprise environments. Like StreamSets, it has native support for Kudu. Let’s develop a CDAP pipeline to ingest data to Kudu.

Data Ingestion with Kudu

The first thing we need to do is prepare test data for our example.

cat test01.csv
1,Jeff Wells,San Diego,71
2,Nancy Maher,Van Nuys,34
3,Thomas Chen,Rolling Hills,62
4,Earl Brown,Artesia,29
hadoop fs -put test01.csv /mydata
To access CDAP, direct your browser to the hostname of the server where the application is installed, using port 11011. The first time you access CDAP, you’re greeted with a welcome page.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig59_HTML.jpg
Figure 7-59

CDAP welcome page

Close the window and click the button “Start by Adding Entities to CDAP.” You’re presented with five choices. Let’s create a Pipeline.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig60_HTML.jpg
Figure 7-60

Add Entity

A canvas will appear. This is where you’ll design, deploy, and administer your data pipelines. On the right side of the canvas, you can choose different sources, transformations, and sinks that you can use to design your pipeline.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig61_HTML.jpg
Figure 7-61

CDAP canvas

You may notice that the Kudu source and sink are missing. We need to add the Kudu source and sink from the Cask Market. Click the “Cask Market” link near the upper-right corner of the application window.

Scroll down the list of items in the Cask Market until you find the Kudu source and sink.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig62_HTML.jpg
Figure 7-62

Cask Market

Click the Kudu source and sink icon and then click “Deploy.”
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig63_HTML.jpg
Figure 7-63

Cask Market – Kudu Source and Sink

Click “Finish.”
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig64_HTML.jpg
Figure 7-64

Finish installation

Click “Create a Pipeline.” Notice the Kudu icon is now available as a data source and sink.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig65_HTML.jpg
Figure 7-65

Kudu data source and sink

Our example CDAP pipeline will ingest a CSV file into a Kudu table. But before inserting the data, we will hash the name with a popular hashing algorithm. Let’s drag a “File” source to the canvas.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig66_HTML.jpg
Figure 7-66

File source

Double-click the file source icon. Enter the file properties such as the filename and the path of the file in HDFS.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig67_HTML.jpg
Figure 7-67

File properties

CDAP makes it easy to access documentation about a particular sink, source, or transformation.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig68_HTML.jpg
Figure 7-68

File Batch Source documentation

Next, drag a CSVParser transformation to the canvas and connect it to the File source.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig69_HTML.jpg
Figure 7-69

CSVParser transformation

Enter the CSVParser properties. Make sure the Output Schema has the correct columns.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig70_HTML.jpg
Figure 7-70

Configure CSVParser

The documentation for the CSV Parser transformation is readily available if you need assistance.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig71_HTML.jpg
Figure 7-71

CSVParser documentation

Drag a Hasher transformation to the canvas and connect it with the CSVParser.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig72_HTML.jpg
Figure 7-72

Hasher transformation

Configure the Hasher by choosing the hashing algorithm and specifying the field you want to hash. For this example, we’ll choose MD5 as our hashing algorithm.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig73_HTML.jpg
Figure 7-73

Hasher configuration

Drag a Kudu sink and connect it to the Hasher.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig74_HTML.jpg
Figure 7-74

Kudu sink

Configure the Kudu sink. Note that CDAP uses the Kudu native APIs to insert data to the tables instead of going through Impala. CDAP will also create the destination table so you just need to specify the table name, instead of using the “impala::database_name.table_name” format. In this example, we’ll use the table name “users_table.” Later, we’ll create an external Impala table on top of this Kudu table.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig75_HTML.jpg
Figure 7-75

Kudu sink configuration

The Kudu sink documentation is available just in case you need help with some of the options.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig76_HTML.jpg
Figure 7-76

Kudu sink documentation

Your canvas should look like Figure 7-77. You can now deploy the pipeline. You can preview and validate the pipeline first to make sure there are no errors.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig77_HTML.jpg
Figure 7-77

Complete pipeline

Finally, the Canvas will show you the number of records transferred and processed from the File source all the way to the Kudu sink. You’ll see an indicator near the upper-left corner of the canvas whether the job as successful or not. See Figure 7-78.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig78_HTML.jpg
Figure 7-78

Number of records transferred and processed

Check the logs and confirm that the job succeeded as shown in Figure 7-79.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig79_HTML.jpg
Figure 7-79

Check CDAP logs

Confirm that the rows were successfully inserted to the Kudu table. First, we have to create an external table in Impala on top of the Kudu table that CDAP created. Make sure the name field is hashed.

impala-shell
CREATE EXTERNAL TABLE cdap_users
STORED AS KUDU
TBLPROPERTIES (
'kudu.table_name' = 'users_table'
);
SELECT * FROM cdap_users;
+----+----------------------------------+---------------+-----+
| id | name                             | city          | age |
+----+----------------------------------+---------------+-----+
| 3  | dd500fc6d39cde55b6b4858e9854a21d | Rolling Hills | 62  |
| 1  | 228b855279d81c5251cff62e2b503079 | San Diego     | 71  |
| 4  | 332035b1942026174865ede2021dad2a | Artesia       | 29  |
| 2  | 8283a7fa1a09657dcc62125f8d734a7e | Van Nuys      | 34  |
+----+----------------------------------+---------------+-----+

You’ve successfully ingested data into a Kudu table, using a Field Hasher to perform in-stream transformation to your data.

Pentaho Data Integration

Pentaho offers a complete line of products for data integration, big data processing, and business analytics. We’ll focus on Pentaho Data Integration (PDI) in this chapter. There is a community version called Pentaho Community Edition (CE), which includes Kettle, the open source version of Pentaho Data Integration. There is an Enterprise version, which includes Native YARN integration, analyzer and dashboard enhancements, advanced security, and high availability features. xix

Pentaho PDI has an intuitive user interface with ready-built and easy-to-use components to help you develop data ingestion pipelines. Similar to most ETL tools in the market, Pentaho PDI allows you to connect to different types of data sources ranging from popular RDBMS such as Oracle, SQL Server, MySQL, and Teradata to Big Data and NoSQL platforms such as HDFS, HBase, Cassandra, and MongoDB. PDI includes an integrated enterprise orchestration and scheduling capabilities for coordinating and managing workflows. You can effortlessly switch between Pentaho’s native execution engine and Apache Spark to scale your pipelines to handle large data volumes.

Ingest CSV into HDFS and Kudu

Unlike other data integration tools described in this chapter, Pentaho does not include native support for Kudu yet. In order for us to insert data into Kudu, we will need to use Pentaho’s generic Table Output component using Impala’s JDBC driver. Using Table Output directly may not be fast enough depending on the size of your data and data ingestion requirements. One way to improve performance is to stage data to HDFS first, then using Table Output to ingest data from HDFS to Kudu. In some cases, this may be faster than directly ingesting into Kudu.

The first thing we need to do is prepare test data for our example.

cd /mydata
cat test01.csv
id,name,city,age
1,Jeff Wells,San Diego,71
2,Nancy Maher,Van Nuys,34
3,Thomas Chen,Rolling Hills,62
4,Earl Brown,Artesia,29
Start Pentaho PDI. We’ll use the Community Edition (Kettle) for this example. Navigate to where you installed the binaries and execute ./spoon.sh. See Figure 7-80.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig80_HTML.jpg
Figure 7-80

Start Pentaho Data Integration

The Spoon graphical user interface is shown in Figure 7-81. This is where you design and build your jobs and transformation.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig81_HTML.jpg
Figure 7-81

Graphical view

On the left side of the window, you can find a list of all the supported input, output and transformation steps available for ETL development. Expand “Input” and drag “CSV file input” step to the canvas as shown in Figure 7-82.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig82_HTML.jpg
Figure 7-82

CSV file input

Double-click the icon. Enter the configuration details such as the file name, delimiters, and so on. See Figure 7-83.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig83_HTML.jpg
Figure 7-83

Configure CSV file input

Click “Get Fields” to let Pentaho infer the field type and size of the fields. See Figure 7-84.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig84_HTML.jpg
Figure 7-84

Get fields

Expand “Big Data” and drag “Hadoop File Output” to the canvas as shown in Figure 7-85.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig85_HTML.jpg
Figure 7-85

Hadoop file output

Connect “CSV file input” to “Hadoop File Output,” as shown in Figure 7-86.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig86_HTML.jpg
Figure 7-86

Connect CSV file input to Hadoop file output

Double-click the “Hadoop File Output” component to configure the destination. Enter all required information such as the name of the cluster, the address of the HDFS namenode, and so on. See Figure 7-87.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig87_HTML.jpg
Figure 7-87

Configure Hadoop file output

Enter additional parameters to the Hadoop File Output stage. In our example, we’ll rename the file based on our own Date time format. See Figure 7-88.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig88_HTML.jpg
Figure 7-88

Rename file based on date time format

Specify details about the fields such as data type, format, length, precision and so on. See Figure 7-89.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig89_HTML.jpg
Figure 7-89

Configure fields

You can preview and sanity check your data. See Figure 7-90.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig90_HTML.jpg
Figure 7-90

Preview data

One of the most helpful features of PDI is the ability to monitor execution metrics of your job. Try doing a test run to make sure that the data is getting transferred from your source to destination. See Figure 7-91.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig91_HTML.jpg
Figure 7-91

Run the job

Navigate to “metrics” in the Execution Results panel as shown in Figure 7-92.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig92_HTML.jpg
Figure 7-92

Execution Results

Looking closer, it shows you how long each step in the job takes. In this example, the entire job took 412 ms to execute. Initializing the transformation took 122 ms, part of the 122 ms, initializing the CSV file input step took 3 ms while initializing the Hadoop File Output step took 96 ms. The actual execution of the CSV file input step took 15 ms, while the execution of the Hadoop File Output took 251 ms. See Figure 7-93.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig93_HTML.jpg
Figure 7-93

Metrics – Execution Results

Navigate to the “Logging” tab to inspect the application log. As you can see the job successfully executed as shown in Figure 7-94.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig94_HTML.jpg
Figure 7-94

Logging – Execution Results

Confirm that the file was indeed copied to HDFS.

hadoop fs -ls /proddata
-rw-r--r--   3 hadoop supergroup        129 2017-05-13 00:05/proddata/20170513000514.txt
hadoop fs -cat /proddata/20170513000514.txt
id,name,city,age
1,Jeff Wells,San Diego,71
2,Nancy Maher,Van Nuys,34
3,Thomas Chen,Rolling Hills,62
4,Earl Brown,Artesia,29
Let’s configure the final destination. Drag a Table Output step to the canvas. Double-click the icon and start configuring the step. Let’s configure the Impala driver. Download the Impala JDBC driver from Cloudera.com and copy it to <install directory>/data-integration/lib. See Figure 7-95.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig95_HTML.jpg
Figure 7-95

Configure database connection

Configure the JDBC driver. Enter the hostname or IP address of the Impala server, the TCP/UP port and the database name as shown in Figure 7-96.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig96_HTML.jpg
Figure 7-96

Configure JDBC settings

Test the database connection as shown in Figure 7-97.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig97_HTML.jpg
Figure 7-97

Test the database connection

Create the destination table.

impala-shell
CREATE TABLE pentaho_users
(
 id BIGINT,
 name STRING,
 city STRING,
 age TINYINT,
 PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU;
Enter the destination table and other relevant configuration options. See Figure 7-98.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig98_HTML.jpg
Figure 7-98

Configure table output

Click “Get fields” and make sure the source and destination fields match as shown in Figure 7-99.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig99_HTML.jpg
Figure 7-99

Get fields

Preview and sanity check the data as shown in Figure 7-100.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig100_HTML.jpg
Figure 7-100

Preview data

Execute the job. Monitor the logs and make sure the job successfully executes. See Figure 7-101.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig101_HTML.jpg
Figure 7-101

Run the job

The job ran successfully.

2017/05/13 00:21:16 - Spoon - Transformation opened.
2017/05/13 00:21:16 - Spoon - Launching transformation [csv_to_kudu1]...
2017/05/13 00:21:16 - Spoon - Started the transformation execution.
2017/05/13 00:21:16 - csv_to_kudu1 - Dispatching started for transformation [csv_to_kudu1]
2017/05/13 00:21:16 - Table output.0 - Connected to database [Impala_Kudu] (commit=1000)
2017/05/13 00:21:16 - CSV file input.0 - Header row skipped in file '/mydata/test01.csv'
2017/05/13 00:21:16 - CSV file input.0 - Finished processing (I=5, O=0, R=0, W=4, U=0, E=0)
2017/05/13 00:21:16 - Hadoop File Output.0 - Finished processing (I=0, O=5, R=4, W=4, U=0, E=0)
2017/05/13 00:21:16 - Table output.0 - Finished processing (I=0, O=4, R=4, W=4, U=0, E=0)
2017/05/13 00:21:16 - Spoon - The transformation has finished!!

Confirm that the data was successfully inserted into the Kudu table.

impala-shell
select * from pentaho_users;
+----+-------------+---------------+-----+
| id | name        | city          | age |
+----+-------------+---------------+-----+
| 2  | Nancy Maher | Van Nuys      | 34  |
| 3  | Thomas Chen | Rolling Hills | 62  |
| 1  | Jeff Wells  | San Diego     | 71  |
| 4  | Earl Brown  | Artesia       | 29  |
+----+-------------+---------------+-----+

Data Ingestion to Kudu with Transformation

Let’s start with another example. This time we’ll use a string replace transformation to replace the string “Yonkers” with “Berkeley.”

Prepare the data.

ls /mydata
test01.csv  test02.csv
cat test01.csv
id,name,city,age
1,Jeff Wells,San Diego,71
2,Nancy Maher,Van Nuys,34
3,Thomas Chen,Rolling Hills,62
4,Earl Brown,Artesia,29
cat test02.csv
id,name,city,age
5,Damian Lee,Yonkers,27
6,John Lohan,Encino,55
7,Lily Tran,Reseda,50
8,Sam Estevez,Tucson,81
Enter the source directory and regular expression (Wildcard) to search for files. See Figure 7-102.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig102_HTML.jpg
Figure 7-102

Specify source directory and file

Preview and sanity check the data. See Figure 7-103.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig103_HTML.jpg
Figure 7-103

Preview data

Drag a “Replace string” transformation step to the canvas. Double-click the icon and configure it by specifying the string to search and the string to replace it with. As we mentioned earlier, we’ll replace the string “Yonkers” with “Berkeley” in the city field. You can ignore the other options for now. See Figure 7-104.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig104_HTML.jpg
Figure 7-104

Replace a string transformation

Run the job as shown in Figure 7-105.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig105_HTML.jpg
Figure 7-105

Run the job

Check the table and make sure “Yonkers” is replaced with “Berkeley.” Compare the value of the city in row id 5 with the original source text file.

impala-shell
select * from pentaho_users order by id;
+----+-------------+---------------+-----+
| id | name        | city          | age |
+----+-------------+---------------+-----+
| 1  | Jeff Wells  | San Diego     | 71  |
| 2  | Nancy Maher | Van Nuys      | 34  |
| 3  | Thomas Chen | Rolling Hills | 62  |
| 4  | Earl Brown  | Artesia       | 29  |
| 5  | Damian Lee  | Berkeley      | 27  |
| 6  | John Lohan  | Encino        | 55  |
| 7  | Lily Tran   | Reseda        | 50  |
| 8  | Sam Estevez | Tucson        | 81  |
+----+-------------+---------------+-----+

SQL Server to Kudu

In this example, we’ll show how to ingest data from an RDBMS (SQL Server 2016) to Kudu.

Make sure you have data in your source SQL Server 2016 database. We’ll use the same table we used in other examples. Inside a salesdb database, there’s a table called users with the following rows. Use SQL Server Management Studio to confirm. See Figure 7-106.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig106_HTML.jpg
Figure 7-106

Check the table in SQL Server Management Studio

In Spoon, drag a Table Input step to the canvas as shown in Figure 7-107.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig107_HTML.jpg
Figure 7-107

Table input

Configure the Table Input step. See Figure 7-108.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig108_HTML.jpg
Figure 7-108

Configure Table input

Make sure to install the SQL Server JDBC driver by copying sqljdbc41.jar (JDK 1.7) or sqljdbc42.jar (JDK 1.8) to <install directory>/data-integration/lib. Download the JDBC driver from Microsoft.com if you haven’t already. Choose the MS SQL Server (native) connection type as shown in Figure 7-109.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig109_HTML.jpg
Figure 7-109

Configure database connection

Configure the JDBC setting (Figure 7-110).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig110_HTML.jpg
Figure 7-110

Configure JDBC settings

Test the connection to the source SQL Server 2016 database (Figure 7-111).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig111_HTML.jpg
Figure 7-111

Test database connection

Now that you’ve configured the connection to your source database, specify a SQL query as your data source. See Figure 7-112.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig112_HTML.jpg
Figure 7-112

Specify SQL query as data source

Preview your data to ensure your SQL query is valid (Figure 7-113).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig113_HTML.jpg
Figure 7-113

Preview data

Drag a Table Output step to your canvas and connect it to the Table Input step (Figure 7-114).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig114_HTML.jpg
Figure 7-114

Table output

Configure your destination. Enter your target schema and target table. If you haven’t done it already, configure your Impala JDBC connection as well as in Figure 7-115.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig115_HTML.jpg
Figure 7-115

Configure Table output

Run the job (Figure 7-116).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig116_HTML.jpg
Figure 7-116

Run the job

Inspect the logs for errors.

2017/06/03 20:31:17 - Spoon - Transformation opened.
2017/06/03 20:31:17 - Spoon - Launching transformation [SQL Server to Kudu]...
2017/06/03 20:31:17 - Spoon - Started the transformation execution.
2017/06/03 20:31:17 - SQL Server to Kudu - Dispatching started for transformation [SQL Server to Kudu]
2017/06/03 20:31:17 - Table output.0 - Connected to database [Impala] (commit=1000)
2017/06/03 20:31:17 - Table input.0 - Finished reading query, closing connection.
2017/06/03 20:31:17 - Table input.0 - Finished processing (I=4, O=0, R=0, W=4, U=0, E=0)
2017/06/03 20:31:18 - Table output.0 - Finished processing (I=0, O=4, R=4, W=4, U=0, E=0)
2017/06/03 20:31:18 - Spoon - The transformation has finished!!

Confirm that the data was successfully inserted to the Kudu table.

impala-shell
select * from users;
+--------+-----------------+--------------+-------+-------+-----+
| userid | name            | city         | state | zip   | age |
+--------+-----------------+--------------+-------+-------+-----+
| 102    | Felipe Drummond | Palo Alto    | CA    | 94301 | 33  |
| 100    | Wendell Ryan    | San Diego    | CA    | 92102 | 24  |
| 101    | Alicia Thompson | Berkeley     | CA    | 94705 | 52  |
| 103    | Teresa Levine   | Walnut Creek | CA    | 94507 | 47  |
+--------+-----------------+--------------+-------+-------+-----+

Congratulations! You’ve successfully ingested data from SQL Server 2016 to Kudu.

Talend Open Studio

Talend is one of the leading software companies that specializes in big data integration. Talend offers free open source data ingestion tools called Open Studio for Big Data and Open Studio for Data Integration. Both tools provide modern graphical user interface, YARN support, HDFS, HBase, Hive and Kudu support, connectors to Oracle, SQL Server and Teradata, and fully open source under Apache License v2. xx The main difference between Open Studio for Data Integration and Open Studio for Big Data is that Data Integration can only generate native Java code, while Big Data can generate both native Java, Spark and MapReduce code.

The commercial version gives you access to live Talend support, guaranteed response times, upgrades, and product patches. The open source version only provides community support. xxi If you are processing terabytes or petabytes of data, I suggest you use the Big Data edition. For traditional ETL-type workloads such as moving data from an RDBMS to Kudu, with some light data transformation in between, the Data Integration edition is sufficient. We will use Talend Open Studio for Data Integration in this chapter.

Note

Talend Kudu components are provided by a third-party company, One point Ltd. These components are free and downloadable from the Talend Exchange – https://exchange.talend.com/ . The Kudu Output and Input components need to be installed before you can use Talend with Kudu.

Ingesting CSV Files to Kudu

Let’s start with a familiar example of ingesting CSV files to Kudu.

Prepare the test data.

ls /mydata
test01.csv
cat test01.csv
id,name,city,age
1,Jeff Wells,San Diego,71
2,Nancy Maher,Van Nuys,34
3,Thomas Chen,Rolling Hills,62
4,Earl Brown,Artesia,29

Create the destination Kudu table in Impala.

impala-shell
CREATE TABLE talend_users
(
 id INTEGER,
 name STRING,
 city STRING,
 age INTEGER,
 PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU;
Start Talend Open Studio for Data Integration as shown in Figure 7-117.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig117_HTML.jpg
Figure 7-117

Start Talend Open Studio for Data Integration

You can select to create a new project or open an existing one. Let’s start a new project (Figure 7-118).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig118_HTML.jpg
Figure 7-118

Select an existing project or create a new project

You’re presented with a GUI similar to Figure 7-119.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig119_HTML.jpg
Figure 7-119

Talend Open Studio Graphical User Interface

Let’s create a new job (Figure 7-120).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig120_HTML.jpg
Figure 7-120

Create job

Specify a name for the job. You can specify other properties such as purpose, description, author, and so on. Click “Finish” when you’re done (Figure 7-121).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig121_HTML.jpg
Figure 7-121

Specify job name and other job properties

You’ll see a canvas similar to Figure 7-122. This is where you’ll design and run jobs.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig122_HTML.jpg
Figure 7-122

Job canvas

Similar to previous data ingestion tools discussed in earlier in this chapter, you have to specify a source, transformation steps, and destination when designing data ingestion pipelines. On the right-hand side of the canvas you will see a list of inputs that you can use as your data source. Because we’re ingesting a CSV file, drag and drop a tFileInputDelimited source into the canvas. Configure the source by specifying the file name of the CSV file. See Figure 7-123.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig123_HTML.jpg
Figure 7-123

tFileInputDelimited source

Next, specify an output by dragging and dropping a Kudu output into the canvas. Configure the Kudu output by specifying the table name, connection information, and so on (Figure 7-124).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig124_HTML.jpg
Figure 7-124

Kudu output

Don’t forget to connect the input and output icons (Figure 7-125).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig125_HTML.jpg
Figure 7-125

Connect File Input delimited and Kudu output

Run the job as shown in Figure 7-126.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig126_HTML.jpg
Figure 7-126

Run the job

The logs will show information about the job while it’s running. You’ll see an exit code when the job completes. An exit code of zero indicates that the job successfully completed.

Starting job CSV_to_Impala at 23:20 14/05/2017.
[statistics] connecting to socket on port 3725
[statistics] connected
330 [New I/O worker #1] INFO org.apache.kudu.client.AsyncKuduClient - Discovered tablet Kudu Master for table 'Kudu Master' with partition [<start>, <end>)
399 [New I/O worker #1] INFO org.apache.kudu.client.AsyncKuduClient - Discovered tablet 4bcfc5b62a284ea0b572d8201aea0aa5 for table 'impala::default.talend_users' with partition [0x00000001, 0x00000002)
400 [New I/O worker #1] INFO org.apache.kudu.client.AsyncKuduClient - Discovered tablet 42d775f9402b45d18e1d1c22ca61ed22 for table 'impala::default.talend_users' with partition [0x00000002, 0x00000003)
400 [New I/O worker #1] INFO org.apache.kudu.client.AsyncKuduClient - Discovered tablet 6a0e39ac33ff433e8d8242ca0ea2bee8 for table 'impala::default.talend_users' with partition [0x00000003, <end>)
453 [New I/O worker #1] INFO org.apache.kudu.client.AsyncKuduClient - Discovered tablet ffbf9021409f445fae04b1f35c318567 for table 'impala::default.talend_users' with partition [<start>, 0x00000001)
[statistics] disconnected
Job CSV_to_Impala ended at 23:20 14/05/2017. [exit code=0]

Confirm that the rows were successfully inserted into the Kudu table.

impala-shell
select * from talend_users;
+----+-------------+---------------+-----+
| id | name        | city          | age |
+----+-------------+---------------+-----+
| 3  | Thomas Chen | Rolling Hills | 62  |
| 4  | Earl Brown  | Artesia       | 29  |
| 1  | Jeff Wells  | San Diego     | 71  |
| 2  | Nancy Maher | Van Nuys      | 34  |
+----+-------------+---------------+-----+

SQL Server to Kudu

For our second example, let’s ingest data from SQL Server to Kudu using Talend Open Studio. Create the Kudu table DimGeography in Impala if you haven’t done it already.

Create a new job (Figure 7-127).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig127_HTML.jpg
Figure 7-127

New job

../images/456459_1_En_7_Chapter/456459_1_En_7_Fig128_HTML.jpg
Figure 7-128

tMSSQLinput. See Figure 7-128.

Configure the component by making sure the correct data types are specified and so on (Figure 7-129).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig129_HTML.jpg
Figure 7-129

Configure MS SQL input

Drag and drop a Kudu output component into the canvas. Connect it to the tMSSQLinput component as shown in Figure 7-130.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig130_HTML.jpg
Figure 7-130

Kudu output

Configure the Kudu output and make sure the data types matches with the SQL Server table as shown in Figure 7-131.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig131_HTML.jpg
Figure 7-131

Configure Kudu output

Sync the schemas if they do not match (Figure 7-132).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig132_HTML.jpg
Figure 7-132

Sync the schema

Run the job as shown in Figure 7-133.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig133_HTML.jpg
Figure 7-133

Run the job

Verify if the job ran successfully by comparing source and destination row counts (Figure 7-134).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig134_HTML.jpg
Figure 7-134

Verify if the job ran successfully

impala-shell
SELECT count(*) FROM DimGeography;
 +----------+
| count(*) |
+----------+
| 655      |
+----------+

Data Transformation

Let’s now use some of Talend’s built-in features for data transformation. I’ll use the tReplace component to replace a value in the specified input column. We’ll replace the value “United Kingdom” with “UK.” I’ll also use the tFilterRow component to filter results to only include records where city is equal to “London” or “Berkshire.”

Drag and drop the tReplace and tFilterRow components into the canvas, in between the input and the output as shown in Figure 7-136.

Configure the tReplace component by selecting the field that contains the value that you want to replace (Figure 7-135).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig135_HTML.jpg
Figure 7-135

Select field

Configure the tReplace component by specifying the value you want to replace and the replacement value. In this case we’ll replace “United Kingdom” with “UK” (Figure 7-136).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig136_HTML.jpg
Figure 7-136

Configure tReplace

Configure the tFilterRow component. We’ll return only the rows that equals “London” and “Berkshire” in the city field (Figure 7-137).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig137_HTML.jpg
Figure 7-137

Configure tFilterRow

Don’t forget to connect all the components as shown in Figure 7-138.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig138_HTML.jpg
Figure 7-138

Connect tFilterRow to Kudu output

Run the job as shown in Figure 7-139.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig139_HTML.jpg
Figure 7-139

Run the job

Inspect the data in the Kudu table to ensure that the job successfully executed. As you can see from the results, only records where city is equal to London and Berkshire were returned. United Kingdom was also replaced with UK.

impala-shell
SELECT geographykey as gkey, city, stateprovincecode as spc, stateprovincename as spn, countryregioncode as crc, englishcountryregionname as ecr, postalcode as pc FROM DimGeography;
+------+-----------+-----+---------+-----+-----+----------+
| gkey | city      | spc | spn     | crc | ecr | pc       |
+------+-----------+-----+---------+-----+-----+----------+
| 246  | London    | ENG | England | GB  | UK  | EC1R 0DU |
| 250  | London    | ENG | England | GB  | UK  | SW6 SBY  |
| 254  | London    | ENG | England | GB  | UK  | W1N 9FA  |
| 257  | London    | ENG | England | GB  | UK  | W1Y 3RA  |
| 230  | Berkshire | ENG | England | GB  | UK  | RG11 5TP |
| 244  | London    | ENG | England | GB  | UK  | C2H 7AU  |
| 248  | London    | ENG | England | GB  | UK  | SW19 3RU |
| 249  | London    | ENG | England | GB  | UK  | SW1P 2NU |
| 251  | London    | ENG | England | GB  | UK  | SW8 1XD  |
| 252  | London    | ENG | England | GB  | UK  | SW8 4BG  |
| 253  | London    | ENG | England | GB  | UK  | W10 6BL  |
| 256  | London    | ENG | England | GB  | UK  | W1X3SE   |
| 245  | London    | ENG | England | GB  | UK  | E17 6JF  |
| 247  | London    | ENG | England | GB  | UK  | SE1 8HL  |
| 255  | London    | ENG | England | GB  | UK  | W1V 5RN  |
+------+-----------+-----+---------+-----+-----+----------+
Fetched 15 row(s) in 4.50s

Other Big Data Integration Players

This chapter would not be complete if I didn’t mention the traditional ETL players. They’re tools that have been enhanced to include big data integration. Even so, they still lag in terms of native features compared to the newer big data integration tools I just discussed. Most lack native Spark support and connectors to popular big data sources, for example.

Informatica

Informatica is the largest software development company in the world that specializes in data integration. Founded in 1993, the company is based in Redwood City, California. Informatica also develops software for master data management, data quality, b2b data exchange, data virtualization, and more. xxii Informatica PowerCenter Big Data Edition is the company’s flagship product for big data integration. Like the other ETL tools described in this chapter, Informatica PowerCenter Big Data Edition features an easy-to-use visual development environment. Informatica PowerCenter Big Data Edition has tight integration with the leading traditional and big data platforms making it easy for you schedule, manage and monitor processes, and workflows across your enterprise. xxiii As of this writing, Informatica PowerCenter doesn’t have native Kudu support; however you may be able to ingest data into Kudu using Informatica PowerCenter via Impala and JDBC/ODBC.

Microsoft SQL Server Integration Services

This list would not be complete if I didn’t mention the top three largest enterprise software development companies, Microsoft, IBM, and Oracle. SQL Server Integration Services (SSIS) includes features that supports Hadoop and HDFS data integration. SSIS provides the Hadoop Connection Manager and the following Control Flow Tasks: Hadoop File System Task, Hadoop Hive Task, and Hadoop Pig Task. SSIS supports the following data source and destination: HDFS File Source and HDFS File Destination. As with all the ETL tools described in this chapter, SSIS also sports a GUI development environment. xxiv As of this writing, SSIS doesn’t have native Kudu support; however you may be able to ingest data into Kudu via Impala and JDBC/ODBC.

Oracle Data Integrator for Big Data

Oracle Data Integrator for Big Data provides advanced data integration capabilities for big data platforms. It supports a diverse set of workloads, including Spark, Spark Streaming, and Pig transformations and connects to various big data sources such as Kafka and Cassandra. For Orchestration of ODI jobs, users have a choice between using ODI agents or Oozie as orchestration engines. xxv As of this writing, Oracle Data Integrator doesn’t have native Kudu support; however you may be able to ingest data into Kudu using Oracle Data Integrator via Impala and JDBC/ODBC.

IBM InfoSphere DataStage

IBM InfoSphere DataStage is IBM’s data integration tool that comes with IBM InfoSphere Information Server. It supports extract, transform, and load (ETL) and extract, load, and transform (ELT) patterns. xxvi It provides some big data support such as accessing and processing files on HDFS and moving tables from Hive to an RDBMS. xxvii The IBM InfoSphere Information Server operations console can be used to monitor and administer DataStage jobs, including monitoring their job logs and resource usage. The operations console also aids in troubleshooting issues when runtime issues occur. xxviii As of this writing, DataStage doesn’t have native Kudu support; however you may be able to ingest data into Kudu using DataStage via Impala and JDBC/ODBC.

Syncsort

Syncsort is another leading software development company specializing in big data integration. They are well known for their extensive support for mainframes. By 2013, Syncsort had positioned itself as a “Big Iron to Big Data” data integration company. xxix

Syncsort DMX-h, Syncsort’s big data integration platform, has a batch and streaming interfaces that can collect different types of data from multiple sources such as Kafka, HBase, RDBMS, S3, and Mainframes. xxx Like the other ETL solutions described in this chapter, Syncsort features an easy-to-use GUI for ETL development. xxxi As of this writing, DMX-h doesn’t have native Kudu support; however you may be able to ingest data into Kudu using DMX-h via Impala and JDBC/ODBC.

Apache NIFI

Apache NIFI is an open source real-time data ingestion tool mostly used in the Hortonworks environment, although it can be used to ingest data to and from other platforms such as Cloudera and MapR. It’s similar to StreamSets in many aspects. One of the main limitations of NIFI is its lack of YARN support. NIFI runs as an independent JVM process or multiple JVM processes if configured in cluster mode. xxxii NIFI does not have native Kudu support at the time of this writing, although the open source community is working on this. Check out NIFI-3973 for more details. xxxiii

Data Ingestion with Native Tools

Ten years ago, you had to be Java developer in order to ingest and process data with Hadoop. Dozens of lines of code were often required to perform simple data ingestion or processing. Today, thanks to the thousands of committers and contributors to various Apache projects, the Hadoop ecosystem has a whole heap of (and some say too many) native tools for data ingestion and processing. Some of these tools can be used to ingest data into Kudu. I’ll show examples using Flume, Kafka, and Spark. If you want to develop your own data ingestion routines, Kudu provides APIs for Spark, Java, C++, and Python.

Kudu and Spark

Kudu provides a Spark API that you can use to ingest data into Kudu tables. In the following example, we’ll join a table stored in a SQL Server database with another table stored in an Oracle database and insert the joined data into a Kudu table. Chapter 6 provides a more thorough discussion of Kudu and Spark.

Start the spark-shell. Don’t forget to include the necessary drivers and dependencies.

spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.1.0 --driver-class-path ojdbc6.jar:sqljdbc41.jar --jars ojdbc6.jar,sqljdbc41.jar

Set up the Oracle connection

val jdbcURL = "jdbc:oracle:thin:sales/cloudera@//192.168.56.30:1521/EDWPDB"
val connectionProperties = new java.util.Properties()

Create a dataframe from the Oracle table.

val oraDF = sqlContext.read.jdbc(jdbcURL, "users", connectionProperties)
oraDF.show
+------+---------------+------------+-----+-----+---+
|USERID|           NAME|        CITY|STATE|  ZIP|AGE|
+------+---------------+------------+-----+-----+---+
|   102|Felipe Drummond|   Palo Alto|   CA|94301| 33|
|   103|  Teresa Levine|Walnut Creek|   CA|94507| 47|
|   100|   Wendell Ryan|   San Diego|   CA|92102| 24|
|   101|Alicia Thompson|    Berkeley|   CA|94705| 52|
+------+---------------+------------+-----+-----+---+

Register the table so we can run SQL against it.

oraDF.registerTempTable("ora_users")

Set up the SQL Server connection.

val jdbcURL = "jdbc:sqlserver://192.168.56.103;databaseName=salesdb;user=sa;password=cloudera"
val connectionProperties = new java.util.Properties()

Create a dataframe from the SQL Server table.

val sqlDF = sqlContext.read.jdbc(jdbcURL, "userattributes", connectionProperties)
sqlDF.show
+------+------+------+------------------+
|userid|height|weight|        occupation|
+------+------+------+------------------+
|   100|   175|   170|       Electrician|
|   101|   180|   120|         Librarian|
|   102|   180|   215|    Data Scientist|
|   103|   178|   132|Software Developer|
+------+------+------+------------------+

Register the table so that we can join it to the Oracle table.

sqlDF.registerTempTable("sql_userattributes")

Join both tables. We'll insert the results to a Kudu table.

val joinDF = sqlContext.sql("select ora_users.userid,ora_users.name,ora_users.city,ora_users.state,ora_users.zip,ora_users.age,sql_userattributes.height,sql_userattributes.weight,sql_userattributes.occupation from ora_users  INNER JOIN sql_userattributes ON ora_users.userid=sql_userattributes.userid")
joinDF.show
+------+---------------+------------+-----+-----+---+------+------+-----------+
|userid|           name|        city|state|  zip|age|height|weight| occupation|
+------+---------------+------------+-----+-----+---+------+------+-----------+
|   100|   Wendell Ryan|   San Diego|   CA|92102| 24|   175|   170|Electrician|
|   101|Alicia Thompson|    Berkeley|   CA|94705| 52|   180|   120|  Librarian|
|   102|Felipe Drummond|   Palo Alto|   CA|94301| 33|   180|   215|Data                                                                    Scientist|
|   103|  Teresa Levine|Walnut Creek|   CA|94507| 47|   178|   132|Software                                                                     Developer|
+------+---------------+------------+-----+-----+---+------+------+-----------+

You can also join the dataframes using the following method.

val joinDF2 = oraDF.join(sqlDF,"userid")
joinDF2.show
+------+---------------+------------+-----+-----+---+------+------+-----------+
|userid|           NAME|        CITY|STATE|  ZIP|AGE|height|weight|occupation |
+------+---------------+------------+-----+-----+---+------+------+-----------+
|   100|   Wendell Ryan|   San Diego|   CA|92102| 24|   175|   170|Electrician|
|   101|Alicia Thompson|    Berkeley|   CA|94705| 52|   180|   120|Librarian  |
|   102|Felipe Drummond|   Palo Alto|   CA|94301| 33|   180|   215|Data                                                                      Scientist|
|   103|  Teresa Levine|Walnut Creek|   CA|94507| 47|   178|   132|Software                                                                      Developer|
+------+---------------+------------+-----+-----+---+------+------+-----------+

Create the destination Kudu table in Impala.

impala-shell
create table users2 (
userid BIGINT PRIMARY KEY,
name STRING,
city STRING,
state STRING,
zip STRING,
age STRING,
height STRING,
weight STRING,
occupation STRING
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;

Go back to the spark-shell and set up the Kudu connection.

import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("kuducluster:7051")

Insert the data to Kudu.

kuduContext.insertRows(JoinDF, "impala::default.users2")

Confirm that the data was successfully inserted into the Kudu table.

impala-shell
select * from users2;
+------+--------------+------------+-----+------+---+------+------+-----------+
|userid|name          |city        |state|zip   |age|height|weight|occupation|
+------+--------------+------------+-----+------+---+------+------+-----------+
|102  |Felipe Drummond|Palo Alto   |CA   |94301 |33 |180   |215   |Data                                                                    Scientist|
|103  |Teresa Levine  |Walnut Creek|CA   |94507 |47 |178   |132   |Software                                                                      Developer|
|100  |Wendell Ryan   |San Diego   |CA   |92102 |24 |175   |170   |Electrician|
|101  |Alicia Thompson|Berkeley    |CA   |94705 |52 |180   |120   |Librarian  |
+-----+---------------+------------+-----+------+---+------+------+-----------+

Flume, Kafka, and Spark Streaming

Using Flume, Kafka, and Spark Streaming for real-time data ingestion and event processing is a common architectural pattern.Apache Flume

Allows users to collect, transform, and move large amounts of streaming and event data into HDFS, HBase, and Kafka to name a few. xxxiv It’s an integrated component of most Hadoop distributions. Flume’s architectures can be divided into sources, channels, and sinks. Sources are your data sources, channels provide an intermediate buffer between sources and sinks and provides reliability, and sinks represent your destination. Flume has a simple architecture as you can see in Figure 7-140.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig140_HTML.jpg
Figure 7-140

Flume Architecture

Flume is configured using a simple configuration file. The example configuration file lets you generate events from a script and then logs them to the console.

# Name the components
agent1.sources = source1
agent1.sinks = sinks1
agent1.channels = channel1
# Configure the source
agent1.sources.source1.type = exec
agent1.sources.source1.command = /tmp/myscript.sh
agent1.sources.source1.channels = channel1
# Configure the channel
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 10000
agent1.channels.channel1.transactionCapacity = 1000
# Configure the sink
agent1.sinks.sinks1.type = logger
agent1.sinks.sinks1.channel = channel1

To learn more about Flume, Using Flume (O’Reilly, 2014) by Hari Shreedharan is the definitive guide.

Apache Kafka

Kafka is a fast, scalable, and reliable distributed publish-subscribe messaging system. Kafka is now a standard component of architectures that requires real-time data ingestion and streaming. Although not required, Kafka is frequently used with Apache Flume, Storm, Spark Streaming, and StreamSets.

Kafka runs as a cluster on one or more brokers and has built-in replication and partitioning as shown in Figure 7-141.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig141_HTML.jpg
Figure 7-141

Kafka Producers and Consumers

Kafka records are published as topics. Each topic is partitioned and continuously appended to. Each record in the partition is assigned a unique offset that identifies each record. xxxv Think of the topic as a table and the offset as a primary key (Figure 7-142).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig142_HTML.jpg
Figure 7-142

Kafka Topic

To learn more about Kafka, Kafka: The Definitive Guide (O’Reilly, 2017) by Shapiro, Narkhede, and Paling, is the best resource available.

Flafka
When Kafka was first released in 2011, users had to write Java code in order to ingest data into and read data out of Kafka. Unfortunately, this requirement slowed the adoption of Kafka because not everyone can write code in Java. There has to be an easier way for applications to interact with Kafka. As discussed earlier, these tasks are easily accomplished with Flume (Figure 7-143).
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig143_HTML.jpg
Figure 7-143

A typical Flafka pipeline

The engineers at Cloudera and others in the open source community recognized the benefits of integrating Flume with Kafka, so they developed Flume and Kafka integration frequently referred to as Flafka. Starting in CDH 5.2, Flume can act as consumer and producer for Kafka. With further development included in Flume 1.6 and CDH 5.3, the ability for Kafka to act as a Flume channel has been added. xxxvi

Spark Streaming
Historically, Kafka did not provide any mechanism for stream processing. Kafka Streams was recently released to provide basic stream processing capabilities. Flume also has Interceptors that can be used for light-weight stream processing. However, Flume Interceptors have become out of vogue lately due to the popularity of Apache Spark. Spark Streaming is more powerful than Kafka Streams and Flume Interceptors due to Spark Streaming’s integration with Spark SQL, Spark MLlib, and other features of Spark (Figure 7-144). Unlike Interceptors, Spark Streaming doesn’t require Flume and can integrate with other data sources such as MQTT, Kafka, and Kinesis to name a few. If you have an unsupported data source that you want to integrate with Spark Streaming, you can implement a custom receiver that can handle that particular data source. Consult Chapter 2 for an example on how read and write to Kudu with Spark Streaming.
../images/456459_1_En_7_Chapter/456459_1_En_7_Fig144_HTML.jpg
Figure 7-144

A typical Flafka pipeline with Spark Streaming and Kudu

Sqoop

Sqoop is not technically compatible with Kudu. You cannot use Sqoop to transfer data from an RDBMS to Kudu and vice versa. However, Sqoop may be the only tool some users may have for ingesting data from an RDBMS.

What you can do is to use Sqoop to ingest data from an RDBMS into HDFS and then use Spark or Impala to read the data from HDFS and insert it into Kudu. Here’s a few examples of what you can do with Sqoop. Make sure you install and configure the correct drivers.

Get a list of available databases in SQL Server.

sqoop list-databases --connect jdbc:sqlserver://10.0.1.124:1433 --username myusername --password mypassword

Copy a table from SQL Server to Hadoop.

sqoop import --connect "jdbc:sqlserver://10.0.1.124:1433;database=AdventureWorksDW2012;username=myusername;password=mypassword" --table DimProduct --hive-import --hive-overwrite

Copy a table from Hive to SQL Server.

sqoop export --connect "jdbc:sqlserver://10.0.1.124:1433;database=AdventureWorksDW2012;username=myusername;password=mypassword" --table salesfact_hadoop --hcatalog-database default --hcatalog-table sales_data

Then you can just simply execute an Impala insert into…select statement:

INSERT INTO my_kudu_table SELECT * FROM sales_data;

Kudu Client API

Kudu provides NoSQL-style Java, C++, and Python client APIs. Applications that requires the best performance from Kudu should use the client APIs. In fact, some of the data ingestion tools discussed earlier, such as StreamSets, CDAP, and Talend utilizes the client APIs to ingest data into Kudu. DML changes via the API is available for querying in Impala immediately without the need to execute INVALIDATE METADATA. Refer Chapter 2 for more details on the Kudu client API.

MapReduce and Kudu

If your organization still uses MapReduce , you may be delighted to know that Kudu integrates with MapReduce. Example MapReduce code can be found on Kudu’s official website. xxxvii

Summary

There are several ways to ingest data into Kudu. You can use third-party commercials tools such as StreamSets and Talend. You can create your own applications using native tools such as Apache Spark and Kudu’s client APIs offers. Kudu enables users to ingest data by batch or real time while running analytic queries at the same time, making it the ideal platform for IoT and advanced analytics. Now that you’ve ingested data into Kudu, you need to extract value out of them. In Chapters 8 and 9, I will discuss common ways to analyze data stored in your Kudu tables.

References

  1. i.

    Business Wire; “StreamSets Raises $12.5 Million in Series A Funding Led by Battery Ventures and NEA,” Business Wire, 2015, https://www.businesswire.com/news/home/20150924005143/en/StreamSets-Raises-12.5-Million-Series-Funding-Led

     
  2. ii.

    StreamSets; “Conquer Dataflow Chaos,” StreamSets, 2018, https://streamsets.com/

     
  3. iii.

    Apache Kudu; “Example Use Cases,” Apache Kudu, 2018, https://kudu.apache.org/docs/#kudu_use_cases

     
  4. iv.

    Tom White; “The Small Files Problem,” Cloudera, 2009, https://blog.cloudera.com/blog/2009/02/the-small-files-problem/

     
  5. v.

    Aaron Fabri; “Introducing S3Guard: S3 Consistency for Apache Hadoop,” Cloudera, 2017, http://blog.cloudera.com/blog/2017/08/introducing-s3guard-s3-consistency-for-apache-hadoop/

     
  6. vi.

    Eelco Dolstra; “S3BinaryCacheStore is eventually consistent,” Github, 2017, https://github.com/NixOS/nix/issues/1420

     
  7. vii.

    Sumologic; “10 Things You Might Not Know About Using S3,” Sumologic, 2018, https://www.sumologic.com/aws/s3/10-things-might-not-know-using-s3/

     
  8. viii.
     
  9. ix.
     
  10. x.
     
  11. xi.

    StreamSets; “Processing XML Data with Custom Delimiters,” StreamSets, 2018, https://streamsets.com/documentation/datacollector/latest/help/#Pipeline_Design/TextCDelim.html#concept_okt_kmg_jx

     
  12. xii.

    StreamSets; “Processing XML Data with Custom Delimiters,” StreamSets, 2018, https://streamsets.com/documentation/datacollector/latest/help/#Pipeline_Design/TextCDelim.html#concept_okt_kmg_jx

     
  13. xiii.

    Arvind Prabhakar; “Tame unruly big data flows with StreamSets,” InfoWorld, 2016, http://www.infoworld.com/article/3138005/analytics/tame-unruly-big-data-flows-with-streamsets.html

     
  14. xiv.
     
  15. xv.
     
  16. xvi.
     
  17. xvii.

    JSFiddle; “generateUUID,” JSFiddle, 2018, https://jsfiddle.net/briguy37/2MVFd/

     
  18. xviii.

    Pat Patterson; “Retrieving Metrics via the StreamSets Data Collector REST API,” StreamSets, 2016, https://streamsets.com/blog/retrieving-metrics-via-streamsets-data-collector-rest-api/

     
  19. xix.
     
  20. xx.

    Talend; “Open Source Integration Software,” Talend, 2018, https://www.talend.com/download/talend-open-studio/

     
  21. xxi.

    Talend; “Why Upgrade?,” Talend, 2018, https://www.talend.com/products/why-upgrade/

     
  22. xxii.

    Crunchbase; “Informatica,” Crunchbase, 2018, https://www.crunchbase.com/organization/informatica#/entity

     
  23. xxiii.
     
  24. xxiv.

    Viral Kothari; “Microsoft SSIS WITH Cloudera BIGDATA,” YouTube, 2016, https://www.youtube.com/watch?v=gPLfcL2zDX8

     
  25. xxv.

    Oracle; “Oracle Data Integrator For Big Data,” Oracle, 2018, http://www.oracle.com/us/products/middleware/data-integration/odieebd-ds-2464372.pdf ;

     
  26. xxvi.
     
  27. xxvii.
     
  28. xxviii.

    IBM Analytic Skills; “Monitoring DataStage jobs using the IBM InfoSphere Information Server Operations Console,” IBM, 2013, https://www.youtube.com/watch?v=qOl_6HqyVes

     
  29. xxix.

    SyncSort; “Innovative Software from Big Iron to Big Data,” SyncSort, 2018, http://www.syncsort.com/en/About/Syncsort-History

     
  30. xxx.

    SyncSort; “Syncsort DMX-h,” SyncSort, 2018, http://www.syncsort.com/en/Products/BigData/DMXh

     
  31. xxxi.

    SyncSort; “Introduction to Syncsort DMX-h 8,” SyncSort, 2015, https://www.youtube.com/watch?v=7e_8YadLa9E

     
  32. xxxii.

    Hortonworks; “APACHE NIFI,” Hortonworks, 2018, https://hortonworks.com/apache/nifi/

     
  33. xxxiii.

    Jira; “Create a new Kudu Processor to ingest data,” Jira, 2017, https://issues.apache.org/jira/browse/NIFI-3973

     
  34. xxxiv.

    Apache Flume; “Flume 1.8.0 User Guide,” Apache Flume, 2018, https://flume.apache.org/FlumeUserGuide.html

     
  35. xxxv.

    Apache Kafka; “Introduction,” Apache Kafka, 2018, https://kafka.apache.org/intro

     
  36. xxxvi.

    Gwen Shapira, Jeff Holoman; “Flafka: Apache Flume Meets Apache Kafka for Event Processing,” Cloudera, 2014, http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/

     
  37. xxxvii.
     
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.16.23