© Butch Quinto 2018
Butch QuintoNext-Generation Big Datahttps://doi.org/10.1007/978-1-4842-3147-0_5

5. Introduction to Spark

Butch Quinto1 
(1)
Plumpton, Victoria, Australia
 

Spark is the next-generation big data processing framework for processing and analyzing large data sets. Spark features a unified processing framework that provides high-level APIs in Scala, Python, Java, and R and powerful libraries including Spark SQL for SQL support, MLlib for machine learning, Spark Streaming for real-time streaming, and GraphX for graph processing. i Spark was founded by Matei Zaharia at the University of California, Berkeley’s AMPLab and was later donated to the Apache Software Foundation, becoming a top-level project in February 24, 2014. ii The first version was released on May 30, 2014. iii

Entire books have been written about Spark. This chapter will give you a quick introduction to Spark, enough to give you the skills needed to perform common data processing tasks. My goal is to make you productive as quickly as possible. For a more thorough treatment, Learning Spark by Holden Karau, Andy Konwinski, Patrick Wendell and Matei Zaharia, (O‘Reilly, 2015) remains the best introduction to Spark. Advanced Analytics with Spark 2nd edition by Sandy Ryza, Uri Laserson, Sean Owen, and Josh Wills (O’Reilly, 2015) covers advanced Spark topics and is also highly recommended. I assume no previous knowledge of Spark. However, some knowledge of Scala is helpful. Learning Scala by Jason Swartz (O’Reilly, 2014) and Programming in Scala 2nd edition by Martin Odersky, Lex Spoon, and Bill Venners (Artima, 2011) are good books to help you learn Scala. For a primer on Hadoop and Hadoop components such as HDFS and YARN, visit the Apache Hadoop website. Integration with Kudu is discussed in Chapter 6.

Overview

Spark was developed to address the limitations of MapReduce, Hadoop’s original data processing framework. Matei Zaharia saw a lot of MapReduce’s limitations at UC Berkeley and Facebook (where he did his internship) and sought to create a faster and more generalized, multipurpose data processing framework that can handle iterative and interactive applications. iv Matei succeeded in his goal in making Spark better than MapReduce in almost every way. Spark is more accessible and easier to use due to its simple but powerful APIs. Spark provides a unified platform (Figure 5-1) that supports more types of workloads such as streaming, interactive, graph processing, machine learning, and batch. v Spark jobs can run 10-100x faster than equivalent MapReduce jobs due to its fast in-memory capabilities and advanced DAG (Directed Acyclic Graph) execution engine. Data scientists and engineers are just generally more productive with Spark than MapReduce.
../images/456459_1_En_5_Chapter/456459_1_En_5_Fig1_HTML.jpg
Figure 5-1

Apache Spark Ecosystem

Cluster Managers

Cluster managers manage and allocate cluster resources applications. Spark supports the stand-alone cluster manager that comes with Spark (Standalone Scheduler), YARN, and Mesos. There’s an experimental project to bring native support for Spark to utilize Kubernetes as a cluster manager. Check SPARK-18278 for more details.

Architecture

At a high level, Spark distributes the execution of Spark applications tasks across cluster nodes (Figure 5-2). Every Spark application has a SparkContext object within its driver program. The SparkContext represents a connection to your cluster manager, which provides computing resources to your Spark applications. After connecting to the cluster, Spark acquires executors on your worker nodes. Spark then sends your application code to the executors. An application will usually run one or more jobs in response to a Spark action. Each job is then divided by Spark into smaller directed acyclic graphs (DAGs) of stages of tasks. Each task is then distributed and sent to executors across the worker nodes for execution.
../images/456459_1_En_5_Chapter/456459_1_En_5_Fig2_HTML.jpg
Figure 5-2

Apache Spark Architecture

Each Spark application gets its own set of executors. Because tasks from different applications run in different JVMs, a Spark application cannot interfere with another Spark application. This also means that it’s difficult for Spark applications to share data without using a slow external data source such as HDFS or S3 [38]. Using an off-heap memory storage such as Tachyon (a.k.a Alluxio) can make data sharing faster and easier. I discuss Alluxio in Chapter 10.

Executing Spark Applications

I assume you’re using Cloudera Enterprise as your big data platform. In CDH, Spark 1.x and 2.x can coexist on the same cluster without any issues. You use an interactive shell (spark-shell or pyspark) or submit an application (spark-submit) to execute Spark 1.x applications. You need to use different commands to access Spark 2.x (Table 5-1).
Table 5-1

Spark 1.x and Spark 2.x Commands

Spark 1.x

Spark 2.x

spark-submit

spark2-submit

spark-shell

spark2-shell

pyspark

pyspark2

Spark on YARN

YARN is the default cluster manager for most Hadoop-based platforms such as Cloudera and Hortonworks. There are two deploy modes that can be used to launch Spark applications in YARN.

Cluster Mode

In cluster mode, the driver program runs inside an application master managed by YARN. The client can exit without affecting the execution of the application. To launch applications or the spark-shell in cluster mode:

spark-shell --master yarn --deploy-mode cluster
spark-submit --class mypath.myClass --master yarn --deploy-mode cluster

Client Mode

In client mode, the driver program runs in the client machine. The application master is only used for requesting resources from YARN. To launch applications or the spark-shell in client mode:

spark-shell --master yarn --deploy-mode client
spark-submit --class mypath.myClass --master yarn --deploy-mode client

Introduction to the Spark-Shell

You typically use an interactive shell for ad hoc data analysis or exploration. It’s also a good tool to learn the Spark API. Spark’s interactive shell is available in Scala or Python. In our example below, we’ll create a list of cities and convert them all to uppercase.

spark2-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://10.0.1.101:4040
Spark context available as 'sc' (master = yarn, app id = application_1513771857144_0002).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_   version 2.2.0.cloudera1
      /_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_151)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val myCities = sc.parallelize(List("tokyo","new york","sydney","san francisco"))
myCities: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val uCities = myCities.map {x => x.toUpperCase}
uCities: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at map at <console>:26
scala> uCities.collect.foreach(println)
TOKYO
NEW YORK
SYDNEY
SAN FRANCISCO
Listing 5-1

Introduction to spark-shell

You will use the spark-shell throughout the chapter. A SparkSession named “spark” is automatically created when you start the spark2-shell as shown in Listing 5-1.

SparkSession

As you can see in Figure 5-2, SparkContext enables access to all Spark features and capabilities. The driver program uses the SparkContext to access other contexts such as StreamingContext, SQLContext, and HiveContext. Starting in Spark 2.0, SparkSession provides a single point of entry to interact with Spark. All features available through SparkContext such as SQLContext, HiveContext, and StreamingContext in Spark 1.x are now accessible via SparkSession. vi

In Spark 1.x you would write something like this.

val sparkConf = new SparkConf().setAppName("MyApp").setMaster("local")
val sc = new SparkContext(sparkConf).set("spark.executor.cores", "4")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

In Spark 2.x, you don’t have to explicitly create SparkConf, SparkContext, or SQLContext since all of their functionalities are already included in SparkSession.

