© Butch Quinto 2018
Butch QuintoNext-Generation Big Datahttps://doi.org/10.1007/978-1-4842-3147-0_2

2. Introduction to Kudu

Butch Quinto1 
(1)
Plumpton, Victoria, Australia
 

Kudu is an Apache-licensed open source columnar storage engine built for the Apache Hadoop platform. It supports fast sequential and random reads and writes, enabling real-time stream processing and analytic workloads. i It integrates with Impala, allowing you to insert, delete, update, upsert, and retrieve data using SQL. Kudu also integrates with Spark (and MapReduce) for fast and scalable data processing and analytics. Like other projects in the Apache Hadoop ecosystem, Kudu runs on commodity hardware and was designed to be highly scalable and highly available.

The Apache Kudu project was founded in 2012 by Todd Lipcon, a software engineer at Cloudera and PMC member and committer on the Hadoop, HBase, and Thrift projects. ii Kudu was developed to address the limitations of HDFS and HBase while combining both of its strengths. While HDFS supports fast analytics and large table scans, files stored in HDFS are immutable and can only be appended to after they are created. iii HBase makes it possible to update and randomly access data, but it’s slow for analytic workloads. Kudu can handle both high velocity data and real-time analytics, allowing you to update Kudu tables and run analytic workloads at the same time. Batch processing and analytics on HDFS are still slightly faster than Kudu in some cases and HBase beats Kudu in random reads and writes performance. Kudu is somewhere in the middle. As shown in Figure 2-1, Kudu’s performance is close enough to HDFS with Parquet (Kudu is faster in some cases) and HBase in terms of random reads and writes so that most of the time the performance difference is negligible.
../images/456459_1_En_2_Chapter/456459_1_En_2_Fig1_HTML.jpg
Figure 2-1

High-level performance comparison of HDFS, Kudu, and HBase

Prior to Kudu, some data engineers used a data processing architecture called the Lambda architecture to work around the limitations of HDFS and HBase. The Lambda architecture works by having a speed and batch layer (and technically, there’s also a serving layer). Transaction data goes to the speed layer (usually HBase) where users get immediate access to the latest data. Data from the speed layer is copied at regular intervals (hourly or daily) to the batch layer (usually HDFS) in Parquet format, to be utilized for reporting and analytics. As you can see in Figure 2-2, data is copied twice and the data pipline is more complicated than necessary with the Lambda architecture. This is somewhat similar to a typical enterprise data warehouse environment with OLTP databases representing the “speed layer” and the data warehouse acting as the “batch layer.”
../images/456459_1_En_2_Chapter/456459_1_En_2_Fig2_HTML.jpg
Figure 2-2

Lambda Architecture

Kudu makes the Lambda architecture obsolete due to its ability to simultaneously handle random reads and writes and analytic workloads. With Kudu, there is no data duplication and the data pipeline is considerably simpler, as shown in Figure 2-3.
../images/456459_1_En_2_Chapter/456459_1_En_2_Fig3_HTML.jpg
Figure 2-3

Modern data ingest pipeline using Kudu

Kudu Is for Structured Data

Kudu was designed to store structured data similar to relational databases. In fact, Kudu (when used with Impala) is often used for relational data management and analytics. Kudu rivals commercial data warehouse platforms in terms of capabilities, performance, and scalability. We’ll discuss Impala and Kudu integration later in the chapter and more thoroughly in Chapter 4.

Use Cases

Before we begin, let’s talk about what Kudu is not. Kudu is not meant to replace HBase or HDFS. HBase is a schema-less NoSQL-style data store that makes it suitable for sparse data or applications that requires variable schema. HBase was designed for OLTP-type workloads that requires random reads and writes. For more information on HBase, see the HBase online documentation.

HDFS was designed to store all types of data: structured, semi-structured, and unstructured. If you need to store data in a highly scalable file system, HDFS is a great option. As mentioned earlier, HDFS (using Parquet) is still faster in some cases than Kudu when it comes to running analytic workloads. For more on HDFS, see the HDFS online documentation.

As discussed earlier, Kudu excels at storing structured data. It doesn’t have an SQL interface, therefore you need to pair Kudu with Impala. Data that you would normally think of storing in a relational or time series database can most likely be stored in Kudu as well. Below are some use cases where Kudu can be utilized. iv

Relational Data Management and Analytics

Kudu (when used with Impala) exhibits most of the characteristics of a relational database. It stores data in rows and columns and organizes them in databases and tables. Impala provides a highly scalable MPP SQL engine and allows you to interact with Kudu tables using ANSI SQL commands just as you would with a relational database. Relational database use cases can be classified into two main categories, online transactional processing (OLTP) and decision support systems (DSS) or as commonly referred to in modern nomenclature, data warehousing. Kudu was not designed for OLTP, but it can be used for data warehousing and other enterprise data warehouse (EDW) modernization use cases.

Data Warehousing

Kudu can be used for dimensional modeling – the basis of modern data warehousing and online analytic processing (OLAP). Kudu lacks foreign key constraints, auto-increment columns, and other features that you would normally find in a traditional data warehouse platform; however these limitations do not preclude you from organizing your data in facts and dimensions tables. Impala can be accessed using your favorite BI and OLAP tools via ODBC/JDBC. I discuss data warehousing using Impala and Kudu in Chapter 8.

ETL Offloading

ETL offloading is one of the many EDW optimization use cases that you can use Kudu for. Critical reports are unavailable to the entire organization due to ETL processes running far beyond its processing window and pass into the business hours. By offloading time-consuming ETL processing to an inexpensive Kudu cluster, ETL jobs can finish before business hours, making critical reports and analytics available to business users when they need it. I discuss ETL offloading using Impala and Kudu in Chapter 8.

Analytics Offloading and Active Archiving

Impala is an extremely fast and scalable MPP SQL engine. You can reduce the load on your enterprise data warehouse by redirecting some of your ad hoc queries and reports to Impala and Kudu. Instead of spending millions of dollars upgrading your data warehouse, analytics offloading and active archiving is the smarter and more cost-effective way to optimize your EDW environment. I discuss analytics offloading and active archiving using Impala and Kudu in Chapter 8.

Data Consolidation

It’s not unusual for large organizations to have hundreds or thousands of legacy databases scattered across its enterprise, paying millions of dollars in licensing, administration and infrastructure cost. By consolidating these databases into a single Kudu cluster and using Impala to provide SQL access, you can significantly reduce cost while improving performance and scalability. I discuss data consolidation using Impala and Kudu in Chapter 8.

Internet of Things (IoT) and Time Series

Kudu is perfect for IoT and time series applications where real-time data ingestion, visualization, and complex event processing of sensor data is critical. Several large companies and government agencies such as Xiaomi, JD.com, v and Australia Department of Defense vi are successfully using Kudu for IoT use cases. I discuss IoT, real-time data ingestion, and complex event processing using Impala, Kudu, and StreamSets in Chapter 7. I discuss real-time data visualization with Zoomdata in Chapter 9.

Feature Store for Machine Learning Platforms

Data science teams usually create a centralized feature store where they can publish and share highly selected sets of authoritative features with other teams for creating machine learning models. Creating and maintaining feature stores using immutable data formats such as ORC and Parquet is time consuming, cumbersome, and requires too much unnecessary hard work, especially for large data sets. Using Kudu as a fast and highly scalable mutable feature store, data scientists and engineers can easily update and add features using familiar SQL statements. The ability to update feature stores in seconds or minutes is critical in an Agile environment where data scientists are constantly iterating in building, testing, and improving the accuracy of their predictive models. In Chapter 6, we use Kudu as a feature store for building a predictive machine learning model using Spark MLlib.

Note

Kudu allows up to a maximum of 300 columns per table. HBase is a more appropriate storage engine if you need to store more than 300 features. HBase tables can contain thousands or millions of columns. The downside in using HBase is that it is not as efficient in handling full table scans compared to Kudu. There is discussion within the Apache Kudu community to address the 300-column limitation in future versions of Kudu.

Strictly speaking, you can bypass Kudu’s 300-column limit by setting an unsafe flag. For example, if you need the ability to create a Kudu table with 1000 columns, you can start the Kudu master with the following flags: --unlock-unsafe-flags --max-num-columns=1000. This has not been thoroughly tested by the Kudu development team and is therefore not recommended for production use.

