Spark SQL and DataFrames

Before Apache Spark, Apache Hive was the go-to technology whenever anyone wanted to run an SQL-like query on a large amount of data. Apache Hive essentially translated SQL queries into MapReduce-like, like logic, automatically making it very easy to perform many kinds of analytics on big data without actually learning to write complex code in Java and Scala.

With the advent of Apache Spark, there was a paradigm shift in how we can perform analysis on big data scale. Spark SQL provides an easy-to-use SQL-like layer on top of Apache Spark's distributed computation abilities. In fact, Spark SQL can be used as an online analytical processing database.

Spark SQL works by parsing the SQL-like statement into an Abstract Syntax Tree (AST), subsequently converting that plan to a logical plan and then optimizing the logical plan into a physical plan that can be executed. The final execution uses the underlying DataFrame API, making it very easy for anyone to use DataFrame APIs by simply using an SQL-like interface rather than learning all the internals. Since this book dives into technical details of various APIs, we will primarily cover the DataFrame APIs, showing Spark SQL API in some places to contrast the different ways of using the APIs.

Thus, DataFrame API is the underlying layer beneath Spark SQL. In this chapter, we will show you how to create DataFrames using various techniques, including SQL queries and performing operations on the DataFrames.

A DataFrame is an abstraction of the Resilient Distributed dataset (RDD), dealing with higher level functions optimized using catalyst optimizer and also highly performant via the Tungsten Initiative. You can think of a dataset as an efficient table of an RDD with heavily optimized binary representation of the data. The binary representation is achieved using encoders, which serializes the various objects into a binary structure for much better performance than RDD representation. Since DataFrames uses the RDD internally anyway, a DataFrame/dataset is also distributed exactly like an RDD, and thus is also a distributed dataset. Obviously, this also means datasets are immutable.

The following is an illustration of the binary representation of data:

datasets were added in Spark 1.6 and provide the benefits of strong typing on top of DataFrames. In fact, since Spark 2.0, the DataFrame is simply an alias of a dataset.

org.apache.spark.sql defines type DataFrame as a dataset[Row], which means that most of the APIs will work well with both datasets and DataFrames
type DataFrame = dataset[Row]

A DataFrame is conceptually similar to a table in a Relational Database. Hence, a DataFrame contains rows of data, with each row comprised of several columns.

One of the first things we need to keep in mind is that, just like RDDs, DataFrames are immutable. This property of DataFrames being immutable means every transformation or action creates a new DataFrame.

Let's start looking more into DataFrames and how they are different from RDDs. RDD's, as seen before, represent a low-level API of data manipulation in Apache Spark. The DataFrames were created on top of RDDs to abstract the low-level inner workings of RDDs and expose high-level APIs, which are easier to use and provide a lot of functionality out-of-the-box. DataFrame was created by following similar concepts found in the Python pandas package, R language, Julia language, and so on.

As we mentioned before, DataFrames translate the SQL code and domain specific language expressions into optimized execution plans to be run on top of Spark Core APIs in order for the SQL statements to perform a wide variety of operations. DataFrames support many different types of input data sources and many types of operations. These includes all types of SQL operations, such as joins, group by, aggregations, and window functions, as most of the databases. Spark SQL is also quite similar to the Hive query language, and since Spark provides a natural adapter to Apache Hive, users who have been working in Apache Hive can easily transfer their knowledge, applying it to Spark SQL, thus minimizing the transition time.

DataFrames essentially depend on the concept of a table, as seen previously. The table can be operated on very similar to how Apache Hive works. In fact, many of the operations on the tables in Apache Spark are similar to how Apache Hive handles tables and operates on those tables. Once you have a table that is the DataFrame, the DataFrame can be registered as a table and you can operate on the data using Spark SQL statements in lieu of DataFrame APIs.

DataFrames depend on the catalyst optimizer and the Tungsten performance improvements, so let's briefly examine how catalyst optimizer works. A catalyst optimizer creates a parsed logical plan from the input SQL and then analyzes the logical plan by looking at all the various attributes and columns used in the SQL statement. Once the analyzed logical plan is created, catalyst optimizer further tries to optimize the plan by combining several operations and also rearranging the logic to get better performance.

In order to understand the catalyst optimizer, think about it as a common sense logic Optimizer which can reorder operations such as filters and transformations, sometimes grouping several operations into one so as to minimize the amount of data that is shuffled across the worker nodes. For example, catalyst optimizer may decide to broadcast the smaller datasets when performing joint operations between different datasets. Use explain to look at the execution plan of any data frame. The catalyst optimizer also computes statistics of the DataFrame's columns and partitions, improving the speed of execution.

For example, if there are transformations and filters on the data partitions, then the order in which we filter data and apply transformations matters a lot to the overall performance of the operations. As a result of all the optimizations, the optimized logical plan is generated, which is then converted into a physical plan. Obviously, several physical plans are possibilities to execute the same SQL statement and generate the same result. The cost optimization logic determines and picks a good physical plan, based on cost optimizations and estimations.

Tungsten performance improvements are another key ingredient in the secret sauce behind the phenomenal performance improvements offered by Spark 2.x compared to the previous releases, such as Spark 1.6 and older. Tungsten implements a complete overhaul of memory management and other performance improvements. Most important memory management improvements use binary encoding of the objects and referencing them in both off-heap and on-heap memory. Thus, Tungsten allows the usage of office heap memory using the binary encoding mechanism to encode all the objects. Binary encoded objects take up much less memory. Project Tungsten also improves shuffle performance.

The data is typically loaded into DataFrames through the DataFrameReader, and data is saved from DataFrames through DataFrameWriter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.218.93.169