val spark = SparkSession.
builder().
appName("MyApp").
config("spark.executor.cores", "4").
getOrCreate()

Accumulator

Accumulators are variables that are only “added” to. They are usually used to implement counters. In the example, I add up the elements of an array using an accumulator:

val accum = sc.longAccumulator("Accumulator 01")
sc.parallelize(Array(10, 20, 30, 40)).foreach(x => accum.add(x))
accum.value
res2: Long = 100

Broadcast Variables

Broadcast variables are read-only variable stored on each executor node’s memory. Spark uses high-speed broadcast algorithms to reduce network latency of copying broadcast variables. Instead of storing data in a slow storage engine such as HDFS or S3, using broadcast variables is a faster way to store a copy of a dataset on each node.

val broadcastVar = sc.broadcast(Array(10, 20, 30))
broadcastVar.value
res0: Array[Int] = Array(10, 20, 30)

RDD

An RDD is a resilient immutable distributed collection of objects partitioned across one or more nodes in your cluster. RDD’s can be processed and operated in parallel by two types of operations: transformations and actions.

Note

The RDD was Spark’s primary programming interface in Spark 1.x. Dataset has replaced RDD as the main API starting in Spark 2.0. Users are highly recommended to switch from RDD to Dataset due to richer programming interface and better performance. We discuss Dataset and DataFrame later in the chapter.

Creating an RDD

Creating an RDD is straightforward. You can create an RDD from an existing Scala collection or from reading from an external file stored in HDFS or S3.

parallelize

Parallelize creates an RDD from a Scala collection.

val myList = (1 to 5).toList
val myRDD = sc.parallelize(myList)
val myCitiesRDD = sc.parallelize(List("tokyo","new york","sydney","san francisco"))
textFile

Textfile creates an RDD from a text file stored in HDFS or S3.

val myRDD = sc.textFile("hdfs://master01:9000/files/mydirectory")
val myRDD = sc.textFile("s3a://mybucket/files/mydata.csv")

Note that an RDD is immutable. Operations that need to perform any type of data transformation will need to create another RDD. RDD operations can be classified into two categories: transformation and action.

Transformations

A transformation is an operation that creates a new RDD. I describe some of the most common transformations. Refer to the online Spark documentation for a complete list.

Map

Map executes a function against each element in the RDD. It creates and returns a new RDD of the result. Map’s return type doesn’t necessarily have to be the same type of the original RDD.

val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco"))
val uCities = myCities.map {x => x.toUpperCase}
uCities.collect.foreach(println)
TOKYO
NEW YORK
PARIS
SAN FRANCISCO

Let’s show another example of map.

val lines = sc.parallelize(List("Michael Jordan", "iPhone"))
val words = lines.map(line => line.split(" "))
words.collect
res2: Array[Array[String]] = Array(Array(Michael, Jordan), Array(iPhone))  
Flatmap

Flatmap executes a function against each element in the RDD and then flattens the results.

val lines = sc.parallelize(List("Michael Jordan", "iPhone"))
val words = lines.flatMap(line => line.split(" "))
words.collect
res3: Array[String] = Array(Michael, Jordan, iPhone)
Filter

Returns RDD that only includes elements that match the condition specified.

val lines = sc.parallelize(List("Michael Jordan", "iPhone","Michael Corleone"))
val words = lines.map(line => line.split(" "))
val results = words.filter(w => w.contains("Michael"))
results.collect
res9: Array[Array[String]] = Array(Array(Michael, Jordan), Array(Michael, Corleone))
Distinct

Returns only distinct values.

val myCities1 = sc.parallelize(List("tokyo","tokyo","paris","sydney"))
val myCities2 = sc.parallelize(List("perth","tokyo","canberra","sydney"))

Combine results using union.

val citiesFromBoth = myCities1.union(myCities2)

Display only distinct values.

citiesFromBoth.distinct.collect.foreach(println)
sydney
perth
canberra
tokyo
paris
ReduceByKey

Combine values with the same key. using the specified reduce function.

val pairRDD = sc.parallelize(List(("a", 1), ("b",2), ("c",3), ("a", 30), ("b",25), ("a",20)))
val sumRDD = pairRDD.reduceByKey((x,y) => x+y)
sumRDD.collect
res15: Array[(String, Int)] = Array((b,27), (a,51), (c,3))   
Keys

Return an RDD containing just the keys.

val myRDD = sc.parallelize(List(("a", "Larry"), ("b", "Curly"), ("c", "Moe")))
val keysRDD = myRDD.keys
keysRDD.collect.foreach(println)
a
b
c
Values

Return an RDD containing just the values.

val myRDD = sc.parallelize(List(("a", "Larry"), ("b", "Curly"), ("c", "Moe")))
val valRDD = myRDD.values
valRDD.collect.foreach(println)
Larry
Curly
Moe
Inner Join

Returns an RDD of all elements from both RDDs based on the join predicate.

val employee = Array((100,"Jim Hernandez"), (101,"Shane King"))
val employeeRDD = sc.parallelize(employee)
val employeeCity = Array((100,"Glendale"), (101,"Burbank"))
val employeeCityRDD = sc.parallelize(employeeCity)
val employeeState = Array((100,"CA"), (101,"CA"), (102,"NY"))
val employeeStateRDD = sc.parallelize(employeeState)
val employeeRecordRDD = employeeRDD.join(employeeCityRDD).join(employeeStateRDD)
employeeRecordRDD.collect.foreach(println)
(100,((Jim Hernandez,Glendale),CA))
(101,((Shane King,Burbank),CA))
RightOuterJoin / LeftOuterJoin

Returns an RDD of all elements from the right RDD even if there are no matching rows on the left RDD. A Left Outer Join is equivalent to the Right Outer Join with the columns in a different order.

val employeeRecordRDD = employeeRDD.join(employeeCityRDD).rightOuterJoin(employeeStateRDD)
employeeRecordRDD.collect.foreach(println)
(100,(Some((Jim Hernandez,Glendale)),CA))
(102,(None,NY))
(101,(Some((Shane King,Burbank)),CA))
Union

Returns an RDD that contains the combination of two or more RDDs.

val  employee2 = Array((103,"Mark Choi","Torrance","CA"), (104,"Janet Reyes","Rolling Hills","CA"))
val employee2RDD = sc.parallelize(employee2)
val  employee3 = Array((105,"Lester Cruz","Van Nuys","CA"), (106,"John White","Inglewood","CA"))
val employee3RDD = sc.parallelize(employee3)
employeesRDD.collect.foreach(println)
(103,Mark Choi,Torrance,CA)
(104,Janet Reyes,Rolling Hills,CA)
(105,Lester Cruz,Van Nuys,CA)
(106,John White,Inglewood,CA)
Subtract

Returns an RDD that contains only the elements that are in the first RDD.

val  listEmployees = Array((103,"Mark Choi","Torrance","CA"), (104,"Janet Reyes","Rolling Hills","CA"),(105,"Lester Cruz","Van Nuys","CA"))
val listEmployeesRDD = sc.parallelize(listEmployees)
val  exEmployees = Array((103,"Mark Choi","Torrance","CA"))
val exEmployeesRDD = sc.parallelize(exEmployees)
val currentEmployeesRDD = listEmployeesRDD.subtract(exEmployeesRDD)
currentEmployeesRDD.collect.foreach(println)
(105,Lester Cruz,Van Nuys,CA)
(104,Janet Reyes,Rolling Hills,CA)
Coalesce

