©  Raju Kumar Mishra 2018
Raju Kumar MishraPySpark Recipeshttps://doi.org/10.1007/978-1-4842-3141-8_8

8. PySparkSQL

Raju Kumar Mishra
(1)
Bangalore, Karnataka, India
 
Most data that a data scientist deals with is either structured or semistructured. Nowadays, the amount of unstructured data is increasing rapidly. The PySparkSQL module is a higher-level abstraction over PySpark Core for processing structured and semistructured datasets. By using PySparkSQL, we can use SQL and HiveQL code too, which makes this module popular among database programmers and Apache Hive users. The APIs provided by PySparkSQL are optimized. PySparkSQL can read data from many file types such as CSV files, JSON files, and files from other databases.
The DataFrame abstraction is similar to a table in a relational database management system. The DataFrame consists of named columns and is a collection of Row objects. Row objects are defined in PySparkSQL. Users are familiar with the schema of tabular forms, so it becomes easy to operate on DataFrames.
In PySparkSQL 1.6, a new Dataset interface was included. This interface is a hybrid of the DataFrame and RDD, so it provides the benefits of both. The Dataset interface has not been implemented in Spark with Python.
The GraphFrames library is used to process graphs. It is similar to the GraphX library, which does not work for Python. For PySpark users, the GraphFrames library is most suitable for graph processing. It has been developed on top of the SparkSQL DataFrame. We can run our DataFrame queries by using GraphFrames, which makes it unique from GraphX.
You’ll find this chapter full of exciting topics, from PySparkSQL and DataFrames to and the graph analysis recipes.
This chapter covers the following recipes:
  • Recipe 8-1. Create a DataFrame
  • Recipe 8-2. Perform exploratory data analysis on DataFrames
  • Recipe 8-3. Perform aggregation operations on DataFrames
  • Recipe 8-4. Execute SQL and HiveQL queries on DataFrames
  • Recipe 8-5. Perform data joining on DataFrames
  • Recipe 8-6. Calculate breadth-first searches using GraphFrames
  • Recipe 8-7. Calculate page rank using GraphFrames
  • Recipe 8-8. Read data from Apache Hive

Recipe 8-1. Create a DataFrame

Problem

You want to create a DataFrame .

Solution

As you know, a DataFrame is collection of named columns. You might remember the filament data from Chapter 5. You want to do the following on the filament data:
  • Create a DataFrame
  • Know the schema of a DataFrame
  • Print the content of a DataFrame
  • Filter out the data for 100W bulbs
  • Select data from a DataFrame for bulbs of 100W with a life greater than 650

How It Works

Step 8-1-1. Creating a Nested List of Filament Data

First we have to create a nested list of filament data . You’re already familiar with this data from Chapter 5:
>>> filamentData = [['filamentA','100W',605],
...  ['filamentB','100W',683],
...  ['filamentB','100W',691],
...  ['filamentB','200W',561],
...  ['filamentA','200W',530],
...  ['filamentA','100W',619],
...  ['filamentB','100W',686],
...  ['filamentB','200W',600],
...  ['filamentB','100W',696],
...  ['filamentA','200W',579],
...  ['filamentA','200W',520],
...  ['filamentA','100W',622],
...  ['filamentA','100W',668],
...  ['filamentB','200W',569],
...  ['filamentB','200W',555],
...  ['filamentA','200W',541]]
After creating this filamentData nested list, we are going to create an RDD of it. To create the RDD, we’ll use our parallelize() function:
>>> filamentDataRDD = sc.parallelize(filamentData, 4)
>>> filamentDataRDD.take(4)
Here is the output:
[['filamentA', '100W', 605],
 ['filamentB', '100W', 683],
 ['filamentB', '100W', 691],
 ['filamentB', '200W', 561]]
The filamentDataRDD RDD has four partitions. We have created our RDD successfully. The next step is to create a schema for our DataFrame.

Step 8-1-2. Creating a Schema of a DataFrame

In our DataFrame, we have three columns . First, we are going to define the columns. We define the columns by using the StructField() function. PySparkSQL has its own data types, and all of these are defined in the submodule pyspark.sql.types. We have to import everything from pyspark.sql.types:
>>>from pyspark.sql.types import *
After importing the required submodule, we define our first column of the DataFrame:
>>> FilamentTypeColumn = StructField("FilamentType",StringType(),True)
Let’s look at the arguments of StructField() . The first argument is the column name. In this example, the column name is FilamentType. The second argument is the data type of the elements in the column. In this example, the data type of the first column is StringType(). We know that some elements of a column might be null. So the last argument, which has the value True, indicates that this column might have null values or missing data.
>>> BulbPowerColumn = StructField("BulbPower",StringType(),True)
>>> LifeInHoursColumn = StructField("LifeInHours",StringType(),True)
We have created a StructField of each column. Now we have to create a schema of full DataFrames by using the StructType object as follows:
>>> FilamentDataFrameSchema = StructType([FilamentTypeColumn, BulbPowerColumn, LifeInHoursColumn])
FilamentDataFrameSchema is the full schema of our DataFrame.
>>> FilamentDataFrameSchema
Here is the output:
StructType(
       List(StructField(FilamentType,StringType,true),
       StructField(BulbPower,StringType,true),
       StructField(LifeInHours,StringType,true))
      )
The schema of the three columns of our DataFrame can be observed via the FilamentDataFrameSchema variable, as shown in the preceding code.
We know that a DataFrame is an RDD of Row objects. Therefore, we have to transform our filamentDataRDD RDD to the RDD of Row objects. In the RDD of Row objects, every row is a Row object. In the next recipe step, we are going to transform our RDD to an RDD of Row objects.

Step 8-1-3. Creating an RDD of Row Objects

The RDD map() function is best for transforming any RDD from one structure to another. In order to transform our filamentRDD2 RDD to an RDD of Row objects. A DataFrame is nothing but an RDD of Row objects. Let’s create an RDD of rows. But in order to work with Row, we have to first import it. Row is in pyspark.sql. We can import Row as shown here:
>>> from pyspark.sql import Row
>>> filamentRDDofROWs = filamentDataRDD.map(lambda x :Row(str(x[0]), str(x[1]), str(x[2])))
>>> filamentRDDofROWs.take(4)
Here is the output:
[<Row(filamentA, 100W, 605)>,
 <Row(filamentB, 100W, 683)>,
 <Row(filamentB, 100W, 691)>,
 <Row(filamentB, 200W, 561)>]
You can see that we have created an RDD of rows, filamentRDDofROWs. We apply the take() function on our RDD and print four rows of elements out of that.

Step 8-1-4. Creating a DataFrame

We have created the schema and RDD of rows . Therefore, we can create our DataFrame. In order to create a DataFrame, we need the SQLContext object. Let’s create the SQLContext object in the following line of code:
>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
We have created our sqlContext object. As SparkContext, in our case means in PySpark console sc is entering point to PySpark I have mentioned that sc is an object of SparkContext. In a similar way, SQLContext is the entering point to PySparkSQL.
Using the createDataFrame() function , which has been defined on SQLContext, we’ll create our DataFrame, filamentDataFrameRaw. We provide two arguments to the createDataFrame() function: the first one is an RDD of Row objects filamentRDDofROWs; and the second argument is the schema for our DataFrame, FilamentDataFrameSchema.
>>> filamentDataFrameRaw = sqlContext.createDataFrame(filamentRDDofROWs, FilamentDataFrameSchema)
We have created our DataFrame from our filament data. We have given our DataFrame the reference filamentDataFrameRaw. Let’s print the records of our DataFrame on the console. Previously, we used the take() function to fetch data from the DataFrame. But now we are going to change the way we fetch data; we are going to use the show() function. The show() function prints data in a beautiful way. We can provide the number of rows as input to the show() function. In the following line, four records are being fetched:
>>> filamentDataFrameRaw.show(4)
Here is the output, showing only the top four rows:
+------------+---------+-----------+
|FilamentType|BulbPower|LifeInHours|
+------------+---------+-----------+
|   filamentA|     100W|        605|
|   filamentB|     100W|        683|
|   filamentB|     100W|        691|
|   filamentB|     200W|        561|
+------------+---------+-----------+

