Creating a DataFrame from Scala case classes

In this recipe, we'll see how to create a new DataFrame from Scala case classes.

How to do it...

  1. We create a new entity called Employee with the id and name fields, like this:
    case class Employee(id:Int, name:String)
    

    Similar to the previous recipe, we create SparkContext and SQLContext.

    val conf = new SparkConf().setAppName("colRowDataFrame").setMaster("local[2]")
    
    //Initialize Spark context with Spark configuration.  This is the core entry point to do anything with Spark
    val sc = new SparkContext(conf)
    
    //The easiest way to query data in Spark is to use SQL queries.
    val sqlContext=new SQLContext(sc)
    
  2. We can source these employee objects from a variety of sources, such as an RDBMS data source, but for the sake of this example, we construct a list of employees, as follows:
      val listOfEmployees =List(Employee(1,"Arun"), Employee(2, "Jason"), Employee (3, "Abhi"))
    
  3. The next step is to pass the listOfEmployees to the createDataFrame function of SQLContext. That's it! We now have a DataFrame. When we try to print the schema using the printSchema method of the DataFrame, we will see that the DataFrame has two columns, with names id and name, as defined in the case class:
      //Pass in the Employees into the `createDataFrame` function.
      val empFrame=sqlContext.createDataFrame(listOfEmployees)
     empFrame.printSchema
    

    Output:

    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = true)
    

    As you might have guessed, the schema of the DataFrame is inferenced from the case class using reflection.

  4. We can get a different name for the DataFrame—other than the names specified in the case class—using the withColumnRenamed function, as shown here:
      val empFrameWithRenamedColumns=sqlContext.createDataFrame(listOfEmployees).withColumnRenamed("id", "empId")
    
      empFrameWithRenamedColumns.printSchema
    

    Output:

    root
    |-- empId: integer (nullable = false)
    |-- name: string (nullable = true)
    
  5. Let's query the DataFrame using Spark's first-class SQL support. Before that, however, we'll have to register the DataFrame as a table. The registerTempTable, as we saw in the previous recipe, helps us achieve this. With the following command, we will have registered the DataFrame as a table by name "employeeTable"
     "employeeTable"
    
      empFrameWithRenamedColumns.registerTempTable("employeeTable")
    
  6. Now, for the actual query. Let's arrange the DataFrame in descending order of names:
    val sortedByNameEmployees=sqlContext.sql("select * from employeeTable order by name desc")
    
      sortedByNameEmployees.show()
    

    Output:

    +-----+-----+
    |empId| name|
    +-----+-----+
    |    2|Jason|
    |    1| Arun|
    |    3| Abhi|
    +-----+-----+
    

How it works...

The createDataFrame function accepts a sequence of scala.Product. Scala case classes extend from Product, and therefore it fits in the budget. That said, we can actually use a sequence of tuples to create a DataFrame, since tuples implement Product too:

val mobiles=sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone")))
  mobiles.printSchema
  mobiles.show()

Output:

//Schema
root
|-- _1: integer (nullable = false)
|-- _2: string (nullable = true)
//Data
+--+-------+
|_1|     _2|
+--+-------+
| 1|Android|
| 2| iPhone|
+--+-------+

Of course, you can rename the column using withColumnRenamed.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.133.180