Coalesce reduces the number of partitions in an RDD. You might want to use coalesce after performing a filter on a large RDD. While filtering reduces the amount of data consumed by the new RDD, it inherits the number of partitions of the original RDD. If the new RDD is significantly smaller than the original RDD, it may have hundreds or thousands of small partitions, which could cause performance issues.

Using coalesce is also useful when you want to reduce the number of files generated by Spark when writing to HDFS, preventing the dreaded “small file” problem. Each partition gets written as separate files to HDFS. Note that you might run into performance issues when using coalesce since you are effectively reducing the degree of parallelism while writing to HDFS. Try increasing the number of partitions if that happens. This is applicable to DataFrames as well, which I will discuss later. In the example below, we’re writing only one Parquet file to HDFS.

DF.coalesce(1).write.mode("append").parquet("/user/hive/warehouse/Mytable")
Repartition

Repartition can both decrease and increase the number of partitions in an RDD. You would generally use coalesce when reducing partitions since it’s more efficient than repartition. Increasing the number of partitions might be useful by increasing the degree of parallelism when writing to HDFS. This is applicable to DataFrames as well, which I will discuss later. In the example below, we’re writing 6 Parquet files to HDFS.

DF.repartition (6).write.mode("append").parquet("/user/hive/warehouse/Mytable")

Note

Coalesce is generally faster than repartition. Repartition will perform a full shuffle, creating new partitions and equally distributing data across worker nodes. Coalesce minimizes data movement and avoids a full shuffle by using existing partitions.

Actions

An action is an RDD operation that returns a value to the driver program. I list some of the most common actions. Refer to the online Spark documentation for a complete list of RDD actions.

Collect

Returns the entire content of the RDD to the driver program. Not advisable for large data sets.

val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco"))
myCities.collect
res2: Array[String] = Array(tokyo, new york, paris, san francisco)
Take

Returns a subset of the RDD to the driver program.

val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco"))
myCities.take(2)
res4: Array[String] = Array(tokyo, new york)
Count

Returns the number of items in the RDD.

val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco"))
myCities.count
res3: Long = 4   
Foreach

Execute the provided function to each item of the RDD.

val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco"))
myCities.collect.foreach(println)
tokyo
new york
paris
san Francisco

Lazy Evaluation

Spark supports lazy evaluation, which is critical for big data processing. All transformations in Spark are lazily evaluated. Spark does not execute transformations immediately. You can continue to define more transformations. When you finally want the final results, you execute an action, which causes the transformations to be executed.

Caching

Each transformation is re-executed each time you run an action by default. You can cache an RDD in memory using the cache or persist method to avoid re-executing the transformation multiple times. There are several persistence levels to choose from such as MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK, MEMORY_AND_DISK_SER. and DISK_ONLY. Consult Apache Spark’s online documentation for more details on caching. Off-heap caching with Alluxio (previously known as Tachyon) is discussed in Chapter 10.

Spark SQL, Dataset, and DataFrames API

While most of the excitement these days is focused on use cases that involve unstructured and semi-structured data (video analytics, image processing, and text mining to mention a few), a majority of the actual data analysis and processing is still done on structured, relational data. Most of the important business decisions by companies are still based on analysis done on relational data.

SparkSQL was developed to make it easier for Spark data engineers and data science to process and analyze structured data. Dataset is similar to an RDD in that it supports strong typing, but under the hood it has a much more efficient engine. Starting in Spark 2.0, the Dataset API is now the primary programming interface. The DataFrame is just a Dataset with named columns, very similar to relational table. Together, Spark SQL and DataFrames provide a powerful programming interface for processing and analyzing structured data. Here’s a quick example on how to use the DataFrames API. I’ll discuss the DataFrames API in more detail later.

val jsonDF = spark.read.json("/jsondata")
jsonDF.show
+---+------+--------------+-----+------+-----+
|age|  city|          name|state|userid|  zip|
+---+------+--------------+-----+------+-----+
| 35|Frisco| Jonathan West|   TX|   200|75034|
| 28|Dallas|Andrea Foreman|   TX|   201|75001|
| 69| Plano|  Kirsten Jung|   TX|   202|75025|
| 52| Allen|Jessica Nguyen|   TX|   203|75002|
+---+------+--------------+-----+------+-----+
jsonDF.select ("age", "city").show
+---+------+
|age|  city|
+---+------+
| 35|Frisco|
| 28|Dallas|
| 69| Plano|
| 52| Allen|
+---+------+
jsonDF.filter($"userid" < 202).show()
+---+------+--------------+-----+------+-----+
|age|  city|          name|state|userid|  zip|
+---+------+--------------+-----+------+-----+
| 35|Frisco| Jonathan West|   TX|   200|75034|
| 28|Dallas|Andrea Foreman|   TX|   201|75001|
+---+------+--------------+-----+------+-----+
jsonDF.createOrReplaceTempView("jsonDF")
val uzDF = spark.sql("SELECT userid, zip FROM jsonDF")
uzDF.show
+------+-----+
|userid|  zip|
+------+-----+
|   200|75034|
|   201|75001|
|   202|75025|
|   203|75002|
+------+-----+

Note

The DataFrame and Dataset APIs have been unified in Spark 2.0. The DataFrame is now just a type alias for a Dataset of Row, where a Row is a generic untyped object. In contrast, Dataset is a collection of strongly typed objects Dataset[T]. Scala supports strongly typed and untyped API, while in Java, Dataset[T] is the main abstraction. DataFrames is the main programming interface for R and Python due to its lack of support for compile-time type safety.

Spark Data Sources

Some of the most common tasks in data processing are reading from and writing to different data sources. I provide several examples in the next few pages. I cover Spark and Kudu integration in Chapter 6.

CSV

Spark provides you with different ways to read data from CSV files. You can read the data into an RDD first and then convert it to DataFrame.

val dataRDD = sc.textFile("/sparkdata/customerdata.csv")
val parsedRDD = dataRDD.map{_.split(",")}
case class CustomerData(customerid: Int, name: String, city: String, state: String, zip: String)
val dataDF = parsedRDD.map{ a => CustomerData (a(0).toInt, a(1).toString, a(2).toString,a(3).toString,a(4).toString) }.toDF

You can also use the Databricks CSV package. This method reads data directly in a DataFrame.

spark-shell --packages com.databricks:spark-csv_2.10:1.5.0
val dataDF = sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/sparkdata/customerdata.csv")

Starting in Spark 2.0, the CSV connector is already built in so there’s no need to use Databrick’s third-party package.

val dataDF = spark.read
              .option("header", "true")
              .option("inferSchema", "true")    
              .csv("/sparkdata/customerdata.csv")

XML

Databricks has a Spark xml package that makes it easy to reads XML data.