Step 8-1-5. Printing a Schema of a DataFrame

We have created our DataFrame. Let’s check its schema. We can fetch the schema of a DataFrame by using the printSchema() function defined on the DataFrame:
>>> filamentDataFrameRaw.printSchema()
Here is the output:
root
 |-- FilamentType: string (nullable = true)
 |-- BulbPower: string (nullable = true)
 |-- LifeInHours: string (nullable = true)
The printSchema() function’s output shows that the DataFrame has three columns. These columns indicate the column name, the data type of the columns, and whether a column is nullable. You are an observant reader if you have noticed that the data type of the LifeInHours column is string. It’s better to represent time in hours as either an integer data type or a floating-point type. Therefore, we have to change the data type of the third column.
We can typecast a column value from one data type to another by using the cast() function.

Step 8-1-6. Changing the Data Type of a Column

The withColumn() function returns a DataFrame by adding a new column to it. But if that column is already in the DataFrame, the withColumn() function will replace the existing column:
>>> filamentDataFrame = filamentDataFrameRaw.withColumn('LifeInHours',filamentDataFrameRaw.LifeInHours.cast(FloatType()))
Investigating schema will now return float as the data type for the LifeInHours column:
>>> filamentDataFrame.printSchema()
Here is the output:
root
 |-- FilamentType: string (nullable = true)
 |-- BulbPower: string (nullable = true)
 |-- LifeInHours: float (nullable = true)
The data type of the LifeInHours column has been transformed from a string type to a float type. Let’s investigate the DataFrame by fetching some rows from it. We can display rows by using the following function:
>>> filamentDataFrame.show(5)
Here is the output, showing only the top five rows:
+--------------+----------+------------+
|FilamentType  |BulbPower |LifeInHours |
+--------------+----------+------------+
|   filamentA  |   100W   |   605.0    |
|   filamentB  |   100W   |   683.0    |
|   filamentB  |   100W   |   691.0    |
|   filamentB  |   200W   |   561.0    |
|   filamentA  |   200W   |   530.0    |
+--------------+----------+------------+
We can observe that the data type of the LifeInHours column has been changed to floating-point.
The column names can be fetched by using the columns attribute of the DataFrame object:
>>> filamentDataFrame.columns
Here is the output:
['FilamentType', 'BulbPower', 'LifeInHours']

Step 8-1-7. Filtering Out Data Where BulbPower Is 100W

Filtering rows, based on particular logic, can be done by using the filter() function . This function takes a logical expression and returns a DataFrame of filtered data:
>>> filamentDataFrame100Watt = filamentDataFrame.filter(filamentDataFrame.BulbPower == '100W')
We need all the rows where BulbPower is equal to 100W. Therefore, we provide filamentDataFrame.BulbPower == '100W' as an argument to the filter() function. Let’s see what is inside the filamentDataFrame100Watt DataFrame:
>>> filamentDataFrame100Watt.show()
Here is the output:
+--------------+------------+---------------+
|FilamentType  |BulbPower |LifeInHours |
+----------------+-------------+---------------+
|   filamentA  |   100W   |   605.0    |
|   filamentB  |   100W   |   683.0    |
|   filamentB  |   100W   |   691.0    |
|   filamentA  |   100W   |   619.0    |
|   filamentB  |   100W   |   686.0    |
|   filamentB  |   100W   |   696.0    |
|   filamentA  |   100W   |   622.0    |
|   filamentA  |   100W   |   668.0    |
+----------------+-------------+---------------+
The filter() function has done its job accurately.

Step 8-1-8. Selecting Data from a DataFrame

A compound logical expression can also be used in the filter() function. In this step, we are going to use a compound logical expression with the & operator:
>>> filamentData100WGreater650 =filamentDataFrame.filter((filamentDataFrame.BPower == '100W')  & (filamentDataFrame.LifeInHours > 650.0))
>>> filamentData100WGreater650.show()
Here is the output:
+--------------+-----------+------------+
|FilamentType  |BulbPower  |LifeInHours |
+--------------+-----------+------------+
|   filamentB  |   100W    |   683.0    |
|   filamentB  |   100W    |   691.0    |
|   filamentB  |   100W    |   686.0    |
|   filamentB  |   100W    |   696.0    |
|   filamentA  |   100W    |   668.0    |
+--------------+-----------+------------+
Finally, we have met our requirement. In the next recipe, we are going to do exploratory analysis on a DataFrame.

Recipe 8-2. Perform Exploratory Data Analysis on a DataFrame

Problem

You want to perform exploratory data analysis on a DataFrame.

Solution

In exploratory data analysis, we explore the given data. Exploring the data means counting the number of records and then looking for meaningful patterns. For data of numeric columns, we calculate the measures of central tendency and the spread in the data. The spread in the data is nothing but the variability in the data. You might know that measures of central tendency are the mean, median, and mode. But how is variability, or data spread, measured? We can measure it by using either variance or standard deviation.
A given dataset might have categorical columns. For categorical columns, we count the frequency for each value of that variable. A count of records gives us an idea of the number of data points we have. We calculate the minimum and maximum data points from a given numerical column. Knowing the minimum and maximum shows us the range of data.
PySparkSQL has a summary() function defined on the DataFrame. This function will return the number of records (count), mean, standard deviation (stdev), minimum (min), and maximum (max) from a column of numerical values in the DataFrame.
You have a file filamentData.csv. This time we have to read data from the CSV file and create a DataFrame. After creating the DataFrame, we have to do a summary analysis on the DataFrame’s numerical columns. Apart from summary statistics on the numerical columns, we have to know the frequency of distinct values in each categorical field.
You want to perform the following on the DataFrame of filament data:
  • Read data from the CSV file filamentData.csv
  • Create a DataFrame
  • Calculate summary statistics on a numerical column
  • Count the frequency of distinct values in the FilamentType categorical column
  • Count the frequency of distinct values in the BulbPower categorical column

How It Works

First we have to read the given file and transform the data into a DataFrame. In the preceding recipe, we started from a nested list and performed several steps to create a DataFrame. And in Chapter 6 we found that it took numerous steps to get a nested list from a CSV file. It will be good if there is some.
The PySpark package can read the CSV file and transform data directly to a DataFrame. And we should be happy that we have a PySpark package to help us. The package name is com.databricks.spark.csv. This package was developed by Databricks. Before PySpark.2.x.x, the user had to use this package separately. But in PySpark version 2.x.x.x, the package is merged in PySpark, so you don’t need to include the JARs separately. Thanks to Databricks for this beautiful and very useful package.

Step 8-2-1. Defining the DataFrame Schema

Here we are going to define the schema of our DataFrame . In our schema, there are three columns. The first column is FilamentType, which has the data type of string. The second column is BulbPower, which also has the data type of string. The last column is LifeInHours, which has the data type of double.
We need different data types defined in PySparkSQL. We can find all the data types in the pyspark.sql.types submodule:
>>> from pyspark.sql.types import *
>>> FilamentTypeColumn = StructField("FilamentType",StringType(),True)
>>> BulbPowerColumn = StructField("BulbPower",StringType(),True)
>>> LifeInHoursColumn = StructField("LifeInHours",DoubleType(),True)
We have created three StructFields:
>>> FilamentDataFrameSchema = StructType([FilamentTypeColumn, BulbPowerColumn, LifeInHoursColumn])
Using these StructFields, we have created a schema for our DataFrame. The name of our DataFrame schema is FilamentDataFrameSchema.

Step 8-2-2. Reading a CSV File and Creating a DataFrame