Key Concepts

Kudu introduces a few concepts that describe different parts of its architecture.

Table A table is where data is stored in Kudu. Every Kudu table has a primary key and is divided into segments called tablets.

Tablet A tablet, or partition, is a segment of a table.

Tablet Server A tablet server stores and serves tablets to clients.

Master A master keeps track of all cluster metadata and coordinates metadata operations.

Catalog Table Central storage for all of cluster metadata. The catalog table stores information about the location of tables and tablets, their current state, and number of replicas. The catalog table is stored in the master.

Architecture

Similar to the design of other Hadoop components such as HDFS and HBase (and their Google counterparts, BigTable and GFS), Kudu has a master-slave architecture. As shown in Figure 2-4, Kudu comprises one or more Master servers responsible for cluster coordination and metadata management. Kudu also has one or more tablet servers, storing data and serving them to client applications. vii For a tablet, there can only be one acting master, the leader, at any given time. If the leader becomes unavailable, another master is elected to become the new leader. Similar to the master, one tablet server acts as a leader, and the rest are followers. All write request go to the leader, while read requests go to the leader or replicas. Data stored in Kudu is replicated using the Raft Consensus Algorithm, guaranteeing the availability of data will survive the loss of some of the replica as long as the majority of the total number of replicas is still available. Whenever possible, Kudu replicates logical operations instead of actual physical data, limiting the amount of data movement across the cluster.

Note

The Raft Consensus Algorithm is described in detail in “The Raft Paper”: In Search of an Understandable Consensus Algorithm (Extended Version) by Diego Ongaro and John Ousterhout. viii Diego Ongaro’s PhD dissertation, “Consensus: Bridging Theory and Practice,” published by Stanford University in 2014, expands on the content of the paper in more detail. ix

../images/456459_1_En_2_Chapter/456459_1_En_2_Fig4_HTML.jpg
Figure 2-4

Kudu Architecture

Multi-Version Concurrency Control (MVCC)

Most modern databases use some form of concurrency control to ensure read consistency instead of traditional locking mechanisms. Oracle has a multi-version consistency model since version 6.0. x Oracle uses data maintained in the rollback segments to provide read consistency. The rollback segments contain the previous data that have been modified by uncommitted or recently committed transactions. xi MemSQL and SAP HANA manages concurrency using MVCC as well. Originally, SQL Server only supported a pessimistic concurrency model, using locking to enforce concurrency. As a result, readers block writers and writers block readers. The likelihood of blocking problems and lock contention increase as the number of concurrent users and operations rise, leading to performance and scalability issues. Things became so bad in SQL Server-land that developers and DBAs were forced to use the NOLOCK hint in their queries or set the READ UNCOMITTED isolation level, tolerating dirty reads in exchange for a minor performance boost. Starting in SQL Server 2005, Microsoft introduced its own version of multi-version concurrency control known as row-level versioning. xii SQL Server doesn’t have the equivalent of rollback segments so it uses tempdb to store previously committed data. Teradata does not have multi-version consistency model and relies on transactions and locks to enforce concurrency control. xiii

Similar to Oracle, MemSQL, and SAP HANA, Kudu uses multi-version concurrency control to ensure read consistency. xiv Readers don’t block writers and writers don’t block readers. Kudu’s optimistic concurrency model means that operations are not required to acquire locks during large full table scans, considerably improving query performance and scalability.

Impala and Kudu

Impala is the default MPP SQL engine for Kudu. Impala allows you to interact with Kudu using SQL. If you have experience with traditional relational databases where the SQL and storage engines are tightly integrated, you might find it unusual that Kudu and Impala are decoupled from each other. Impala was designed to work with other storage engines such as HDFS, HBase, and S3, not just Kudu. There’s also work underway to integrate other SQL engines such as Apache Drill (DRILL-4241) and Hive (HIVE-12971) with Kudu. Decoupling storage, SQL, and processing engines are common practices in the open source community.

The Impala-Kudu integration works great but there is still work to be done. While it matches or exceeds traditional data warehouse platforms in terms of performance and scalability, Impala-Kudu lacks some of the enterprise features found in most traditional data warehouse platforms. We discuss some of these limitations later in the chapter.

Primary Key

Every Kudu table needs to have a primary key. Kudu’s primary key is implemented as a clustered index. With a clustered index, the rows are stored physically in the tablet in the same order as the index. Also note that Kudu doesn’t have an auto-increment feature so you will have to include a unique primary key value when inserting rows to a Kudu table. If you don’t have a primary key value, you can use Impala’s built-in uuid() function or another method to generate a unique value.

Data Types

Like other relational databases, Kudu supports various data types (Table 2-1).
Table 2-1

List of Data Types, with Available and Default Encoding

Data Type

Encoding

Default

boolean

plain, run length

run length

8-bit signed integer

plain, bitshuffle, run length

bitshuffle

16-bit signed integer

plain, bitshuffle, run length

bitshuffle

32-bit signed integer

plain, bitshuffle, run length

bitshuffle

64-bit signed integer

plain, bitshuffle, run length

bitshuffle

unixtime_micros (64-bit microseconds since the Unix epoch)

plain, bitshuffle, run length

bitshuffle

single-precision (32-bit) IEEE-754 floating-point number

plain, bitshuffle

bitshuffle

double-precision (64-bit) IEEE-754 floating-point number

plain, bitshuffle

bitshuffle

UTF-8 encoded string (up to 64KB uncompressed)

plain, prefix, dictionary

dictionary

binary (up to 64KB uncompressed)

plain, prefix, dictionary

dictionary

You may notice that Kudu currently does not support the decimal data type. This is a key limitation in Kudu. The float and double data types only store a very close approximation of the value instead of the exact value as defined in the IEEE 754 specification. xv Because of this behaviour, float and double are not appropriate for storing financial data. At the time of writing, support for decimal data type is still under development (Apache Kudu 1.5 / CDH 5.13). Decimal support is coming in Kudu 1.7. Check KUDU-721 for more details. There are various workarounds available. You can store financial data as string then use Impala to cast the value to decimal every time you need to read the data. Since Parquet supports decimals, another workaround would be to use Parquet for your fact tables and Kudu for dimension tables.

As shown in Table 2-1, Kudu columns can use different encoding types depending on the type of column. Supported encoding types includes Plain, Bitshuffle, Run Length, Dictionary, and Prefix. By default, Kudu columns are uncompressed. Kudu supports column compression using Snappy, zlib, or LZ4 compression codecs. Consult Kudu’s documentation for more details on Kudu encoding and compression support.

Note

In earlier versions of Kudu, date and time are represented as a BIGINT. You can use the TIMESTAMP data type in Kudu tables starting in Impala 2.9/CDH 5.12. However, there are several things to keep in mind. Kudu represents date and time columns using 64-bit values, while Impala represents date and time as 96-bit values. Nanosecond values generated by Impala are rounded when stored in Kudu. When reading and writing TIMESTAMP columns, there is an overhead converting between Kudu’s 64-bit representation and Impala’s 96-bit representation. There are two workarounds: use the Kudu client API or Spark to insert data, or continue using BIGINT to represent date and time. xvi

Partitioning

Table partitioning is a common way to enhance performance, availability, and manageability of Kudu tables. Partitioning allows tables to be subdivided into smaller segments, or tablets. Partitioning enables Kudu to take advantage of partition pruning by allowing access to tables at a finer level of granularity. Table partitioning is required for all Kudu tables and is completely transparent to applications. Kudu supports Hash, Range, and Composite Hash-Range and Hash-Hash partitioning. Below are a few examples of partitioning in Kudu.

Hash Partitioning

There are times when it is desirable to evenly distribute data randomly across partitions to avoid IO bottlenecks. With hash partitioning , data is placed in a partition based on a hashing function applied to the partitioning key. Not that you are not allowed to add partitions on hash partitioned tables. You will have to rebuild the entire hash partitioned table if you wish to add more partitions .