cat users.xml
<userid>100</userid><name>Wendell Ryan</name><city>San Diego</city><state>CA</state><zip>92102</zip>
<userid>101</userid><name>Alicia Thompson</name><city>Berkeley</city><state>CA</state><zip>94705</zip>
<userid>102</userid><name>Felipe Drummond</name><city>Palo Alto</city><state>CA</state><zip>94301</zip>
<userid>103</userid><name>Teresa Levine</name><city>Walnut Creek</city><state>CA</state><zip>94507</zip>
hadoop fs -mkdir /xmldata
hadoop fs -put users.xml /xmldata
spark-shell --packages  com.databricks:spark-xml:2.10:0.4.1

Create a DataFrame using Spark XML. In this example, we specify the row tag and the path in HDFS where the XML file is located.

val xmlDF = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "user").load("/xmldata/");
xmlDF: org.apache.spark.sql.DataFrame = [city: string, name: string, state: string, userid: bigint, zip: bigint]

Let’s also take a look at the data.

xmlDF.show
+------------+---------------+-----+------+-----+
|        city|           name|state|userid|  zip|
+------------+---------------+-----+------+-----+
|   San Diego|   Wendell Ryan|   CA|   100|92102|
|    Berkeley|Alicia Thompson|   CA|   101|94705|
|   Palo Alto|Felipe Drummond|   CA|   102|94301|
|Walnut Creek|  Teresa Levine|   CA|   103|94507|
+------------+---------------+-----+------+-----+

JSON

We’ll create a JSON file as sample data for this example. Make sure the file is in a folder in HDFS called /jsondata.

cat users.json
{"userid": 200, "name": "Jonathan West", "city":"Frisco", "state":"TX", "zip": "75034", "age":35}
{"userid": 201, "name": "Andrea Foreman", "city":"Dallas", "state":"TX", "zip": "75001", "age":28}
{"userid": 202, "name": "Kirsten Jung", "city":"Plano", "state":"TX", "zip": "75025", "age":69}
{"userid": 203, "name": "Jessica Nguyen", "city":"Allen", "state":"TX", "zip": "75002", "age":52}

Create a DataFrame from the JSON file.

val jsonDF = sqlContext.read.json("/jsondata")
jsonDF: org.apache.spark.sql.DataFrame = [age: bigint, city: string, name: string, state: string, userid: bigint, zip: string]

Check the data.

jsonDF.show
+---+------+--------------+-----+------+-----+
|age|  city|          name|state|userid|  zip|
+---+------+--------------+-----+------+-----+
| 35|Frisco| Jonathan West|   TX|   200|75034|
| 28|Dallas|Andrea Foreman|   TX|   201|75001|
| 69| Plano|  Kirsten Jung|   TX|   202|75025|
| 52| Allen|Jessica Nguyen|   TX|   203|75002|
+---+------+--------------+-----+------+-----+

Relational Databases Using JDBC

We use MySQL in this example, but other relational databases such as Oracle, SQL Server, Teradata, and PostgreSQL, to mention a few, are also supported. As long as the relational database has a JDBC/ODBC driver, it should be accessible from Spark. Performance is dependent on your JDBC/ODBC driver’s support for batch operations. Please check your JDBC driver’s documentation for more details.

mysql -u root -pmypassword
create databases salesdb;
use salesdb;
create table customers (
customerid INT,
name VARCHAR(100),
city VARCHAR(100),
state CHAR(3),
zip  CHAR(5));
spark-shell --driver-class-path mysql-connector-java-5.1.40-bin.jar --jars mysql-connector-java-5.1.40-bin.jar

Note

In some versions of Spark --jars does not add the JAR in the driver’s class path. vii It is recommended that you include the JDBC driver in your --jars and the Spark classpath. viii

Start the spark-shell. Take note that I had to include the MySQL driver as a parameter in both the –driver-class-path and –jars. You may not need to do this in newer versions of Spark.

Read the csv file into a DataFrame

val dataRDD = sc.textFile("/home/hadoop/test.csv")
val parsedRDD = dataRDD.map{_.split(",")}
case class CustomerData(customerid: Int, name: String, city: String, state: String, zip: String)
val dataDF = parsedRDD.map{ a => CustomerData (a(0).toInt, a(1).toString, a(2).toString,a(3).toString,a(4).toString)}.toDF

Register the data frame as a temp table so that we can run SQL queries against it. In Spark 2.x, use createOrReplaceTempView.

dataDF.registerTempTable("dataDF")

Let's set up the connection properties.

val jdbcUsername = "myuser"
val jdbcPassword = "mypass"
val jdbcHostname = "10.0.1.112"
val jdbcPort = 3306
val jdbcDatabase ="salesdb"
val jdbcrewriteBatchedStatements = "true"
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}?user=${jdbcUsername}&password=${jdbcPassword}&rewriteBatchedStatements=${jdbcrewriteBatchedStatements}"
val connectionProperties = new java.util.Properties()

This will allow us to specify the correct save mode - Append, Overwrite, etc.

import org.apache.spark.sql.SaveMode

Insert the data returned by the SELECT statement to the customer table stored in the MySQL salesdb database.

sqlContext.sql("select * from dataDF").write.mode(SaveMode.Append).jdbc(jdbcUrl, "customers", connectionProperties)

Let’s read a table using JDBC. Let’s populate the users table in MySQL with some test data. Make sure the users table exists in the salesdb database.

mysql -u root -pmypassword
use salesdb;
describe users;
+--------+--------------+------+-----+---------+-------+
| Field  | Type         | Null | Key | Default | Extra |
+--------+--------------+------+-----+---------+-------+
| userid | bigint(20)   | YES  |     | NULL    |       |
| name   | varchar(100) | YES  |     | NULL    |       |
| city   | varchar(100) | YES  |     | NULL    |       |
| state  | char(3)      | YES  |     | NULL    |       |
| zip    | char(5)      | YES  |     | NULL    |       |
| age    | tinyint(4)   | YES  |     | NULL    |       |
+--------+--------------+------+-----+---------+-------+
select * from users;
Empty set (0.00 sec)
insert into users values (300,'Fred Stevens','Torrance','CA',90503,23);
insert into users values (301,'Nancy Gibbs','Valencia','CA',91354,49);
insert into users values (302,'Randy Park','Manhattan Beach','CA',90267,21);
insert into users values (303,'Victoria Loma','Rolling Hills','CA',90274,75);
 select * from users;
+--------+---------------+-----------------+-------+-------+------+
| userid | name          | city            | state | zip   | age  |
+--------+---------------+-----------------+-------+-------+------+
|    300 | Fred Stevens  | Torrance        | CA    | 90503 |   23 |
|    301 | Nancy Gibbs   | Valencia        | CA    | 91354 |   49 |
|    302 | Randy Park    | Manhattan Beach | CA    | 90267 |   21 |
|    303 | Victoria Loma | Rolling Hills   | CA    | 90274 |   75 |
+--------+---------------+-----------------+-------+-------+------+
spark-shell --driver-class-path mysql-connector-java-5.1.40-bin.jar --jars mysql-connector-java-5.1.40-bin.jar

Let’s set up the jdbc url and connection properties.

val jdbcURL = s"jdbc:mysql://10.0.1.101:3306/salesdb?user=myuser&password=mypass"
val connectionProperties = new java.util.Properties()