Let’s create a DataFrame. We are going use a spark.read.csv function to read and convert the file data to a DataFrame:
>>> filamentDataFrame = spark.read.csv('file:///home/pysparkbook/pysparkBookData/filamentData.csv',header=True, schema = FilamentDataFrameSchema, mode="DROPMALFORMED")
Let’s discuss the arguments of the spark.read.csv function. The first argument is the file path. The second argument indicates that our file has a header line. The third argument provides the schema of our DataFrame. What is the mode argument? This fourth argument, mode, provides a way to deal with corrupt records during parsing. The value of the mode argument is DROPMALFORMED. This value is saying to drop all the corrupt data.
We have read our filamentData.csv data file. And the spark.read.csv() function has already transformed our CSV data into a DataFrame. Let’s check whether we have our DataFrame. Do you remember the functions on a DataFrame that can help you visualize it? The show() function will work for our purpose. Let’s fetch five rows from our DataFrame. The show() function prints records of the DataFrame on the console. It prints records in a tabular format, which is easier to read and understand. Let’s apply the show() function to print five records on the console:
>>> filamentDataFrame.show(5)
Here is the output, showing only the top five rows:
+------------+---------+-----------+
|FilamentType|BulbPower|LifeInHours|
+------------+---------+-----------+
|   filamentA|     100W|      605.0|
|   filamentB|     100W|      683.0|
|   filamentB|     100W|      691.0|
|   filamentB|     200W|      561.0|
|   filamentA|     200W|      530.0|
+------------+---------+-----------+
We have our DataFrame. Let’s check its schema:
>>> filamentDataFrame.printSchema()
Here is the output:
root
 |-- FilamentType: string (nullable = true)
 |-- BulbPower: string (nullable = true)
 |-- LifeInHours: double (nullable = true)
We have a proper schema too.
Note
To learn more about getting a CSV file into a Spark DataFrame, read this Stack Overflow discussion: https://stackoverflow.com/questions/29936156/get-csv-to-spark-dataframe .

Step 8-2-3. Calculating Summary Statistics

The describe() function , which is defined on a DataFrame, will give us the following:
>>> dataSummary = filamentDataFrame.describe()
>>> dataSummary.show()
Here is the output:
+---------+------------------------------+
|summary  |       LifeInHours            |
+---------+------------------------------+
|  count  |                16            |
|   mean  |          607.8125            |
| stddev  | 61.11652122517009            |
|    min  |             520.0            |
|    max  |             696.0            |
+---------+------------------------------+
We have our results: count, mean, stddev, min, and max. Next, we have to find the frequency of the values of our two categorical columns, FilamentType and BulbPower, and a combination of them.

Step 8-2-4. Counting the Frequency of Distinct Values in the FilamentType Categorical Column

A very naive method for finding the frequency of values is to filter the records by using the filter() function and then count them. Let’s perform these tasks one by one:
>>> filamentDataFrame.filter(filamentDataFrame.FilamentType == 'filamentA').count()
Here is the output:
8
In the preceding code, we filter out all the records where FilamentType is equal to filamentA. Eight rows have filamentA. Now let’s see how many rows have filamentB in the first column:
>>> filamentDataFrame.filter(filamentDataFrame.FilamentType == 'filamentB').count()
Here is the output:
8
We have filtered out all the records where FilamentType is equal to filamentB. Using the count() function on the filtered data returns the total number of rows that have filamentB in the first column. Eight rows have filamentB in the first column.

Step 8-2-5. Counting the Frequency of Distinct Values in the BulbPower Categorical Column

Now let’s filter data where BulbPower is equal to 100W. After filtering out the required rows, we have to count them:
>>> filamentDataFrame.filter(filamentDataFrame.BulbPower  == '100W').count()
Here is the output:
8
Eight rows have BulbPower values of 100W. Similarly, we can count other values and their combinations. Let’s compute the frequency of 200W bulbs:
>>> filamentDataFrame.filter(filamentDataFrame.BulbPower  == '200W').count()
Here is the output:
8
Our BulbPower columns have eight 100W bulbs and eight 200W bulbs.

Step 8-2-6. Counting the Frequency of Distinct Values in a Combination of FilamentType and BulbPower Columns

In the following code, we are going to count rows on the basis of a compound logical expression:
>>> filamentDataFrame.filter((filamentDataFrame.FilamentType == 'filamentB') & (filamentDataFrame.BulbPower  == '100W')).count()
Here is the output:
4
>>> filamentDataFrame.filter((filamentDataFrame.FilamentType == 'filamentB') & (filamentDataFrame.BulbPower  == '200W')).count()
Here is the output:
4
>>> filamentDataFrame.filter((filamentDataFrame.FilamentType == 'filamentA') & (filamentDataFrame.BulbPower  == '200W')).count()
Here is the output:
4
>>> filamentDataFrame.filter((filamentDataFrame.FilamentType == 'filamentA') & (filamentDataFrame.BulbPower  == '100W')).count()
Here is the output:
4

Recipe 8-3. Perform Aggregation Operations on a DataFrame

Problem

You want to perform data aggregation on a DataFrame.

Solution

To get a summarized pattern of data, data scientists perform aggregation on a given dataset. Summarized patterns are easy to understand. Sometimes the summarization is done based on the key. To perform aggregation based on the key, we first need to group the data by key.
In PySparkSQL, grouping by key can be performed by using the groupBy() function . This function returns the pyspark.sql.group.GroupedData object. After this GroupedData object is created, we can apply many aggregation functions such as avg(), sum(), count(), min(), max(), and sum() on GroupedData.
We have a data file named adult.data. I obtained this data from the web site of the Bren School of Information and Computer Science at the University of California, Irvine (UCI). This is a simple CSV file with 15 columns. Table 8-1 describes all 15 columns.
You want to do the following:
  • Create a DataFrame from the adult.data file
  • Count the total number of records in the DataFrame
  • Count the number of times that a salary is greater than $50,000 and the number of times it’s less than $50,000
  • Perform summary statistics on the numeric columns age, capital-gain, capital-loss, and hours-per-week
  • Find out the mean age of male and female workers from the data
  • Find out whether a salary greater than $50,000 is more frequent for males or females
  • Find the highest-paid job
Table 8-1. Description of adult.data File
A430628_1_En_8_Figa_HTML.gif

How It Works

First we will download the required data, and then we will perform all the required actions one by one.

Step 8-3-1. Creating a DataFrame from the adult.data File

Let’s download the file adult.data from the UCI Machine Learning Repository web site. We can fetch the data file by using the wget Linux command:
We have download the data file. Now we will read the data by using the spark.read.csv() function:
>>> censusDataFrame = spark.read.csv('file:///home/pysparkbook/pysparkBookData/adult.data',header=True, inferSchema = True)
In the preceding code, the second argument of the spark.read.csv() function is header = True. This indicates that the adult.data file has a header. The inferSchema = True argument is to infer the schema from the data itself. We are not providing an explicit schema for our DataFrame.
>>> censusDataFrame.printSchema()
Here is the output:
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
From the schema of our DataFrame, it is clear that there are 15 columns. Some columns are numeric, and the rest are strings. Our dataset is a mixture of categorical and numerical fields.

Step 8-3-2. Counting the Total Number of Records in a DataFrame

Let’s count how many records we have in our DataFrame. Our simplest count() method fulfills that requirement:
>>> censusDataFrame.count()
Here is the output:
32561
Our data frame has 32,561 records. That’s a large number of records.

Step 8-3-3. Counting the Frequency of Salaries Greater Than and Less Than 50K

We can achieve our goal of counting the frequency of certain salaries by first grouping our data by the income column and then counting by using our count() function:
>>> groupedByIncome = censusDataFrame.groupBy('income').count()
>>> groupedByIncome.show()
Here is the output:
+----------+---------+
|income    |count    |
+----------+---------+
|  >50K    | 7841    |
| <=50K    |24720    |
+----------+---------+
It is evident from this table that salaries greater than $50,000 are less frequent than those less than or equal to that amount.

Step 8-3-4. Performing Summary Statistics on Numeric Columns

