Spark DataFrame

For numerical and analytical tasks, Spark provides a convenient interface available through the pyspark.sql module (also called SparkSQL). The module includes a spark.sql.DataFrame class that can be used for efficient SQL-style queries similar to those of Pandas. Access to the SQL interface is provided through the SparkSession class:

    from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

SparkSession can then be used to create a DataFrame through the function createDataFrame. The function createDataFrame accepts either a RDD, a list, or a pandas.DataFrame.

In the following example, we will create a spark.sql.DataFrame by converting an RDD, rows, which contains a collection of Row instances. The Row instances represent an association between a set of column names and a set of values, just like a row in a pd.DataFrame. In this example, we have two columns--x and y--to which we will associate random numbers:

    # We will use the x_rdd and y_rdd defined previously.
rows = rdd_x.zip(rdd_y).map(lambda xy: Row(x=float(xy[0]), y=float(xy[1])))

rows.first() # Inspect the first element
# Result:
# Row(x=0.18432163061239137, y=0.632310101419016)

After obtaining our collection of Row instances, we can combine them in a DataFrame, as follows. We can also inspect the DataFrame content using the show method:

    df = spark.createDataFrame(rows)
df.show(5)
# Output:
# +-------------------+--------------------+
# | x| y|
# +-------------------+--------------------+
# |0.18432163061239137| 0.632310101419016|
# | 0.8159145525577987| -0.9578448778029829|
# |-0.6565050226033042| 0.4644773453129496|
# |-0.1566191476553318|-0.11542211978216432|
# | 0.7536730082381564| 0.26953055476074717|
# +-------------------+--------------------+
# only showing top 5 rows

spark.sql.DataFrame supports performing transformations on the distributed dataset using a convenient SQL syntax. For example, you can use the selectExpr method to calculate a value using a SQL expression. In the following code, we compute the hit test using the x and y columns and the pow SQL function:

    hits_df = df.selectExpr("pow(x, 2) + pow(y, 2) < 1 as hits")
hits_df.show(5)
# Output:
# +-----+
# | hits|
# +-----+
# | true|
# |false|
# | true|
# | true|
# | true|
# +-----+
# only showing top 5 rows

To demonstrate the expressivity of SQL, we can also calculate the estimation of pi using a single expression. The expression involves using SQL functions such as sum, pow, cast, and count:

    result = df.selectExpr('4 * sum(cast(pow(x, 2) + 
pow(y, 2) < 1 as int))/count(x) as pi
')
result.first()
# Result:
# Row(pi=3.13976)

Spark SQL follows the same syntax as Hive, a SQL engine for distributed datasets built on Hadoop. Refer to https://cwiki.apache.org/confluence/display/Hive/LanguageManual for a complete syntax reference.

DataFrames are a great way to leverage the power and optimization of Scala while using the Python interface. The main reason is that queries are interpreted symbolically by SparkSQL and the execution happens directly in Scala without having to pass intermediate results through Python. This greatly reduces the serialization overhead and takes advantage of the query optimizations performed by SparkSQL. Optimizations and query planning allows the use of SQL operators, such as GROUP BY, without incurring in performance penalties, such as the one we experienced using groupBy directly on an RDD.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.192.120