We can create a DataFrame from an entire table.

val mysqlDF = sqlContext.read.jdbc(jdbcURL, "users", connectionProperties)
mysqlDF.show
+------+-------------+---------------+-----+-----+---+
|userid|         name|           city|state|  zip|age|
+------+-------------+---------------+-----+-----+---+
|   300| Fred Stevens|       Torrance|   CA|90503| 23|
|   301|  Nancy Gibbs|       Valencia|   CA|91354| 49|
|   302|   Randy Park|Manhattan Beach|   CA|90267| 21|
|   303|Victoria Loma|  Rolling Hills|   CA|90274| 75|
+------+-------------+---------------+-----+-----+---+

Parquet

Reading and writing to Parquet is straightforward.

val employeeDF = spark.read.load("/sparkdata/employees.parquet")
employeeDF.select("id","firstname","lastname","salary").write.format("parquet").save("/sparkdata/myData.parquet")

You can run SELECT statements on Parquet files directly.

val myDF = spark.sql("SELECT * FROM parquet.`/sparkdata/myData.parquet`")

HBase

There are different ways to access HBase from Spark. As discussed earlier, Scala has access to all Java libraries including the HBase client APIs. This is not the preferred way to access HBase from Spark, but some developers might find them handy. Another way to access HBase is via Impala, which I discuss in Chapter 6.

Note

Spark on HBase is the preferred way of accessing HBase from Spark. However, it only works on Spark 1.x at the time of this writing. Spark on HBase is not supported on Spark 2.x.

You can use SaveAsHadoopDataset to write data to HBase.

Start the HBase shell. Create an HBase table and populate it with test data.

hbase shell
create 'users', 'cf1'

Start the spark-shell.

spark-shell

Import all necessary packages.

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
val hconf = HBaseConfiguration.create()
val jobConf = new JobConf(hconf, this.getClass)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"users")
val num = sc.parallelize(List(1,2,3,4,5,6))
val theRDD = num.filter.map(x=>{
        val rowkey = "row" + x
        val put = new Put(Bytes.toBytes(rowkey))
        put.add(Bytes.toBytes("cf1"), Bytes.toBytes("fname"), Bytes.toBytes("my fname" + x))
    (new ImmutableBytesWritable, put)
})
theRDD.saveAsHadoopDataset(jobConf)

You can also use the HBase client API from Spark to read and write data to HBase.

Start the HBase shell. Create another HBase table and populate it with test data.

hbase shell
create 'employees', 'cf1'
put 'employees','400','cf1:name', 'Patrick Montalban'
put 'employees','400','cf1:city', 'Los Angeles'
put 'employees','400','cf1:state', 'CA'
put 'employees','400','cf1:zip', '90010'
put 'employees','400','cf1:age', '71'
put 'employees','401','cf1:name', 'Jillian Collins'
put 'employees','401','cf1:city', 'Santa Monica'
put 'employees','401','cf1:state', 'CA'
put 'employees','401','cf1:zip', '90402'
put 'employees','401','cf1:age', '45'
put 'employees','402','cf1:name', 'Robert Sarkisian'
put 'employees','402','cf1:city', 'Glendale'
put 'employees','402','cf1:state', 'CA'
put 'employees','402','cf1:zip', '91204'
put 'employees','402','cf1:age', '29'
put 'employees','403','cf1:name', 'Warren Porcaro'
put 'employees','403','cf1:city', 'Burbank'
put 'employees','403','cf1:state', 'CA'
put 'employees','403','cf1:zip', '91523'
put 'employees','403','cf1:age', '62'

Let’s verify if the data was successfully inserted into our HBase table.

scan 'employees'
ROW                    COLUMN+CELL
 400                   column=cf1:age, timestamp=1493105325812, value=71
 400                   column=cf1:city, timestamp=1493105325691, value=Los Angeles
 400                   column=cf1:name, timestamp=1493105325644, value=Patrick Montalban
 400                   column=cf1:state, timestamp=1493105325738, value=CA
 400                   column=cf1:zip, timestamp=1493105325789, value=90010
 401                   column=cf1:age, timestamp=1493105334417, value=45
 401                   column=cf1:city, timestamp=1493105333126, value=Santa Monica
 401                   column=cf1:name, timestamp=1493105333050, value=Jillian Collins
 401                   column=cf1:state, timestamp=1493105333145, value=CA
 401                   column=cf1:zip, timestamp=1493105333165, value=90402
 402                   column=cf1:age, timestamp=1493105346254, value=29
 402                   column=cf1:city, timestamp=1493105345053, value=Glendale
 402                   column=cf1:name, timestamp=1493105344979, value=Robert Sarkisian
 402                   column=cf1:state, timestamp=1493105345074, value=CA
 402                   column=cf1:zip, timestamp=1493105345093, value=91204
 403                   column=cf1:age, timestamp=1493105353650, value=62
 403                   column=cf1:city, timestamp=1493105352467, value=Burbank
 403                   column=cf1:name, timestamp=1493105352445, value=Warren Porcaro
 403                   column=cf1:state, timestamp=1493105352513, value=CA
 403                   column=cf1:zip, timestamp=1493105352549, value=91523

Start the spark-shell.

spark-shell

Import all necessary packages.

val configuration = HBaseConfiguration.create()

Specify the HBase table and rowkey.

val table = new HTable(configuration, "employees");
val g = new Get(Bytes.toBytes("401"))
val result = table.get(g);

Extract the values from the table.

val val2 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name"));
val val3 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("city"));
val val4 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("state"));
val val5 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("zip"));
val val6 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("age"));

Convert the values to the appropriate data types.

val id = Bytes.toString(result.getRow())
val name = Bytes.toString(val2);
val city = Bytes.toString(val3);
val state = Bytes.toString(val4);
val zip = Bytes.toString(val5);
val age = Bytes.toShort(val6);

Print the values.

println("employee id: " + id + "name: " + name + "city: " + city + "state: " + state + "zip: " + zip + "age: " + age);
employee id: 401 name: Jillian Collins city: Santa Monica state: CA zip: 90402 age: 13365

Let’s write to HBase using the HBase API.

val configuration = HBaseConfiguration.create()
val table = new HTable(configuration, "employees");

Specify a new rowkey.

val p = new Put(new String("404").getBytes());

Populate the cells with the new values.

p.add("cf1".getBytes(), "name".getBytes(), new String("Denise Shulman").getBytes());
p.add("cf1".getBytes(), "city".getBytes(), new String("La Jolla").getBytes());
p.add("cf1".getBytes(), "state".getBytes(), new String("CA").getBytes());
p.add("cf1".getBytes(), "zip".getBytes(), new String("92093").getBytes());
p.add("cf1".getBytes(), "age".getBytes(), new String("56").getBytes());

Write to the HBase table.

table.put(p);
table.close();

Confirm that the values were successfully inserted into the HBase table.

Start the HBase shell.