CREATE TABLE myTable (
 id BIGINT NOT NULL,
 name STRING,
 PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU;

Range Partitioning

Range partitioning stores data in partitions based on predefined ranges of values of the partitioning key for each partition. Range partitioning enhances the manageability of the partitions by allowing new partitions to be added to the table. It also improves performance of read operations via partition pruning. One downside: range partitioning can cause hot spots if you insert data in partition key order.

CREATE TABLE myTable (
  year INT,
  deviceid INT,
  totalamt INT,
  PRIMARY KEY (deviceid, year)
)
PARTITION BY RANGE (year) (
  PARTITION VALUE = 2016,
  PARTITION VALUE = 2017,
  PARTITION VALUE = 2018
)
STORED AS KUDU;

Hash-Range Partitioning

Hash-Range partitioning combines the benefits while minimizing the limitations of hash and range partitioning. Using hash partitioning ensures write IO is spread evenly across tablet servers, while using range partitions ensure new tablets can be added to accommodate future growth.

CREATE TABLE myTable (
 id BIGINT NOT NULL,
 sensortimestamp BIGINT NOT NULL,
 sensorid INTEGER,
 temperature INTEGER,
 pressure INTEGER,
 PRIMARY KEY(rowid,sensortimestamp)
)
PARTITION BY HASH (id) PARTITIONS 16,
RANGE (sensortimestamp)
(
PARTITION unix_timestamp('2017-01-01') <= VALUES < unix_timestamp('2018-01-01'),
PARTITION unix_timestamp('2018-01-01') <= VALUES < unix_timestamp('2019-01-01'),
PARTITION unix_timestamp('2019-01-01') <= VALUES < unix_timestamp('2020-01-01')
)
STORED AS KUDU;

I discuss table partitioning in more detail in Chapter 4.

Spark and Kudu

Spark is the ideal data processing and ingestion tool for Kudu. Spark SQL and the DataFrame API makes it easy to interact with Kudu. I discuss Spark and Kudu integration in more detail in Chapter 6.

You use Spark with Kudu using the DataFrame API. You can use the --packages option in spark-shell or spark-submit to include kudu-spark dependency. You can also manually download the jar file from central.maven.org and include it in your --jars option. Use the kudu-spark2_2.11 artifact if you are using Spark 2 with Scala 2.11. For example:

spark-shell –-packages org.apache.kudu:kudu-spark2_2.11:1.1.0
spark-shell --jars kudu-spark2_2.11-1.1.0.jar

Kudu Context

You use a Kudu context in order to execute DML statements against a Kudu table. xvii For example, if we need to insert data into a Kudu table:

import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("kudumaster01:7051")
case class CustomerData(id: Long, name: String, age: Short)
val data = Array(CustomerData(101,"Lisa Kim",60), CustomerData(102,"Casey Fernandez",45))
val insertRDD = sc.parallelize(data)
val insertDF = sqlContext.createDataFrame(insertRDD)
insertDF.show
+----------+---------------+---+
|customerid|           name|age|
+----------+---------------+---+
|       101|       Lisa Kim| 60|
|       102|Casey Fernandez| 45|
+----------+---------------+---+

Insert the DataFrame into Kudu table. I assume the table already exists.

kuduContext.insertRows(insertDF, "impala::default.customers")

Confirm that the data was successfully inserted.

val df = sqlContext.read.options(Map("kudu.master" -> "kuducluster:7051","kudu.table" -> "impala::default.customers")).kudu
df.select("id","name","age").show()
+---+---------------+---+
| id|           name|age|
+---+---------------+---+
|102|Casey Fernandez| 45|
|101|       Lisa Kim| 60|
+---+---------------+---+

I discuss Spark and Kudu integration in more detail in Chapter 6.

Note

Starting in Kudu 1.6, Spark performs better by taking advantage of scan locality. Spark will scan the closest tablet replica instead of scanning the leader, which could be in a different tablet server.

Spark Streaming and Kudu

In our example shown in Listing 2-1, we will use Flafka (Flume and Kafka) and Spark Streaming to read data from a Flume spooldir source, store it in Kafka, and processing and writing the data to Kudu with Spark Streaming.

A new stream processing engine built on Spark SQL was included in Spark 2.0 called Structured Streaming. Starting with Spark 2.2.0, the experimental tag from Structured Streaming has been removed. However, Cloudera still does not support Structured Streaming as of this writing (CDH 5.13). Chapter 7 describes Flafka and Spark Streaming in more detail.

import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.spark.kudu._
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
object FlumeStreaming {
   case class MySensorData(tableid: String, deviceid: String, thedate: String, thetime: String, temp: Short, status: String)
    def readSensorData(str: String): MySensorData = {
      val col = str.split(",")
      val thetableid = col(0)
      val thedeviceid = col(1)
      val thedate = col(2)
      val thetime = col(3)
      val thetemp = col(4)
      val thestatus = col(5)
      MySensorData(col(0), col(1), col(2), col(3), col(4).toShort, col(5))
    }
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumeStreaming")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(1))
       // the arguments are for host name and port number
    val flumeStream = FlumeUtils.createPollingStream(ssc,args(0),args(1).toInt)
    val sensorDStream = flumeStream.map (x => new String(x.event.getBody.array)).map(readSensorData)
    sensorDStream.foreachRDD {rdd =>
             val sqlContext = new SQLContext(sc)
             import sqlContext.implicits._
             val kuduContext = new KuduContext("kudumaster01:7051")
// convert the RDD into a DataFrame and insert it into the Kudu table
             val DataDF = rdd.toDF
             kuduContext.insertRows(DataDF, "impala::default.sensortable")
             DataDF.registerTempTable("currentDF")
             // Update the table based on the thresholds
             val WarningFilteredDF = sqlContext.sql("select * from currentDF where temp > 50 and temp <= 60")
             WarningFilteredDF.registerTempTable("warningtable")
             val UpdatedWarningDF = sqlContext.sql("select tableid,deviceid,thedate,thetime,temp,'WARNING' as status from warningtable")
             kuduContext.updateRows(UpdatedWarningDF, "impala::default.sensortable")
             val CriticalFilteredDF = sqlContext.sql("select * from currentDF where temp > 61")
             CriticalFilteredDF.registerTempTable("criticaltable")
             val UpdatedCriticalDF = sqlContext.sql("select tableid,deviceid,thedate,thetime,temp,'CRITICAL' as status from criticaltable")
             kuduContext.updateRows(UpdatedCriticalDF, "impala::default.sensortable")
     }
    ssc.start()
    ssc.awaitTermination()
  }
}
Listing 2-1

Spark Streaming and Kudu

Listing 2-2 shows the flume configuration file, with Kafka used as a flume channel.

agent1.sources  = source1
agent1.channels = channel1
agent1.sinks = spark
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /tmp/streaming
agent1.sources.source1.channels = channel1
agent1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.channel1.brokerList = kafkabroker01:9092, kafkabroker02:9092, kafkabroker03:9092
agent1.channels.channel1.zookeeperConnect = server03:2181
agent1.channels.channel1.topic = mytopic
agent1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent1.sinks.spark.hostname = 127.0.0.1
agent1.sinks.spark.port =  9999
agent1.sinks.spark.channel = channel1
agent1.sinks.spark.batchSize=5
Listing 2-2

Flume configuration file

After compiling the package, submit the application to the cluster to execute it.

spark-submit
--class FlumeStreaming
--jars kudu-spark_2.10-0.10.0.jar
--master yarn-client
--driver-memory=512m
--executor-memory=512m
--executor-cores 4  
/mydir/spark/flume_streaming_kudu/target/scala-2.10/test-app_2.10-1.0.jar localhost 9999
The Flafka pipeline with Spark Streaming and Kudu should look like Figure 2-5.
../images/456459_1_En_2_Chapter/456459_1_En_2_Fig5_HTML.jpg
Figure 2-5

A Flafka pipeline with Spark Streaming and Kudu

Kudu C++, Java, and Python Client APIs

Kudu provides NoSQL-style Java, C++, and Python client APIs. Applications that require the best performance from Kudu should use the client APIs. In fact, some of the data ingestion tools discussed in Chapter 7, such as StreamSets, CDAP, and Talend utilize the client APIs to ingest data into Kudu. DML changes via the API are available for querying in Impala immediately without the need to execute INVALIDATE METADATA.

