© Butch Quinto 2018
Butch QuintoNext-Generation Big Datahttps://doi.org/10.1007/978-1-4842-3147-0_6

6. High Performance Data Processing with Spark and Kudu

Butch Quinto1 
(1)
Plumpton, Victoria, Australia
 

Kudu is just a storage engine. You need a way to get data into it and out. As Cloudera’s default big data processing framework, Spark is the ideal data processing and ingestion tool for Kudu. Not only does Spark provide excellent scalability and performance, Spark SQL and the DataFrame API make it easy to interact with Kudu.

If you are coming from a data warehousing background or if you are familiar with a relational database such as Oracle and SQL Server, you can consider Spark a more powerful and versatile equivalent to procedural extensions to SQL such as PL/SQL and T-SQL.

Spark and Kudu

You use Spark with Kudu using the Data Source API. You can use the —packages option in spark-shell or spark-submit to include kudu-spark dependency. You can also manually download the jar file from central.maven.org and include it in your —jars option. Consult the Apache Spark online documentation for more details on how to use sbt and Maven as your project build tool.

Spark 1.6.x

Use the kudu-spark_2.10 artifact if you are using Spark with Scala 2.10. For example:

spark-shell -packages org.apache.kudu:kudu-spark_2.10:1.1.0
spark-shell -jars kudu-spark_2.10-1.1.0.jar

Spark 2.x

Use the kudu-spark2_2.11 artifact if you are using Spark 2 with Scala 2.11. For example:

spark-shell –-packages org.apache.kudu:kudu-spark2_2.11:1.1.0
spark-shell -jars kudu-spark2_2.11-1.1.0.jar

Kudu Context

You use a Kudu context in order to execute a DML statement against a Kudu table. i You have to specify the Kudu master servers and the port. We only have one Kudu context in our example below. There are usually multiple Kudu masters in a production environment; in that case you have to specify a comma-separated list of all the hostnames of the Kudu masters. I present examples on how to use Spark with Kudu. I also show examples on how to integrate Kudu with other data sources using Spark.

import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("kudumaster01:7051")

We need to create our table before we can start.