hbase shell
scan 'employees'
ROW                COLUMN+CELL
 400               column=cf1:age, timestamp=1493105325812, value=71
 400               column=cf1:city, timestamp=1493105325691, value=Los Angeles
 400               column=cf1:name, timestamp=1493105325644, value=Patrick Montalban
 400               column=cf1:state, timestamp=1493105325738, value=CA
 400               column=cf1:zip, timestamp=1493105325789, value=90010
 401               column=cf1:age, timestamp=1493105334417, value=45
 401               column=cf1:city, timestamp=1493105333126, value=Santa Monica
 401               column=cf1:name, timestamp=1493105333050, value=Jillian Collins
 401               column=cf1:state, timestamp=1493105333145, value=CA
 401               column=cf1:zip, timestamp=1493105333165, value=90402
 402               column=cf1:age, timestamp=1493105346254, value=29
 402               column=cf1:city, timestamp=1493105345053, value=Glendale
 402               column=cf1:name, timestamp=1493105344979, value=Robert Sarkisian
 402               column=cf1:state, timestamp=1493105345074, value=CA
 402               column=cf1:zip, timestamp=1493105345093, value=91204
 403               column=cf1:age, timestamp=1493105353650, value=62
 403               column=cf1:city, timestamp=1493105352467, value=Burbank
 403               column=cf1:name, timestamp=1493105352445, value=Warren Porcaro
 403               column=cf1:state, timestamp=1493105352513, value=CA
 403               column=cf1:zip, timestamp=1493105352549, value=91523
 404               column=cf1:age, timestamp=1493123890714, value=56
 404               column=cf1:city, timestamp=1493123890714, value=La Jolla
 404               column=cf1:name, timestamp=1493123890714, value=Denise Shulman
 404               column=cf1:state, timestamp=1493123890714, value=CA
 404               column=cf1:zip, timestamp=1493123890714, value=92093

Amazon S3

Amazon S3 is a popular object store frequently used as data store for transient clusters. It’s also a cost-effective storage for backups and cold data. Reading data from S3 is just like reading data from HDFS or any other file system.

Read a CSV file from Amazon S3. Make sure you’ve configured your S3 credentials.

val myCSV = sc.textFile("s3a://mydata/customers.csv")

Map CSV data to an RDD.

import org.apache.spark.sql.Row
val myRDD = myCSV.map(_.split(',')).map(e ⇒ Row(r(0).trim.toInt, r(1), r(2).trim.toInt, r(3)))

Create a schema.

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
val mySchema = StructType(Array(
StructField("customerid",IntegerType,false),
StructField("customername",StringType,false),
StructField("age",IntegerType,false),
StructField("city",StringType,false)))
val myDF = sqlContext.createDataFrame(myRDD, mySchema)

Solr

Solr is a popular search platform that provides full-text search and real-time indexing capabilities. You can interact with Solr from Spark using SolrJ. ix

import java.net.MalformedURLException;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocumentList;
val solr = new HttpSolrServer("http://master02:8983/solr/mycollection");
val query = new SolrQuery();
query.setQuery("*:*");
query.addFilterQuery("userid:3");
query.setFields("userid","name","age","city");
query.setStart(0);    
query.set("defType", "edismax");
val response = solr.query(query);
val results = response.getResults();
println(results);

A much better way to access Solr collections from Spark is by using the spark-solr package. Lucidworks started the spark-solr project to provide Spark-Solr integration. x Using spark-solr is so much easier and powerful compared to SolrJ, allowing you to create DataFrames from Solr collections and using SparkSQL to interact with them.

Start by importing the jar file from spark-shell. You can download the jar file from Lucidworks’s website.

spark-shell --jars spark-solr-3.0.1-shaded.jar

Specify the collection and connection information.

val myOptions = Map( "collection" -> "mycollection","zkhost" -> "{ master02:8983/solr}")

Create a DataFrame.

val solrDF = spark.read.format("solr")
  .options(myOptions)
  .load

Microsoft Excel

I’ve encountered several requests on how to access Excel worksheets from Spark. While this is not something that I would normally do, working with Excel is a reality in almost every corporate IT environment.

A company called Crealytics developed a Spark plug-in for interacting with Excel. The library requires Spark 2.x. The package can be added using the --packages command-line option. xi

spark-shell --packages com.crealytics:spark-excel_2.11:0.9.12

Create a DataFrame from an Excel worksheet.

val ExcelDF = spark.read
    .format("com.crealytics.spark.excel")
    .option("sheetName", "sheet1")
    .option("useHeader", "true")
    .option("inferSchema", "true")
    .option("treatEmptyValuesAsNulls", "true")
    .load("budget.xlsx")

Write a DataFrame to an Excel worksheet.

ExcelDF2.write
  .format("com.crealytics.spark.excel")
  .option("sheetName", "sheet1")
  .option("useHeader", "true")
  .mode("overwrite")
  .save("budget2.xlsx")

You can find more details from their github page: github.com/crealytics.

Secure FTP

Downloading files from SFTP and writing DataFrames to an SFTP server is also a popular request. SpringML provides a Spark SFTP connector library. The library requires Spark 2.x and utilizes jsch, a Java implementation of SSH2. Reading from and writing to SFTP servers will be executed as a single process.

The package can be added using the --packages command-line option. xii

spark-shell --packages com.springml:spark-sftp_2.11:1.1.1

Create a DataFrame from the file in SFTP server.

val SftpDF = spark.read.
            format("com.springml.spark.sftp").
            option("host", "sftpserver.com").
            option("username", "myusername").
            option("password", "mypassword").
            option("inferSchema", "true").
            option("fileType", "csv").
            option("delimiter", ",").
            load("/myftp/myfile.csv")

Write DataFrame as CSV file to FTP server.

SftpDF2.write.
      format("com.springml.spark.sftp").
      option("host", "sftpserver.com").
      option("username", "myusername").
      option("password", "mypassword").
      option("fileType", "csv").
      option("delimiter", ",").
      save("/myftp/myfile.csv")

You can find more details from their github page: github.com/springml/spark-sftp.

Spark MLlib (DataFrame-Based API)

Machine Learning is one of Spark’s main applications. The DataFrame-based API (Spark ML or Spark ML Pipelines) is now the primary API for Spark. The RDD-based API (Spark MLlib) is entering maintenance mode. We won’t cover the old RDD-based API. Previously, the DataFrame-based API was informally referred to as Spark ML and Spark ML Pipelines (spark.ml package) to differentiate it from the RDD-based API, which was named based on the original spark.mllib package. The RDD-based API will be deprecated in Spark 2.3 once the DataFrames-based API reaches feature parity. xiii The RDD-based API will be removed in Spark 3.0. For now, Spark MLlib includes both APIs.

The DataFrames based API is faster and easier to use than the RDD-based API, allowing users to use SQL and take advantage of Catalyst and Tungsten optimizations. The DataFrames-based API makes it easy to transform features by providing a higher-level abstraction for representing tabular data similar to a relational database table, making it a natural choice for implementing pipelines.

A majority of the operations performed in a typical machine learning pipeline is feature transformation. As shown in Figure 5-3, DataFrames makes it easy to perform feature transformation. xiv The tokenizer breaks the text into a bag of words, appending the words into the output second DataFrame. TF-IDF takes the second DataFrame as input, converts the bag of words into a feature vector, and adds them to the third DataFrame.
../images/456459_1_En_5_Chapter/456459_1_En_5_Fig3_HTML.jpg
Figure 5-3