Kudu Java Client API

Listing 2-3 provides an example using the Java client API.

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import java.util.ArrayList;
import java.util.List;
public class JavaKuduClient {
  public static void main(String[] args) {
// Create Kudu client object
KuduClient myKuduClient = new KuduClient.KuduClientBuilder("kudumaster01").build();
// Create the schema
         List<ColumnSchema> myColumns = new ArrayList(3);
      myColumns.add(new ColumnSchema.ColumnSchemaBuilder("rowid", Type.INT32)
          .key(true)
          .build());
      myColumns.add(new ColumnSchema.ColumnSchemaBuilder("customername", Type.STRING)
          .build());
         myColumns.add(new ColumnSchema.ColumnSchemaBuilder("customerage", Type.INT8)
          .build());
      List<String> partKeys = new ArrayList<>();
      partKeys.add("key");
// Create the table based on the schema
      Schema mySchema = new Schema(myColumns);
      client.createTable("CustomersTbl", mySchema, new CreateTableOptions().setRangePartitionColumns(partKeys));
// Open the Kudu table
      KuduTable myTable = myKuduClient.openTable("CustomersTbl");
      KuduSession mySession = myKuduClient.newSession();
// Insert new rows
        Insert myInsert = myTable.newInsert();
        myInsert.getRow().addInt("rowid", 1);
        myInsert.getRow().addString("customername", "Jerry Walsh");
             myInsert.getRow().addInt("customerage", 64)
             mySession.apply(myInsert);
// Update existing rows
        Update myUpdate = myTable.newUpdate();
        myUpdate.getRow().addInt("rowid", 1);
        myUpdate.getRow().addString("customername", "Jerome Walsh");
             myUpdate.getRow().addInt("customerage", 65)
             mySession.apply(myUpdate);
// Upsert rows
        Upsert myUpsert = myTable.newUpsert();
        myUpsert.getRow().addInt("rowid", 2);
        myUpsert.getRow().addString("customername", "Tim Stein");
             myUpsert.getRow().addInt("customerage", 49)
             myUpsert.apply(myUpdate);
// Delete row
             Delete myDelete = myTable.newDelete()
             myDelete.getrow().addString("rowid", 1);
             mySession.apply(myDelete)
// Display rows
      List<String> myColumns = new ArrayList<String>();
      myColumns.add("rowid");
         myColumns.add("customername");
         myColumns.add("customerage");
      KuduScanner myScanner = myClient.newScannerBuilder(myTable)
          .setProjectedColumnNames(myColumns)
          .build();
      while (myScanner.hasMoreRows()) {
        RowResultIterator myResultIterator = myScanner.nextRows();
        while (myResultIterator.hasNext()) {
          RowResult myRow = myResultIterator.next();
          System.out.println(myRow.getInt("rowid"));
               System.out.println(myRow.getString("customername"));
               System.out.println(myRow.getString("customerage"));
        }
       }  
// Delete table
        myKuduClient.deleteTable(myTable);
// Close the connection
        myKuduClient.shutdown();
  }
}
Listing 2-3

Sample Java code using the Kudu client API

Maven Artifacts

You will need the following in your pom.xml file.

<dependency>
  <groupId>org.apache.kudu</groupId>
  <artifactId>kudu-client</artifactId>
  <version>1.1.0</version>
</dependency>

Kudu Python Client API

The Python client API provides an easy way to interact with Kudu. The Python API is still in experimental stage and might change at any time. See Listing 2-4 for an example.

