Kudu is just a storage engine. You need a way to get data into it and out. As Cloudera’s default big data processing framework, Spark is the ideal data processing and ingestion tool for Kudu. Not only does Spark provide excellent scalability and performance, Spark SQL and the DataFrame API make it easy to interact with Kudu.
If you are coming from a data warehousing background or if you are familiar with a relational database such as Oracle and SQL Server, you can consider Spark a more powerful and versatile equivalent to procedural extensions to SQL such as PL/SQL and T-SQL.
Spark and Kudu
You use Spark with Kudu using the Data Source API. You can use the —packages option in spark-shell or spark-submit to include kudu-spark dependency. You can also manually download the jar file from central.maven.org and include it in your —jars option. Consult the Apache Spark online documentation for more details on how to use sbt and Maven as your project build tool.
Spark 1.6.x
Use the kudu-spark_2.10 artifact if you are using Spark with Scala 2.10. For example:
You use a Kudu context in order to execute a DML statement against a Kudu table.i You have to specify the Kudu master servers and the port. We only have one Kudu context in our example below. There are usually multiple Kudu masters in a production environment; in that case you have to specify a comma-separated list of all the hostnames of the Kudu masters. I present examples on how to use Spark with Kudu. I also show examples on how to integrate Kudu with other data sources using Spark.
import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("kudumaster01:7051")
We need to create our table before we can start.
impala-shell
CREATE TABLE customers
(
id BIGINT PRIMARY KEY,
name STRING,
age SMALLINT
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU;
Create a case class that will provide a schema to our sample data.
case class CustomerData(id: Long, name: String, age: Short)
Inserting Data
Create some sample data.
val data = Array(CustomerData(101,"Lisa Kim",60), CustomerData(102,"Casey Fernandez",45))
val insertRDD = sc.parallelize(data)
val insertDF = sqlContext.createDataFrame(insertRDD)
insertDF.show
+----------+---------------+---+
|customerid| name|age|
+----------+---------------+---+
| 101| Lisa Kim| 60|
| 102|Casey Fernandez| 45|
+----------+---------------+---+
Insert the DataFrame. Note the name of the table. This format is needed if the table was created in Impala. In this example, default is the name of the database and customers is the name of the table. This convention was adopted to prevent table name collision between tables created in Impala and tables created natively using the Kudu API or Spark.
val df = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.customers")).kudu
df.select("id","name","age").show()
+--+------+----------+
| id| name|age|
+--+------+----------+
|102|Casey Cullen| 90|
|103|Byron Miller| 25|
|101| Lisa Kim|240|
+--+------+----------+
Deleting Data
Inspect the data in the table.
val df = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.customers")).kudu
df.select("id","name","age").show()
+--+------+----------+
| id| name|age|
+--+------+----------+
|102|Casey Cullen| 90|
|103|Byron Miller| 25|
|101| Lisa Kim|240|
+--+------+----------+
Register the table so that we can use the table in our SQL query.
df.registerTempTable("customers")
Delete the data based on our query.
kuduContext.deleteRows(sqlContext.sql("select id from customers where name like 'Casey%'"), "impala::default.customers")
Confirm that the data was successfully deleted.
val df = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.customers")).kudu
df.select("id","name","age").show()
+--+------+----------+
| id| name|age|
+--+------+----------+
|103|Byron Miller| 25|
|101| Lisa Kim|240|
+--+------+----------+
Selecting Data
Select the data in the table.
val df = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.customers")).kudu
df.select("id","name","age").show()
+--+------+----------+
| id| name|age|
+--+------+----------+
|103|Byron Miller| 25|
|101| Lisa Kim|240|
+--+------+----------+
You can also run your query by registering the table and using it in a SQL query. Note that you should use createOrReplaceTempView if you are using Spark 2.x.
df.registerTempTable("customers")
val df2 = sqlContext.sql("select * from customers where age=240")
df2.show
+--+----+--------+
| id| name|age|
+--+----+--------+
|101|Lisa Kim|240|
+--+----+--------+
Creating a Kudu Table
This table will not be visible to Impala since this table was created in Spark. You have to create an external table in Impala and refer to this table.
Import org.apache.kudu.client.CreateTableOptions;
kuduContext.createTable("customer2", df.schema, Seq("customerid"), new CreateTableOptions().setRangePartitionColumns(List("customerid").asJava).setNumReplicas(1))
Inserting CSV into Kudu
Let’s insert multiple rows from a CSV file.
val dataRDD = sc.textFile("/sparkdata/customerdata.csv")
val parsedRDD = dataRDD.map{_.split(",")}
case class CustomerData(customerid: Long, name: String, age: Short)
val dataDF = parsedRDD.map{a => CustomerData (a(0).toLong, a(1), a(2).toShort) }.toDF
kuduContext.insertRows(dataDF, "customer2");
Because we created the customer2 table via Spark we only had to specify the table name (customer2) instead of impala::default.customer2.
Inserting CSV into Kudu Using the spark-csv Package
We can also use the spark-csv package to process the sample CSV data. The command below will automatically download the spark-csv dependencies so make sure you have an Internet connection in your cluster node. Note the use of the comma separating the list of packages.
Because we created the customer2 table via Spark, we only had to specify the table name (customer2) instead of impala::default.customer2.
Insert CSV into Kudu by Programmatically Specifying the Schema
You can use StructType to define a schema for your data set. Programmatically specifying a schema is helpful when the schema cannot be determined ahead of time.
Read a CSV file from HDFS.
val myCSV = sc.textFile("/tmp/mydata/customers.csv")
Map CSV data to an RDD
import org.apache.spark.sql.Row
val myRDD = myCSV.map(_.split(',')).map(e ⇒ Row(r(0).trim.toInt, r(1), r(2).trim.toInt, r(3)))
Remember that we created the customers table via Impala. Therefore, we need to use the format impala::database.table when referring to the customers table.
Inserting XML into Kudu Using the spark-xml Package
We’ll create an XML file as a sample data for this example. We need to copy the file to HDFS.
We’ll use the spark-xml package to process the sample XML data. This package works similarly to the spark-csv package. The command below will automatically download the spark-xml package so make sure you have an Internet connection in your cluster node. Include the kudu-spark dependency as well.
Now let’s go back to the impala-shell and compare the schema with the structure of the users table. As you can see, the data type of the age and zip columns in the users table is different from the corresponding columns in the data frame. We’ll get an error message if we try to insert this DataFrame into the Kudu table.
describe users;
+--------+---------+---------+-------------+
| name | type | comment | primary_key |
+--------+---------+---------+-------------+
| userid | bigint | | true |
| name | string | | false |
| city | string | | false |
| state | string | | false |
| zip | string | | false |
| age | tinyint | | false |
+--------+---------+---------+-------------+
We’ll need to cast the data types before inserting the DataFrame to the Kudu table. We’re introducing the use of selectExpr method here to convert data types, but another option is to programmatically specify the schema using StructType.
val convertedDF = xmlDF.selectExpr("userid","name","city","state","cast(zip as string) zip","cast(age as tinyint) age");
Let’s populate the users table in MySQL with some test data. Make sure the users table exists in the salesdb database. We will insert this data to a table in Kudu.
In some versions of Spark -jars does not add the JAR in the driver’s class path.ii It is recommended that you include the JDBC driver in your -jars and the Spark classpath.iii
Start the spark-shell. Take note that I had to include the MySQL driver as a parameter in both the –driver-class-path and –jars.
Let’s take advantage of pushdown optimization, running the query in the database and only returning the required results.
val mysqlDF = sqlContext.read.jdbc(jdbcURL, "users", connectionProperties).select("userid", "city", "state","age").where("age < 25")
mysqlDF.show
+------+---------------+-----+---+
|userid| city|state|age|
+------+---------------+-----+---+
| 300| Torrance| CA| 23|
| 302|Manhattan Beach| CA| 21|
+------+---------------+-----+---+
Let's specify an entire query. This is a more convenient and flexible method. Also, unlike the previous method, we are not required to specify the columns in our SELECT list if the column is specified in the WHERE clause.
val query = "(SELECT userid,name FROM users WHERE city IN ('Torrance','Rolling Hills')) as users"
val mysqlDF = sqlContext.read.jdbc(jdbcURL, query, connectionProperties)
mysqlDF.show
+------+-------------+
|userid| name|
+------+-------------+
| 300| Fred Stevens|
| 303|Victoria Loma|
+------+-------------+
We’ve just tried different ways on how to select data from a MySQL table.
Let’ just go ahead and insert the whole table to Kudu.
val mysqlDF = sqlContext.read.jdbc(jdbcURL, "users", connectionProperties)
But first let’s convert age from integer to TINYINT. Otherwise you won’t be able to insert this DataFrame into Kudu. Again, we could’ve just defined a schema using StructType.
val convertedDF = mysqlDF.selectExpr("userid","name","city","state","zip","cast(age as tinyint) age");
I’m going to use SQL Server 2016 throughout this book. You will also need to install SQL Server Management Studio separately (Figure 6-2).
We’ll create the salesdb database and the users table. In the Object Explorer, right-click the Databases node and click “New Databases” (Figure 6-3). You will be shown a window where you can specify the database name and other database configuration options. Enter a database name “salesdb” and click OK. For testing purposes we’ll leave the other options with the default values.
Expand the salesdb node. Right-click “Tables,” click “New,” then “Table.” Fill in the column names and data types. To make it easier for you to follow the examples in the book, make sure you keep the column names and data types the same as the MySQL and Kudu tables. Click the “Save” icon near the upper-right corner of the window, and then enter the table name “users” (Figure 6-4).
Let’s insert some test data into the users table we just created. Click “New Query” button on the standard toolbar to open a new editor window (Figure 6-5).
We’re going to copy these four rows from SQL Server to Kudu.
Start the spark-shell. Don’t forget to pass the SQL Server driver as a parameter in both the -driver-class-path and -jars.
There are several ways to transfer data from HBase to Kudu. We can use the HBase client API. There’s the Spark-HBase connector developed by Hortonworks.iv Astro, developed by Huwawei, provides an SQL layer HBase using Spark SQL.v The SparkOnHBase project from Cloudera was integrated into HBase recently, but it will probably take time before it makes it into a release of HBase.vi
We’ll use the most straightforward way by using JDBC. This might not be the fastest way to move data from HBase to Kudu, but it should be adequate for most tasks. We’ll create a Hive table on top of the HBase table, and then we’ll create a Spark DataFrame using JDBC via Impala. Once we have the DataFrame, we can easily insert it to Kudu.
Clicking the Impala JDBC Driver Downloads link near the bottom right of the page will bring you to the download page of the latest version of the JDBC driver for Impala. Download and unzip the files.
ls -l
-rw-r-r- 1 hadoop hadoop 693530 Mar 9 10:04 Cloudera-JDBC-Driver-for-Impala-Install-Guide.pdf
-rw-r-r- 1 hadoop hadoop 43600 Mar 8 11:11 Cloudera-JDBC-Driver-for-Impala-Release-Notes.pdf
-rw-r-r- 1 hadoop hadoop 46725 Mar 4 13:12 commons-codec-1.3.jar
-rw-r-r- 1 hadoop hadoop 60686 Mar 4 13:12 commons-logging-1.1.1.jar
-rw-r-r- 1 hadoop hadoop 7670596 Mar 4 13:16 hive_metastore.jar
-rw-r-r- 1 hadoop hadoop 596600 Mar 4 13:16 hive_service.jar
-rw-r-r- 1 hadoop hadoop 352585 Mar 4 13:12 httpclient-4.1.3.jar
-rw-r-r- 1 hadoop hadoop 181201 Mar 4 13:12 httpcore-4.1.3.jar
-rw-r-r- 1 hadoop hadoop 1562600 Mar 4 13:19 ImpalaJDBC41.jar
-rw-r-r- 1 hadoop hadoop 275186 Mar 4 13:12 libfb303-0.9.0.jar
-rw-r-r- 1 hadoop hadoop 347531 Mar 4 13:12 libthrift-0.9.0.jar
-rw-r-r- 1 hadoop hadoop 367444 Mar 4 13:12 log4j-1.2.14.jar
-rw-r-r- 1 hadoop hadoop 294796 Mar 4 13:16 ql.jar
-rw-r-r- 1 hadoop hadoop 23671 Mar 4 13:12 slf4j-api-1.5.11.jar
-rw-r-r- 1 hadoop hadoop 9693 Mar 4 13:12 slf4j-log4j12-1.5.11.jar
-rw-r-r- 1 hadoop hadoop 1307923 Mar 4 13:16 TCLIServiceClient.jar
-rw-r-r- 1 hadoop hadoop 792964 Mar 4 13:12 zookeeper-3.4.6.jar
The first thing we need to do is to start “hbase shell” and create the HBase table. We also need to add test data to our HBase table. Consult the Apache HBase Reference Guide online if you are unfamiliar with the HBase commands.
hbase shell
create 'hbase_users', 'cf1'
put 'hbase_users','400','cf1:name', 'Patrick Montalban'
put 'hbase_users','400','cf1:city', 'Los Angeles'
put 'hbase_users','400','cf1:state', 'CA'
put 'hbase_users','400','cf1:zip', '90010'
put 'hbase_users','400','cf1:age', '71'
put 'hbase_users','401','cf1:name', 'Jillian Collins'
put 'hbase_users','401','cf1:city', 'Santa Monica'
put 'hbase_users','401','cf1:state', 'CA'
put 'hbase_users','401','cf1:zip', '90402'
put 'hbase_users','401','cf1:age', '45'
put 'hbase_users','402','cf1:name', 'Robert Sarkisian'
put 'hbase_users','402','cf1:city', 'Glendale'
put 'hbase_users','402','cf1:state', 'CA'
put 'hbase_users','402','cf1:zip', '91204'
put 'hbase_users','402','cf1:age', '29'
put 'hbase_users','403','cf1:name', 'Warren Porcaro'
put 'hbase_users','403','cf1:city', 'Burbank'
put 'hbase_users','403','cf1:state', 'CA'
put 'hbase_users','403','cf1:zip', '91523'
put 'hbase_users','403','cf1:age', '62'
Create the Hive external table on top of the HBase table.
val solr = new HttpSolrServer("http://master02:8983/solr/mycollection");
val query = new SolrQuery();
query.setQuery("*:*");
query.addFilterQuery("userid:3");
query.setFields("userid","name","age","city");
query.setStart(0);
query.set("defType", "edismax");
val response = solr.query(query);
val results = response.getResults();
println(results);
A much better way to access Solr collections from Spark is by using the spark-solr package. Lucidworks started the spark-solr project to provide Spark-Solr integration.viii Using spark-solr is so much easier and powerful compared to SolrJ, allowing you to create DataFrames from Solr collections and using SparkSQL to interact with them. You can download the jar file from Lucidworks’s website.
Start by importing the jar file from spark-shell.
spark-shell -jars spark-solr-3.0.1-shaded.jar
Specify the collection and connection information.
val myOptions = Map("collection" -> "mycollection","zkhost" -> "{master02:8983/solr}")
Amazon S3 is a popular object store frequently used as data store for transient clusters. It’s also a cost-effective storage for backups and cold data. Reading data from S3 is just like reading data from HDFS or any other file system.
Read a CSV file from Amazon S3. Make sure you’ve configured your S3 credentials.
val myCSV = sc.textFile("s3a://mydata/customers.csv")
Map CSV data to an RDD.
import org.apache.spark.sql.Row
val myRDD = myCSV.map(_.split(‘,’)).map(e ⇒ Row(r(0).trim.toInt, r(1), r(2).trim.toInt, r(3)))
Register the DataFrame so we can run SQL queries against it.
kuduDF.registerTempTable("kudu_users")
Set up the JDBC URL and connection properties of the SQL Server database.
val jdbcURL = "jdbc:sqlserver://192.168.56.103;databaseName=salesdb;user=sa;password=cloudera"
val connectionProperties = new java.util.Properties()
import org.apache.spark.sql.SaveMode
To make sure our test is consistent, make sure the users table in SQL Server is empty using SQL Server Management Studio (Figure 6-7).
Insert the data from Kudu to SQL Server.
sqlContext.sql("select * from kudu_users").write.mode(SaveMode.Append).jdbc(jdbcURL, "users", connectionProperties)
Check the SQL Server table again and verify that the rows were added (Figure 6-8).
Congratulations! The data was successfully inserted.
Inserting from Kudu into Oracle
The first thing we need to do is set up the Oracle environment. We’ll create the user table in an existing pluggable database called EDWPDB. Log in as sysdba and start the instance. Consult the online Oracle documentation if you are unfamiliar with Oracle.
sqlplus / as sysdba
SQL*Plus: Release 12.2.0.1.0 Production on Sat May 6 18:12:45 2017
Copyright (c) 1982, 2016, Oracle. All rights reserved.
Connected to an idle instance.
SQL> startup
ORACLE instance started.
Total System Global Area 1845493760 bytes
Fixed Size 8793976 bytes
Variable Size 553648264 bytes
Database Buffers 1275068416 bytes
Redo Buffers 7983104 bytes
Database mounted.
Database opened.
SELECT name, open_mode FROM v$pdbs;
NAME OPEN_MODE
-------------------- ---------
PDB$SEED READ ONLY
ORCLPDB MOUNTED
EDWPDB MOUNTED
Open the EDWPDB pluggable database and set it as the current container.
ALTER PLUGGABLE DATABASE EDWPDB OPEN;
SELECT name, open_mode FROM v$pdbs;
NAME OPEN_MODE
-------------------- -----
PDB$SEED READ ONLY
ORCLPDB MOUNTED
EDWPDB READ WRITE
ALTER SESSION SET container = EDWPDB;
Create the Oracle table.
CREATE TABLE users (
userid NUMBER(19),
name VARCHAR(50),
city VARCHAR(50),
state VARCHAR (50),
zip VARCHAR(50),
age NUMBER(3));
Start the spark-shell. Don’t forget to include the oracle driver. I’m using ojdbc6.jar in this example.
Note
You might encounter the error “ORA-28040: No matching authentication protocol exception” when connecting to Oracle 12c R2 using the ojdbc6.jar driver. This is most likely caused by a bug in Oracle12c, Bug 14575666. The fix is to set SQLNET.ALLOWED_LOGON_VERSION=8 in the oracle/network/admin/sqlnet.ora file.
Register the DataFrame so we can run SQL queries against it.
kuduDF.registerTempTable("kudu_users")
Set up the JDBC URL and connection properties of the Oracle database.
val jdbcURL = "jdbc:oracle:thin:sales/cloudera@//192.168.56.30:1521/EDWPDB"
val connectionProperties = new java.util.Properties()
import org.apache.spark.sql.SaveMode
Make sure the users table in Oracle is empty using Oracle SQL Developer (Figure 6-9).
Insert the data from Kudu to Oracle.
sqlContext.sql("select * from kudu_users").write.mode(SaveMode.Append).jdbc(jdbcURL, "users", connectionProperties)
Check the Oracle table again and verify that the rows were added (Figure 6-10).
Congratulations! You’ve successfully copied rows from Kudu to Oracle.
Inserting from Kudu to HBase
We’re going to insert data HBase via Impala so that we can use SQL. This is not the fastest method to write to HBase. If performance is critical, I suggest you use the saveAsHadoopDataset method or the HBase Java client API to write to HBase. There are various other ways of importing data into HBase.ix
Start the spark-shell and create a data frame from the kudu table.
Go back to the spark-shell and set up the Impala connection.
val jdbcURL = s"jdbc:impala://10.0.1.101:21050;AuthMech=0"
val connectionProperties = new java.util.Properties()
Insert only the selected rows to the destination HBase table.
import org.apache.spark.sql.SaveMode
sqlContext.sql("select * from kudu_users where userid in (300,301,302,303)").write.mode(SaveMode.Append).jdbc(jdbcURL, "hbase_users", connectionProperties)
Go back to impala-shell and confirm that the rows were added to the destination HBase table.
You will discover that Spark generates dozens or hundreds of small files when writing to HDFS. This is known as the “small file” problem.x This will eventually cause all sorts of performance problems on your cluster. If that happens you might want to use coalesce or repartition to specify how many files to write to HDFS. For example, you might want Spark to write 1 parquet file to HDFS.
Using coalesce and repartition may cause performance issues since you are essentially reducing the degree of parallelism when writing data. Coalesce and repartition also trigger a shuffle that could cause performance issues depending on the amount of data that you’re processing. You need to balance the number of generated files with processing performance. You may still have to perform regular compaction of your parquet tables after a period of time. This is a problem that you will not have with Kudu.
Insert SQL Server and Oracle DataFrames into Kudu
We will join data from SQL and Oracle and insert it into Kudu.
Start the spark-shell. Don’t forget to include the necessary drivers and dependencies.
val jdbcURL = "jdbc:sqlserver://192.168.56.103;databaseName=salesdb;user=sa;password=cloudera"
val connectionProperties = new java.util.Properties()
Create a DataFrame from the SQL Server table.
val sqlDF = sqlContext.read.jdbc(jdbcURL, "userattributes", connectionProperties)
sqlDF.show
+------+------+------+------------------+
|userid|height|weight| occupation|
+------+------+------+------------------+
| 100| 175| 170| Electrician|
| 101| 180| 120| Librarian|
| 102| 180| 215| Data Scientist|
| 103| 178| 132|Software Developer|
+------+------+------+------------------+
Register the table so that we can join it to the Oracle table.
sqlDF.registerTempTable("sql_userattributes")
Join both tables. We'll insert the results to a Kudu table.
val joinDF = sqlContext.sql("select ora_users.userid,ora_users.name,ora_users.city,ora_users.state,ora_users.zip,ora_users.age,sql_userattributes.height,sql_userattributes.weight,sql_userattributes.occupation from ora_users INNER JOIN sql_userattributes ON ora_users.userid=sql_userattributes.userid")
Register the DataFrame so we can run SQL queries against it.
kuduDF.registerTempTable("kudu_users")
val jdbcURL = "jdbc:sqlserver://192.168.56.103;databaseName=salesdb;user=sa;password=cloudera"
val connectionProperties = new java.util.Properties()
Create a DataFrame from the SQL Server table.
val sqlDF = sqlContext.read.jdbc(jdbcURL, "userattributes", connectionProperties)
sqlDF.show
+------+------+------+------------------+
|userid|height|weight| occupation|
+------+------+------+------------------+
| 100| 175| 170| Electrician|
| 101| 180| 120| Librarian|
| 102| 180| 215| Data Scientist|
| 103| 178| 132|Software Developer|
+------+------+------+------------------+
Register the DataFrame as a temp table.
sqlDF.registerTempTable("sql_userattributes")
Join both tables. We'll insert the results to an Oracle database.
val joinDF = sqlContext.sql("select
kudu_users.userid,kudu_users.name,kudu_users.city,kudu_users.state,kudu_users.zip,kudu_users.age,sql_userattributes.height,sql_userattributes.weight,sql_userattributes.occupation from kudu_users INNER JOIN sql_userattributes ON kudu_users.userid=sql_userattributes.userid")
Verify that the rows were successfully added to the Oracle database (Figure 6-12).
Spark Streaming and Kudu
There are use cases where you need to ingest data into Kudu in near real time. This is a requirement for Internet of Things (IoT) use cases, for example. In Listing 6-1, we show a sample Spark Streaming application that ingests sensor data from Flume. We perform basic event stream processing to tag the status of each event as NORMAL, WARNING, or CRITICAL depending on the temperature returned by the sensor. The status is then saved in a Kudu table together with the rest of the data.
Users can query the Kudu table as data is being inserted into it. In Chapter 9, we discuss a real-time data visualization tool called Zoomdata. You can use Zoomdata to visualize data stored in Kudu in real time.
After executing sbt package to package the spark application, you can now launch the application using the spark-submit tool that comes with Spark. The parameters are for testing purposes only. Change the parameters based on your data ingestion requirements. Note that I’m using the jar file to include kudu-spark dependency. The parameters localhost and 9999 will be used as the Flume sink destination.
Kudu makes it easier for data scientists to prepare and clean data. Kudu can serve as a fast, highly scalable and mutable feature store for machine learning applications. It also smoothly integrates with Spark SQL and DataFrame API.
Let’s work on an example. We'll use the Heart Disease Data Setxi from the UCI Machine Learning Repository to predict the presence of heart disease. The data was collected by Robert Detrano, MD, PhD, from the VA Medical Center, Long Beach and Cleveland Clinic Foundation. Historically, the Cleveland data set has been the subject of numerous research so we’ll use that data set. The original data set has 76 attributes, but only 14 of them are traditionally used by ML researchers (Table 6-1). We will simply perform binomial classification and determine if the patient has heart disease or not (Listing 6-2). This is the same example we used in Chapter 5, but this time we’ll use Kudu to store our features.
Table 6-1
Cleveland Heart Disease Data Set Attribute Information
Attribute
Description
age
Age
sex
Sex
cp
Chest pain type
trestbps
Resting blood pressure
chol
Serum cholesterol in mg/dl
fbs
Fasting blood sugar > 120 mg/dl
restecg
Resting electrocardiographic results
thalach
Maximum heart rate achieved
exang
Exercise induced angina
oldpeak
ST depression induced by exercise relative to rest
slope
The slope of the peak exercise ST segment
ca
Number of major vessels (0–3) colored by flourosopy
thal
Thalium stress test result
num
The predicted attribute – diagnosis of heart disease
Let’s start. We’ll need to download the file, copy it to HDFS, and create an external table on top of it. We will then copy the data to a Kudu table.
Note that we used the Impala function uuid() to generate a unique primary key for our Kudu table. The Kudu table should now have some data in it. Let’s use the spark-shell to fit our model using Spark MLlib.
val myPipeline = new Pipeline().setStages(Array(myRFclassifier))
val myCV = new CrosValidator()
.setEstimator(myPipeline)
.setEvaluator(myEvaluator)
.setEstimatorParamMaps(myParamGrid)
.setNumFolds(3)
Listing 6-2
Performing binary classifcation using Kudu as a feature store
We can now fit the model.
val myCrossValidatorModel = myCV.fit(trainingData)
Let’s evaluate the model.
val myEvaluatorParamMap = ParamMap(myEvaluator.metricName -> "areaUnderROC")
val aucTrainingData = myEvaluator.evaluate(CrossValidatorPrediction, myEvaluatorParamMap)
You can now make some predictions on our data.
val myCrossValidatorPrediction = myCrossValidatorModel.transform(testData)
We used a very small data set (300+ observations) for our example, but imagine if the data set contains billions of rows. If you need to add or update your training data, you simply run the DML statement against the Kudu table. The ability to update a highly scalable storage engine such as Kudu greatly simplifies data preparation and feature engineering.
Note
Kudu allows up to a maximum of 300 columns per table. HBase is a more appropriate storage engine if you needs to store more than 300 features. HBase tables can contain thousands or millions of columns. The downside in using HBase is that it is not as efficient in handling full-table scans compared to Kudu. There is discussion within the Apache Kudu community to address the 300-column limitation in future versions of Kudu.
Strictly speaking, you can bypass Kudu’s 300-column limit by setting an unsafe flag. For example, if you need the ability to create a Kudu table with 1000 columns, you can start the Kudu master with the following flags: -unlock-unsafe-flags -max-num-columns=1000. This has not been thoroughly tested by the Kudu development team and is therefore not recommended for production use.
Summary
Spark, Impala, and Kudu are perfect together. Spark provides a highly scalable data processing framework, while Kudu provides a highly scalable storage engine. With Impala providing a fast SQL interface, you have everything you need to implement a cost-effective enterprise data management and analytic platform. Of course, you are not limited to these three Apache open source projects. In subsequent chapters, I will cover other open source and commercial applications that can further enhance your big data platform.