Example Feature Transformation in a typical Spark ML pipeline

Pipeline

A pipeline is just a sequence of connected stages to create a machine learning workflow. A stage can be either a Transformer or Estimator.

Transformer

A transformer takes a DataFrame as input and outputs a new DataFrame with additional columns appended to the new DataFrame. The new DataFrame includes the columns from the input DataFrame and the additional columns.

Estimator

An estimation is a machine learning algorithm that fits a model on training data. An estimator Estimators accept training data and produces a machine learning model.

ParamGridBuilder

A ParamGridBuilder is used to build a parameter grid. The CrossValidator performs a grid search and trains models with a combination of user-specified hyperparameters in the parameter grid.

CrossValidator

A CrossValidator cross evaluates fitted machine learning models and outputs the best one by trying to fit the underlying estimator with user-specified combinations of hyperparameters.

Evaluator

An evaluator calculates the performance of your machine learning models. An evaluator outputs a metric such as precision or recall to measure how well a fitted model performs.

Example

Let’s work on an example. We'll use the Heart Disease Data Set xv from the UCI Machine Learning Repository to predict the presence of heart disease. The data was collected by Robert Detrano, MD, PhD, from the VA Medical Center, Long Beach and Cleveland Clinic Foundation. Historically, the Cleveland data set has been the subject of numerous research so we’ll use that data set. The original data set has 76 attributes, but only 14 of them are traditionally used by ML researchers (Table 5-2). We will simply perform binomial classification and determine if the patient has heart disease or not.
Table 5-2

Cleveland Heart Disease Data Set Attribute Information

Attribute

Description

age

Age

sex

Sex

cp

Chest pain type

trestbps

Resting blood pressure

chol

Serum cholesterol in mg/dl

fbs

Fasting blood sugar > 120 mg/dl

restecg

Resting electrocardiographic results

thalach

Maximum heart rate achieved

exang

Exercise induced angina

oldpeak

ST depression induced by exercise relative to rest

slope

The slope of the peak exercise ST segment

ca

Number of major vessels (0–3) colored by flourosopy

thal

Thalium stress test result

num

The predicted attribute – diagnosis of heart disease

Let’s start. Add the column names to the CSV file before starting. We’ll need to download the file and copy it to HDFS.

wget http://archive.ics.uci.edu/ml/machine-learning-databases/heart-disease/cleveland.data
head -n 10 processed.cleveland.data
63.0,1.0,1.0,145.0,233.0,1.0,2.0,150.0,0.0,2.3,3.0,0.0,6.0,0
67.0,1.0,4.0,160.0,286.0,0.0,2.0,108.0,1.0,1.5,2.0,3.0,3.0,2
67.0,1.0,4.0,120.0,229.0,0.0,2.0,129.0,1.0,2.6,2.0,2.0,7.0,1
37.0,1.0,3.0,130.0,250.0,0.0,0.0,187.0,0.0,3.5,3.0,0.0,3.0,0
41.0,0.0,2.0,130.0,204.0,0.0,2.0,172.0,0.0,1.4,1.0,0.0,3.0,0
56.0,1.0,2.0,120.0,236.0,0.0,0.0,178.0,0.0,0.8,1.0,0.0,3.0,0
62.0,0.0,4.0,140.0,268.0,0.0,2.0,160.0,0.0,3.6,3.0,2.0,3.0,3
57.0,0.0,4.0,120.0,354.0,0.0,0.0,163.0,1.0,0.6,1.0,0.0,3.0,0
63.0,1.0,4.0,130.0,254.0,0.0,2.0,147.0,0.0,1.4,2.0,1.0,7.0,2
53.0,1.0,4.0,140.0,203.0,1.0,2.0,155.0,1.0,3.1,3.0,0.0,7.0,1
hadoop fs -put processed.cleveland.data /tmp/data

Then use the spark-shell to interactively create our model using Spark MLlib as shown in Listing 5-2.

spark-shell --packages com.databricks:spark-csv_2.10:1.5.0
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.param.ParamMap
import org.apache.kudu.spark.kudu._
val dataDF = sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/tmp/data/processed.cleveland.data")
dataDF.printSchema
root
 |-- id: string (nullable = false)
 |-- age: float (nullable = true)
 |-- sex: float (nullable = true)
 |-- cp: float (nullable = true)
 |-- trestbps: float (nullable = true)
 |-- chol: float (nullable = true)
 |-- fbs: float (nullable = true)
 |-- restecg: float (nullable = true)
 |-- thalach: float (nullable = true)
 |-- exang: float (nullable = true)
 |-- oldpeak: float (nullable = true)
 |-- slope: float (nullable = true)
 |-- ca: float (nullable = true)
 |-- thal: float (nullable = true)
 |-- num: float (nullable = true)
val myFeatures = Array("age", "sex", "cp", "trestbps", "chol", "fbs",
      "restecg", "thalach", "exang", "oldpeak", "slope",
      "ca", "thal", "num")
val myAssembler = new VectorAssembler().setInputCols(myFeatures).setOutputCol("features")
val dataDF2 = myAssembler.transform(dataDF)
val myLabelIndexer = new StringIndexer().setInputCol("num").setOutputCol("label")
val dataDF3 = mylabelIndexer.fit(dataDF2).transform(dataDF2)
val dataDF4 = dataDF3.where(dataDF3("ca").isNotNull).where(dataDF3("thal").isNotNull).where(dataDF3("num").isNotNull)
val Array(trainingData, testData) = dataDF4.randomSplit(Array(0.8, 0.2), 101)
val myRFclassifier = new RandomForestClassifier().setFeatureSubsetStrategy("auto").setSeed(101)
val myEvaluator = new BinaryClassificationEvaluator().setLabelCol("label")
val myParamGrid = new ParamGridBuilder()
      .addGrid(myRFclassifier.maxBins, Array(10, 20, 30))
      .addGrid(myRFclassifier.maxDepth, Array(5, 10, 15))
      .addGrid(myRFclassifier.numTrees, Array(20, 30, 40))
      .addGrid(myRGclassifier.impurity, Array("gini", "entropy"))
      .build()
val myPipeline = new Pipeline().setStages(Array(myRFclassifier))
val myCV = new CrosValidator()
      .setEstimator(myPipeline)
      .setEvaluator(myEvaluator)
      .setEstimatorParamMaps(myParamGrid)
      .setNumFolds(3)
Listing 5-2

Performing binary classification using Random Forest

We can now fit the model

val myCrossValidatorModel = myCV.fit(trainingData)

Let’s evaluate the model.

val myEvaluatorParamMap = ParamMap(myEvaluator.metricName -> "areaUnderROC")
val aucTrainingData = myEvaluator.evaluate(CrossValidatorPrediction, myEvaluatorParamMap)

You can now make some predictions on our data.

val myCrossValidatorPrediction = myCrossValidatorModel.transform(testData)