import kudu
from kudu.client import Partitioning
from datetime import datetime
# Connect to Kudu
myclient = kudu.connect(host='kudumaster01', port=7051)
# Define the columns
mybuilder = kudu.schema_builder()
mybuilder.add_column('rowid').type(kudu.int64).nullable(False).primary_key()
mybuilder.add_column('customername', type_=kudu.string, nullable=False)
mybuilder.add_column('customerage', type_=kudu.int8, nullable=False)
myschema = mybuilder.build()
# Define partitioning method
mypartitioning = Partitioning().add_hash_partitions(column_names=['rowid'], num_buckets=24)
# Create new table
myclient.create_table('customers', myschema, mypartitioning)
# Open a table
mytable = myclient.table('customers')
# Create a new session
mysession = client.new_session()
# Insert a row
myinsert = mytable.new_insert({'rowid': 1, 'customername': "Jason Weinstein", 'customerage': 62})
mysession.apply(myinsert)
# Upsert a row
myupsert = mytable.new_upsert({'rowid': 2, 'customername': "Frank Nunez", 'customerage': 47})
session.apply(myupsert)
# Updating a row
myupdate = table.new_update({'rowid': 1, 'customername': "Jason Dean Weinstein"})
session.apply(myupdate)
# Delete a row
mydelete = table.new_delete({'rowid': 1})
session.apply(mydelete`)
# Flush the session
mysession.flush()
# Create a scanner with a predicate
myscanner = mytable.scanner()
myscanner.add_predicate(table['customerage'] < 50)
# Read the data. Note that this method doesn't scale well for large table scans
myresult = myscanner.open().read_all_tuples()
Listing 2-4

Sample Python code using the Kudu client API

Kudu C++ Client API

Kudu also provides a C++ client API. See Listing 2-5 for an example.

#include <ctime>
#include <iostream>
#include <sstream>
#include "kudu/client/callbacks.h"
#include "kudu/client/client.h"
#include "kudu/client/row_result.h"
#include "kudu/client/stubs.h"
#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
#include "kudu/common/partial_row.h"
#include "kudu/util/monotime.h"
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduColumnSchema;
using kudu::client::KuduError;
using kudu::client::KuduInsert;
using kudu::client::KuduPredicate;
using kudu::client::KuduRowResult;
using kudu::client::KuduScanner;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduSession;
using kudu::client::KuduStatusFunctionCallback;
using kudu::client::KuduTable;
using kudu::client::KuduTableAlterer;
using kudu::client::KuduTableCreator;
using kudu::client::KuduValue;
using kudu::client::sp::shared_ptr;
using kudu::KuduPartialRow;
using kudu::MonoDelta;
using kudu::Status;
using std::string;
using std::vector;
int main(int argc, char* argv[]) {
  // Enable verbose debugging for the client library.
  // Set parameter to 0 to disable
  kudu::client::SetVerboseLogLevel(2);
  // Create and connect a client.
  shared_ptr<KuduClient> client;
  KUDU_CHECK_OK(KuduClientBuilder().add_master_server_addr("kudumaster01:7051").Build(&client));
  KUDU_LOG(INFO) << "Client connection created.";
  // Create a schema.
  // Available data types:  INT8 = 0, INT16 = 1, INT32 = 2,
  // INT64 = 3, STRING = 4, BOOL = 5, FLOAT = 6, DOUBLE = 7,
  // BINARY = 8, UNIXTIME_MICROS = 9, TIMESTAMP = UNIXTIME_MICROS
  KuduSchema mytable_schema;
  KuduSchemaBuilder mytable_builder;
  categories_builder.AddColumn("rowid")->Type(KuduColumnSchema::INT32)->NotNull();
  categories_builder.AddColumn("name")->Type(KuduColumnSchema::STRING)->NotNull();
  categories_builder.AddColumn("age")->Type(KuduColumnSchema::INT8)->NotNull();
  categories_builder.AddColumn("salary")->Type(KuduColumnSchema::DOUBLE)->NotNull();
  categories_builder.SetPrimaryKey({"rowid"});
  KUDU_CHECK_OK(categories_builder.Build(&mytable_schema));
  KUDU_LOG(INFO) << "Created a schema for mytable";
  // Delete table if it exists
  bool exists;
  KUDU_CHECK_OK(client->TableExists("mytable", &exists));
  if (exists) {
    KUDU_CHECK_OK(client->DeleteTable("mytable"));
       KUDU_LOG(INFO) << "Deleting table if it exists.";
  }
   // Generate the split keys for the table.
  vector<const KuduPartialRow*> splits;
  int32_t num_tablets = 20
  int32_t increment = 1000 / num_tablets;
  for (int32_t i = 1; i < num_tablets; i++) {
    KuduPartialRow* row = mytable_schema.NewRow();
    KUDU_CHECK_OK(row->SetInt32(0, i * increment));
    splits.push_back(row);
  }
  vector<string> column_names;
  column_names.push_back("rowid");
  // Create the table.
  KuduTableCreator* table_creator = client->NewTableCreator();
  KUDU_CHECK_OK(table_creator->table_name("mytable")
        .schema(&mytable_schema)
      .set_range_partition_columns(column_names)
      .split_rows(splits)
      .Create());
  // Confirm if the table was successfully created
  bool created;
  KUDU_CHECK_OK(client->TableExists("mytable", &created));
  created ? KUDU_LOG(INFO) << "Created table mytable." :
            KUDU_LOG(INFO) << "Failed to create table mytable.";
  // Insert two rows into the table.
  shared_ptr<KuduTable> table;
  client->OpenTable("mytable", &table);
  KuduInsert* my_insert = table->NewInsert();
  KuduPartialRow* row = categories_insert->mutable_row();
  KUDU_CHECK_OK(row->SetInt32("rowid", 100));
  KUDU_CHECK_OK(row->SetStringCopy("name", "Fred Smith"));
  KUDU_CHECK_OK(row->SetInt8("age", 56));
  KUDU_CHECK_OK(row->SetDouble("salary", 110000));
  KUDU_CHECK_OK(session->Apply(my_insert));
  KuduInsert* my_insert = table->NewInsert();
  KuduPartialRow* row = categories_insert->mutable_row();
  KUDU_CHECK_OK(row->SetInt32("rowid", 101));
  KUDU_CHECK_OK(row->SetStringCopy("name", "Linda Stern"));
  KUDU_CHECK_OK(row->SetInt8("age", 29));
  KUDU_CHECK_OK(row->SetDouble("salary", 75000));
  KUDU_CHECK_OK(session->Apply(my_insert));
  KUDU_CHECK_OK(session->Flush());
  KUDU_LOG(INFO) << "Inserted two rows into mytable";
  // Scan one row based on a predicate
  KuduScanner scanner(table.get());
  // Add a predicate: WHERE name = "Linda Stern"
  KuduPredicate* pred = table->NewComparisonPredicate(
      "name", KuduPredicate::EQUAL, KuduValue::FromString("Linda Stern"));
  KUDU_RETURN_NOT_OK(scanner.AddConjunctPredicate(pred));
  KUDU_RETURN_NOT_OK(scanner.Open());
  vector<KuduRowResult> results;
  while (scanner.HasMoreRows()) {
    KUDU_RETURN_NOT_OK(scanner.NextBatch(&results));
    for (vector<KuduRowResult>::iterator iter = results.begin();
        iter != results.end();
        iter++) {
      const KuduRowResult& result = *iter;
      string_t myname;
      KUDU_RETURN_NOT_OK(result.GetString("name", &myname));
      KUDU_LOG(INFO) << "Scanned some rows out of a table" << myname;
    }
    results.clear();
  }
  // Delete the table.
  KUDU_CHECK_OK(client->DeleteTable("mytable"));
  KUDU_LOG(INFO) << "Deleted mytable.";
}
Listing 2-5

Sample C++ code using the Kudu client API

More examples xviii can be found on Kudu’s official website xix and github repository. xx The sample code available online was contributed by the Kudu development team and served as reference for this chapter.

Backup and Recovery

Kudu doesn’t have a backup and recovery utility. However, there are a few ways to back up (and recover) Kudu tables using Impala, Spark, and third-party tools such as StreamSets and Talend.

Note

HDFS snapshots cannot be used to back up Kudu tables since Kudu data does not reside in HDFS. xxi

Backup via CTAS

The simplest way to back up a Kudu table is to use CREATE TABLE AS (CTAS) . You’re basically just creating another copy of the Kudu table in HDFS, preferably in Parquet format (or other compressed format), so you can copy the file to a remote location such as another cluster or S3.

CREATE TABLE AS DimCustomer_copy AS SELECT * FROM DimCustomer;
+-----------------------+
| summary               |
+-----------------------+
| Inserted 18484 row(s) |
+-----------------------+

You can create the table first so you can customize table options if needed, and then use INSERT INTO to insert data from the Kudu table.

CREATE TABLE DimCustomer_Parquet (
ID STRING,
CustomerKey BIGINT,
FirstName STRING,
LastName STRING,
BirthDate STRING,
YearlyIncome FLOAT,
TotalChildren INT,
EnglishEducation STRING,
EnglishOccupation STRING,
HouseOwnerFlag INT,
NumberCarsOwned INT
)
STORED AS PARQUET;
set COMPRESSION_CODEC=gzip;
insert into DimCustomer_Parquet Select * from DimCustomer;
Modified 18484 row(s) in 4.52s

Note

The CREATE TABLE LIKE syntax is not supported on Kudu tables. If you try to use the syntax to create a table, you will receive an error message similar to the following: “ERROR: AnalysisException: Cloning a Kudu table using CREATE TABLE LIKE is not supported.”

Check the files with the HDFS command.

hadoop fs -du -h /user/hive/warehouse/dimcustomer_parquet
636.8 K  1.9 M  /user/hive/warehouse/dimcustomer_parquet/f948582ab9f8dfbb-5e57d0ca00000000_1052047868_data.0.parq

Copy the Parquet Files to Another Cluster or S3

You can now copy the Parquet files to another cluster using distcp.

hadoop distctp -pb hftp://kuducluster:50070/user/hive/warehouse/dimcustomer_parquet hdfs://kuducluster2/backup_files

You can also copy the files to S3.

hadoop distcp –pb -Dfs.s3a.access.key=s3-access-key -Dfs.s3a.secret.key=s3-secret-key hdfs://user/hive/warehouse/dimcustomer_parquet s3a://myWarehouseBucket/backup_files

To maintain consistency, note that I used the -pb option to guarantee that the special block size of the Parquet data files is preserved. xxii

Note

Cloudera has a cluster replication feature called Cloudera Enterprise Backup and Disaster Recovery (BDR) . BDR provides an easy-to-use graphical user interface that lets you schedule replication from one cluster to another. BDR does not work with Kudu, but you can replicate the destination Parquet files residing in HDFS. xxiii

Export Results via impala-shell to Local Directory, NFS, or SAN Volume

Impala-shell can generate delimited files that you can then compress and copy to a remote server or NFS/SAN volume. Note that this method is not appropriate for large tables.

impala-shell -q "SELECT * FROM DimCustomer" --delimited --output_delimiter=, --output_file /backup_nfs/dimcustomer_bak.csv

Export Results Using the Kudu Client API

The Kudu client API can be used to export data as well. See Listing 2-6 for an example.

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import java.util.ArrayList;
import java.util.List;
public class TableBackup {
  public static void main(String[] args) {
// Create Kudu client object
KuduClient myKuduClient = new KuduClient.KuduClientBuilder("kudumaster").build();
KuduTable myTable = myKuduClient.openTable("CustomersTbl");
KuduSession mySession = myKuduClient.newSession();
// Display rows
      List<String> myColumns = new ArrayList<String>();
      myColumns.add("rowid");
        myColumns.add("customername");
        myColumns.add("customerage");
      KuduScanner myScanner = myKuduClient.newScannerBuilder(myTable)
          .setProjectedColumnNames(myColumns)
          .build();
      while (myScanner.hasMoreRows()) {
        RowResultIterator myResultIterator = myScanner.nextRows();
        while (myResultIterator.hasNext()) {
          RowResult myRow = myResultIterator.next();
          System.out.println(myRow.getInt("rowid"));
              System.out.println(myRow.getString("customername"));
              System.out.println(myRow.getString("customerage"));
        }
       }
 }
}
Listing 2-6

Sample Java code using the Kudu client API to export data

Compile the java code and run it from the command line. Redirect the results to a file. This method is appropriate for small data sets.

java TableBackup >> /backup_nfs/mybackup.txt

Export Results with Spark

You can also back up data using Spark. This is more appropriate for large tables since you can control parallelism, the number of executors, executor cores, and executor memory.

Start by creating a Data Frame.

val df = sqlContext.read.options(Map("kudu.master" -> "localhost:7051","kudu.table" -> "impala::default.DimCustomer")).kudu

Save the data in CSV format.

df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("/backup/dimcustomer_bak")

Or you can save it as Parquet.

df.coalesce(1).write.mode("append").parquet("/backup/dimcustomer_p_bak”)

Using coalesce to limit the amount of files generated when writing to HDFS may cause performance issues. I discuss coalesce in more details in Chapter 5.

Replication with Spark and Kudu Data Source API

We can use Spark to copy data from one Kudu cluster to another.

Start the spark-shell.

spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.1.0 --driver-class-path mysql-connector-java-5.1.40-bin.jar --jars mysql-connector-java-5.1.40-bin.jar

Connect to the Kudu master and check the data in the users table. We’re going to sync this Kudu table with another Kudu table in another cluster.

import org.apache.kudu.spark.kudu._
val kuduDF = sqlContext.read.options(Map("kudu.master" -> "kuducluster:7051","kudu.table" -> "impala::default.users")).kudu
kuduDF.select("userid","name","city","state","zip","age").sort($"userid".asc).show()
+------+---------------+---------------+-----+-----+---+
|userid|           name|           city|state|  zip|age|
+------+---------------+---------------+-----+-----+---+
|   100|   Wendell Ryan|      San Diego|   CA|92102| 24|
|   101|Alicia Thompson|       Berkeley|   CA|94705| 52|
|   102|Felipe Drummond|      Palo Alto|   CA|94301| 33|
|   103|  Teresa Levine|   Walnut Creek|   CA|94507| 47|
|   200|  Jonathan West|         Frisco|   TX|75034| 35|
|   201| Andrea Foreman|         Dallas|   TX|75001| 28|
|   202|   Kirsten Jung|          Plano|   TX|75025| 69|
|   203| Jessica Nguyen|          Allen|   TX|75002| 52|
|   300|   Fred Stevens|       Torrance|   CA|90503| 23|
|   301|    Nancy Gibbs|       Valencia|   CA|91354| 49|
|   302|     Randy Park|Manhattan Beach|   CA|90267| 21|
|   303|  Victoria Loma|  Rolling Hills|   CA|90274| 75|
+------+---------------+---------------+-----+-----+---+

Let’s go ahead and insert the data to a table in another Kudu cluster. The destination table needs to be present in the other Kudu cluster.

val kuduContext = new KuduContext("kuducluster2:7051")
kuduContext.insertRows(kuduDF, "impala::default.users2")

Verify the data in the destination table.

impala-shell
select * from users2 order by userid;
+------+---------------+---------------+-----+-----+---+
|userid|           name|           city|state|  zip|age|
+------+---------------+---------------+-----+-----+---+
|   100|   Wendell Ryan|      San Diego|   CA|92102| 24|
|   101|Alicia Thompson|       Berkeley|   CA|94705| 52|
|   102|Felipe Drummond|      Palo Alto|   CA|94301| 33|
|   103|  Teresa Levine|   Walnut Creek|   CA|94507| 47|
|   200|  Jonathan West|         Frisco|   TX|75034| 35|
|   201| Andrea Foreman|         Dallas|   TX|75001| 28|
|   202|   Kirsten Jung|          Plano|   TX|75025| 69|
|   203| Jessica Nguyen|          Allen|   TX|75002| 52|
|   300|   Fred Stevens|       Torrance|   CA|90503| 23|
|   301|    Nancy Gibbs|       Valencia|   CA|91354| 49|
|   302|     Randy Park|Manhattan Beach|   CA|90267| 21|
|   303|  Victoria Loma|  Rolling Hills|   CA|90274| 75|
+------+---------------+---------------+-----+-----+---+

The rows were successfully replicated.

Real-Time Replication with StreamSets

StreamSets is a powerful real-time and batch ingestion tool used mostly in real-time streaming and Internet of Things (IoT) use cases. You can use StreamSets to replicate data from JDBC sources such as Oracle, MySQL, SQL Server, or Kudu to another destination in real time or near real time. StreamSets offers two origins to facilitate replication through a JDBC connection: JDBC Query Consumer and JDBC Multitable Consumer. The JDBC Query Consumer origin uses a user-defined SQL query to read data from tables. See Figure 2-6.
../images/456459_1_En_2_Chapter/456459_1_En_2_Fig6_HTML.jpg
Figure 2-6

StreamSets and Kudu

The JDBC Multitable Consumer origin reads multiple tables in the same database. The JDBC Multitable Consumer origin is appropriate for database replication. StreamSets includes a Kudu destination; alternatively the JDBC Producer is a (slower) option and can be used to replicate data to other relational databases. Chapter 7 covers StreamSets in more detail.

Replicating Data Using ETL Tools Such as Talend, Pentaho, and CDAP

Talend (Figure 2-9) and Cask Data Platform (Figure 2-7) offer native support for Kudu. Both provide Kudu source and sinks and can be used to replicate data from one Kudu cluster to one or more Kudu clusters; another destination such as S3; or an RDBMS such as SQL Server, MySQL, or Oracle. Other tools such as Pentaho PDI (Figure 2-8) does not have native Kudu support. However, it can transfer data to Kudu via Impala, albeit slower. Chapter 7 covers batch and real-time ingestion tools in including StreamSets, Talend, Pentaho, and CDAP in detail.
../images/456459_1_En_2_Chapter/456459_1_En_2_Fig7_HTML.jpg
Figure 2-7

ETL with CDAP and Kudu

../images/456459_1_En_2_Chapter/456459_1_En_2_Fig8_HTML.jpg
Figure 2-8

ETL with Pentaho and Kudu

../images/456459_1_En_2_Chapter/456459_1_En_2_Fig9_HTML.jpg
Figure 2-9

ETL with Talend and Kudu

Note

Talend Kudu components are provided by a third-party company, One point Ltd. These components are free and downloadable from the Talend Exchange at – https://exchange.talend.com/ . The Kudu Output and Input components need to be installed before you can use Talend with Kudu.

Python and Impala

Using Python is not the fastest or most scalable way to back up large Kudu tables, but it should be adequate for small- to medium-sized data sets. Below is a list of the most common ways to access Kudu tables from Python.

Impyla

Cloudera built a Python package known as Impyla. xxiv Impyla simply communicates with Impala using standard ODBC/JDBC. One of the nice features of Impyla is its ability to easily convert query results into pandas DataFrame (not to be confused with Spark DataFrames). Here’s an example.

>>> from impala.util import as_pandas
>>> cur.execute('SELECT id, name, salary FROM employees')
>>> df = as_pandas(cur)
>>> type(df)
<class 'pandas.core.frame.DataFrame'>
>>> df
              id     name                      salary
0            001     James Chan                100000
1            002     Roger Lim                  75000
2            003     Dan Tanner            v    65000
3            004     Lilian Russo               90000
4            005     Edith Sarkisian           110000

pyodbc

pyodbc is a popular open source Python package that you can use to access databases via ODBC/JDBC. xxv Here’s an example on how to use pyodbc. To learn more about pyodbc, visit its github page at github.com/mkleehammer/pyodbc.

import pyodbc
myconnection_str = '''Driver=/mypath/libclouderaimpalaodbc.dylib;HOST=localhost;PORT=21050'''
myconnection = pyodbc.connect("myconnection_str")
cursor = myconnection.cursor()
cursor.execute("select id, name, salary  from employees")

SQLAlchemy

SQLAlchemy is an SQL toolkit for Python that features an object-relational mapper (ORM). To learn more about SQLAlchemy, visit its website at sqlalchemy.org. Here’s an example on how to use SQLAlchemy to connect to Impala.

import sqlalchemy
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
myconnection_str = '''Driver=/mypath/libclouderaimpalaodbc.dylib;HOST=localhost;PORT=21050'''
myconnection = create_engine("myconnection_str")
session = sessionmaker(bind=db)
user = session.query(Employees).filter_by(salary > '65000').first()

High Availability Options

Aside from Kudu , having a default tablet replica factor of 3 (which can be increased to 5 or 7), there are no built-in high availability tools or features available for Kudu. Fortunately, you can use built-in components in Cloudera Enterprise and third-party tools such as StreamSets to provide high availability capabilities to Kudu. High availability can protect you from complete site failure by having two or more Kudu clusters. The clusters can be in geographically distributed data centers or cloud providers. xxvi Just be aware that the amount of data that is being replicated could impact performance and cost. An added benefit of having an active-active environment is being able to use both clusters for different use cases. For example, a second cluster can be used for ad hoc queries, building machine learning models, and other data science workloads, while the first cluster is used for real-time analytics or use cases with well-defined SLAs. Let’s explore a few high availability options for Kudu.

Active-Active Dual Ingest with Kafka and Spark Streaming

In this option, all data is published to a Kafka cluster. Spark streaming is used to read data from the Kafka topics. Using Spark Streaming, you have the option to perform data transformation and cleansing before writing the data to Kudu. Figure 2-10 shows two Kudu destinations, but you could have more depending on your HA requirements.
../images/456459_1_En_2_Chapter/456459_1_En_2_Fig10_HTML.jpg
Figure 2-10

Active-Active Dual Ingest with Kafka and Spark Streaming

Active-Active Kafka Replication with MirrorMaker

Another high availability option is to replicate Kafka using MirrorMaker . As shown in Figure 2-11, data ingested into the source Kafka cluster is replicated using MirrorMaker to the destination Kafka cluster. From there, the data is read by Spark Streaming and written to the Kudu destinations similar to Figure 2-10. If your goal is dual ingest, using MirrorMaker to replicate your Kudu cluster might be overkill. However, it provides better data protection since data is replicated on two or more Kafka and Kudu clusters.
../images/456459_1_En_2_Chapter/456459_1_En_2_Fig11_HTML.jpg
Figure 2-11

Active-Active Kafka Replication with MirrorMaker

Active-Active Dual Ingest with Kafka and StreamSets

This option is very similar to Figure 2-10 but uses StreamSets instead of Spark Streaming. StreamSets is easier to use and administer than Spark Streaming and provides built-in monitoring, alerting, and exception handling. It also offers an event framework that makes it easy to kick off a task based on events. I usually recommend StreamSets over Spark Streaming in most projects (Figure 2-12).
../images/456459_1_En_2_Chapter/456459_1_En_2_Fig12_HTML.jpg
Figure 2-12

Active-Active Dual Ingest with Kafka and StreamSets

Active-Active Dual Ingest with StreamSets

Technically, you don’t need Kafka in order to have active-active dual ingest. StreamSets allows you to configure multiple destinations. Using the Stream Selector processor, it can even route data to different destinations based on certain conditions (Figure 2-13). Using Kafka provides added high availability and scalability and is still recommended in most cases. I cover StreamSets in Chapter 7 in more detail.
../images/456459_1_En_2_Chapter/456459_1_En_2_Fig13_HTML.jpg
Figure 2-13

Active-Active Dual Ingest with Kafka and StreamSets

Administration and Monitoring

Just like other data management platforms, Kudu provides tools to aid in system administration and monitoring.

Cloudera Manager Kudu Service

Cloudera Manager is Cloudera’s cluster management tool, providing a single pane of glass for administering and managing Cloudera Enterprise clusters. With Cloudera Manager, you can perform common administration tasks such as starting and stopping the Kudu service, updating configuration, monitoring performance, and checking logs.

Kudu Master Web UI

Kudu Masters provide a web interface (available on port 8051) that provides information about the cluster. It displays information about tablet servers, heartbeat, hostnames, tables, and schemas. You can also view details on available logs, memory usage, and resource consumption.

Kudu Tablet Server Web UI

Each table server also provides a web interface (available on port 8050) that provides information about the tablet servers cluster. It displays more detailed information about each tablet hosted on the tablet servers, debugging and state information, resource consumption, and available logs.

Kudu Metrics

Kudu provides several metrics that you can use to monitor and troubleshoot your cluster. You can get a list of available Kudu metrics by executing $ kudu-tserver --dump_metrics_json or kudu-master --dump_metrics_json. Once you know the metric you want to check, you can collect the actual value via HTTP by visiting /metrics end-point. These metrics are also collected and aggregated by Cloudera Manager. For example:

curl -s 'http://tabletserver01:8050/metrics?include_schema=1&metrics=read_bytes_rate'

Kudu Command-Line Tools

In addition to Cloudera Manager and the web user interfaces accessible provided by the master and tablet servers, Kudu includes command-line tools for common system administration tasks.

Validate Cluster Health

ksck: Checks that the cluster metadata is consistent and that the masters and tablet servers are running. Ksck checks all tables and tablets by default, but you can specify a list of tables to check using the tables flag, or a list of tablet servers using the tablets flag. Use the checksum_scan and checksum_snapshot to check for inconsistencies in your data.

Usage:

kudu cluster ksck --checksum_scan [--tables <tables>] <master_address>
File System

check: Check a Kudu filesystem for inconsistencies

Usage:

kudu fs check [-fs_wal_dir=<dir>] [-fs_data_dirs=<dirs>] [-repair]

list: Show list of tablet replicas in the local filesystem

Usage:

kudu local_replica list [-fs_wal_dir=<dir>] [-fs_data_dirs=<dirs>] [-list_detail]

data_size: Summarize the data size/space usage of the given local replica(s).

Usage:

kudu local_replica data_size <tablet_id_pattern> [-fs_wal_dir=<dir>] [-fs_data_dirs=<dirs>] [-format=<format>]
Master

status: Get the status of a Kudu Master

Usage:

kudu master status <master_address>

timestamp: Get the current timestamp of a Kudu Master

Usage:

kudu master timestamp <master_address>

list: List masters in a Kudu cluster

Usage:

kudu master list <master_addresses> [-columns=<columns>] [-format=<format>] [-timeout_ms=<ms>]
Measure the Performance of a Kudu Cluster

loadgen: Run load generation with optional scan afterward

loadgen inserts auto-generated random data into an existing or auto-created table as fast as the cluster can execute it. Loadgen can also check whether the actual count of inserted rows matches the original row count.

Usage:

kudu perf loadgen <master_addresses> [-buffer_flush_watermark_pct=<pct>] [-buffer_size_bytes=<bytes>] [-buffers_num=<num>] [-error_buffer_size_bytes=<bytes>] [-flush_per_n_rows=<rows>] [-keep_auto_table] [-num_rows_per_thread=<thread>] [-num_threads=<threads>] [-run_scan] [-seq_start=<start>] [-show_first_n_errors=<errors>] [-string_fixed=<fixed>] [-string_len=<len>] [-table_name=<name>] [-table_num_buckets=<buckets>] [-table_num_replicas=<replicas>] [-use_random]
Table

delete: Delete a table

Usage:

kudu table delete <master_addresses> <table_name>

list: List all tables

Usage:

kudu table list <master_addresses> [-list_tablets]
Tablets

leader_step_down: Force the tablet’s leader replica to step down

Usage:

kudu tablet leader_step_down <master_addresses> <tablet_id>

add_replica: Add a new replica to a tablet’s Raft configuration

Usage:

kudu tablet change_config add_replica <master_addresses> <tablet_id> <ts_uuid> <replica_type>

move_replica: Move a tablet replica from one tablet server to another

The replica move tool effectively moves a replica from one tablet server to another by adding a replica to the new server and then removing it from the old one.

Usage:

kudu tablet change_config move_replica <master_addresses> <tablet_id> <from_ts_uuid> <to_ts_uuid>
Tablet Server

status: Get the status of a Kudu Tablet Server

Usage:

kudu tserver status <tserver_address>

timestamp: Get the current timestamp of a Kudu Tablet Server

Usage:

kudu tserver timestamp <tserver_address>

list: List tablet servers in a Kudu cluster

Usage:

kudu tserver list <master_addresses> [-columns=<columns>] [-format=<format>] [-timeout_ms=<ms>]

Consult Kudu’s online command-line reference guide xxvii for a complete list and description of Kudu’s command-line tools.

Known Issues and Limitations

There are several issues and limitations in Kudu. Depending on your use case, they can be considered minor or major issues. Most of them have workarounds but some do not. You have to be aware of these limitations when building new applications or migrating workloads to Kudu. I list some of the major ones below. Kudu committers and contributors are hard at work fixing these limitations.
  • Kudu does not support DECIMAL, CHAR, VARCHAR, DATE, and complex types like ARRAY, MAP, and STRUCT.

  • Kudu tables can have a maximum of 300 columns.

  • Kudu does not have secondary indexes.

  • Kudu does not have foreign keys.

  • Multi-row and multi-table transactions are not supported.

  • Kudu does not have a built-in backup and recovery and high availability feature.

  • Kudu does not support row, column, and table-level role-based access control.

  • Kudu recommends 100 as a maximum number of tablet servers.

  • Kudu recommends 3 as a maximum number of masters.

  • Kudu recommends 8TB as the maximum amount of stored data, post-replication and post-compression, per tablet server.

  • Kudu recommended 2000 as a maximum number of tablets per tablet server, post-replication.

  • Kudu recommends 60 as a maximum number of tablets per table for each tablet server, post-replication, at table-creation time.

  • Kudu does not support rack-awareness, multiple data centers, and rolling restarts.

For a more complete and up-to-date list of Kudu’s limitations, consult Cloudera’s online documentation. xxviii

Security

Kudu supports Kerberos for strong authentication. Communication between Kudu clients and servers is encrypted with TLS. Kudu does not support table, row, or column-level access control. Instead it uses a white-list style access control list to implement coarse-grained authorization. Two levels of access include Superuser and User. Unauthenticated users will be unable to access the Kudu cluster. xxix

There’s still a lot of work to be done from a security standpoint. For the meantime, additional suggestions to tighten up security include restricting direct access to the Kudu tables and implementing role-based access control via the business intelligence tool’s semantic layer. Implementing databases views to mimic row and column-level role-based access control is another option. Configuring IP access lists to restrict access from certain IP addresses to the port used by the master for RPC (the default port is 7051) can also be explored.

Consult Cloudera’s online documentation for a more up-to-date development on Kudu security.

Summary

Although Hadoop is known for its ability to handle structured, unstructured, and semi-structured data, structured relational data remains the focus of most companies’ data management and analytic strategies and will continue to be in the foreseeable future. xxx In fact, a majority of the big data use cases involves replicating workloads from relational databases. Kudu is the perfect storage engine for structured data. Throughout the book, we will focus on Kudu and how it integrates with other projects in the Hadoop ecosystem and third-party applications to enable useful business use cases.

References

  1. i.

    Globenewswire; “Cloudera Announces General Availability of Apache Kudu with Release of Cloudera Enterprise 5.10,” Cloudera, 2017, https://globenewswire.com/news-release/2017/01/31/912363/0/en/Cloudera-Announces-General-Availability-of-Apache-Kudu-with-Release-of-Cloudera-Enterprise-5-10.html

     
  2. ii.

    Todd Lipcon; “A brave new world in mutable big data: Relational storage,” O’Reilly, 2017, https://conferences.oreilly.com/strata/strata-ny/public/schedule/speaker/75982

     
  3. iii.

    Jimmy Xiang; “Apache HBase Write Path,” Cloudera, 2012, https://blog.cloudera.com/blog/2012/06/hbase-write-path/

     
  4. iv.

    Apache Software Foundation; “Introducing Apache Kudu,” ASF, 2017, https://kudu.apache.org/docs/#kudu_use_cases

     
  5. v.

    Apache Software Foundation; “The Apache Software Foundation Announces Apache® Kudu™ v1.0,” ASF, 2017, https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces100

     
  6. vi.

    Pat Patterson; “Innovation with @ApacheKafka, #StreamSets, @ApacheKudu & @Cloudera at the Australian @DeptDefence - spotted on the Kudu Slack channel,” Twitter, 2017, https://twitter.com/metadaddy/status/843842328242634754

     
  7. vii.

    Todd Lipcon; “Kudu: Storage for Fast Analytics on Fast Data,” Cloudera, 2015, https://kudu.apache.org/kudu.pdf

     
  8. viii.

    Diego Ongaro and John Ousterhout; “In Search of an Understandable Consensus Algorithm

    (Extended Version),” Stanford University, 2014, https://raft.github.io/raft.pdf

     
  9. ix.

    Diego Ongaro; “Consensus: Bridging Theory and Practice,” Stanford University, 2014, https://github.com/ongardie/dissertation#readme

     
  10. x.

    Neil Chandler; “Oracle’s Locking Model – Multi Version Concurrency Control,” Neil Chandler, 2013, https://chandlerdba.com/2013/12/01/oracles-locking-model-multi-version-concurrency-control/

     
  11. xi.

    Oracle; “Multiversion Concurrency Control,” Oracle, 2018, https://docs.oracle.com/cd/B19306_01/server.102/b14220/consist.htm#i17881

     
  12. xii.

    Microsoft; “Database Concurrency and Row Level Versioning in SQL Server 2005,” Microsoft, 2018, https://technet.microsoft.com/en-us/library/cc917674.aspx

     
  13. xiii.
     
  14. xiv.

    David Alves and James Kinley; “Apache Kudu Read & Write Paths,” Cloudera, 2017, https://blog.cloudera.com/blog/2017/04/apache-kudu-read-write-paths/

     
  15. xv.

    Microsoft; “Using decimal, float, and real Data,” Microsoft, 2018, https://technet.microsoft.com/en-us/library/ms187912(v=sql.105).aspx

     
  16. xvi.

    Apache Impala; “TIMESTAMP Data Type,” Apache Impala, 2017, https://impala.apache.org/docs/build/html/topics/impala_timestamp.html

     
  17. xvii.

    Cloudera; “Example Impala Commands With Kudu,” Cloudera, 2017, https://kudu.apache.org/docs/developing.html#_kudu_integration_with_spark

     
  18. xviii.

    William Berkeley, “scantoken_noncoveringrange.cc,” Cloudera, 2017, https://gist.github.com/wdberkeley/50e2e47548a0daa3d3bff68e388da37a

     
  19. xix.

    Apache Kudu, “Developing Applications With Apache Kudu,” Apache Kudu, 2017, http://kudu.apache.org/docs/developing.html

     
  20. xx.

    Apache Kudu; “Kudu C++ client sample,” Apache Kudu, 2018, https://github.com/cloudera/kudu/tree/master/src/kudu/client/samples

     
  21. xxi.

    Apache Kudu; “Apache Kudu FAQ,” Apache Kudu, 2018, https://kudu.apache.org/faq.html

     
  22. xxii.

    Cloudera; “Using the Parquet File Format with Impala Tables”, Cloudera, 2018, https://www.cloudera.com/documentation/enterprise/latest/topics/impala_parquet.html

     
  23. xxiii.

    Cloudera; “How To Back Up and Restore HDFS Data Using Cloudera Enterprise BDR,” Cloudera, 2018, https://www.cloudera.com/documentation/enterprise/latest/topics/cm_bdr_howto_hdfs.html

     
  24. xxiv.

    Cloudera; “A New Python Client for Impala”, Cloudera, 2018, http://blog.cloudera.com/blog/2014/04/a-new-python-client-for-impala/

     
  25. xxv.

    Cloudera; “Importing Data into Cloudera Data Science Workbench,” Cloudera, 2018, https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_import_data.html#impala_impyla

     
  26. xxvi.

    Cloudera; “Implementing Active/Active Multi-Cluster Deployments with Cloudera Enterprise,” Cloudera, 2018, https://www.cloudera.com/content/dam/www/marketing/resources/whitepapers/implementing-active-deployments-with-cloudera-enterprise-whitepaper.pdf.landing.html

     
  27. xxvii.

    Cloudera; “Apache Kudu Command Line Tools Reference,” Cloudera, 2018, https://kudu.apache.org/docs/command_line_tools_reference.html

     
  28. xxviii.
     
  29. xxix.

    Cloudera; “Apache Kudu Security,” Cloudera, 2018, https://www.cloudera.com/documentation/kudu/latest/topics/kudu_security.html

     
  30. xxx.

    Cloudera; “Dell Survey: Structured Data Remains Focal Point Despite Rapidly Changing Information Management Landscape,” Cloudera, 2018, http://www.dell.com/learn/us/en/uscorp1/press-releases/2015-04-15-dell-survey

     
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.142.230