impala-shell
CREATE TABLE customers
(
 id BIGINT PRIMARY KEY,
 name STRING,
 age SMALLINT
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU;

Create a case class that will provide a schema to our sample data.

case class CustomerData(id: Long, name: String, age: Short)

Inserting Data

Create some sample data.

val data = Array(CustomerData(101,"Lisa Kim",60), CustomerData(102,"Casey Fernandez",45))
val insertRDD = sc.parallelize(data)
val insertDF = sqlContext.createDataFrame(insertRDD)
insertDF.show
+----------+---------------+---+
|customerid|           name|age|
+----------+---------------+---+
|       101|       Lisa Kim| 60|
|       102|Casey Fernandez| 45|
+----------+---------------+---+

Insert the DataFrame. Note the name of the table. This format is needed if the table was created in Impala. In this example, default is the name of the database and customers is the name of the table. This convention was adopted to prevent table name collision between tables created in Impala and tables created natively using the Kudu API or Spark.

kuduContext.insertRows(insertDF, "impala::default.customers")

Confirm that the data was successfully inserted.

val df = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.customers")).kudu
df.select("id","name","age").show()
+---+---------------+---+
| id|           name|age|
+---+---------------+---+
|102|Casey Fernandez| 45|
|101|       Lisa Kim| 60|
+---+---------------+---+

Updating a Kudu Table

Create an updated data set. Note that we modified the last name and the age.

val data = Array(CustomerData(101,"Lisa Kim",120), CustomerData(102,"Casey Jones",90))
val updateRDD = sc.parallelize(data)
val updateDF = sqlContext.createDataFrame(updateRDD)
updateDF.show
+--+------+---------+
| id|       name|age|
+--+------+---------+
|101|   Lisa Kim|120|
|102|Casey Jones| 90|
+--+------+---------+

Update the table.

kuduContext.updateRows(updateDF, "impala::default.customers");

Confirm that the table was successfully updated.

val df = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.customers")).kudu
df.select("id","name","age").show()
+--+------+---------+
| id|       name|age|
+--+------+---------+
|102|Casey Jones| 90|
|101|   Lisa Kim|120|
+--+------+---------+

Upserting Data

Create some sample data.

val data = Array(CustomerData(101,"Lisa Kim",240), CustomerData(102,"Casey Cullen",90),CustomerData(103,"Byron Miller",25))
val upsertRDD = sc.parallelize(data)
val upsertDF = sqlContext.createDataFrame(upsertRDD)
upsertDF.show
+--+------+----------+
| id|        name|age|
+--+------+----------+
|101|    Lisa Kim|240|
|102|Casey Cullen| 90|
|103|Byron Miller| 25|
+--+------+----------+

Upsert data – update all columns if the primary key exists, and insert rows if it doesn’t.

kuduContext.upsertRows(upsertDF, "impala::default.customers")

Confirm if the data was successfully upserted.

val df = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.customers")).kudu
df.select("id","name","age").show()
+--+------+----------+
| id|        name|age|
+--+------+----------+
|102|Casey Cullen| 90|
|103|Byron Miller| 25|
|101|    Lisa Kim|240|
+--+------+----------+

Deleting Data

Inspect the data in the table.

val df = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.customers")).kudu
df.select("id","name","age").show()
+--+------+----------+
| id|        name|age|
+--+------+----------+
|102|Casey Cullen| 90|
|103|Byron Miller| 25|
|101|    Lisa Kim|240|
+--+------+----------+

Register the table so that we can use the table in our SQL query.

df.registerTempTable("customers")

Delete the data based on our query.

kuduContext.deleteRows(sqlContext.sql("select id from customers where name like 'Casey%'"), "impala::default.customers")

Confirm that the data was successfully deleted.

val df = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.customers")).kudu
df.select("id","name","age").show()
+--+------+----------+
| id|        name|age|
+--+------+----------+
|103|Byron Miller| 25|
|101|    Lisa Kim|240|
+--+------+----------+

Selecting Data

Select the data in the table.

val df = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.customers")).kudu
df.select("id","name","age").show()
+--+------+----------+
| id|        name|age|
+--+------+----------+
|103|Byron Miller| 25|
|101|    Lisa Kim|240|
+--+------+----------+

You can also run your query by registering the table and using it in a SQL query. Note that you should use createOrReplaceTempView if you are using Spark 2.x.

df.registerTempTable("customers")
val df2 = sqlContext.sql("select * from customers where age=240")
df2.show
+--+----+--------+
| id|    name|age|
+--+----+--------+
|101|Lisa Kim|240|
+--+----+--------+

Creating a Kudu Table

This table will not be visible to Impala since this table was created in Spark. You have to create an external table in Impala and refer to this table.

Import org.apache.kudu.client.CreateTableOptions;
kuduContext.createTable("customer2", df.schema, Seq("customerid"), new CreateTableOptions().setRangePartitionColumns(List("customerid").asJava).setNumReplicas(1))

Inserting CSV into Kudu

Let’s insert multiple rows from a CSV file.

val dataRDD = sc.textFile("/sparkdata/customerdata.csv")
val parsedRDD = dataRDD.map{_.split(",")}
case class CustomerData(customerid: Long, name: String, age: Short)
val dataDF = parsedRDD.map{a => CustomerData (a(0).toLong, a(1), a(2).toShort) }.toDF
kuduContext.insertRows(dataDF, "customer2");

Because we created the customer2 table via Spark we only had to specify the table name (customer2) instead of impala::default.customer2.

Inserting CSV into Kudu Using the spark-csv Package

We can also use the spark-csv package to process the sample CSV data. The command below will automatically download the spark-csv dependencies so make sure you have an Internet connection in your cluster node. Note the use of the comma separating the list of packages.

spark-shell -packages com.databricks:spark-csv_2.10:1.5.0,org.apache.kudu:kudu-spark_2.10:1.1.0
val dataDF = sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load(“/sparkdata/customerdata.csv ")
kuduContext.insertRows(dataDF, "customer2");

Because we created the customer2 table via Spark, we only had to specify the table name (customer2) instead of impala::default.customer2.

Insert CSV into Kudu by Programmatically Specifying the Schema

You can use StructType to define a schema for your data set. Programmatically specifying a schema is helpful when the schema cannot be determined ahead of time.

Read a CSV file from HDFS.

val myCSV = sc.textFile("/tmp/mydata/customers.csv")

Map CSV data to an RDD

import org.apache.spark.sql.Row
val myRDD = myCSV.map(_.split(',')).map(e ⇒ Row(r(0).trim.toInt, r(1), r(2).trim.toInt, r(3)))

Create a schema.

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
val mySchema = StructType(Array(
StructField("customerid",IntegerType,false),
StructField("customername",StringType,false),
StructField("age",IntegerType,false),
StructField("city",StringType,false)))
val myDF = sqlContext.createDataFrame(myRDD, mySchema)

Insert DataFrame to Kudu.

kuduContext.insertRows(myDF, "impala::default.customers")

Remember that we created the customers table via Impala. Therefore, we need to use the format impala::database.table when referring to the customers table.

Inserting XML into Kudu Using the spark-xml Package

We’ll create an XML file as a sample data for this example. We need to copy the file to HDFS.

cat users.xml
<userid>100</userid><name>Wendell Ryan</name><city>San Diego</city><state>CA</state><zip>92102</zip>
<userid>101</userid><name>Alicia Thompson</name><city>Berkeley</city><state>CA</state><zip>94705</zip>
<userid>102</userid><name>Felipe Drummond</name><city>Palo Alto</city><state>CA</state><zip>94301</zip>
<userid>103</userid><name>Teresa Levine</name><city>Walnut Creek</city><state>CA</state><zip>94507</zip>
hadoop fs -mkdir /xmldata
hadoop fs -put users.xml /xmldata

We’ll use the spark-xml package to process the sample XML data. This package works similarly to the spark-csv package. The command below will automatically download the spark-xml package so make sure you have an Internet connection in your cluster node. Include the kudu-spark dependency as well.

spark-shell -packages  com.databricks:spark-xml:2.10:0.4.1,org.apache.kudu:kudu-spark_2.10:1.1.0

Create a DataFrame using Spark XML. In this example, we specify the row tag and the path in HDFS where the XML file is located.

val xmlDF = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "user").load("/xmldata/");
xmlDF: org.apache.spark.sql.DataFrame = [city: string, name: string, state: string, userid: bigint, zip: bigint]

Let’s also take a look at the data.

xmlDF.show
+------------+---------------+-----+------+-----+
|        city|           name|state|userid|  zip|
+------------+---------------+-----+------+-----+
|   San Diego|   Wendell Ryan|   CA|   100|92102|
|    Berkeley|Alicia Thompson|   CA|   101|94705|
|   Palo Alto|Felipe Drummond|   CA|   102|94301|
|Walnut Creek|  Teresa Levine|   CA|   103|94507|
+------------+---------------+-----+------+-----+

Let’s inspect the schema.

xmlDF.printSchema
root
 |- age: long (nullable = true)
 |- city: string (nullable = true)
 |- name: string (nullable = true)
 |- state: string (nullable = true)
 |- userid: long (nullable = true)
 |- zip: long (nullable = true)

Now let’s go back to the impala-shell and compare the schema with the structure of the users table. As you can see, the data type of the age and zip columns in the users table is different from the corresponding columns in the data frame. We’ll get an error message if we try to insert this DataFrame into the Kudu table.

describe users;
+--------+---------+---------+-------------+
| name   | type    | comment | primary_key |
+--------+---------+---------+-------------+
| userid | bigint  |         | true        |
| name   | string  |         | false       |
| city   | string  |         | false       |
| state  | string  |         | false       |
| zip    | string  |         | false       |
| age    | tinyint |         | false       |
+--------+---------+---------+-------------+

We’ll need to cast the data types before inserting the DataFrame to the Kudu table. We’re introducing the use of selectExpr method here to convert data types, but another option is to programmatically specify the schema using StructType.

val convertedDF = xmlDF.selectExpr("userid","name","city","state","cast(zip as string) zip","cast(age as tinyint) age");
convertedDF: org.apache.spark.sql.DataFrame = [usersid: bigint, name: string, city: string, state: string, zip: string, age: tinyint]

Create the kudu context and insert the DataFrame to the destination table.

import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("kudumaster01:7051")
kuduContext.insertRows(convertedDF, "impala::default.users")

It looks like the DataFrame was successfully inserted into the Kudu table. Using the impala-shell, check the data in the table to confirm.

select * from users;
+--------+-----------------+--------------+-------+-------+-----+
| userid | name            | city         | state | zip   | age |
+--------+-----------------+--------------+-------+-------+-----+
| 102    | Felipe Drummond | Palo Alto    | CA    | 94301 | 33  |
| 103    | Teresa Levine   | Walnut Creek | CA    | 94507 | 47  |
| 100    | Wendell Ryan    | San Diego    | CA    | 92102 | 24  |
| 101    | Alicia Thompson | Berkeley     | CA    | 94705 | 52  |
+--------+-----------------+--------------+-------+-------+-----+

Inserting JSON into Kudu

We’ll create a JSON file as sample data for this example. Make sure the file is in a folder in HDFS called /jsondata.

cat users.json
{"userid": 200, "name": "Jonathan West", "city":"Frisco", "state":"TX", "zip": "75034", "age":35}
{"userid": 201, "name": "Andrea Foreman", "city":"Dallas", "state":"TX", "zip": "75001", "age":28}
{"userid": 202, "name": "Kirsten Jung", "city":"Plano", "state":"TX", "zip": "75025", "age":69}
{"userid": 203, "name": "Jessica Nguyen", "city":"Allen", "state":"TX", "zip": "75002", "age":52}

Create a DataFrame from the JSON file.

val jsonDF = sqlContext.read.json("/jsondata")
jsonDF: org.apache.spark.sql.DataFrame = [age: bigint, city: string, name: string, state: string, userid: bigint, zip: string]

Check the data.

jsonDF.show
+---+------+--------------+-----+------+-----+
|age|  city|          name|state|userid|  zip|
+---+------+--------------+-----+------+-----+
| 35|Frisco| Jonathan West|   TX|   200|75034|
| 28|Dallas|Andrea Foreman|   TX|   201|75001|
| 69| Plano|  Kirsten Jung|   TX|   202|75025|
| 52| Allen|Jessica Nguyen|   TX|   203|75002|
+---+------+--------------+-----+------+-----+

Check the schema.

jsonDF.printSchema
root
 |- age: long (nullable = true)
 |- city: string (nullable = true)
 |- name: string (nullable = true)
 |- state: string (nullable = true)
 |- userid: long (nullable = true)
 |- zip: string (nullable = true)

Convert the data type of the age column to tinyint to match the table’s data type.

val convertedDF = jsonDF.selectExpr("userid","name","city","state","zip","cast(age as tinyint) age");
convertedDF: org.apache.spark.sql.DataFrame = [userid: bigint, name: string, city: string, state: string, zip: string, age: tinyint]

Create the kudu context and insert the DataFrame to the destination table.

import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("kudumaster01:7051")
kuduContext.insertRows(convertedDF, "impala::default.users")

Using the impala-shell, check if the rows were successfully inserted.

select * from users order by userid;
+--------+-----------------+--------------+-------+-------+-----+
| userid | name            | city         | state | zip   | age |
+--------+-----------------+--------------+-------+-------+-----+
| 100    | Wendell Ryan    | San Diego    | CA    | 92102 | 24  |
| 101    | Alicia Thompson | Berkeley     | CA    | 94705 | 52  |
| 102    | Felipe Drummond | Palo Alto    | CA    | 94301 | 33  |
| 103    | Teresa Levine   | Walnut Creek | CA    | 94507 | 47  |
| 200    | Jonathan West   | Frisco       | TX    | 75034 | 35  |
| 201    | Andrea Foreman  | Dallas       | TX    | 75001 | 28  |
| 202    | Kirsten Jung    | Plano        | TX    | 75025 | 69  |
| 203    | Jessica Nguyen  | Allen        | TX    | 75002 | 52  |
+--------+-----------------+--------------+-------+-------+-----+

Inserting from MySQL into Kudu

Let’s populate the users table in MySQL with some test data. Make sure the users table exists in the salesdb database. We will insert this data to a table in Kudu.

mysql -u root –p mypassword
use salesdb;
describe users;
+--------+--------------+------+-----+---------+-------+
| Field  | Type         | Null | Key | Default | Extra |
+--------+--------------+------+-----+---------+-------+
| userid | bigint(20)   | YES  |     | NULL    |       |
| name   | varchar(100) | YES  |     | NULL    |       |
| city   | varchar(100) | YES  |     | NULL    |       |
| state  | char(3)      | YES  |     | NULL    |       |
| zip    | char(5)      | YES  |     | NULL    |       |
| age    | tinyint(4)   | YES  |     | NULL    |       |
+--------+--------------+------+-----+---------+-------+
select * from users;
Empty set (0.00 sec)
insert into users values (300,'Fred Stevens','Torrance','CA',90503,23);
insert into users values (301,'Nancy Gibbs','Valencia','CA',91354,49);
insert into users values (302,'Randy Park','Manhattan Beach','CA',90267,21);
insert into users values (303,'Victoria Loma','Rolling Hills','CA',90274,75);
 select * from users;
+--------+---------------+-----------------+-------+-------+------+
| userid | name          | city            | state | zip   | age  |
+--------+---------------+-----------------+-------+-------+------+
|    300 | Fred Stevens  | Torrance        | CA    | 90503 |   23 |
|    301 | Nancy Gibbs   | Valencia        | CA    | 91354 |   49 |
|    302 | Randy Park    | Manhattan Beach | CA    | 90267 |   21 |
|    303 | Victoria Loma | Rolling Hills   | CA    | 90274 |   75 |
+--------+---------------+-----------------+-------+-------+------+

Note

In some versions of Spark -jars does not add the JAR in the driver’s class path. ii It is recommended that you include the JDBC driver in your -jars and the Spark classpath. iii

Start the spark-shell. Take note that I had to include the MySQL driver as a parameter in both the –driver-class-path and –jars.

spark-shell -packages org.apache.kudu:kudu-spark_2.10:1.1.0 -driver-class-path mysql-connector-java-5.1.40-bin.jar -jars mysql-connector-java-5.1.40-bin.jar

Let’s set up the jdbc url and connection properties.

val jdbcURL = s"jdbc:mysql://10.0.1.101:3306/salesdb?user=root&password=cloudera"
val connectionProperties = new java.util.Properties()

We can create a DataFrame from an entire table.

val mysqlDF = sqlContext.read.jdbc(jdbcURL, "users", connectionProperties)
mysqlDF.show
+------+-------------+---------------+-----+-----+---+
|userid|         name|           city|state|  zip|age|
+------+-------------+---------------+-----+-----+---+
|   300| Fred Stevens|       Torrance|   CA|90503| 23|
|   301|  Nancy Gibbs|       Valencia|   CA|91354| 49|
|   302|   Randy Park|Manhattan Beach|   CA|90267| 21|
|   303|Victoria Loma|  Rolling Hills|   CA|90274| 75|
+------+-------------+---------------+-----+-----+---+

Let’s take advantage of pushdown optimization, running the query in the database and only returning the required results.

val mysqlDF = sqlContext.read.jdbc(jdbcURL, "users", connectionProperties).select("userid", "city", "state","age").where("age < 25")
 mysqlDF.show
+------+---------------+-----+---+
|userid|           city|state|age|
+------+---------------+-----+---+
|   300|       Torrance|   CA| 23|
|   302|Manhattan Beach|   CA| 21|
+------+---------------+-----+---+

Let's specify an entire query. This is a more convenient and flexible method. Also, unlike the previous method, we are not required to specify the columns in our SELECT list if the column is specified in the WHERE clause.

val query = "(SELECT userid,name FROM users WHERE city IN ('Torrance','Rolling Hills')) as users"  
val mysqlDF = sqlContext.read.jdbc(jdbcURL, query, connectionProperties)
 mysqlDF.show
+------+-------------+
|userid|         name|
+------+-------------+
|   300| Fred Stevens|
|   303|Victoria Loma|
+------+-------------+

We’ve just tried different ways on how to select data from a MySQL table.

Let’ just go ahead and insert the whole table to Kudu.

val mysqlDF = sqlContext.read.jdbc(jdbcURL, "users", connectionProperties)
mysqlDF: org.apache.spark.sql.DataFrame = [userid: bigint, name: string, city: string, state: string, zip: string, age: int]
mysqlDF.show
+------+-------------+---------------+-----+-----+---+
|userid|         name|           city|state|  zip|age|
+------+-------------+---------------+-----+-----+---+
|   300| Fred Stevens|       Torrance|   CA|90503| 23|
|   301|  Nancy Gibbs|       Valencia|   CA|91354| 49|
|   302|   Randy Park|Manhattan Beach|   CA|90267| 21|
|   303|Victoria Loma|  Rolling Hills|   CA|90274| 75|
+------+-------------+---------------+-----+-----+---+

Verify the schema.

mysqlDF.printSchema
root
 |- userid: long (nullable = true)
 |- name: string (nullable = true)
 |- city: string (nullable = true)
 |- state: string (nullable = true)
 |- zip: string (nullable = true)
 |- age: integer (nullable = true)

But first let’s convert age from integer to TINYINT. Otherwise you won’t be able to insert this DataFrame into Kudu. Again, we could’ve just defined a schema using StructType.

val convertedDF = mysqlDF.selectExpr("userid","name","city","state","zip","cast(age as tinyint) age");
convertedDF: org.apache.spark.sql.DataFrame = [userid: bigint, name: string, city: string, state: string, zip: string, age: tinyint]

As you can see, the data type of the age column is now TINYINT. Let’s go ahead and insert the data to Kudu.

import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("kudumaster01:7051")
kuduContext.insertRows(convertedDF, "impala::default.users")

Now go to impala-shell and check if the data was successfully inserted.

select * from users order by userid
+--------+-----------------+-----------------+-------+-------+-----+
| userid | name            | city            | state | zip   | age |
+--------+-----------------+-----------------+-------+-------+-----+
| 100    | Wendell Ryan    | San Diego       | CA    | 92102 | 24  |
| 101    | Alicia Thompson | Berkeley        | CA    | 94705 | 52  |
| 102    | Felipe Drummond | Palo Alto       | CA    | 94301 | 33  |
| 103    | Teresa Levine   | Walnut Creek    | CA    | 94507 | 47  |
| 200    | Jonathan West   | Frisco          | TX    | 75034 | 35  |
| 201    | Andrea Foreman  | Dallas          | TX    | 75001 | 28  |
| 202    | Kirsten Jung    | Plano           | TX    | 75025 | 69  |
| 203    | Jessica Nguyen  | Allen           | TX    | 75002 | 52  |
| 300    | Fred Stevens    | Torrance        | CA    | 90503 | 23  |
| 301    | Nancy Gibbs     | Valencia        | CA    | 91354 | 49  |
| 302    | Randy Park      | Manhattan Beach | CA    | 90267 | 21  |
| 303    | Victoria Loma   | Rolling Hills   | CA    | 90274 | 75  |
+--------+-----------------+-----------------+-------+-------+-----+

Alternatively, you can also check using Spark.

import org.apache.kudu.spark.kudu._
val kuduDF = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.users")).kudu
kuduDF.select("userid","name","city","state","zip","age").sort($"userid".asc).show()
+------+---------------+---------------+-----+-----+---+
|userid|           name|           city|state|  zip|age|
+------+---------------+---------------+-----+-----+---+
|   100|   Wendell Ryan|      San Diego|   CA|92102| 24|
|   101|Alicia Thompson|       Berkeley|   CA|94705| 52|
|   102|Felipe Drummond|      Palo Alto|   CA|94301| 33|
|   103|  Teresa Levine|   Walnut Creek|   CA|94507| 47|
|   200|  Jonathan West|         Frisco|   TX|75034| 35|
|   201| Andrea Foreman|         Dallas|   TX|75001| 28|
|   202|   Kirsten Jung|          Plano|   TX|75025| 69|
|   203| Jessica Nguyen|          Allen|   TX|75002| 52|
|   300|   Fred Stevens|       Torrance|   CA|90503| 23|
|   301|    Nancy Gibbs|       Valencia|   CA|91354| 49|
|   302|     Randy Park|Manhattan Beach|   CA|90267| 21|
|   303|  Victoria Loma|  Rolling Hills|   CA|90274| 75|
+------+---------------+---------------+-----+-----+---+

Inserting from SQL Server into Kudu

The first thing you need to do is download the Microsoft JDBC driver for SQL Server. You can download the JDBC driver here: https://docs.microsoft.com/en-us/sql/connect/jdbc/microsoft-jdbc-driver-for-sql-server .

You should see a page similar to the one in Figure 6-1. Click the “Download JDBC Driver” link.
../images/456459_1_En_6_Chapter/456459_1_En_6_Fig1_HTML.jpg
Figure 6-1

Microsoft JDBC Driver for SQL Server

Choose the language and click “Download.”

Untar and unzip the Tarball. Choose the driver based on the version of your JRE.

tar zxvf sqljdbc_6.0.8112.100_enu.tar.gz
sqljdbc_6.0/enu/auth/x64/sqljdbc_auth.dll
sqljdbc_6.0/enu/auth/x86/sqljdbc_auth.dll
sqljdbc_6.0/enu/install.txt
sqljdbc_6.0/enu/jre7/sqljdbc41.jar
sqljdbc_6.0/enu/jre8/sqljdbc42.jar
sqljdbc_6.0/enu/license.txt
sqljdbc_6.0/enu/release.txt
sqljdbc_6.0/enu/samples/adaptive/executeStoredProcedure.java
sqljdbc_6.0/enu/samples/adaptive/readLargeData.java
sqljdbc_6.0/enu/samples/adaptive/updateLargeData.java
sqljdbc_6.0/enu/samples/alwaysencrypted/AlwaysEncrypted.java
sqljdbc_6.0/enu/samples/connections/connectDS.java
sqljdbc_6.0/enu/samples/connections/connectURL.java
sqljdbc_6.0/enu/samples/datatypes/basicDT.java
sqljdbc_6.0/enu/samples/datatypes/sqlxmlExample.java
sqljdbc_6.0/enu/samples/resultsets/cacheRS.java
sqljdbc_6.0/enu/samples/resultsets/retrieveRS.java
sqljdbc_6.0/enu/samples/resultsets/updateRS.java
sqljdbc_6.0/enu/samples/sparse/SparseColumns.java
sqljdbc_6.0/enu/xa/x64/sqljdbc_xa.dll
sqljdbc_6.0/enu/xa/x86/sqljdbc_xa.dll
sqljdbc_6.0/enu/xa/xa_install.sql
I’m going to use SQL Server 2016 throughout this book. You will also need to install SQL Server Management Studio separately (Figure 6-2).
../images/456459_1_En_6_Chapter/456459_1_En_6_Fig2_HTML.jpg
Figure 6-2

SQL Server Management Studio

We’ll create the salesdb database and the users table. In the Object Explorer, right-click the Databases node and click “New Databases” (Figure 6-3). You will be shown a window where you can specify the database name and other database configuration options. Enter a database name “salesdb” and click OK. For testing purposes we’ll leave the other options with the default values.
../images/456459_1_En_6_Chapter/456459_1_En_6_Fig3_HTML.jpg
Figure 6-3

Create new database

Expand the salesdb node. Right-click “Tables,” click “New,” then “Table.” Fill in the column names and data types. To make it easier for you to follow the examples in the book, make sure you keep the column names and data types the same as the MySQL and Kudu tables. Click the “Save” icon near the upper-right corner of the window, and then enter the table name “users” (Figure 6-4).
../images/456459_1_En_6_Chapter/456459_1_En_6_Fig4_HTML.jpg
Figure 6-4

Create new table

Let’s insert some test data into the users table we just created. Click “New Query” button on the standard toolbar to open a new editor window (Figure 6-5).
../images/456459_1_En_6_Chapter/456459_1_En_6_Fig5_HTML.jpg
Figure 6-5

Insert test data

We’re going to copy these four rows from SQL Server to Kudu.

Start the spark-shell. Don’t forget to pass the SQL Server driver as a parameter in both the -driver-class-path and -jars.

spark-shell -packages org.apache.kudu:kudu-spark_2.10:1.1.0 -driver-class-path sqljdbc41.jar -jars sqljdbc41.jar

Let’s set up the jdbc url and connection properties.

val jdbcURL = "jdbc:sqlserver://192.168.56.102;databaseName=salesdb;user=salesuser;password=salespassword"
val connectionProperties = new java.util.Properties()

We can create a DataFrame from an entire table.

val sqlDF = sqlContext.read.jdbc(jdbcURL, "users", connectionProperties)
sqlDF.show
+------+-------------+--------+-----+-----+---+
|userid|         name|    city|state|  zip|age|
+------+-------------+--------+-----+-----+---+
|   500|  Eric Steele| Seattle|  WA |98109| 91|
|   501|Brian Ambrose|Portland|  OR |97035| 53|
|   502|  Tim Parsons|  Tucson|  AZ |85704| 49|
|   503|   Lee Greene|   Miami|  FL |33018| 30|
+------+-------------+--------+-----+-----+---+

Let’s take advantage of pushdown optimization to run the query in the database and only return the results.

val sqlDF = sqlContext.read.jdbc(jdbcURL, "users", connectionProperties).select("userid", "city", "state","age").where("age < 50")
sqlDF.show
+------+------+-----+---+
|userid|  city|state|age|
+------+------+-----+---+
|   502|Tucson|  AZ | 49|
|   503| Miami|  FL | 30|
+------+------+-----+---+

We can specify an entire query.

val query = "(SELECT userid,name FROM users WHERE city IN ('Seattle','Portland')) as users"  
val sqlDF = sqlContext.read.jdbc(jdbcURL, query, connectionProperties)
sqlDF.show
+------+-------------+
|userid|         name|
+------+-------------+
|   500|  Eric Steele|
|   501|Brian Ambrose|
+------+-------------+

Let’s just go ahead and insert the entire table to Kudu.

val sqlDF = sqlContext.read.jdbc(jdbcURL, "users", connectionProperties)
sqlDF.show
+------+-------------+--------+-----+-----+---+
|userid|         name|    city|state|  zip|age|
+------+-------------+--------+-----+-----+---+
|   500|  Eric Steele| Seattle|  WA |98109| 91|
|   501|Brian Ambrose|Portland|  OR |97035| 53|
|   502|  Tim Parsons|  Tucson|  AZ |85704| 49|
|   503|   Lee Greene|   Miami|  FL |33018| 30|
+------+-------------+--------+-----+-----+---+

Checking the schema, it looks like the age was converted into an integer.

sqlDF.printSchema
root
 |- userid: long (nullable = true)
 |- name: string (nullable = true)
 |- city: string (nullable = true)
 |- state: string (nullable = true)
 |- zip: string (nullable = true)
 |- age: integer (nullable = true)

We need to convert age from integer to TINYINT. Otherwise you won’t be able to insert this DataFrame into kudu.

val convertedDF = sqlDF.selectExpr("userid","name","city","state","zip","cast(age as tinyint) age");
convertedDF: org.apache.spark.sql.DataFrame = [userid: bigint, name: string, city: string, state: string, zip: string, age: tinyint]

As you can see, the data type of the age column is now TINYINT. Let’s go ahead and insert the DataFrame to Kudu.

import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("kudumaster01:7051")
kuduContext.insertRows(convertedDF, "impala::default.users")

Let’s now go to impala-shell and confirm if the data was successfully inserted.

select * from users order by userid
+--------+-------------------+-----------------+-------+-------+-----+
| userid | name              | city            | state | zip   | age |
+--------+-------------------+-----------------+-------+-------+-----+
| 100    | Wendell Ryan      | San Diego       | CA    | 92102 | 24  |
| 101    | Alicia Thompson   | Berkeley        | CA    | 94705 | 52  |
| 102    | Felipe Drummond   | Palo Alto       | CA    | 94301 | 33  |
| 103    | Teresa Levine     | Walnut Creek    | CA    | 94507 | 47  |
| 200    | Jonathan West     | Frisco          | TX    | 75034 | 35  |
| 201    | Andrea Foreman    | Dallas          | TX    | 75001 | 28  |
| 202    | Kirsten Jung      | Plano           | TX    | 75025 | 69  |
| 203    | Jessica Nguyen    | Allen           | TX    | 75002 | 52  |
| 300    | Fred Stevens      | Torrance        | CA    | 90503 | 23  |
| 301    | Nancy Gibbs       | Valencia        | CA    | 91354 | 49  |
| 302    | Randy Park        | Manhattan Beach | CA    | 90267 | 21  |
| 303    | Victoria Loma     | Rolling Hills   | CA    | 90274 | 75  |
| 400    | Patrick Montalban | Los Angeles     | CA    | 90010 | 71  |
| 401    | Jillian Collins   | Santa Monica    | CA    | 90402 | 45  |
| 402    | Robert Sarkisian  | Glendale        | CA    | 91204 | 29  |
| 403    | Warren Porcaro    | Burbank         | CA    | 91523 | 62  |
| 500    | Eric Steele       | Seattle         | WA    | 98109 | 91  |
| 501    | Brian Ambrose     | Portland        | OR    | 97035 | 53  |
| 502    | Tim Parsons       | Tucson          | AZ    | 85704 | 49  |
| 503    | Lee Greene        | Miami           | FL    | 33018 | 30  |
+--------+-------------------+-----------------+-------+-------+-----+

Alternatively, you can also check using Spark.

import org.apache.kudu.spark.kudu._
val kuduDF = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.users")).kudu
kuduDF.select("userid","name","city","state","zip","age").sort($"userid".asc).show()
+------+-----------------+---------------+-----+-----+---+
|userid|             name|           city|state|  zip|age|
+------+-----------------+---------------+-----+-----+---+
|   100|     Wendell Ryan|      San Diego|   CA|92102| 24|
|   101|  Alicia Thompson|       Berkeley|   CA|94705| 52|
|   102|  Felipe Drummond|      Palo Alto|   CA|94301| 33|
|   103|    Teresa Levine|   Walnut Creek|   CA|94507| 47|
|   200|    Jonathan West|         Frisco|   TX|75034| 35|
|   201|   Andrea Foreman|         Dallas|   TX|75001| 28|
|   202|     Kirsten Jung|          Plano|   TX|75025| 69|
|   203|   Jessica Nguyen|          Allen|   TX|75002| 52|
|   300|     Fred Stevens|       Torrance|   CA|90503| 23|
|   301|      Nancy Gibbs|       Valencia|   CA|91354| 49|
|   302|       Randy Park|Manhattan Beach|   CA|90267| 21|
|   303|    Victoria Loma|  Rolling Hills|   CA|90274| 75|
|   400|Patrick Montalban|    Los Angeles|   CA|90010| 71|
|   401|  Jillian Collins|   Santa Monica|   CA|90402| 45|
|   402| Robert Sarkisian|       Glendale|   CA|91204| 29|
|   403|   Warren Porcaro|        Burbank|   CA|91523| 62|
|   500|      Eric Steele|        Seattle|  WA |98109| 91|
|   501|    Brian Ambrose|       Portland|  OR |97035| 53|
|   502|      Tim Parsons|         Tucson|  AZ |85704| 49|
|   503|       Lee Greene|          Miami|  FL |33018| 30|
+------+-----------------+---------------+-----+-----+---+

Inserting from HBase into Kudu

There are several ways to transfer data from HBase to Kudu. We can use the HBase client API. There’s the Spark-HBase connector developed by Hortonworks. iv Astro, developed by Huwawei, provides an SQL layer HBase using Spark SQL. v The SparkOnHBase project from Cloudera was integrated into HBase recently, but it will probably take time before it makes it into a release of HBase. vi

We’ll use the most straightforward way by using JDBC. This might not be the fastest way to move data from HBase to Kudu, but it should be adequate for most tasks. We’ll create a Hive table on top of the HBase table, and then we’ll create a Spark DataFrame using JDBC via Impala. Once we have the DataFrame, we can easily insert it to Kudu.

The first thing we need to do is download the Impala JDBC driver. Point your browser to https://www.cloudera.com/downloads.html (Figure 6-6).
../images/456459_1_En_6_Chapter/456459_1_En_6_Fig6_HTML.jpg
Figure 6-6

Cloudera Enterprise download page

Clicking the Impala JDBC Driver Downloads link near the bottom right of the page will bring you to the download page of the latest version of the JDBC driver for Impala. Download and unzip the files.

ls -l
-rw-r-r- 1 hadoop hadoop  693530 Mar  9 10:04 Cloudera-JDBC-Driver-for-Impala-Install-Guide.pdf
-rw-r-r- 1 hadoop hadoop   43600 Mar  8 11:11 Cloudera-JDBC-Driver-for-Impala-Release-Notes.pdf
-rw-r-r- 1 hadoop hadoop   46725 Mar  4 13:12 commons-codec-1.3.jar
-rw-r-r- 1 hadoop hadoop   60686 Mar  4 13:12 commons-logging-1.1.1.jar
-rw-r-r- 1 hadoop hadoop 7670596 Mar  4 13:16 hive_metastore.jar
-rw-r-r- 1 hadoop hadoop  596600 Mar  4 13:16 hive_service.jar
-rw-r-r- 1 hadoop hadoop  352585 Mar  4 13:12 httpclient-4.1.3.jar
-rw-r-r- 1 hadoop hadoop  181201 Mar  4 13:12 httpcore-4.1.3.jar
-rw-r-r- 1 hadoop hadoop 1562600 Mar  4 13:19 ImpalaJDBC41.jar
-rw-r-r- 1 hadoop hadoop  275186 Mar  4 13:12 libfb303-0.9.0.jar
-rw-r-r- 1 hadoop hadoop  347531 Mar  4 13:12 libthrift-0.9.0.jar
-rw-r-r- 1 hadoop hadoop  367444 Mar  4 13:12 log4j-1.2.14.jar
-rw-r-r- 1 hadoop hadoop  294796 Mar  4 13:16 ql.jar
-rw-r-r- 1 hadoop hadoop   23671 Mar  4 13:12 slf4j-api-1.5.11.jar
-rw-r-r- 1 hadoop hadoop    9693 Mar  4 13:12 slf4j-log4j12-1.5.11.jar
-rw-r-r- 1 hadoop hadoop 1307923 Mar  4 13:16 TCLIServiceClient.jar
-rw-r-r- 1 hadoop hadoop  792964 Mar  4 13:12 zookeeper-3.4.6.jar

The first thing we need to do is to start “hbase shell” and create the HBase table. We also need to add test data to our HBase table. Consult the Apache HBase Reference Guide online if you are unfamiliar with the HBase commands.

hbase shell
create 'hbase_users', 'cf1'
put 'hbase_users','400','cf1:name', 'Patrick Montalban'
put 'hbase_users','400','cf1:city', 'Los Angeles'
put 'hbase_users','400','cf1:state', 'CA'
put 'hbase_users','400','cf1:zip', '90010'
put 'hbase_users','400','cf1:age', '71'
put 'hbase_users','401','cf1:name', 'Jillian Collins'
put 'hbase_users','401','cf1:city', 'Santa Monica'
put 'hbase_users','401','cf1:state', 'CA'
put 'hbase_users','401','cf1:zip', '90402'
put 'hbase_users','401','cf1:age', '45'
put 'hbase_users','402','cf1:name', 'Robert Sarkisian'
put 'hbase_users','402','cf1:city', 'Glendale'
put 'hbase_users','402','cf1:state', 'CA'
put 'hbase_users','402','cf1:zip', '91204'
put 'hbase_users','402','cf1:age', '29'
put 'hbase_users','403','cf1:name', 'Warren Porcaro'
put 'hbase_users','403','cf1:city', 'Burbank'
put 'hbase_users','403','cf1:state', 'CA'
put 'hbase_users','403','cf1:zip', '91523'
put 'hbase_users','403','cf1:age', '62'

Create the Hive external table on top of the HBase table.

create external table hbase_users
(userid bigint,
name string,
city string,
state string,
zip string,
age tinyint)
stored by
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with
SERDEPROPERTIES ('hbase.columns.mapping'=':key, cf1:name, cf1:city, cf1:state, cf1:zip, cf1:age')
TBLPROPERTIES ('hbase.table.name'='hbase_users');

Using the impala-shell, verify that you can see the Hive external table.

show tables;
+-----------+
| name      |
+-----------+
| customers |
| sample_07 |
| sample_08 |
| users     |
| web_logs  |
+-----------+

It’s not showing up. We need to invalidate the metadata to refresh Impala’s memory.

invalidate metadata;
show tables;
+-------------+
| name        |
+-------------+
| customers   |
| hbase_users |
| sample_07   |
| sample_08   |
| users       |
| web_logs    |
+-------------+
select * from hbase_users;
+--------+-----+--------------+-------------------+-------+-------+
| userid | age | city         | name              | state | zip   |
+--------+-----+--------------+-------------------+-------+-------+
| 400    | 71  | Los Angeles  | Patrick Montalban | CA    | 90010 |
| 401    | 45  | Santa Monica | Jillian Collins   | CA    | 90402 |
| 402    | 29  | Glendale     | Robert Sarkisian  | CA    | 91204 |
| 403    | 62  | Burbank      | Warren Porcaro    | CA    | 91523 |
+--------+-----+--------------+-------------------+-------+-------+

Start the spark-shell.

spark-shell -driver-class-path ImpalaJDBC41.jar -jars ImpalaJDBC41.jar -packages org.apache.kudu:kudu-spark_2.10:1.1.0

Create a DataFrame from the HBase table

val jdbcURL = s"jdbc:impala://10.0.1.101:21050;AuthMech=0"
val connectionProperties = new java.util.Properties()
val hbaseDF = sqlContext.read.jdbc(jdbcURL, "hbase_users", connectionProperties)
hbaseDF: org.apache.spark.sql.DataFrame = [userid: bigint, age: int, city: string, name: string, state: string, zip: string]
hbaseDF.show
+------+---+------------+-----------------+-----+-----+
|userid|age|        city|             name|state|  zip|
+------+---+------------+-----------------+-----+-----+
|   400| 71| Los Angeles|Patrick Montalban|   CA|90010|
|   401| 45|Santa Monica|  Jillian Collins|   CA|90402|
|   402| 29|    Glendale| Robert Sarkisian|   CA|91204|
|   403| 62|     Burbank|   Warren Porcaro|   CA|91523|
+------+---+------------+-----------------+-----+-----+

We still need to cast age to TINYINT before we can insert the data to the Kudu users table. Defining the schema using StructType is an option here.

val convertedDF = hbaseDF.selectExpr("userid","name","city","state","zip","cast(age as tinyint) age");
convertedDF: org.apache.spark.sql.DataFrame = [userid: bigint, name: string, city: string, state: string, zip: string, age: tinyint]

We can now insert the data to Kudu.

import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("kudumaster01:7051")
kuduContext.insertRows(convertedDF, "impala::default.users")

Confirm if the data was successfully inserted.

val kuduDF = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.users")).kudu
kuduDF.select("userid","name","city","state","zip","age").sort($"userid".asc).show()
+------+-----------------+---------------+-----+-----+---+
|userid|             name|           city|state|  zip|age|
+------+-----------------+---------------+-----+-----+---+
|   100|     Wendell Ryan|      San Diego|   CA|92102| 24|
|   101|  Alicia Thompson|       Berkeley|   CA|94705| 52|
|   102|  Felipe Drummond|      Palo Alto|   CA|94301| 33|
|   103|    Teresa Levine|   Walnut Creek|   CA|94507| 47|
|   200|    Jonathan West|         Frisco|   TX|75034| 35|
|   201|   Andrea Foreman|         Dallas|   TX|75001| 28|
|   202|     Kirsten Jung|          Plano|   TX|75025| 69|
|   203|   Jessica Nguyen|          Allen|   TX|75002| 52|
|   300|     Fred Stevens|       Torrance|   CA|90503| 23|
|   301|      Nancy Gibbs|       Valencia|   CA|91354| 49|
|   302|       Randy Park|Manhattan Beach|   CA|90267| 21|
|   303|    Victoria Loma|  Rolling Hills|   CA|90274| 75|
|   400|Patrick Montalban|    Los Angeles|   CA|90010| 71|
|   401|  Jillian Collins|   Santa Monica|   CA|90402| 45|
|   402| Robert Sarkisian|       Glendale|   CA|91204| 29|
|   403|   Warren Porcaro|        Burbank|   CA|91523| 62|
+------+-----------------+---------------+-----+-----+---+

The rows were successfully inserted.

Inserting from Solr into Kudu

As discussed in Chapter 5, you can access Solr from Spark using SolrJ. vii

import java.net.MalformedURLException;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocumentList;
val solr = new HttpSolrServer("http://master02:8983/solr/mycollection");
val query = new SolrQuery();
query.setQuery("*:*");
query.addFilterQuery("userid:3");
query.setFields("userid","name","age","city");
query.setStart(0);    
query.set("defType", "edismax");
val response = solr.query(query);
val results = response.getResults();
println(results);

A much better way to access Solr collections from Spark is by using the spark-solr package. Lucidworks started the spark-solr project to provide Spark-Solr integration. viii Using spark-solr is so much easier and powerful compared to SolrJ, allowing you to create DataFrames from Solr collections and using SparkSQL to interact with them. You can download the jar file from Lucidworks’s website.

Start by importing the jar file from spark-shell.

spark-shell -jars spark-solr-3.0.1-shaded.jar

Specify the collection and connection information.

val myOptions = Map("collection" -> "mycollection","zkhost" -> "{master02:8983/solr}")

Create a DataFrame.

val solrDF = spark.read.format("solr")
  .options(myOptions)
  .load

Insert the data to Kudu.

kuduContext.insertRows(solrDF, "impala::default.users")

Insert from Amazon S3 into Kudu

Amazon S3 is a popular object store frequently used as data store for transient clusters. It’s also a cost-effective storage for backups and cold data. Reading data from S3 is just like reading data from HDFS or any other file system.

Read a CSV file from Amazon S3. Make sure you’ve configured your S3 credentials.

val myCSV = sc.textFile("s3a://mydata/customers.csv")

Map CSV data to an RDD.

import org.apache.spark.sql.Row
val myRDD = myCSV.map(_.split(‘,’)).map(e ⇒ Row(r(0).trim.toInt, r(1), r(2).trim.toInt, r(3)))

Create a schema.

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
val mySchema = StructType(Array(
StructField("customerid",IntegerType,false),
StructField("customername",StringType,false),
StructField("age",IntegerType,false),
StructField("city",StringType,false)))
val myDF = sqlContext.createDataFrame(myRDD, mySchema)

Insert DataFrame to Kudu.

kuduContext.insertRows(myDF, "impala::default.customers")

You’ve successfully inserted data from S3 into Kudu.

We’ve inserted data from different data sources into Kudu. Let’s now insert data from Kudu to different data sources.

Inserting from Kudu into MySQL

Start the spark-shell.

spark-shell -packages org.apache.kudu:kudu-spark_2.10:1.1.0 -driver-class-path mysql-connector-java-5.1.40-bin.jar -jars mysql-connector-java-5.1.40-bin.jar

Connect to the Kudu master and check the data in the users table. We’re going to sync this Kudu table with a MySQL table.

import org.apache.kudu.spark.kudu._
val kuduDF = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.users")).kudu
kuduDF.select("userid","name","city","state","zip","age").sort($"userid".asc).show()
+------+---------------+---------------+-----+-----+---+
|userid|           name|           city|state|  zip|age|
+------+---------------+---------------+-----+-----+---+
|   100|   Wendell Ryan|      San Diego|   CA|92102| 24|
|   101|Alicia Thompson|       Berkeley|   CA|94705| 52|
|   102|Felipe Drummond|      Palo Alto|   CA|94301| 33|
|   103|  Teresa Levine|   Walnut Creek|   CA|94507| 47|
|   200|  Jonathan West|         Frisco|   TX|75034| 35|
|   201| Andrea Foreman|         Dallas|   TX|75001| 28|
|   202|   Kirsten Jung|          Plano|   TX|75025| 69|
|   203| Jessica Nguyen|          Allen|   TX|75002| 52|
|   300|   Fred Stevens|       Torrance|   CA|90503| 23|
|   301|    Nancy Gibbs|       Valencia|   CA|91354| 49|
|   302|     Randy Park|Manhattan Beach|   CA|90267| 21|
|   303|  Victoria Loma|  Rolling Hills|   CA|90274| 75|
+------+---------------+---------------+-----+-----+---+

Register the DataFrame so we can run SQL queries against it.

kuduDF.registerTempTable("kudu_users")

Set up the JDBC URL and connection properties of the MySQL database.

val jdbcURL = s"jdbc:mysql://10.0.1.101:3306/salesdb?user=root&password=cloudera"
val connectionProperties = new java.util.Properties()
import org.apache.spark.sql.SaveMode
Check the data in the MySQL table using the MySQL command-line tool.
select * from users;
+--------+---------------+-----------------+-------+-------+------+
| userid | name          | city            | state | zip   | age  |
+--------+---------------+-----------------+-------+-------+------+
|    300 | Fred Stevens  | Torrance        | CA    | 90503 |   23 |
|    301 | Nancy Gibbs   | Valencia        | CA    | 91354 |   49 |
|    302 | Randy Park    | Manhattan Beach | CA    | 90267 |   21 |
|    303 | Victoria Loma | Rolling Hills   | CA    | 90274 |   75 |
+--------+---------------+-----------------+-------+-------+------+

Let’s sync both tables by inserting all the rows with userid < 300 from Kudu to MySQL.

sqlContext.sql("select * from kudu_users where userid < 300").write.mode(SaveMode.Append).jdbc(jdbcUrl, "users", connectionProperties)

Check the MySQL table again and verify that the rows were added.

select * from users order by userid;
+--------+-----------------+-----------------+-------+-------+------+
| userid | name            | city            | state | zip   | age  |
+--------+-----------------+-----------------+-------+-------+------+
|    100 | Wendell Ryan    | San Diego       | CA    | 92102 |   24 |
|    101 | Alicia Thompson | Berkeley        | CA    | 94705 |   52 |
|    102 | Felipe Drummond | Palo Alto       | CA    | 94301 |   33 |
|    103 | Teresa Levine   | Walnut Creek    | CA    | 94507 |   47 |
|    200 | Jonathan West   | Frisco          | TX    | 75034 |   35 |
|    201 | Andrea Foreman  | Dallas          | TX    | 75001 |   28 |
|    202 | Kirsten Jung    | Plano           | TX    | 75025 |   69 |
|    203 | Jessica Nguyen  | Allen           | TX    | 75002 |   52 |
|    300 | Fred Stevens    | Torrance        | CA    | 90503 |   23 |
|    301 | Nancy Gibbs     | Valencia        | CA    | 91354 |   49 |
|    302 | Randy Park      | Manhattan Beach | CA    | 90267 |   21 |
|    303 | Victoria Loma   | Rolling Hills   | CA    | 90274 |   75 |
+--------+-----------------+-----------------+-------+-------+------+

It looks like the rows were successfully inserted.

Inserting from Kudu into SQL Server

Start spark-shell.

spark-shell -packages org.apache.kudu:kudu-spark_2.10:1.1.0 -driver-class-path sqljdbc41.jar -jars sqljdbc41.jar

Create a DataFrame from the users table in the default database.

import org.apache.kudu.spark.kudu._
val kuduDF = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.users")).kudu

Verify the contents of the DataFrame.

kuduDF.select("userid","name","city","state","zip","age").sort($"userid".asc).show()
+------+---------------+------------+-----+-----+---+
|userid|           name|        city|state|  zip|age|
+------+---------------+------------+-----+-----+---+
|   100|   Wendell Ryan|   San Diego|   CA|92102| 24|
|   101|Alicia Thompson|    Berkeley|   CA|94705| 52|
|   102|Felipe Drummond|   Palo Alto|   CA|94301| 33|
|   103|  Teresa Levine|Walnut Creek|   CA|94507| 47|
+------+---------------+------------+-----+-----+---+

Register the DataFrame so we can run SQL queries against it.

kuduDF.registerTempTable("kudu_users")

Set up the JDBC URL and connection properties of the SQL Server database.

val jdbcURL = "jdbc:sqlserver://192.168.56.103;databaseName=salesdb;user=sa;password=cloudera"
val connectionProperties = new java.util.Properties()
import org.apache.spark.sql.SaveMode
To make sure our test is consistent, make sure the users table in SQL Server is empty using SQL Server Management Studio (Figure 6-7).
../images/456459_1_En_6_Chapter/456459_1_En_6_Fig7_HTML.jpg
Figure 6-7

Make sure the table is empty

Insert the data from Kudu to SQL Server.

sqlContext.sql("select * from kudu_users").write.mode(SaveMode.Append).jdbc(jdbcURL, "users", connectionProperties)
Check the SQL Server table again and verify that the rows were added (Figure 6-8).
../images/456459_1_En_6_Chapter/456459_1_En_6_Fig8_HTML.jpg
Figure 6-8

Check the table

Congratulations! The data was successfully inserted.

Inserting from Kudu into Oracle

The first thing we need to do is set up the Oracle environment. We’ll create the user table in an existing pluggable database called EDWPDB. Log in as sysdba and start the instance. Consult the online Oracle documentation if you are unfamiliar with Oracle.

sqlplus / as sysdba
SQL*Plus: Release 12.2.0.1.0 Production on Sat May 6 18:12:45 2017
Copyright (c) 1982, 2016, Oracle.  All rights reserved.
Connected to an idle instance.
SQL> startup
ORACLE instance started.
Total System Global Area 1845493760 bytes
Fixed Size                  8793976 bytes
Variable Size             553648264 bytes
Database Buffers         1275068416 bytes
Redo Buffers                7983104 bytes
Database mounted.
Database opened.
SELECT name, open_mode FROM v$pdbs;
NAME                 OPEN_MODE
-------------------- ---------
PDB$SEED             READ ONLY
ORCLPDB              MOUNTED
EDWPDB               MOUNTED

Open the EDWPDB pluggable database and set it as the current container.

ALTER PLUGGABLE DATABASE EDWPDB OPEN;
SELECT name, open_mode FROM v$pdbs;
NAME                 OPEN_MODE
-------------------- -----
PDB$SEED             READ ONLY
ORCLPDB              MOUNTED
EDWPDB               READ WRITE
ALTER SESSION SET container = EDWPDB;

Create the Oracle table.

CREATE TABLE users (
userid NUMBER(19),
name VARCHAR(50),
city VARCHAR(50),
state VARCHAR (50),
zip VARCHAR(50),
age NUMBER(3));

Start the spark-shell. Don’t forget to include the oracle driver. I’m using ojdbc6.jar in this example.

Note

You might encounter the error “ORA-28040: No matching authentication protocol exception” when connecting to Oracle 12c R2 using the ojdbc6.jar driver. This is most likely caused by a bug in Oracle12c, Bug 14575666. The fix is to set SQLNET.ALLOWED_LOGON_VERSION=8 in the oracle/network/admin/sqlnet.ora file.

spark-shell -packages org.apache.kudu:kudu-spark_2.10:1.1.0 -driver-class-path ojdbc6.jar -jars ojdbc6.jar

Create a DataFrame from the users table in the default database.

import org.apache.kudu.spark.kudu._
val kuduDF = sqlContext.read.options(Map("kudu.master" ->
"kudumaster01:7051","kudu.table" -> "impala::default.users")).kudu

Verify the contents of the DataFrame.

kuduDF.select("userid","name","city","state","zip","age").sort($"userid".asc).show()
+------+---------------+------------+-----+-----+---+
|userid|           name|        city|state|  zip|age|
+------+---------------+------------+-----+-----+---+
|   100|   Wendell Ryan|   San Diego|   CA|92102| 24|
|   101|Alicia Thompson|    Berkeley|   CA|94705| 52|
|   102|Felipe Drummond|   Palo Alto|   CA|94301| 33|
|   103|  Teresa Levine|Walnut Creek|   CA|94507| 47|
+------+---------------+------------+-----+-----+---+

Register the DataFrame so we can run SQL queries against it.

kuduDF.registerTempTable("kudu_users")

Set up the JDBC URL and connection properties of the Oracle database.

val jdbcURL = "jdbc:oracle:thin:sales/cloudera@//192.168.56.30:1521/EDWPDB"
val connectionProperties = new java.util.Properties()
import org.apache.spark.sql.SaveMode
Make sure the users table in Oracle is empty using Oracle SQL Developer (Figure 6-9).
../images/456459_1_En_6_Chapter/456459_1_En_6_Fig9_HTML.jpg
Figure 6-9

Make sure Oracle table is empty

Insert the data from Kudu to Oracle.

sqlContext.sql("select * from kudu_users").write.mode(SaveMode.Append).jdbc(jdbcURL, "users", connectionProperties)
Check the Oracle table again and verify that the rows were added (Figure 6-10).
../images/456459_1_En_6_Chapter/456459_1_En_6_Fig10_HTML.jpg
Figure 6-10

Check Oracle table

Congratulations! You’ve successfully copied rows from Kudu to Oracle.

Inserting from Kudu to HBase

We’re going to insert data HBase via Impala so that we can use SQL. This is not the fastest method to write to HBase. If performance is critical, I suggest you use the saveAsHadoopDataset method or the HBase Java client API to write to HBase. There are various other ways of importing data into HBase. ix

Start the spark-shell and create a data frame from the kudu table.

spark-shell -driver-class-path ImpalaJDBC41.jar -jars ImpalaJDBC41.jar -packages org.apache.kudu:kudu-spark_2.10:1.1.0
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.spark.kudu._
val kuduDF = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.users")).kudu

Verify the contents of the table.

kuduDF.sort($"userid".asc).show()
+------+-----------------+---------------+-----+-----+---+
|userid|             name|           city|state|  zip|age|
+------+-----------------+---------------+-----+-----+---+
|   100|     Wendell Ryan|      San Diego|   CA|92102| 24|
|   101|  Alicia Thompson|       Berkeley|   CA|94705| 52|
|   102|  Felipe Drummond|      Palo Alto|   CA|94301| 33|
|   103|    Teresa Levine|   Walnut Creek|   CA|94507| 47|
|   200|    Jonathan West|         Frisco|   TX|75034| 35|
|   201|   Andrea Foreman|         Dallas|   TX|75001| 28|
|   202|     Kirsten Jung|          Plano|   TX|75025| 69|
|   203|   Jessica Nguyen|          Allen|   TX|75002| 52|
|   300|     Fred Stevens|       Torrance|   CA|90503| 23|
|   301|      Nancy Gibbs|       Valencia|   CA|91354| 49|
|   302|       Randy Park|Manhattan Beach|   CA|90267| 21|
|   303|    Victoria Loma|  Rolling Hills|   CA|90274| 75|
|   400|Patrick Montalban|    Los Angeles|   CA|90010| 71|
|   401|  Jillian Collins|   Santa Monica|   CA|90402| 45|
|   402| Robert Sarkisian|       Glendale|   CA|91204| 29|
|   403|   Warren Porcaro|        Burbank|   CA|91523| 62|
+------+-----------------+---------------+-----+-----+---+

Let’s register the table so we can use it in a query.

kuduDF.registerTempTable("kudu_users")

Using impala-shell, verify the contents of the destination HBase table.

select * from hbase_users order by userid;
+--------+-----+--------------+-------------------+-------+-------+
| userid | age | city         | name              | state | zip   |
+--------+-----+--------------+-------------------+-------+-------+
| 400    | 71  | Los Angeles  | Patrick Montalban | CA    | 90010 |
| 401    | 45  | Santa Monica | Jillian Collins   | CA    | 90402 |
| 402    | 29  | Glendale     | Robert Sarkisian  | CA    | 91204 |
| 403    | 62  | Burbank      | Warren Porcaro    | CA    | 91523 |
+--------+-----+--------------+-------------------+-------+-------+

Go back to the spark-shell and set up the Impala connection.

val jdbcURL = s"jdbc:impala://10.0.1.101:21050;AuthMech=0"
val connectionProperties = new java.util.Properties()

Insert only the selected rows to the destination HBase table.

import org.apache.spark.sql.SaveMode
sqlContext.sql("select * from kudu_users where userid in (300,301,302,303)").write.mode(SaveMode.Append).jdbc(jdbcURL, "hbase_users", connectionProperties)

Go back to impala-shell and confirm that the rows were added to the destination HBase table.

select * from hbase_users order by userid;
+--------+-----+-----------------+-------------------+-------+-------+
| userid | age | city            | name              | state | zip   |
+--------+-----+-----------------+-------------------+-------+-------+
| 300    | 23  | Torrance        | Fred Stevens      | CA    | 90503 |
| 301    | 49  | Valencia        | Nancy Gibbs       | CA    | 91354 |
| 302    | 21  | Manhattan Beach | Randy Park        | CA    | 90267 |
| 303    | 75  | Rolling Hills   | Victoria Loma     | CA    | 90274 |
| 400    | 71  | Los Angeles     | Patrick Montalban | CA    | 90010 |
| 401    | 45  | Santa Monica    | Jillian Collins   | CA    | 90402 |
| 402    | 29  | Glendale        | Robert Sarkisian  | CA    | 91204 |
| 403    | 62  | Burbank         | Warren Porcaro    | CA    | 91523 |
+--------+-----+-----------------+-------------------+-------+-------+

The data was successfully inserted into the HBase table.

Inserting Rows from Kudu to Parquet

Read the table.

spark-shell -packages org.apache.kudu:kudu-spark_2.10:1.1.0
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.spark.kudu._
val df = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.customers")).kudu
df.show
+---+------------+---+
| id|        name|age|
+---+------------+---+
|103|Byron Miller| 25|
|101|    Lisa Kim|240|
+---+------------+---+

Register the table and then create another DataFrame off of the result of a query.

df.registerTempTable("customers")
val df2 = sqlContext.sql("select * from customers where age=240")

Inspect the data.

df2.show
+---+--------+---+
| id|    name|age|
+---+--------+---+
|101|Lisa Kim|240|
+---+--------+---+

Append the DataFrame to the parquet table. You can also overwrite the destination by using the keyword “overwrite” instead of “append.”

df2.write.mode("SaveMode.Append").parquet("/user/hive/warehouse/Mytable")

You will discover that Spark generates dozens or hundreds of small files when writing to HDFS. This is known as the “small file” problem. x This will eventually cause all sorts of performance problems on your cluster. If that happens you might want to use coalesce or repartition to specify how many files to write to HDFS. For example, you might want Spark to write 1 parquet file to HDFS.

df2.coalesce(1).write.mode("SaveMode.Append").parquet("/user/hive/warehouse/Mytable")

Using coalesce and repartition may cause performance issues since you are essentially reducing the degree of parallelism when writing data. Coalesce and repartition also trigger a shuffle that could cause performance issues depending on the amount of data that you’re processing. You need to balance the number of generated files with processing performance. You may still have to perform regular compaction of your parquet tables after a period of time. This is a problem that you will not have with Kudu.

Insert SQL Server and Oracle DataFrames into Kudu

We will join data from SQL and Oracle and insert it into Kudu.

Start the spark-shell. Don’t forget to include the necessary drivers and dependencies.

spark-shell -packages org.apache.kudu:kudu-spark_2.10:1.1.0 -driver-class-path ojdbc6.jar:sqljdbc41.jar -jars ojdbc6.jar,sqljdbc41.jar

Set up the Oracle connection.

val jdbcURL = "jdbc:oracle:thin:sales/cloudera@//192.168.56.30:1521/EDWPDB"
val connectionProperties = new java.util.Properties()

Create a DataFrame from the Oracle table.

val oraDF = sqlContext.read.jdbc(jdbcURL, "users", connectionProperties)
oraDF.show
+------+---------------+------------+-----+-----+---+
|USERID|           NAME|        CITY|STATE|  ZIP|AGE|
+------+---------------+------------+-----+-----+---+
|   102|Felipe Drummond|   Palo Alto|   CA|94301| 33|
|   103|  Teresa Levine|Walnut Creek|   CA|94507| 47|
|   100|   Wendell Ryan|   San Diego|   CA|92102| 24|
|   101|Alicia Thompson|    Berkeley|   CA|94705| 52|
+------+---------------+------------+-----+-----+---+

Register the table so we can run SQL against it.

oraDF.registerTempTable("ora_users")

Set up the SQL Server connection.

val jdbcURL = "jdbc:sqlserver://192.168.56.103;databaseName=salesdb;user=sa;password=cloudera"
val connectionProperties = new java.util.Properties()

Create a DataFrame from the SQL Server table.

val sqlDF = sqlContext.read.jdbc(jdbcURL, "userattributes", connectionProperties)
sqlDF.show
+------+------+------+------------------+
|userid|height|weight|        occupation|
+------+------+------+------------------+
|   100|   175|   170|       Electrician|
|   101|   180|   120|         Librarian|
|   102|   180|   215|    Data Scientist|
|   103|   178|   132|Software Developer|
+------+------+------+------------------+

Register the table so that we can join it to the Oracle table.

sqlDF.registerTempTable("sql_userattributes")

Join both tables. We'll insert the results to a Kudu table.

val joinDF = sqlContext.sql("select ora_users.userid,ora_users.name,ora_users.city,ora_users.state,ora_users.zip,ora_users.age,sql_userattributes.height,sql_userattributes.weight,sql_userattributes.occupation from ora_users  INNER JOIN sql_userattributes ON ora_users.userid=sql_userattributes.userid")
joinDF.show
+------+---------------+------------+-----+-----+---+------+------+-----------+
|userid|           name|        city|state|  zip|age|height|weight| occupation|
+------+---------------+------------+-----+-----+---+------+------+-----------+
|   100|   Wendell Ryan|   San Diego|   CA|92102| 24|   175|   170|Electrician|
|   101|Alicia Thompson|    Berkeley|   CA|94705| 52|   180|   120|  Librarian|
|   102|Felipe Drummond|   Palo Alto|   CA|94301| 33|   180|   215|  Data                                                                     Scientist|
|   103|  Teresa Levine|Walnut Creek|   CA|94507| 47|   178|   132|  Software Developer|
+------+---------------+------------+-----+-----+---+------+------+-----------+

You can also join both DataFrames using this method.

val joinDF2 = oraDF.join(sqlDF,"userid")
joinDF2.show
+------+---------------+------------+-----+-----+---+------+------+------------+
|userid|           NAME|        CITY|STATE|  ZIP|AGE|height|weight|  occupation|
+------+---------------+------------+-----+-----+---+------+------+------------+
|   100|   Wendell Ryan|   San Diego|   CA|92102| 24|   175|   170| Electrician|
|   101|Alicia Thompson|    Berkeley|   CA|94705| 52|   180|   120|   Librarian|
|   102|Felipe Drummond|   Palo Alto|   CA|94301| 33|   180|   215|   Data                                                                       Scientist|
|   103|  Teresa Levine|Walnut Creek|   CA|94507| 47|   178|   132|   Software                                                                       Developer|
+------+---------------+------------+-----+-----+---+------+------+------------+

Create the destination Kudu table in Impala.

impala-shell
create table users2 (
userid BIGINT PRIMARY KEY,
name STRING,
city STRING,
state STRING,
zip STRING,
age STRING,
height STRING,
weight STRING,
occupation STRING
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;

Go back to the spark-shell and set up the Kudu connection

import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("kudumaster01:7051")

Insert the data to Kudu.

kuduContext.insertRows(JoinDF, "impala::default.users2")

Confirm that the data was successfully inserted into the Kudu table.

impala-shell
select * from users2;
+------+---------------+------------+-----+------+---+------+------+----------+
|userid|name           |city        |state|zip   |age|height|weight|occupation|
+------+---------------+------------+-----+------+---+------+------+----------+
|102   |Felipe Drummond|Palo Alto   |CA   |94301 |33 |180   |215   | Data                                                                      Scientist|
|103   |Teresa Levine  |Walnut Creek|CA   |94507 |47 |178   |132   | Software                                                                      Developer|
|100   |Wendell Ryan   |San Diego   |CA   |92102 |24 |175   |170   |Electrician|
|101   |Alicia Thompson|Berkeley    |CA   |94705 |52 |180   |120   |Librarian |
+------+---------------+------------+-----+------+---+------+------+----------+

Looks good to me.

Insert Kudu and SQL Server DataFrames into Oracle

Create the destination table in Oracle using Oracle SQL Developer (Figure 6-11).
../images/456459_1_En_6_Chapter/456459_1_En_6_Fig11_HTML.jpg
Figure 6-11

Create an Oracle table

Start the spark-shell. Don’t forget to include the necessary drivers and dependencies.

spark-shell -packages org.apache.kudu:kudu-spark_2.10:1.1.0 -driver-class-path ojdbc6.jar:sqljdbc41.jar -jars ojdbc6.jar,sqljdbc41.jar

Create a DataFrame from the Kudu users table in the default database.

import org.apache.kudu.spark.kudu._
val kuduDF = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.users")).kudu

Verify the contents of the DataFrame.

kuduDF.select("userid","name","city","state","zip","age").sort($"userid".asc).show()
+------+---------------+------------+-----+-----+---+
|userid|           name|        city|state|  zip|age|
+------+---------------+------------+-----+-----+---+
|   100|   Wendell Ryan|   San Diego|   CA|92102| 24|
|   101|Alicia Thompson|    Berkeley|   CA|94705| 52|
|   102|Felipe Drummond|   Palo Alto|   CA|94301| 33|
|   103|  Teresa Levine|Walnut Creek|   CA|94507| 47|
+------+---------------+------------+-----+-----+---+

Register the DataFrame so we can run SQL queries against it.

kuduDF.registerTempTable("kudu_users")
val jdbcURL = "jdbc:sqlserver://192.168.56.103;databaseName=salesdb;user=sa;password=cloudera"
val connectionProperties = new java.util.Properties()

Create a DataFrame from the SQL Server table.

val sqlDF = sqlContext.read.jdbc(jdbcURL, "userattributes", connectionProperties)
sqlDF.show
+------+------+------+------------------+
|userid|height|weight|        occupation|
+------+------+------+------------------+
|   100|   175|   170|       Electrician|
|   101|   180|   120|         Librarian|
|   102|   180|   215|    Data Scientist|
|   103|   178|   132|Software Developer|
+------+------+------+------------------+

Register the DataFrame as a temp table.

sqlDF.registerTempTable("sql_userattributes")

Join both tables. We'll insert the results to an Oracle database.

val joinDF = sqlContext.sql("select
kudu_users.userid,kudu_users.name,kudu_users.city,kudu_users.state,kudu_users.zip,kudu_users.age,sql_userattributes.height,sql_userattributes.weight,sql_userattributes.occupation from kudu_users  INNER JOIN sql_userattributes ON kudu_users.userid=sql_userattributes.userid")
joinDF.show
+------+---------------+------------+-----+-----+---+------+------+-----------+
|userid|           name|        city|state|  zip|age|height|weight|occupation |
+------+---------------+------------+-----+-----+---+------+------+-----------+
|   100|   Wendell Ryan|   San Diego|   CA|92102| 24|   175|   170|Electrician|
|   101|Alicia Thompson|    Berkeley|   CA|94705| 52|   180|   120|Librarian  |
|   102|Felipe Drummond|   Palo Alto|   CA|94301| 33|   180|   215|  Data                                                                      Scientist|
|   103|  Teresa Levine|Walnut Creek|   CA|94507| 47|   178|   132|Software                                                                      Developer|
+------+---------------+------------+-----+-----+---+------+------+-----------+

You can achieve the same result using this method.

val joinDF2 = kuduDF.join(sqlDF,"userid")
joinDF2.show
+------+---------------+------------+-----+-----+---+------+------+-----------+
|userid|           name|        city|state|  zip|age|height|weight| occupation|
+------+---------------+------------+-----+-----+---+------+------+-----------+
|   100|   Wendell Ryan|   San Diego|   CA|92102| 24|   175|   170|Electrician|
|   101|Alicia Thompson|    Berkeley|   CA|94705| 52|   180|   120|  Librarian|
|   102|Felipe Drummond|   Palo Alto|   CA|94301| 33|   180|   215|  Data                                                                      Scientist|
|   103|  Teresa Levine|Walnut Creek|   CA|94507| 47|   178|   132|  Software                                                                      Developer|
+------+---------------+------------+-----+-----+---+------+------+-----------+

Set up the JDBC URL and connection properties of the Oracle database.

val jdbcURL = "jdbc:oracle:thin:sales/cloudera@//192.168.56.30:1521/EDWPDB"
val connectionProperties = new java.util.Properties()
import org.apache.spark.sql.SaveMode

Insert the DataFrame to Oracle.

joinDF.write.mode(SaveMode.Append).jdbc(jdbcURL, "users2", connectionProperties)
Verify that the rows were successfully added to the Oracle database (Figure 6-12).
../images/456459_1_En_6_Chapter/456459_1_En_6_Fig12_HTML.jpg
Figure 6-12

Verify rows

Spark Streaming and Kudu

There are use cases where you need to ingest data into Kudu in near real time. This is a requirement for Internet of Things (IoT) use cases, for example. In Listing 6-1, we show a sample Spark Streaming application that ingests sensor data from Flume. We perform basic event stream processing to tag the status of each event as NORMAL, WARNING, or CRITICAL depending on the temperature returned by the sensor. The status is then saved in a Kudu table together with the rest of the data.

Users can query the Kudu table as data is being inserted into it. In Chapter 9, we discuss a real-time data visualization tool called Zoomdata. You can use Zoomdata to visualize data stored in Kudu in real time.

import java.io.IOException;
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
object FlumeStreaming {
   case class SensorData(tableid: String, deviceid: String, thedate: String, thetime: String, temp: Short, status: String)
    def parseSensorData(str: String): SensorData = {
      val myData = str.split(",")
      val myTableid = myData(0)
      val myDeviceid = myData(1)
      val myDate = myData(2)
      val myTime = myData(3)
      val myTemp = myData(4)
      val myStatus = myData(5)
      SensorData(myTableid, myDeviceid, myDate, myTime, myTemp.toShort, myStatus)
    }
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumeStreaming")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(1))
    val flumeStream = FlumeUtils.createPollingStream(ssc,args(0),args(1).toInt)
    val sensorDStream = flumeStream.map (x => new String(x.event.getBody.array)).map(parseSensorData)
    sensorDStream.foreachRDD { rdd =>
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
             val kuduContext = new KuduContext("kudumaster01:7051")
// convert the RDD into a DataFrame and insert it into the Kudu table
             val DataDF = rdd.toDF
             kuduContext.insertRows(DataDF, "impala::default.sensortable")
             DataDF.registerTempTable("currentDF")
             // Update the table based on the thresholds
             val WarningFilteredDF = sqlContext.sql("select * from currentDF where temp > 50 and temp <= 60")
             WarningFilteredDF.registerTempTable("warningtable")
             val UpdatedWarningDF = sqlContext.sql("select tableid,deviceid,thedate,thetime,temp,'WARNING' as status from warningtable")
             kuduContext.updateRows(UpdatedWarningDF, "impala::default.sensortable")
             val CriticalFilteredDF = sqlContext.sql("select * from currentDF where temp > 61")
             CriticalFilteredDF.registerTempTable("criticaltable")
             val UpdatedCriticalDF = sqlContext.sql("select tableid,deviceid,thedate,thetime,temp,'CRITICAL' as status from criticaltable")
             kuduContext.updateRows(UpdatedCriticalDF, "impala::default.sensortable")
     }
    ssc.start()
    ssc.awaitTermination()
  }
}
Listing 6-1

Spark Streaming with Kudu

This is my SBT file. Consult Chapter 5 for more information on building applications. Consult Maven’s online documentation if you are using Maven.

name := "My Test App"
version := "1.0"
scalaVersion := "2.10.5"
resolvers ++= Seq(
  "Apache Repository" at "https://repository.apache.org/content/repositories/releases/",
  "Cloudera repo" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
)
libraryDependencies ++= Seq (
        "org.apache.spark" % "spark-core_2.10" % "1.5.0",
        "org.apache.spark" % "spark-streaming_2.10" % "1.5.0",
        "org.apache.spark" % "spark-streaming-flume_2.10" % "1.5.0",
        "org.apache.spark" % "spark-sql_2.10" % "1.5.0"
)

After executing sbt package to package the spark application, you can now launch the application using the spark-submit tool that comes with Spark. The parameters are for testing purposes only. Change the parameters based on your data ingestion requirements. Note that I’m using the jar file to include kudu-spark dependency. The parameters localhost and 9999 will be used as the Flume sink destination.

spark-submit
-class FlumeStreaming
-jars kudu-spark_2.10-0.10.0.jar
-master yarn-client
-driver-memory=512m
-executor-memory=512m
-executor-cores 4  
/mydir/spark/flume_streaming_kudu/target/scala-2.10/butch-app_2.10-1.0.jar localhost 9999

This is a sample flume.conf file using spooldir as a data source. The settings are adequate for testing purposes.

agent1.sources  = source1
agent1.channels = channel1
agent1.sinks = spark
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /tmp/streaming
agent1.sources.source1.channels = channel1
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 10000
agent1.channels.channel1.transactionCapacity = 1000
agent1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent1.sinks.spark.hostname = 127.0.0.1
agent1.sinks.spark.port =  9999
agent1.sinks.spark.channel = channel1
agent1.sinks.spark.batchSize=5

Kudu as a Feature Store for Spark MLlib

Kudu makes it easier for data scientists to prepare and clean data. Kudu can serve as a fast, highly scalable and mutable feature store for machine learning applications. It also smoothly integrates with Spark SQL and DataFrame API.

Let’s work on an example. We'll use the Heart Disease Data Set xi from the UCI Machine Learning Repository to predict the presence of heart disease. The data was collected by Robert Detrano, MD, PhD, from the VA Medical Center, Long Beach and Cleveland Clinic Foundation. Historically, the Cleveland data set has been the subject of numerous research so we’ll use that data set. The original data set has 76 attributes, but only 14 of them are traditionally used by ML researchers (Table 6-1). We will simply perform binomial classification and determine if the patient has heart disease or not (Listing 6-2). This is the same example we used in Chapter 5, but this time we’ll use Kudu to store our features.
Table 6-1

Cleveland Heart Disease Data Set Attribute Information

Attribute

Description

age

Age

sex

Sex

cp

Chest pain type

trestbps

Resting blood pressure

chol

Serum cholesterol in mg/dl

fbs

Fasting blood sugar > 120 mg/dl

restecg

Resting electrocardiographic results

thalach

Maximum heart rate achieved

exang

Exercise induced angina

oldpeak

ST depression induced by exercise relative to rest

slope

The slope of the peak exercise ST segment

ca

Number of major vessels (0–3) colored by flourosopy

thal

Thalium stress test result

num

The predicted attribute – diagnosis of heart disease

Let’s start. We’ll need to download the file, copy it to HDFS, and create an external table on top of it. We will then copy the data to a Kudu table.

wget http://archive.ics.uci.edu/ml/machine-learning-databases/heart-disease/cleveland.data
head -n 10 processed.cleveland.data
63.0,1.0,1.0,145.0,233.0,1.0,2.0,150.0,0.0,2.3,3.0,0.0,6.0,0
67.0,1.0,4.0,160.0,286.0,0.0,2.0,108.0,1.0,1.5,2.0,3.0,3.0,2
67.0,1.0,4.0,120.0,229.0,0.0,2.0,129.0,1.0,2.6,2.0,2.0,7.0,1
37.0,1.0,3.0,130.0,250.0,0.0,0.0,187.0,0.0,3.5,3.0,0.0,3.0,0
41.0,0.0,2.0,130.0,204.0,0.0,2.0,172.0,0.0,1.4,1.0,0.0,3.0,0
56.0,1.0,2.0,120.0,236.0,0.0,0.0,178.0,0.0,0.8,1.0,0.0,3.0,0
62.0,0.0,4.0,140.0,268.0,0.0,2.0,160.0,0.0,3.6,3.0,2.0,3.0,3
57.0,0.0,4.0,120.0,354.0,0.0,0.0,163.0,1.0,0.6,1.0,0.0,3.0,0
63.0,1.0,4.0,130.0,254.0,0.0,2.0,147.0,0.0,1.4,2.0,1.0,7.0,2
53.0,1.0,4.0,140.0,203.0,1.0,2.0,155.0,1.0,3.1,3.0,0.0,7.0,1
hadoop fs -put processed.cleveland.data /tmp/data
impala-shell
CREATE EXTERNAL TABLE cleveland_csv (
               age float,
               sex float,
               cp float,
               trestbps float,
               chol float,
               fbs float,
               restecg float,
               thalach float,
               exang float,
               oldpeak float,
               slope float ,
               ca float,
               thal float,
               num float
               )
               ROW FORMAT
          DELIMITED FIELDS TERMINATED BY ','
          LINES TERMINATED BY ' ' STORED AS TEXTFILE
          LOCATION '/tmp/data';
CREATE TABLE cleveland_kudu (
               id string,
               age float,
               sex float,
               cp float,
               trestbps float,
               chol float,
               fbs float,
               restecg float,
               thalach float,
               exang float,
               oldpeak float,
               slope float ,
               ca float,
               thal float,
               num float,
               primary key(id)
               )
        PARTITION BY HASH PARTITIONS 4
        STORED AS KUDU
        TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
INSERT INTO cleveland_kudu
SELECT
         uuid(),
               age,
               sex
               cp,
               trestbps,
               chol,
               fbs,
               restecg,
               thalach,
               exang,
               oldpeak,
               slope,
               ca,
               thal,
               num
FROM
cleveland_csv;

Note that we used the Impala function uuid() to generate a unique primary key for our Kudu table. The Kudu table should now have some data in it. Let’s use the spark-shell to fit our model using Spark MLlib.

spark-shell -packages org.apache.kudu:kudu-spark_2.10:1.1.0
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.param.ParamMap
import org.apache.kudu.spark.kudu._
val kuduDF = sqlContext.read.options(Map("kudu.master" -> "kudumaster01:7051","kudu.table" -> "impala::default.cleveland_kudu")).kudu
kuduDF.printSchema
root
 |- id: string (nullable = false)
 |- age: float (nullable = true)
 |- sex: float (nullable = true)
 |- cp: float (nullable = true)
 |- trestbps: float (nullable = true)
 |- chol: float (nullable = true)
 |- fbs: float (nullable = true)
 |- restecg: float (nullable = true)
 |- thalach: float (nullable = true)
 |- exang: float (nullable = true)
 |- oldpeak: float (nullable = true)
 |- slope: float (nullable = true)
 |- ca: float (nullable = true)
 |- thal: float (nullable = true)
 |- num: float (nullable = true)
val myFeatures = Array("age", "sex", "cp", "trestbps", "chol", "fbs",
      "restecg", "thalach", "exang", "oldpeak", "slope",
      "ca", "thal", "num")
val myAssembler = new VectorAssembler().setInputCols(myFeatures).setOutputCol("features")
val kuduDF2 = myAssembler.transform(kuduDF)
val myLabelIndexer = new StringIndexer().setInputCol("num").setOutputCol("label")
val kuduDF3 = mylabelIndexer.fit(kuduDF2).transform(kuduDF2)
val kuduDF4 = kuduDF3.where(kuduDF3("ca").isNotNull).where(kuduDF3("thal").isNotNull).where(kuduDF3("num").isNotNull)
val Array(trainingData, testData) = kuduDF4.randomSplit(Array(0.8, 0.2), 101)
val myRFclassifier = new RandomForestClassifier().setFeatureSubsetStrategy("auto").setSeed(101)
val myEvaluator = new BinaryClassificationEvaluator().setLabelCol("label")
val myParamGrid = new ParamGridBuilder()
      .addGrid(myRFclassifier.maxBins, Array(10, 20, 30))
      .addGrid(myRFclassifier.maxDepth, Array(5, 10, 15))
      .addGrid(myRFclassifier.numTrees, Array(20, 30, 40))
      .addGrid(myRGclassifier.impurity, Array("gini", "entropy"))
      .build()
val myPipeline = new Pipeline().setStages(Array(myRFclassifier))
val myCV = new CrosValidator()
      .setEstimator(myPipeline)
      .setEvaluator(myEvaluator)
      .setEstimatorParamMaps(myParamGrid)
      .setNumFolds(3)
Listing 6-2

Performing binary classifcation using Kudu as a feature store

We can now fit the model.

val myCrossValidatorModel = myCV.fit(trainingData)

Let’s evaluate the model.

val myEvaluatorParamMap = ParamMap(myEvaluator.metricName -> "areaUnderROC")
val aucTrainingData = myEvaluator.evaluate(CrossValidatorPrediction, myEvaluatorParamMap)

You can now make some predictions on our data.

val myCrossValidatorPrediction = myCrossValidatorModel.transform(testData)

We used a very small data set (300+ observations) for our example, but imagine if the data set contains billions of rows. If you need to add or update your training data, you simply run the DML statement against the Kudu table. The ability to update a highly scalable storage engine such as Kudu greatly simplifies data preparation and feature engineering.

Note

Kudu allows up to a maximum of 300 columns per table. HBase is a more appropriate storage engine if you needs to store more than 300 features. HBase tables can contain thousands or millions of columns. The downside in using HBase is that it is not as efficient in handling full-table scans compared to Kudu. There is discussion within the Apache Kudu community to address the 300-column limitation in future versions of Kudu.

Strictly speaking, you can bypass Kudu’s 300-column limit by setting an unsafe flag. For example, if you need the ability to create a Kudu table with 1000 columns, you can start the Kudu master with the following flags: -unlock-unsafe-flags -max-num-columns=1000. This has not been thoroughly tested by the Kudu development team and is therefore not recommended for production use.

Summary

Spark, Impala, and Kudu are perfect together. Spark provides a highly scalable data processing framework, while Kudu provides a highly scalable storage engine. With Impala providing a fast SQL interface, you have everything you need to implement a cost-effective enterprise data management and analytic platform. Of course, you are not limited to these three Apache open source projects. In subsequent chapters, I will cover other open source and commercial applications that can further enhance your big data platform.

References

  1. i.

    Apache Kudu; “Kudu Integration with Spark,” Apache Kudu, 2018, https://kudu.apache.org/docs/developing.html#_kudu_integration_with_spark

     
  2. ii.

    Holden Karau, Rachel Warren; “High Performance Spark,” O'Reilly, June 2017 https://www.safaribooksonline.com/library/view/high-performance-spark/9781491943199/

     
  3. iii.

    Apache Spark; “JDBC To Other Databases,” Apache Spark, 2018, http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

     
  4. iv.

    Zhan Zhang; “SPARK-ON-HBASE: DATAFRAME BASED HBASE CONNECTOR,” Hortonworks, 2016, https://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/

     
  5. v.

    Huawei; “Astro: High performance SQL layer over HBase by using Spark SQL framework,” Huawei, 2018, http://huaweibigdata.github.io/astro/

     
  6. vi.

    Ted Malaska; “Apache Spark Comes to Apache HBase with HBase-Spark Module,” Cloudera, 2018 https://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase-spark-module/

     
  7. vii.

    Apache Lucene; “Using SolrJ,” Apache Lucene, 2018, https://lucene.apache.org/solr/guide/6_6/using-solrj.html

     
  8. viii.

    Lucidworks; “Lucidworks Spark/Solr Integration,”, Lucidworks, 2018, https://github.com/lucidworks/spark-solr

     
  9. ix.
     
  10. x.

    Tom White; “The Small Files Problem,” Cloudera, 2009, https://blog.cloudera.com/blog/2009/02/the-small-files-problem/

     
  11. xi.

    David W. Aha; “Heart Disease Data Set,” University of California, Irvine, 1988, http://archive.ics.uci.edu/ml/datasets/heart+Disease

     
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.133.117