The describe() function, shown here, is very useful:
>>> censusDataFrame.describe('age').show()
Here is the output:
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|             32561|
|   mean| 38.58164675532078|
| stddev|13.640432553581356|
|    min|                17|
|    max|                90|
+-------+------------------+
The maximum age of a working person is 90 years old, and the minimum age is 17 years. The mean age of working people is 30.58 years. From this mean value, it can be inferred that most working people are in their 30s.
Similarly, we can find summary statistics for the capital-gain and capital-loss data:
>>> censusDataFrame.describe('capital-gain').show()
+-------+------------------+
|summary|      capital-gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|               0.0|
|    max|           99999.0|
+-------+------------------+
>>> censusDataFrame.describe('capital-loss').show()
Here is the output:
+-------+----------------+
|summary|    capital-loss|
+-------+----------------+
|  count|           32561|
|   mean| 87.303829734959|
| stddev|402.960218649002|
|    min|             0.0|
|    max|          4356.0|
+-------+----------------+
Let’s see the distribution of hours per week for workers:
>>> censusDataFrame.describe('hours-per-week').show()
Here is the output:
+-------+------------------+
|summary|    hours-per-week|
+-------+------------------+
|  count|             32561|
|   mean|40.437455852092995|
| stddev|12.347428681731838|
|    min|               1.0|
|    max|              99.0|
It is clear that the maximum number of working hours per week is 99.0.

Step 8-3-5. Finding the Mean Age of Male and Female Workers from the Data

The average age of male and female workers can be found as follows:
>>> groupedByGender = censusDataFrame.groupBy('sex')
>>> type(groupedByGender)
Here is the output:
<class 'pyspark.sql.group.GroupedData'>
>>> groupedByGender.mean('age').show()
Here is the output:
+----------+------------------------+
 |    sex  |    avg(age)            |
+----------+------------------------+
 |   Male  |  39.43354749885268     |
 | Female  |  36.85823043357163     |
+----------+------------------------+
Male workers are, on average, older than their female worker counterparts.
>>> groupedByGender.mean('hours-per-week').show()
Here is the output:
+-----------+-------------------------+
 |    sex   |  avg(hours-per-week)    |
+-----------+-------------------------+
 |   Male   |  42.42808627810923      |
 | Female   |  36.410361154953115     |
+-----------+-------------------------+
Female workers, on average, work fewer hours than their male counterparts, according to our data.

Step 8-3-6. Finding Out Whether High Salaries are More Frequent for Males or Females

Since our result depends on two fields, sex and income, we have to group our data by both:
>>> groupedByGenderIncome = censusDataFrame.groupBy(['income', 'sex'])
Now, on the grouped data, we can apply the count() function to get our desired result:
>>> groupedByGenderIncome.count().show()
Here is the output:
+------+-------+-----+
|income|    sex|count|
+------+-------+-----+
|  >50K|   Male| 6662|
|  >50K| Female| 1179|
| <=50K| Female| 9592|
| <=50K|   Male|15128|
+------+-------+-----+
We can see that a salary greater than $50,000 is more frequent for males.

Step 8-3-7. Finding the Highest-Paid Job

To find the job with highest income, we must group our DataFrame on the occupation and income fields:
>>> groupedByOccupationIncome = censusDataFrame.groupBy(['occupation', 'income'])
On the grouped data, we have to apply count(). We need the highest-paid occupation, so we need to sort the data:
>>> groupedByOccupationIncome.count().sort(['income','count'], ascending= 0).show(5)
Here is the output, showing only the top five rows:
+----------------+------+-----+
|      occupation|income|count|
+----------------+------+-----+
| Exec-managerial|  >50K| 1968|
|  Prof-specialty|  >50K| 1859|
|           Sales|  >50K|  983|
|    Craft-repair|  >50K|  929|
|    Adm-clerical|  >50K|  507|
+----------------+------+-----+
We can see that a high frequency of executive/managerial people have salaries greater than $50,000.

Recipe 8-4. Execute SQL and HiveQL Queries on a DataFrame

Problem

You want to run SQL and HiveQL queries on a DataFrame.

Solution

We can use createOrReplaceTempView() , which creates a temporary view. The DataFrame class provides this function. The life of this view is the same as the SparkSession that creates the DataFrame.
We have another function, registerTempTable() , which creates a temporary table in memory. Using SQLContext, we can run SQL commands, and using HiveContext, we can run HiveQL queries on these temporary tables. In new versions of PySpark, this method is deprecated. But if you are working with older PySpark code, you might find registerTempTable().
In the preceding recipe, we created a DataFrame named censusDataFrame. You want to perform the following actions on the DataFrame:
  • Create a temporary view
  • Select the age and income columns by using SQL commands
  • Compute the average hours worked per week, based on education level

How It Works

We will start with creating a temporary view of our DataFrame. After creating this temporary view, we will apply SQL commands to perform our tasks.

Step 8-4-1. Creating a Temporary View in Memory

Let’s create a temporary table first. Then we can run our SQL or HiveQL commands on that table.
>>> censusDataFrame.createOrReplaceTempView("censusDataTable")
This creates the temporary table censusDataTable.

Step 8-4-2. Selecting Age and Income Columns Using a SQL Command

The select command is a highly used and very popular command in SQL. We need to select two columns, age and income:
>>> censusDataAgeIncome = spark.sql('select age, income from censusDataTable limit 5')
The SQL command in the preceding code is a general SQL select command to fetch two columns, age and income. The spark.sql() function can be used to run SQL commands.
In the spark.sql() function, Spark is denoting a SparkSession object. Whenever we start the PySpark shell, we find the SparkSession available as spark. In the preceding SQL command, limit 5 will return five records only.
Running the SQL command by using the spark.sql() function will return a DataFrame. In our case, we have the DataFrame censusDataAgeIncome, which has only two columns, age and income. So we know that we can print our DataFrame columns by using the show() function. Let’s see the result:
>>> censusDataAgeIncome.show()
Here is the output:
+---+------+
|age|income|
+---+------+
| 39| <=50K|
| 50| <=50K|
| 38| <=50K|
| 53| <=50K|
| 28| <=50K|
+---+------+
The spark.sql() function returns a DataFrame. We can test this by using the type() function . The following code line is for testing the data type of censusDataAgeIncome:
>>> type(censusDataAgeIncome)
Here is the output:
<class 'pyspark.sql.dataframe.DataFrame'>
The type of censusDataAgeIncome is DataFrame.

Step 8-4-3. Computing Average Hours per Week Based on Education Level

Computing the average hours per week for workers, based on their education level, requires grouping data by education first. In SQL and HiveQL, we can use the group by clause to get the grouped data:
>>> avgHoursPerWeekByEducation = spark.sql("select education, round(avg(`hours-per-week`),2) as averageHoursPerWeek from censusDataTable group by education")
The SQL avg() function will find the mean, or average, value. The round() function has been used to get data in a beautifully formatted fashion. We fetch the average hours per week as an aliased name, averageHoursPerWeek.  And at the end of the computation, we get the DataFrame avgHoursPerWeekByEducation, which consists of two columns. The first column is education, and the second column is averageHoursPerWeek. Let’s apply the show() function to see the content of the DataFrame:
>>> avgHoursPerWeekByEducation.show()
Here is the output:
+---------------+-----------------------------------+
|    education  |  averageHoursPerWeek              |
+---------------+-----------------------------------+
|  Prof-school  |              47.43                |
|         10th   |              37.05                |
|      7th-8th   |              39.37                |
|      5th-6th   |              38.90                |
|   Assoc-acdm  |              40.50                |
|    Assoc-voc  |              41.61                |
|      Masters  |              43.84                |
|         12th   |              35.78                |         
|    Preschool  |              36.65                |
|          9th   |              38.04                |
|    Bachelors  |              42.61                |
|    Doctorate  |              46.97                |
|      HS-grad  |              40.58                |
|         11th   |              33.93                |
| Some-college  |              38.85                |
|      1st-4th  |              38.26                |
+---------------+-----------------------------------+

Recipe 8-5. Perform Data Joining on DataFrames

Problem

You want to perform join operations on two DataFrames.

Solution

Often we’re required to combine information from two or more DataFrames or tables. To do this, we perform a join of DataFrames. Basically, table joining is a SQL term, where we join two or more tables to get denormalized tables. Join operations on two tables are very common in data science.
In PySparkSQL, we can perform the following types of joins:
  • Inner join
  • Left outer join
  • Right outer join
  • Full outer join
In Chapter 5, we discussed paired RDD joins. In this recipe, we’ll discuss joining DataFrames. You want to perform the following:
  • Read a student data table from a PostgreSQL database
  • Read subject data from a JSON file
  • Perform an inner join on DataFrames
  • Save an inner-joined DataFrame as a JSON file
  • Perform a right outer join
  • Save a right-outer-joined DataFrame into PostgreSQL
  • Perform a left outer join
  • Perform a full outer join