Spark MLlib provides features for building pipelines, featurization, and popular machine learning algorithms for regression, classification, clustering, and collaborative filtering. Advanced Analytics with Spark, 2nd edition, by Sandy Ryza, Uri Laserson, Sean Owen, and Josh Wills (O’Reilly, 2017) provides a more in-depth treatment of machine learning with Spark. We’ll use Kudu as a feature store in Chapter 6.

GraphX

Spark includes a graph processing framework called GraphX . There is a separate package called GraphFrames that is based on DataFrames. GraphFrames is currently not part of core Apache Spark. GraphX and GraphFrames are still considered immature and are not supported by Cloudera Enterprise at the time of this writing. xvi I won’t cover them in this book, but feel free to visit Spark’s online documentation for more details.

Spark Streaming

I cover Spark Streaming in Chapter 6. Spark 2.0 includes a new stream processing framework called Structured Streaming, a high-level streaming API built on top of Spark SQL. Structured Streaming is not supported by Cloudera at the time of this writing. xvii

Hive on Spark

Cloudera supports Hive on Spark for faster batch processing. Early benchmarks show an average of 3x faster performance than Hive on MapReduce. xviii Note that Hive on Spark is still mainly for batch processing and does not replace Impala for low-latency SQL queries. Hive on Spark is useful for organizations who want to take advantage of Spark’s performance without having to learn Scala or Python. Some may find it non-trivial to refactor data processing pipelines due to the amount and complexity of HiveQL queries. Hive for Spark is ideal for those scenarios.

Spark 1.x vs Spark 2.x

Although plenty of code base out there still runs on Spark 1.x, most of your development should now be on Spark 2.x. Most of the Spark 2.x API is similar to 1.x, but there are some changes in 2.x that break API compatibility. Spark 2 is not compatible with Scala 2.10; only Scala 2.11 is supported. JDK 8 is also a requirement for Spark 2.2. Refer to Spark’s online documentation for more details.

Monitoring and Configuration

There are several tools that you can use to monitor and configure Apache Spark. Cloudera Manager is the de facto administration tool for Cloudera Enterprise. Spark also includes system administration and monitoring capabilities.

Cloudera Manager

Cloudera Manager is the cluster administration tool that comes with Cloudera Enterprise. You can use Cloudera Manager to perform all sorts of administration tasks such as updating a configuration (Figure 5-4) and monitoring its performance (Figure 5-5).
../images/456459_1_En_5_Chapter/456459_1_En_5_Fig4_HTML.jpg
Figure 5-4

Using Cloudera Manager to configure Spark

../images/456459_1_En_5_Chapter/456459_1_En_5_Fig5_HTML.jpg
Figure 5-5

Monitoring Spark Jobs

Spark Web UI

Spark provides a couple of ways to monitor Spark applications. For Spark applications that are currently running, you can access its performance information on port 4040. If there are multiple jobs running on the same node, you can access the web UI on port 4041, 4042, and so on.

The Spark History Server provides detailed information about Spark applications that have already completed execution. The Spark History Server can be accessed on port 18088 (Figure 5-6). Detailed performance metrics (Figure 5-7 and Figure 5-8) and information about the Spark environment (Figure 5-9) can be viewed to aid in monitoring and troubleshooting.
../images/456459_1_En_5_Chapter/456459_1_En_5_Fig6_HTML.jpg
Figure 5-6

Spark History Server

../images/456459_1_En_5_Chapter/456459_1_En_5_Fig7_HTML.jpg
Figure 5-7

Detailed performance information about a particular Spark job

../images/456459_1_En_5_Chapter/456459_1_En_5_Fig8_HTML.jpg
Figure 5-8

Performance metrics on Spark executors

../images/456459_1_En_5_Chapter/456459_1_En_5_Fig9_HTML.jpg
Figure 5-9

Information about the current Spark environment

Summary

Apache Spark has superseded MapReduce as the de facto big data processing framework. Data engineers and scientists appreciate Spark’s simple and easy-to-use API, its ability to handle multiple workloads, and fast performance. Its focus on SparkSQL, Dataset, and DataFrame APIs are welcome improvements in making Spark more accessible and easier to use. Spark is the ideal data processing engine for Kudu. I discuss Spark and Kudu integration in Chapter 6.

References

  1. i.

    Apache Spark; “Spark Overview,” Apache Spark, 2018, https://spark.apache.org/docs/2.2.0/

     
  2. ii.

    “Apache Software Foundation; “The Apache Software Foundation Announces Apache Spark as a Top-Level Project,” Apache Software Foundation, 2018, https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces50

     
  3. iii.

    Apache Spark; “Apache Spark News,” Apache Spark, 2018, https://spark.apache.org/news/

     
  4. iv.

    Matei Zaharia; “I’m Matei Zaharia, creator of Spark and CTO at Databricks. AMA!,” Reddit, 2018, https://www.reddit.com/r/IAmA/comments/31bkue/im_matei_zaharia_creator_of_spark_and_cto_at/?st=j1svbrx9&sh=a8b9698e

     
  5. v.

    Databricks; “What is Apache Spark?,” Databricks, 2018, https://databricks.com/spark/about

     
  6. vi.

    Jules Damji; “How to use SparkSession in Apache Spark 2.0,” Databricks, 2018, https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html

     
  7. vii.

    Holden Karau, Rachel Warren; “High Performance Spark,” O'Reilly, June 2017, https://www.safaribooksonline.com/library/view/high-performance-spark/9781491943199/

     
  8. viii.

    Apache Spark; “JDBC To Other Databases,” Apache Spark, 2018, https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

     
  9. ix.

    Apache Lucene; “Using SolrJ,” Apache Lucene, 2018, https://lucene.apache.org/solr/guide/6_6/using-solrj.html

     
  10. x.

    Lucidworks; “Tools for reading data from Solr as a Spark RDD and indexing objects from Spark into Solr using SolrJ,” Lucidworks, 2018, https://github.com/lucidworks/spark-solr

     
  11. xi.

    Crealytics; “A Spark plugin for reading Excel files via Apache POI,” Crealytics, 2018, https://github.com/crealytics/spark-excel

     
  12. xii.

    SpringML; “Spark SFTP Connector Library,” SpringML, 2018, https://github.com/springml/spark-sftp

     
  13. xiii.

    Apache Spark; “Machine Learning Library (MLlib) Guide”, Apache Spark, 2018, https://spark.apache.org/docs/latest/ml-guide.html

     
  14. xiv.

    Xiangrui Meng, Joseph Bradley, Evan Sparks and Shivaram Venkataraman; “ML Pipelines: A New High-Level API for MLlib,” Databricks, 2018, https://databricks.com/blog/2015/01/07/ml-pipelines-a-new-high-level-api-for-mllib.html

     
  15. xv.

    David Aha; “Heart Disease Data Set,” UCI Machine Learning Repository, 1988, https://archive.ics.uci.edu/ml/datasets/heart%2BDisease

     
  16. xvi.
     
  17. xvii.
     
  18. xviii.

    Santosh Kumar; “Faster Batch Processing with Hive-on-Spark,” Cloudera, 2016, https://vision.cloudera.com/faster-batch-processing-with-hive-on-spark/

     
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.142.232