Combining datasets

So, we have seen moving a data frame into Spark for analysis. This appears to be very close to SQL tables. Under SQL it is standard practice not to reproduce items in different tables. For example, a product table might have the price and an order table would just reference the product table by product identifier, so as not to duplicate data. So, then another SQL practice is to join or combine the tables to come up with the full set of information needed. Keeping with the order analogy, we combine all of the tables involved as each table has pieces of data that are needed for the order to be complete.

How difficult would it be to create a set of tables and join them using Spark? We will use example tables of Product, Order, and ProductOrder:

Table

Columns

Product

Product ID,

Description,

Price

Order

Order ID,

Order Date

ProductOrder

Order ID,

Product ID,

Quantity

 

So, an Order has a list of Product/Quantity values associated.

We can populate the data frames and move them into Spark:

from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc) 

# load product set
productDF = spark.read.format("csv") 
        .option("header", "true") 
        .load("product.csv");
productDF.show()
productDF.createOrReplaceTempView("product")

# load order set
orderDF = spark.read.format("csv") 
        .option("header", "true") 
        .load("order.csv");
orderDF.show()
orderDF.createOrReplaceTempView("order")

# load order/product set
orderproductDF = spark.read.format("csv") 
        .option("header", "true") 
        .load("orderproduct.csv");
orderproductDF.show()
orderproductDF.createOrReplaceTempView("orderproduct")  

Now, we can attempt to perform an SQL-like JOIN operation among them:

# join the tables
joinedDF = spark.sql("SELECT * " 
      "FROM orderproduct " 
      "JOIN order ON order.orderid = orderproduct.orderid " 
      "ORDER BY order.orderid")
joinedDF.show()  

Doing all of this in Jupyter results in the display as follows:

Our standard imports obtain a SparkContext and initialize a SparkSession. Note, the getOrCreate of the SparkContext. If you were to run this code outside of Jupyter there would be no context and a new one would be created. Under Jupyter, the startup for Spark in Jupyter initializes a context for all scripts. We can use that context at will with any Spark script, rather than have to create one ourselves.

Load our product table:

Load the order table:

Load the orderproduct table. Note that at least one of the orders has multiple products:

We have the orderid column from order and orderproduct in the result set. We could be more selective in our query and specify the exact columns we want to be returned:

I had tried to use the Spark join() command with no luck.

The documentation and examples I found on the internet are old, sparse, and incorrect.

Using the command also presented the persistent error of a task not returning results in time. From the underlying Hadoop, I expect that processing tasks are normally broken up into separate tasks. I assume that Spark is breaking up functions into separate threads for completion similarly. It is not clear why such minor tasks are not completing as I was not asking it to perform anything extraordinary.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.222.163.31