In order to join the two DataFrames, PySparkSQL provides the join() function, which works on DataFrames.

How It Works

Let’s start exploring the PostgreSQL database and the tables inside. We have to read data from the PostgreSQL database. Let’s create a table in the PostgreSQL database and put records into the table.
Let’s enter into the database server:
$ sudo -u postgres psql
For our recipe, we’ll create the database pysparkbookdb:
postgres=# create database pysparkbookdb;
Here is the output:
CREATE DATABASE
We can see whether our database has been created successfully. The following SQL select command fetches all the databases that exist on our database server:
postgres=# SELECT datname FROM pg_database;
Outcome :
 datname    
---------------
 template1
 template0
 postgres
 metastore
 pymetastore
 pysparkbookdb
(6 rows)
We can see that our pysparkbookdb database has been created successfully.
After creating the pysparkbookdb database, we have to connect to that database. We can connect to a database by using the c command, which stands for connect. After connecting to the database, we are going to create a table, studentTable. Then we will insert our student data into the table.
postgres=# c pysparkbookdb
You are now connected to the pysparkbookdb database as the user postgres. Let’s create our required table too:
pysparkbookdb=# create table studentTable(
pysparkbookdb(# studentID char(50) not null,
pysparkbookdb(# name char(50) not  null,
pysparkbookdb(# gender char(5) not null
pysparkbookdb(# );
CREATE TABLE
We have created a studentTable table in the RDBMS. The d, if used with the table name, provides the schema of the table; but if the command is used without anything, it prints all the tables in that particular database. In the following lines. we are printing the schema of the studentTable table:
pysparkbookdb=# d  studentTable
Here is the output:
 Table "public.studenttable"
  Column   |     Type      | Modifiers
-----------+---------------+-----------
 studentid | character(50) | not null
 name      | character(50) | not null
 gender    | character(5)  | not null
Let’s put some data into the table. We are going to put in five records of students:
insert into studentTable values ('si1', 'Robin', 'M');
insert into studentTable values ('si2', 'Maria', 'F');
insert into studentTable values ('si3', 'Julie',   'F');
insert into studentTable values ('si4', 'Bob',   'M');
insert into studentTable values ('si6','William','M');
Records have been inserted into the table. We can visualize table data by using the SQL select command.
pysparkbookdb=# select * from studentTable;
Here is the output:
      studentid         |               name                |  gender
------------------------+---------------------------------- +--------
           si1          |               Robin               |    M    
           si2          |               Maria               |    F    
           si3          |               Julie               |    F    
           si4          |               Bob                 |    M    
           si6          |               William             |    M    
(5 rows)
We have created a table and inserted records.

Step 8-5-1. Reading Student Data Table from PostgreSQL Database

We know that we have our student data in a table in a PostgreSQL server. We need to read data from studentTable, which is in the pysparkbookdb database. In order to connect PySpark to the PostgreSQL server, we need a database JDBC connector. We are going to start our PySpark shell by using the following command:
pyspark --driver-class-path  .ivy2/jars/org.postgresql_postgresql-9.4.1212.jar --packages org.postgresql:postgresql:9.4.1212
After starting the PySpark shell, including our connector JARs, we can read the table data by using the spark.read function:
>>> dbURL="jdbc:postgresql://localhost/pysparkbookdb?user=postgres&password=''"   
We have created our database URL. We connect to our pysparkbookdb database by using the postgres user. There is no password for the postgres user. The PostgreSQL server is running on the localhost machine. We are going to read data from the PostgreSQL server by using the spark.read function.
>>> studentsDataFrame = spark.read.format('jdbc').options(
url = dbURL,
database='pysparkbookdb',
dbtable='studenttable'
                                              ).load()
In the options part, we provide the URL of the database, the database name for the database argument, and the table name for the dbtable argument. We read the table data, which has been transformed into the DataFrame studentsDataFrame. Let’s check our studentsDataFrame:
>>> studentsDataFrame.show()
Here is the output:
+--------------------+--------------------+------+
|           studentid|                name|gender|
+--------------------+--------------------+------+
|si1              ...|Robin            ...| M    |
|si2              ...|Maria            ...| F    |
|si3              ...|Julie            ...| F    |
|si4              ...|Bob              ...| M    |
|si6              ...|William          ...| M    |
+--------------------+--------------------+------+
We have our required student DataFrame. But do you see any problems with our DataFrame? Have another look. Do you see the ellipses (. . .) in our DataFrame? We have to remove them. We can do this by using the trim() function, applying it to the columns.
To trim strings, we have to import the trim() function. This function is in the submodule pyspark.sql.functions. After importing the trim() function, we can use it to remove the dots from our columns as follows:
>>> from pyspark.sql.functions import trim
>>> studentsDataFrame = studentsDataFrame.select(trim(studentsDataFrame.studentid),trim(studentsDataFrame.name),studentsDataFrame.gender)
Let’s print and see whether we got rid of the problem:
>>> studentsDataFrame.show()
Here is the output:
+---------------+----------+------+
|trim(studentid)|trim(name)|gender|
+---------------+----------+------+
|            si1|     Robin| M    |
|            si2|     Maria| F    |
|            si3|     Julie| F    |
|            si4|       Bob| M    |
|            si6|   William| M    |
+---------------+----------+------+
We got rid of the problem. But are you sure that we got rid of all the problems? How about the names of the DataFrame columns? Now we have to change the column names to be clearer:
>>> studentsDataFrame = studentsDataFrame.withColumnRenamed('trim(studentid)', 'studentID').withColumnRenamed('trim(name)','Name').withColumnRenamed('gender', 'Gender')
You can see the changed column names by printing the schema:
>>> studentsDataFrame.printSchema()
Here is the output:
root
 |-- studentID: string (nullable = false)
 |-- Name: string (nullable = false)
 |-- Gender: string (nullable = false)
We have our column names in a readable format. We should check that everything is appropriate by printing the DataFrame:
>>> studentsDataFrame.show()
Here is the output:
+---------+-------+------+
|studentID|   Name|Gender|
+---------+-------+------+
|      si1|  Robin| M    |
|      si2|  Maria| F    |
|      si3|  Julie| F    |
|      si4|    Bob| M    |
|      si6|William| M    |
+---------+-------+------+
Now we can move on to DataFrame joining.

Step 8-5-2. Reading Subject Data from a JSON File

Let’s read our subject data from the subjects.json file:
>>> subjectsDataFrame = sqlContext.read.format("json").load('/home/pysparkbook/pysparkBookData/subjects.json')
We have another DataFrame, subjectsDataFrame. Let’s investigate our subjectsDataFrame by using the show() function :
>>> subjectsDataFrame.show()
Here is the output:
+---------+-------+
|studentID|subject|
+---------+-------+
|      si1| Python|
|      si3|   Java|
|      si1|   Java|
|      si2| Python|
|      si3|   Ruby|
|      si4|    C++|
|      si5|      C|
|      si4| Python|
|      si2|   Java|
+---------+-------+
>>> subjectsDataFrame.printSchema()
Here is the output:
root
 |-- studentID: string (nullable = true)
 |-- subject: string (nullable = true)

Step 8-5-3. Performing an Inner Join on DataFrames

We have two DataFrames , subjectsDataFrame and studentsDataFrame. In both DataFrames, we have to perform a join on the studentID column. This column is common in both DataFrames. An inner join returns records that have key values that match. If we look at the values of the studentID column in both DataFrames, we will find that values si1, si2, si3, and si4 are common to both DataFrames. Therefore, an inner join will return records for only those values.
>>> joinedDataInner = subjectsDataFrame.join(studentsDataFrame, subjectsDataFrame.studentID==studentsDataFrame.studentID, how='inner')
>>> joinedDataInner.show()
Here is the output:
+---------+-------+---------+-----+------+
|studentID|subject|studentID| Name|Gender|
+---------+-------+---------+-----+------+
|      si1|   Java|      si1|Robin| M    |
|      si1| Python|      si1|Robin| M    |
|      si2|   Java|      si2|Maria| F    |
|      si2| Python|      si2|Maria| F    |
|      si3|   Ruby|      si3|Julie| F    |
|      si3|   Java|      si3|Julie| F    |
|      si4| Python|      si4|  Bob| M    |
|      si4|    C++|      si4|  Bob| M    |
+---------+-------+---------+-----+------+
In the resulting DataFrame joinedDataInner, it is easily observed that we have student IDs si1, si2, si3, and si4.

Step 8-5-4. Saving an Inner-Joined DataFrame as a JSON File

After doing analysis, we generally save the results somewhere. Here we are going to save our DataFrame joinedDataInner as a JSON file. Let’s have a look at the columns in joinedDataInner; we can see that the studentID column occurs twice. If we are saving data in the same format, it is going to throw a pyspark.sql.utils.AnalysisException exception. Therefore, we first have to remove the duplicate column. For this, the select() function is going to be the most useful. The following code removes the duplicate studentID column:
>>> joinedDataInner = joinedDataInner.select(subjectsDataFrame.studentID,'subject', 'Name', 'Gender')
The columns of the DataFrame are as follows:
>>> joinedDataInner.columns
['studentID', 'subject', 'Name', 'Gender']
The duplicate studentID column has been removed. The following code saves the DataFrame as a JSON file inside the innerJoinedTable directory;
>>> joinedDataInner.write.format('json').save('/home/muser/innerJoinedTable')
We should see what has been saved under our innerJoinedTable directory:
innerJoinedTable$ ls
Here is the output:
part-r-00000-77838a67-4a1f-441a-bb42-4cd03be525a9.json  _SUCCESS
The ls command shows two files inside the directory. The JSON file contains our data, and the second file tells us that we have written the data successfully. Now you want to know what is inside the JSON file. The command cat is best for this job:
innerJoinedTable$ cat part-r-00000-77838a67-4a1f-441a-bb42-4cd03be525a9.json
Here is the output:
{"studentID":"si1","subject":"Java","Name":"Robin","Gender":"M    "}
{"studentID":"si1","subject":"Python","Name":"Robin","Gender":"M    "}
{"studentID":"si2","subject":"Java","Name":"Maria","Gender":"F    "}
{"studentID":"si2","subject":"Python","Name":"Maria","Gender":"F    "}
{"studentID":"si3","subject":"Ruby","Name":"Julie","Gender":"F    "}
{"studentID":"si3","subject":"Java","Name":"Julie","Gender":"F    "}
{"studentID":"si4","subject":"Python","Name":"Bob","Gender":"M    "}
{"studentID":"si4","subject":"C++","Name":"Bob","Gender":"M    "}
We have done one more job successfully.

Step 8-5-5. Performing a Left Outer Join

Here, we are going to perform a left outer join . In a left outer join, every value from the studentID column of the subjectsDataFrame DataFrame will be considered, even if it has a matching counterpart in the studentID column of the studentsDataFrame DataFrame. For the left outer join, we have to provide left_outer as the value of the how argument of the join() function.
>>> joinedDataLeftOuter = subjectsDataFrame.join(studentsDataFrame, subjectsDataFrame.studentID==studentsDataFrame.studentID, how='left_outer')
>>> joinedDataLeftOuter.show()
Here is the output:
 +---------+-------+---------+-----+------+
|studentID|subject|studentID| Name|Gender|
+---------+-------+---------+-----+------+
|      si5|      C|     null| null|  null|
|      si2| Python|      si2|Maria| F    |
|      si2|   Java|      si2|Maria| F    |
|      si4|    C++|      si4|  Bob| M    |
|      si4| Python|      si4|  Bob| M    |
|      si3|   Java|      si3|Julie| F    |
|      si3|   Ruby|      si3|Julie| F    |
|      si1| Python|      si1|Robin| M    |
|      si1|   Java|      si1|Robin| M    |
+---------+-------+---------+-----+------+
The left-outer-joined table shows that si5, which is part of the studentID column of subjectsDataFrame, is part of our joined table.

Step 8-5-6. Saving a Left-Outer-Joined DataFrame into PostgreSQL

Saving result data to a PostgreSQL database helps data analysts put results into safe hands. Other users can use the result data for many purposes. Let’s save our results in a PostgreSQL database.
Again, we have to remove the duplicate column before saving the data to a PostgreSQL database:
>>> joinedDataLeftOuter = joinedDataLeftOuter.select(subjectsDataFrame.studentID,'subject', 'Name', 'Gender')
It is a good idea to check that the data has been saved properly in the database. The d command will print all the existing tables in the PostgreSQL database. We have already created the database pysparkbookdb on our server. In the same database, we are going to save our DataFrame:
pysparkbookdb=# d
Here is the output:
            List of relations
 Schema |     Naame     | Type  |  Owner   
--------+---------------+-------+----------
 public | studenttable  | table | postgres
(1 row)
The d command shows that in the pysparkbookdb database, we have only one table, studenttable. Now let’s save the DataFrame:
>>> props = { 'user' : 'postgres', 'password' : '' }
>>> joinedDataLeftOuter.write.jdbc(
...                                 url   = dbURL,
...                                 table = 'joineddataleftoutertable',
...                                 mode  = 'overwrite',
...                                 properties = props
...                                )
The preceding code saves the DataFrame to the joineddataleftoutertable table. It also defines the variable dbURL. The mode argument has the value overwrite, which means it will overwrite the values if something before.
After saving data into PostgreSQL, we should check it once. Again, we are going to use the d command to see all the tables in our pysparkbookdb database:
pysparkbookdb=# d
Here is the output:
                  List of relations
 Schema |           Name           | Type  |  Owner   
--------+--------------------------+-------+----------
 public | joineddataleftoutertable | table | postgres
 public | studenttable             | table | postgres
(2 rows)
And we have our DataFrame saved in PostgreSQL as the joineddataleftoutertable table in the pysparkbookdb database. Let’s check the values in the table by using the select command:
pysparkbookdb=# select * from joineddataleftoutertable;
Here is the output:
 studentID | subject | Name  | Gender
-----------+---------+-------+--------
 si5       | C       |       |
 si2       | Python  | Maria | F    
 si2       | Java    | Maria | F    
 si4       | C++     | Bob   | M    
 si4       | Python  | Bob   | M    
 si3       | Java    | Julie | F    
 si3       | Ruby    | Julie | F    
 si1       | Python  | Robin | M    
 si1       | Java    | Robin | M    
(9 rows)
For further use of the result data, we have already saved it in the PostgreSQL database. After the left outer join, we are going to perform a right outer join on our DataFrames.

Step 8-5-7. Performing a Right Outer Join

In a right outer join , every value of the studentID column of studentsDataFrame.
>>> joinedDataRightOuter = subjectsDataFrame.join(studentsDataFrame, subjectsDataFrame.studentID==studentsDataFrame.studentID, how='right_outer')
>>> joinedDataRightOuter.show()
+---------+-------+---------+-------+------+
|studentID|subject|studentID|   Name|Gender|
+---------+-------+---------+-------+------+
|      si1|   Java|      si1|  Robin| M    |
|      si1| Python|      si1|  Robin| M    |
|      si2|   Java|      si2|  Maria| F    |
|      si2| Python|      si2|  Maria| F    |
|      si3|   Ruby|      si3|  Julie| F    |
|      si3|   Java|      si3|  Julie| F    |
|      si4| Python|      si4|    Bob| M    |
|      si4|    C++|      si4|    Bob| M    |
|     null|   null|      si6|William| M    |
+---------+-------+---------+-------+------+

Step 8-5-8. Performing a Full Outer Join

An outer join combines all values from the key columns :
>>> joinedDataOuter = subjectsDataFrame.join(studentsDataFrame, subjectsDataFrame.studentID==studentsDataFrame.studentID, how='outer')
>>> joinedDataOuter.show()
Here is the output:
+---------+-------+---------+-------+------+
|studentID|subject|studentID|   Name|Gender|
+---------+-------+---------+-------+------+
|      si5|      C|     null|   null|  null|
|      si2| Python|      si2|  Maria| F    |
|      si2|   Java|      si2|  Maria| F    |
|      si4|    C++|      si4|    Bob| M    |
|      si4| Python|      si4|    Bob| M    |
|      si3|   Java|      si3|  Julie| F    |
|      si3|   Ruby|      si3|  Julie| F    |
|     null|   null|      si6|William| M    |
|      si1| Python|      si1|  Robin| M    |
|      si1|   Java|      si1|  Robin| M    |
+---------+-------+---------+-------+------+

Recipe 8-6. Perform Breadth-First Search Using GraphFrames

Problem

You want to perform a breadth-first search using GraphFrames.

Solution

A breadth-first search is a very popular algorithm that can be used to find the shortest distance between two given nodes. Figure 8-1 shows a graph that we have been given. It has seven nodes: A, B, C, D, E, F, and G.
A430628_1_En_8_Fig1_HTML.gif
Figure 8-1.
A graph
If we will look at the connection between nodes, we will find the following:
A – C – B
B – A – C – G – F
C – A – B – F – D
D – C – F – E
E – D – F
F – B – C – D -- E – G
G – B – F
Let me explain this structure. Take a look at the first line. This line, A – C – B, tells us that node A is connected to B and C.
You want to perform a breadth-first search and find the shortest distance between nodes. To do this, we are going to use an external library, GraphFrames. We can use PySpark with GraphFrames very easily. GraphFrames provides DataFrame-based graphs.

How It Works

To use GraphFrames, we first have to include it as we did for the PostgreSQL connector JAR file. We are going to start the PySpark shell by using the GraphFrames JAR. We are using GraphFrames version 0.4.0.
$ pyspark --packages graphframes:graphframes:0.4.0-spark2.0-s_2.11
The GraphFrames package has been added. Now we are going to run a breadth-first search using GraphFrames.

Step 8-6-1. Creating DataFrames of Vertices of a Given Graph

We know that GraphFrames work on PySparkSQL DataFrames. Let’s create a DataFrame of vertices. We already know how to create a DataFrame. We are going to perform the same steps as we did in Recipe 8-1:
>>> from pyspark.sql.types import *
>>> from pyspark.sql import Row
>>> verticesDataList = ['A', 'B', 'C', 'D', 'E', 'F', 'G']
>>> verticesSchema = StructType([StructField('id',StringType(),True)])
>>> verticesRDD = sc.parallelize(verticesDataList, 4)
>>> verticesRDDRows = verticesRDD.map(lambda data : Row(data[0]))
>>> verticesDataFrame = sqlContext.createDataFrame(verticesRDDRows, verticesSchema)
>>> verticesDataFrame.show(4)
Here is the output, showing only the top four rows:
+---+
| id|
+---+
|  A|
|  B|
|  C|
|  D|
+---+
We have created our vertices DataFrame. I am sure that you have observed that the column name of our vertices DataFrame is id. Can you have another name as the column name for the vertices DataFrame ? The answer is a simple no. It is mandatory to name the column id. Let’s create a DataFrame of edges.

Step 8-6-2. Creating DataFrames of Edges of a Given Graph

We have to create a DataFrame of edges. We will first create a list of tuples; each tuple will have a source node and destination node of an edge:
>>> edgeDataList = [('A','C'),('A','B'),('B','A'),('B','C'),('B','G'),('B','F'),('C','A'),    
                   ('C','B'),('C','F'),('C','D'),('D','C'),('D','F'),('D','E'),('E','D'),  
                   ('E','F'),('F','B'),('F','C'),('F','D'),('F','E'),('F','G'),('G','B'),
                   ('G','F')]
After creating a list of edges, this list has to be parallelized by using the parallelize() function. We are parallelizing this data into four partitions:
>>> edgeRDD = sc.parallelize(edgeDataList, 4)
We have created an RDD of the edges list.
>>> edgeRDD.take(4)
Here is the output:
[('A', 'C'),
 ('A', 'B'),
 ('B', 'A'),
 ('B', 'C')]
After creating the RDD of edges, the RDD of rows has to be created to create the DataFrame. The following line of code creates an RDD of Row objects:
>>> edgeRDDRows = edgeRDD.map( lambda data : Row(data[0], data[1]))
>>> edgeRDDRows.take(4)
Here is the output:
[<Row(A, C)>,
 <Row(A, B)>,
 <Row(B, A)>,
 <Row(B, C)>]
A schema is required for our edge DataFrame. We have to create a column schema for the source node column and destination node column. Then, using the StructType() function , we will create a schema for our edge DataFrame.
>>> sourceColumn = StructField('src', StringType(),True)
>>> destinationColumn = StructField('dst', StringType(), True)
>>> edgeSchema = StructType([sourceColumn, destinationColumn])
Have you observed that for sourceColumn we have given the name as src, and for destinationColumn, we have given the name dst? This is also mandatory; it is required syntax for GraphFrames. The schema for the DataFrame has been created. The next step, obviously, is to create the DataFrame:
>>> edgeDataFrame = sqlContext.createDataFrame(edgeRDDRows, edgeSchema)
>>> edgeDataFrame.show(5)
Here is the output, showing only the top five rows:
+---+---+
|src|dst|
+---+---+
|  A|  C|
|  A|  B|
|  B|  A|
|  B|  C|
|  B|  G|
+---+---+

Step 8-6-3. Creating a GraphFrames Object

At this moment, we have verticesDataFrame, a DataFrame of vertices; and edgeDataFrame, a DataFrames of edges. Using these two, we can create our graph. In GraphFrames, we can create a graph by using the following code lines:
>>> import graphframes.graphframe  as gfm
>>> ourGraph = gfm.GraphFrame(verticesDataFrame, edgeDataFrame)
The GraphFrame Python class is defined under the graphframes.graphframe submodule. GraphFrame() takes the vertices and edges DataFrames and returns a GraphFrames object. We have our GraphFrames object, ourGraph. We can fetch all the vertices as follows:
>>> ourGraph.vertices.show(5)
Here is the output, showing only the top five rows:
+---+
| id|
+---+
|  A|
|  B|
|  C|
|  D|
|  E|
+---+
We can fetch edges also; here are the top five rows:
>>> ourGraph.edges.show(5)
+---+---+
|src|dst|
+---+---+
|  A|  C|
|  A|  B|
|  B|  A|
|  B|  C|
|  B|  G|
+---+---+

Step 8-6-4. Running a Breath-First Search Algorithm

We have created a graph from the required data. Now we can run a breadth-first algorithm on the ourGraph graph. The bfs() function is a breadth-first search (BFS) implementation in GraphFrames. This function is defined on the GraphFrames object. Therefore, we can run this BFS on our GraphFrames object ourGraph. We want to get the minimum path between node D and node G. The first argument, fromExpr, is an expression that tells us that we have to start our BFS from node D. The second argument is toExpr, and the value of toExpr indicates that the destination node for our search is G.
>>> bfsPath = ourGraph.bfs(fromExpr="id='D'", toExpr = "id='G'")
>>> bfsPath.show()
Here is the output:
+----+-----+---+-----+---+                                                      
|from|   e0| v1|   e1| to|
+----+-----+---+-----+---+
| [D]|[D,F]|[F]|[F,G]|[G]|
+----+-----+---+-----+---+
The output of BFS is very clear. The [D] in the from column means that the start node for BFS is D. The [G] in the to column indicates that the destination node of BFS is G. The shortest path between D and G is from D to F and from F to G.

Recipe 8-7. Calculate Page Rank Using GraphFrames

Problem

You want to perform a page-rank algorithm using GraphFrames.

Solution

We already discussed page rank in Chapter 5. We have been given the same graph of web pages that we used in the previous chapter. You want to run the page-rank algorithm using DataFrames. Figure 8-2 shows the network of web pages.
A430628_1_En_8_Fig2_HTML.gif
Figure 8-2.
Graph of web pages

How It Works

First, we have to create a GraphFrames object for the given graph. We have to then create a DataFrame of vertices and a DataFrame of edges.

Step 8-7-1. Creating DataFrame of Vertices

We have been given a graph of four nodes. Our four nodes are a, b, c, and d. First, we create a list of these four nodes:
>>> verticesList  = ['a', 'b', 'c', 'd']
>>> verticesListRDD = sc.parallelize(verticesList, 4)
>>> verticesListRowsRDD = verticesListRDD.map( lambda data : Row(data))
>>> verticesListRowsRDD.collect()
Here is the output:
[<Row(a)>,
<Row(b)>,
<Row(c)>,
<Row(d)>]
>>> verticesSchema = StructType([StructField('id', StringType(), True)])
>>> verticesDataFrame = sqlContext.createDataFrame(verticesListRowsRDD, verticesSchema)
>>> verticesDataFrame.show()
+---+
| id|
+---+
|  a|
|  b|
|  c|
|  d|
+---+
We have created a DataFrame of vertices. Now we have to create a Data Frame of edges.

Step 8-7-2. Creating a DataFrame of Edges

To create a DataFrame of edges, the steps are similar to those in many previous recipes. First, we have to create a list of edges. Each edge in the list will be defined by a tuple. Then, we have to create an RDD of edges. Thereafter, we have to transform our RDD to an RDD of row objects. This will be followed by creating a schema and a DataFrame of edges. Let’s perform the steps:
>>> edgeDataList = [('a','b'), ('a','c'), ('a','d'), ('b', 'c'),
                   ('b', 'd'),('c', 'b'), ('d', 'a'), ('d', 'c')]
>>> sourceColumn = StructField('src', StringType(),True)
>>> destinationColumn = StructField('dst', StringType(), True)
>>> edgeSchema = StructType([sourceColumn, destinationColumn])
>>> edgeRDD = sc.parallelize(edgeDataList, 4)
>>> edgeRDD.take(4)
Here is the output:
[('a', 'b'),
 ('a', 'c'),
 ('a', 'd'),
 ('b', 'c')]
>>> edgeRDDRows = edgeRDD.map( lambda data : Row(data[0], data[1]))
>>> edgeRDDRows.take(4)
Here is the output:
[<Row(a, b)>,
<Row(a, c)>,
<Row(a, d)>,
<Row(b, c)>]
>>> edgeDataFrame = sqlContext.createDataFrame(edgeRDDRows, edgeSchema)
>>> edgeDataFrame.show(5)
Here is the output, showing only the top five rows:
+---+---+
|src|dst|
+---+---+
|  a|  b|
|  a|  c|
|  a|  d|
|  b|  c|
|  b|  d|
+---+---+
We have created a DataFrame of edges. Let’s create a graph.

Step 8-7-3. Creating a Graph

The process of creating a graph follows the same path as in the preceding recipe:
>>> import graphframes.graphframe  as gfm
>>> ourGraph = gfm.GraphFrame(verticesDataFrame, edgeDataFrame)
>>> ourGraph.vertices.show(5)
Here is the output:
+---+
| id|
+---+
|  a|
|  b|
|  c|
|  d|
+---+
>>> ourGraph.edges.show(5)
Here is the output, showing only the top five rows:
+---+---+
|src|dst|
+---+---+
|  a|  b|
|  a|  c|
|  a|  d|
|  b|  c|
|  b|  d|
+---+---+

Step 8-7-4. Running a Page-Rank Algorithm

Page rank for pages can be found by using the pageRank() function, which is defined on the GraphFrames object:
>>> pageRanks = ourGraph.pageRank(resetProbability=0.15, tol=0.01)
You might be wondering about the return type of the pageRank() function. Let’s see by printing it:
>>> pageRanks
Here is the output:
GraphFrame(v:[id: string, pagerank: double], e:[src: string, dst: string ... 1 more field])
The pageRank() function returns a GraphFrame object. The returned GraphFrame object has vertices and edges. The vertices part of the returned GraphFrame object has the web pages and corresponding page ranks. Let’s explore what is inside the edges part of pageRanks:
>>> pageRanks.edges.show()
Here is the output:
+---+---+------------------+                                                    
|src|dst|            weight|
+---+---+------------------+
|  d|  a|               0.5|
|  b|  d|               0.5|
|  a|  b|0.3333333333333333|
|  a|  d|0.3333333333333333|
|  d|  c|               0.5|
|  a|  c|0.3333333333333333|
|  b|  c|               0.5|
|  c|  b|               1.0|
+---+---+------------------+
It can be observed that the edges part of the pageRanks object consists of edges and corresponding weights:
>>> pageRanks.vertices.select('id','pagerank')
Here is the output:
DataFrame[id: string, pagerank: double]
>>> pageRanks.vertices.select('id','pagerank').show()
Here is the output:
+---+------------------+
| id|          pagerank|
+---+------------------+
|  a|0.4831888601952005|
|  b| 1.238562817904233|
|  d| 0.806940642367432|
|  c|1.1401295025626326|
+---+------------------+
Finally, we have the page rank for the given pages.

Recipe 8-8. Read Data from Apache Hive

Problem

You want to read table data from Apache Hive .

Solution

We have a table, filamentdata, in Hive. This is the same filament data we have used in many recipes. We have to read this data by using PySparkSQL from Apache Hive. Let’s look at the whole process. First we are going to create a table in Hive and upload data into it. Let’s start with creating the table filamentdata. We’ll create our table in the apress database of Hive. We created this database in Chapter 2, at the time of installation. But let’s check that our creation still exists. We can display all the databases in Hive by using show:
hive> show databases;
Here is the output:
OK
apress
default
Time taken: 3.275 seconds, Fetched: 2 row(s)
We have the database apress. Therefore, we have to use this database by using the use command:
hive> use apress;
Here is the output:
OK
Time taken: 0.125 seconds
After using the database, we create a table named filamenttable by using the following command:
hive> create table filamenttable (
       >  filamenttype string,
       >  bulbpower string,
       >   lifeinhours float
        >)
        > row format delimited
        > fields terminated by ',';
We have created a a Hive table with three columns. The first column is filamenttype, with values of the string type. The second column is bulbpower, with the data type string. The third column is lifeinhours, of the float type. And now we can display our table by using the show command:
hive> show tables;
Here is the output:
OK
filamenttable
Time taken: 0.118 seconds, Fetched: 1 row(s)
The required table has been created successfully. Let’s load the data into the table we have created. We’ll load the data into Hive from a local directory by using load. The local clause in the following command tells Hive that the data is being loaded from a file in a local directory, not from HDFS.
hive> load data local inpath '/home/pysparkbook/pysparkBookData/filamentData.csv' overwrite into table filamenttable;
Here is the output:
Loading data to table apress.filamenttable
OK
Time taken: 5.39 seconds
After the data loads, we can query the table. We can display a row by using select with limit to limit the number of rows:
hive> select * from filamenttable limit 5;
OK
filamentA       100W    605.0
filamentB       100W    683.0
filamentB       100W    691.0
filamentB       200W    561.0
filamentA       200W    530.0
Time
taken: 0.532 seconds, Fetched: 5 row(s)
We have displayed some of the rows of our filamenttable table. We have to read this table data by using PySparkSQL.

How It Works

Step 8-8-1. Creating a HiveContext Object

The HiveContext class has been defined inside the pyspark.sql submodule. We can create the HiveContext object by using this class and providing sc as input to the HiveContext constructor:
>>> from pyspark.sql import HiveContext
>>> ourHiveContext = HiveContext(sc)
We have created the HiveContext object.

Step 8-8-2. Reading Table Data from Hive

We can read the table by using the table() function, which is defined on the HiveContext object. In the table() function, we have to provide the name of the table in the format <databaseName>.<tableName>. In our case, the database name is apress, and the table name is filamenttable. Therefore, the argument value for the table() function will be apress.filamenttable.
>>> FilamentDataFrame = ourHiveContext.table('apress.filamenttable')
>>> FilamentDataFrame.show(5)
+------------+---------+-----------+
|filamenttype|bulbpower|lifeinhours|
+------------+---------+-----------+
|   filamentA|     100W|      605.0|
|   filamentB|     100W|      683.0|
|   filamentB|     100W|      691.0|
|   filamentB|     200W|      561.0|
|   filamentA|     200W|      530.0|
+------------+---------+-----------+
only showing top 5 rows
And, finally, we have created a DataFrame from the table in Apache Hive.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.195.111