Spark is the next-generation big data processing framework for processing and analyzing large data sets. Spark features a unified processing framework that provides high-level APIs in Scala, Python, Java, and R and powerful libraries including Spark SQL for SQL support, MLlib for machine learning, Spark Streaming for real-time streaming, and GraphX for graph processing. i Spark was founded by Matei Zaharia at the University of California, Berkeley’s AMPLab and was later donated to the Apache Software Foundation, becoming a top-level project in February 24, 2014. ii The first version was released on May 30, 2014. iii
Entire books have been written about Spark. This chapter will give you a quick introduction to Spark, enough to give you the skills needed to perform common data processing tasks. My goal is to make you productive as quickly as possible. For a more thorough treatment, Learning Spark by Holden Karau, Andy Konwinski, Patrick Wendell and Matei Zaharia, (O‘Reilly, 2015) remains the best introduction to Spark. Advanced Analytics with Spark 2nd edition by Sandy Ryza, Uri Laserson, Sean Owen, and Josh Wills (O’Reilly, 2015) covers advanced Spark topics and is also highly recommended. I assume no previous knowledge of Spark. However, some knowledge of Scala is helpful. Learning Scala by Jason Swartz (O’Reilly, 2014) and Programming in Scala 2nd edition by Martin Odersky, Lex Spoon, and Bill Venners (Artima, 2011) are good books to help you learn Scala. For a primer on Hadoop and Hadoop components such as HDFS and YARN, visit the Apache Hadoop website. Integration with Kudu is discussed in Chapter 6.
Overview
Cluster Managers
Cluster managers manage and allocate cluster resources applications. Spark supports the stand-alone cluster manager that comes with Spark (Standalone Scheduler), YARN, and Mesos. There’s an experimental project to bring native support for Spark to utilize Kubernetes as a cluster manager. Check SPARK-18278 for more details.
Architecture
Each Spark application gets its own set of executors. Because tasks from different applications run in different JVMs, a Spark application cannot interfere with another Spark application. This also means that it’s difficult for Spark applications to share data without using a slow external data source such as HDFS or S3 [38]. Using an off-heap memory storage such as Tachyon (a.k.a Alluxio) can make data sharing faster and easier. I discuss Alluxio in Chapter 10.
Executing Spark Applications
Spark 1.x and Spark 2.x Commands
Spark 1.x | Spark 2.x |
---|---|
spark-submit | spark2-submit |
spark-shell | spark2-shell |
pyspark | pyspark2 |
Spark on YARN
YARN is the default cluster manager for most Hadoop-based platforms such as Cloudera and Hortonworks. There are two deploy modes that can be used to launch Spark applications in YARN.
Cluster Mode
In cluster mode, the driver program runs inside an application master managed by YARN. The client can exit without affecting the execution of the application. To launch applications or the spark-shell in cluster mode:
Client Mode
In client mode, the driver program runs in the client machine. The application master is only used for requesting resources from YARN. To launch applications or the spark-shell in client mode:
Introduction to the Spark-Shell
You typically use an interactive shell for ad hoc data analysis or exploration. It’s also a good tool to learn the Spark API. Spark’s interactive shell is available in Scala or Python. In our example below, we’ll create a list of cities and convert them all to uppercase.
Introduction to spark-shell
You will use the spark-shell throughout the chapter. A SparkSession named “spark” is automatically created when you start the spark2-shell as shown in Listing 5-1.
SparkSession
As you can see in Figure 5-2, SparkContext enables access to all Spark features and capabilities. The driver program uses the SparkContext to access other contexts such as StreamingContext, SQLContext, and HiveContext. Starting in Spark 2.0, SparkSession provides a single point of entry to interact with Spark. All features available through SparkContext such as SQLContext, HiveContext, and StreamingContext in Spark 1.x are now accessible via SparkSession. vi
In Spark 1.x you would write something like this.
In Spark 2.x, you don’t have to explicitly create SparkConf, SparkContext, or SQLContext since all of their functionalities are already included in SparkSession.
Accumulator
Accumulators are variables that are only “added” to. They are usually used to implement counters. In the example, I add up the elements of an array using an accumulator:
Broadcast Variables
Broadcast variables are read-only variable stored on each executor node’s memory. Spark uses high-speed broadcast algorithms to reduce network latency of copying broadcast variables. Instead of storing data in a slow storage engine such as HDFS or S3, using broadcast variables is a faster way to store a copy of a dataset on each node.
RDD
An RDD is a resilient immutable distributed collection of objects partitioned across one or more nodes in your cluster. RDD’s can be processed and operated in parallel by two types of operations: transformations and actions.
Note
The RDD was Spark’s primary programming interface in Spark 1.x. Dataset has replaced RDD as the main API starting in Spark 2.0. Users are highly recommended to switch from RDD to Dataset due to richer programming interface and better performance. We discuss Dataset and DataFrame later in the chapter.
Creating an RDD
Creating an RDD is straightforward. You can create an RDD from an existing Scala collection or from reading from an external file stored in HDFS or S3.
parallelize
Parallelize creates an RDD from a Scala collection.
textFile
Textfile creates an RDD from a text file stored in HDFS or S3.
Note that an RDD is immutable. Operations that need to perform any type of data transformation will need to create another RDD. RDD operations can be classified into two categories: transformation and action.
Transformations
A transformation is an operation that creates a new RDD. I describe some of the most common transformations. Refer to the online Spark documentation for a complete list.
Map
Map executes a function against each element in the RDD. It creates and returns a new RDD of the result. Map’s return type doesn’t necessarily have to be the same type of the original RDD.
Let’s show another example of map.
Flatmap
Flatmap executes a function against each element in the RDD and then flattens the results.
Filter
Returns RDD that only includes elements that match the condition specified.
Distinct
Returns only distinct values.
Combine results using union.
Display only distinct values.
ReduceByKey
Combine values with the same key. using the specified reduce function.
Keys
Return an RDD containing just the keys.
Values
Return an RDD containing just the values.
Inner Join
Returns an RDD of all elements from both RDDs based on the join predicate.
RightOuterJoin / LeftOuterJoin
Returns an RDD of all elements from the right RDD even if there are no matching rows on the left RDD. A Left Outer Join is equivalent to the Right Outer Join with the columns in a different order.
Union
Returns an RDD that contains the combination of two or more RDDs.
Subtract
Returns an RDD that contains only the elements that are in the first RDD.
Coalesce
Coalesce reduces the number of partitions in an RDD. You might want to use coalesce after performing a filter on a large RDD. While filtering reduces the amount of data consumed by the new RDD, it inherits the number of partitions of the original RDD. If the new RDD is significantly smaller than the original RDD, it may have hundreds or thousands of small partitions, which could cause performance issues.
Using coalesce is also useful when you want to reduce the number of files generated by Spark when writing to HDFS, preventing the dreaded “small file” problem. Each partition gets written as separate files to HDFS. Note that you might run into performance issues when using coalesce since you are effectively reducing the degree of parallelism while writing to HDFS. Try increasing the number of partitions if that happens. This is applicable to DataFrames as well, which I will discuss later. In the example below, we’re writing only one Parquet file to HDFS.
Repartition
Repartition can both decrease and increase the number of partitions in an RDD. You would generally use coalesce when reducing partitions since it’s more efficient than repartition. Increasing the number of partitions might be useful by increasing the degree of parallelism when writing to HDFS. This is applicable to DataFrames as well, which I will discuss later. In the example below, we’re writing 6 Parquet files to HDFS.
Note
Coalesce is generally faster than repartition. Repartition will perform a full shuffle, creating new partitions and equally distributing data across worker nodes. Coalesce minimizes data movement and avoids a full shuffle by using existing partitions.
Actions
An action is an RDD operation that returns a value to the driver program. I list some of the most common actions. Refer to the online Spark documentation for a complete list of RDD actions.
Collect
Returns the entire content of the RDD to the driver program. Not advisable for large data sets.
Take
Returns a subset of the RDD to the driver program.
Count
Returns the number of items in the RDD.
Foreach
Execute the provided function to each item of the RDD.
Lazy Evaluation
Spark supports lazy evaluation, which is critical for big data processing. All transformations in Spark are lazily evaluated. Spark does not execute transformations immediately. You can continue to define more transformations. When you finally want the final results, you execute an action, which causes the transformations to be executed.
Caching
Each transformation is re-executed each time you run an action by default. You can cache an RDD in memory using the cache or persist method to avoid re-executing the transformation multiple times. There are several persistence levels to choose from such as MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK, MEMORY_AND_DISK_SER. and DISK_ONLY. Consult Apache Spark’s online documentation for more details on caching. Off-heap caching with Alluxio (previously known as Tachyon) is discussed in Chapter 10.
Spark SQL, Dataset, and DataFrames API
While most of the excitement these days is focused on use cases that involve unstructured and semi-structured data (video analytics, image processing, and text mining to mention a few), a majority of the actual data analysis and processing is still done on structured, relational data. Most of the important business decisions by companies are still based on analysis done on relational data.
SparkSQL was developed to make it easier for Spark data engineers and data science to process and analyze structured data. Dataset is similar to an RDD in that it supports strong typing, but under the hood it has a much more efficient engine. Starting in Spark 2.0, the Dataset API is now the primary programming interface. The DataFrame is just a Dataset with named columns, very similar to relational table. Together, Spark SQL and DataFrames provide a powerful programming interface for processing and analyzing structured data. Here’s a quick example on how to use the DataFrames API. I’ll discuss the DataFrames API in more detail later.
Note
The DataFrame and Dataset APIs have been unified in Spark 2.0. The DataFrame is now just a type alias for a Dataset of Row, where a Row is a generic untyped object. In contrast, Dataset is a collection of strongly typed objects Dataset[T]. Scala supports strongly typed and untyped API, while in Java, Dataset[T] is the main abstraction. DataFrames is the main programming interface for R and Python due to its lack of support for compile-time type safety.
Spark Data Sources
Some of the most common tasks in data processing are reading from and writing to different data sources. I provide several examples in the next few pages. I cover Spark and Kudu integration in Chapter 6.
CSV
Spark provides you with different ways to read data from CSV files. You can read the data into an RDD first and then convert it to DataFrame.
You can also use the Databricks CSV package. This method reads data directly in a DataFrame.
Starting in Spark 2.0, the CSV connector is already built in so there’s no need to use Databrick’s third-party package.
XML
Databricks has a Spark xml package that makes it easy to reads XML data.
Create a DataFrame using Spark XML. In this example, we specify the row tag and the path in HDFS where the XML file is located.
Let’s also take a look at the data.
JSON
We’ll create a JSON file as sample data for this example. Make sure the file is in a folder in HDFS called /jsondata.
Create a DataFrame from the JSON file.
Check the data.
Relational Databases Using JDBC
We use MySQL in this example, but other relational databases such as Oracle, SQL Server, Teradata, and PostgreSQL, to mention a few, are also supported. As long as the relational database has a JDBC/ODBC driver, it should be accessible from Spark. Performance is dependent on your JDBC/ODBC driver’s support for batch operations. Please check your JDBC driver’s documentation for more details.
Note
In some versions of Spark --jars does not add the JAR in the driver’s class path. vii It is recommended that you include the JDBC driver in your --jars and the Spark classpath. viii
Start the spark-shell. Take note that I had to include the MySQL driver as a parameter in both the –driver-class-path and –jars. You may not need to do this in newer versions of Spark.
Read the csv file into a DataFrame
Register the data frame as a temp table so that we can run SQL queries against it. In Spark 2.x, use createOrReplaceTempView.
Let's set up the connection properties.
This will allow us to specify the correct save mode - Append, Overwrite, etc.
Insert the data returned by the SELECT statement to the customer table stored in the MySQL salesdb database.
Let’s read a table using JDBC. Let’s populate the users table in MySQL with some test data. Make sure the users table exists in the salesdb database.
Let’s set up the jdbc url and connection properties.
We can create a DataFrame from an entire table.
Parquet
Reading and writing to Parquet is straightforward.
You can run SELECT statements on Parquet files directly.
HBase
There are different ways to access HBase from Spark. As discussed earlier, Scala has access to all Java libraries including the HBase client APIs. This is not the preferred way to access HBase from Spark, but some developers might find them handy. Another way to access HBase is via Impala, which I discuss in Chapter 6.
Note
Spark on HBase is the preferred way of accessing HBase from Spark. However, it only works on Spark 1.x at the time of this writing. Spark on HBase is not supported on Spark 2.x.
You can use SaveAsHadoopDataset to write data to HBase.
Start the HBase shell. Create an HBase table and populate it with test data.
Start the spark-shell.
Import all necessary packages.
You can also use the HBase client API from Spark to read and write data to HBase.
Start the HBase shell. Create another HBase table and populate it with test data.
Let’s verify if the data was successfully inserted into our HBase table.
Start the spark-shell.
Import all necessary packages.
Specify the HBase table and rowkey.
Extract the values from the table.
Convert the values to the appropriate data types.
Print the values.
Let’s write to HBase using the HBase API.
Specify a new rowkey.
Populate the cells with the new values.
Write to the HBase table.
Confirm that the values were successfully inserted into the HBase table.
Start the HBase shell.
Amazon S3
Amazon S3 is a popular object store frequently used as data store for transient clusters. It’s also a cost-effective storage for backups and cold data. Reading data from S3 is just like reading data from HDFS or any other file system.
Read a CSV file from Amazon S3. Make sure you’ve configured your S3 credentials.
Map CSV data to an RDD.
Create a schema.
Solr
Solr is a popular search platform that provides full-text search and real-time indexing capabilities. You can interact with Solr from Spark using SolrJ. ix
A much better way to access Solr collections from Spark is by using the spark-solr package. Lucidworks started the spark-solr project to provide Spark-Solr integration. x Using spark-solr is so much easier and powerful compared to SolrJ, allowing you to create DataFrames from Solr collections and using SparkSQL to interact with them.
Start by importing the jar file from spark-shell. You can download the jar file from Lucidworks’s website.
Specify the collection and connection information.
Create a DataFrame.
Microsoft Excel
I’ve encountered several requests on how to access Excel worksheets from Spark. While this is not something that I would normally do, working with Excel is a reality in almost every corporate IT environment.
A company called Crealytics developed a Spark plug-in for interacting with Excel. The library requires Spark 2.x. The package can be added using the --packages command-line option. xi
Create a DataFrame from an Excel worksheet.
Write a DataFrame to an Excel worksheet.
You can find more details from their github page: github.com/crealytics.
Secure FTP
Downloading files from SFTP and writing DataFrames to an SFTP server is also a popular request. SpringML provides a Spark SFTP connector library. The library requires Spark 2.x and utilizes jsch, a Java implementation of SSH2. Reading from and writing to SFTP servers will be executed as a single process.
The package can be added using the --packages command-line option. xii
Create a DataFrame from the file in SFTP server.
Write DataFrame as CSV file to FTP server.
You can find more details from their github page: github.com/springml/spark-sftp.
Spark MLlib (DataFrame-Based API)
Machine Learning is one of Spark’s main applications. The DataFrame-based API (Spark ML or Spark ML Pipelines) is now the primary API for Spark. The RDD-based API (Spark MLlib) is entering maintenance mode. We won’t cover the old RDD-based API. Previously, the DataFrame-based API was informally referred to as Spark ML and Spark ML Pipelines (spark.ml package) to differentiate it from the RDD-based API, which was named based on the original spark.mllib package. The RDD-based API will be deprecated in Spark 2.3 once the DataFrames-based API reaches feature parity. xiii The RDD-based API will be removed in Spark 3.0. For now, Spark MLlib includes both APIs.
The DataFrames based API is faster and easier to use than the RDD-based API, allowing users to use SQL and take advantage of Catalyst and Tungsten optimizations. The DataFrames-based API makes it easy to transform features by providing a higher-level abstraction for representing tabular data similar to a relational database table, making it a natural choice for implementing pipelines.
Pipeline
A pipeline is just a sequence of connected stages to create a machine learning workflow. A stage can be either a Transformer or Estimator.
Transformer
A transformer takes a DataFrame as input and outputs a new DataFrame with additional columns appended to the new DataFrame. The new DataFrame includes the columns from the input DataFrame and the additional columns.
Estimator
An estimation is a machine learning algorithm that fits a model on training data. An estimator Estimators accept training data and produces a machine learning model.
ParamGridBuilder
A ParamGridBuilder is used to build a parameter grid. The CrossValidator performs a grid search and trains models with a combination of user-specified hyperparameters in the parameter grid.
CrossValidator
A CrossValidator cross evaluates fitted machine learning models and outputs the best one by trying to fit the underlying estimator with user-specified combinations of hyperparameters.
Evaluator
An evaluator calculates the performance of your machine learning models. An evaluator outputs a metric such as precision or recall to measure how well a fitted model performs.
Example
Cleveland Heart Disease Data Set Attribute Information
Attribute | Description |
---|---|
age | Age |
sex | Sex |
cp | Chest pain type |
trestbps | Resting blood pressure |
chol | Serum cholesterol in mg/dl |
fbs | Fasting blood sugar > 120 mg/dl |
restecg | Resting electrocardiographic results |
thalach | Maximum heart rate achieved |
exang | Exercise induced angina |
oldpeak | ST depression induced by exercise relative to rest |
slope | The slope of the peak exercise ST segment |
ca | Number of major vessels (0–3) colored by flourosopy |
thal | Thalium stress test result |
num | The predicted attribute – diagnosis of heart disease |
Let’s start. Add the column names to the CSV file before starting. We’ll need to download the file and copy it to HDFS.
Then use the spark-shell to interactively create our model using Spark MLlib as shown in Listing 5-2.
Performing binary classification using Random Forest
We can now fit the model
Let’s evaluate the model.
You can now make some predictions on our data.
Spark MLlib provides features for building pipelines, featurization, and popular machine learning algorithms for regression, classification, clustering, and collaborative filtering. Advanced Analytics with Spark, 2nd edition, by Sandy Ryza, Uri Laserson, Sean Owen, and Josh Wills (O’Reilly, 2017) provides a more in-depth treatment of machine learning with Spark. We’ll use Kudu as a feature store in Chapter 6.
GraphX
Spark includes a graph processing framework called GraphX . There is a separate package called GraphFrames that is based on DataFrames. GraphFrames is currently not part of core Apache Spark. GraphX and GraphFrames are still considered immature and are not supported by Cloudera Enterprise at the time of this writing. xvi I won’t cover them in this book, but feel free to visit Spark’s online documentation for more details.
Spark Streaming
I cover Spark Streaming in Chapter 6. Spark 2.0 includes a new stream processing framework called Structured Streaming, a high-level streaming API built on top of Spark SQL. Structured Streaming is not supported by Cloudera at the time of this writing. xvii
Hive on Spark
Cloudera supports Hive on Spark for faster batch processing. Early benchmarks show an average of 3x faster performance than Hive on MapReduce. xviii Note that Hive on Spark is still mainly for batch processing and does not replace Impala for low-latency SQL queries. Hive on Spark is useful for organizations who want to take advantage of Spark’s performance without having to learn Scala or Python. Some may find it non-trivial to refactor data processing pipelines due to the amount and complexity of HiveQL queries. Hive for Spark is ideal for those scenarios.
Spark 1.x vs Spark 2.x
Although plenty of code base out there still runs on Spark 1.x, most of your development should now be on Spark 2.x. Most of the Spark 2.x API is similar to 1.x, but there are some changes in 2.x that break API compatibility. Spark 2 is not compatible with Scala 2.10; only Scala 2.11 is supported. JDK 8 is also a requirement for Spark 2.2. Refer to Spark’s online documentation for more details.
Monitoring and Configuration
There are several tools that you can use to monitor and configure Apache Spark. Cloudera Manager is the de facto administration tool for Cloudera Enterprise. Spark also includes system administration and monitoring capabilities.
Cloudera Manager
Spark Web UI
Spark provides a couple of ways to monitor Spark applications. For Spark applications that are currently running, you can access its performance information on port 4040. If there are multiple jobs running on the same node, you can access the web UI on port 4041, 4042, and so on.
Summary
Apache Spark has superseded MapReduce as the de facto big data processing framework. Data engineers and scientists appreciate Spark’s simple and easy-to-use API, its ability to handle multiple workloads, and fast performance. Its focus on SparkSQL, Dataset, and DataFrame APIs are welcome improvements in making Spark more accessible and easier to use. Spark is the ideal data processing engine for Kudu. I discuss Spark and Kudu integration in Chapter 6.
References
- i.
Apache Spark; “Spark Overview,” Apache Spark, 2018, https://spark.apache.org/docs/2.2.0/
- ii.
“Apache Software Foundation; “The Apache Software Foundation Announces Apache Spark as a Top-Level Project,” Apache Software Foundation, 2018, https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces50
- iii.
Apache Spark; “Apache Spark News,” Apache Spark, 2018, https://spark.apache.org/news/
- iv.
Matei Zaharia; “I’m Matei Zaharia, creator of Spark and CTO at Databricks. AMA!,” Reddit, 2018, https://www.reddit.com/r/IAmA/comments/31bkue/im_matei_zaharia_creator_of_spark_and_cto_at/?st=j1svbrx9&sh=a8b9698e
- v.
Databricks; “What is Apache Spark?,” Databricks, 2018, https://databricks.com/spark/about
- vi.
Jules Damji; “How to use SparkSession in Apache Spark 2.0,” Databricks, 2018, https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
- vii.
Holden Karau, Rachel Warren; “High Performance Spark,” O'Reilly, June 2017, https://www.safaribooksonline.com/library/view/high-performance-spark/9781491943199/
- viii.
Apache Spark; “JDBC To Other Databases,” Apache Spark, 2018, https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
- ix.
Apache Lucene; “Using SolrJ,” Apache Lucene, 2018, https://lucene.apache.org/solr/guide/6_6/using-solrj.html
- x.
Lucidworks; “Tools for reading data from Solr as a Spark RDD and indexing objects from Spark into Solr using SolrJ,” Lucidworks, 2018, https://github.com/lucidworks/spark-solr
- xi.
Crealytics; “A Spark plugin for reading Excel files via Apache POI,” Crealytics, 2018, https://github.com/crealytics/spark-excel
- xii.
SpringML; “Spark SFTP Connector Library,” SpringML, 2018, https://github.com/springml/spark-sftp
- xiii.
Apache Spark; “Machine Learning Library (MLlib) Guide”, Apache Spark, 2018, https://spark.apache.org/docs/latest/ml-guide.html
- xiv.
Xiangrui Meng, Joseph Bradley, Evan Sparks and Shivaram Venkataraman; “ML Pipelines: A New High-Level API for MLlib,” Databricks, 2018, https://databricks.com/blog/2015/01/07/ml-pipelines-a-new-high-level-api-for-mllib.html
- xv.
David Aha; “Heart Disease Data Set,” UCI Machine Learning Repository, 1988, https://archive.ics.uci.edu/ml/datasets/heart%2BDisease
- xvi.
Cloudera; “GraphX is not Supported,” Cloudera, 2018, https://www.cloudera.com/documentation/spark2/latest/topics/spark2_known_issues.html#ki_graphx
- xvii.
Cloudera; “Structured Streaming is not Supported”, Cloudera, 2018, https://www.cloudera.com/documentation/spark2/latest/topics/spark2_known_issues.html#ki_structured_streaming
- xviii.
Santosh Kumar; “Faster Batch Processing with Hive-on-Spark,” Cloudera, 2016, https://vision.cloudera.com/faster-batch-processing-with-hive-on-spark/