Manipulating DataFrames

In the previous recipe, we saw how to create a DataFrame. The next natural step, after creating DataFrames, is to play with the data inside them. Other than the numerous functions that help us to do that, we also find other interesting functions that help us sample the data, print the schema of the data, and so on. We'll take a look at them one by one in this recipe.

How to do it...

Now, let's see how we can manipulate DataFrames using the following subrecipes:

  • Printing the schema of the DataFrame
  • Sampling data in the DataFrame
  • Selecting specific columns in the DataFrame
  • Filtering data by condition
  • Sorting data in the frame
  • Renaming columns
  • Treating the DataFrame as a relational table to execute SQL queries
  • Saving the DataFrame as a file

Printing the schema of the DataFrame

After creating the DataFrame from various sources, we would obviously want to quickly check its schema. The printSchema function lets us do just that. It prints our column names and the data types to the default output stream:

  1. Let's load a sample DataFrame from the StudentData.csv file:
    //Now, lets load our pipe-separated file
      //students is of type org.apache.spark.sql.DataFrame
      val students=sqlContext.csvFile(filePath="StudentData.csv", useHeader=true, delimiter='|')
  2. Let's print the schema of this DataFrame:
    students.printSchema
    

    Output

    root
     |-- id: string (nullable = true)
     |-- studentName: string (nullable = true)
     |-- phone: string (nullable = true)
     |-- email: string (nullable = true)
    

Sampling the data in the DataFrame

The next logical thing that we would like to do is to check whether our data got loaded into the DataFrame correctly. There are a few ways of sampling the data in the newly created DataFrame:

  • Using the show method. This is the simplest way. There are two variants of the show method, as explained here:
    • One with an integer parameter that specifies the number of rows to be sampled.
    • The second is without the integer parameter. In it, the number of rows defaults to 20.

    The distinct quality about the show method as compared to the other functions that sample data is that it displays the rows along with the headers and prints the output directly to the default output stream (console):

    //Sample n records along with headers
      students.show (3)
      
      //Sample 20 records along with headers
      students.show ()
    
    //Output of show(3)
    
    +--+-----------+--------------+--------------------+
    |id|studentName|         phone|               email|
    +--+-----------+--------------+--------------------+
    | 1|      Burke|1-300-746-8446|ullamcorper.velit...|
    | 2|      Kamal|1-668-571-5046|pede.Suspendisse@...|
    | 3|       Olga|1-956-311-1686|Aenean.eget.metus...|
    +--+-----------+--------------+--------------------+
    
  • Using the head method. This method also accepts an integer parameter representing the number of rows to be fetched. The head method returns an array of rows. To print these rows, we can pass the println method to the foreach function of the arrays:
      //Sample the first 5 records
      students.head(5).foreach(println)
    

    If you are not a great fan of head, you can use the take function, which is common across all Scala sequences. The take method is just an alias of the head method and delegates all its calls to head:

      //Alias of head
      students.take(5).foreach(println)
    
    //Output
    [1,Burke,1-300-746-8446,[email protected]]
    [2,Kamal,1-668-571-5046,[email protected]]
    [3,Olga,1-956-311-1686,[email protected]]
    [4,Belle,1-246-894-6340,[email protected]]
    [5,Trevor,1-300-527-4967,[email protected]]
    

Selecting DataFrame columns

As you have seen, all DataFrame columns have names. The select function helps us pick and choose specific columns from a previously existing DataFrame and form a completely new one out of it:

  • Selecting a single column: Let's say that you would like to select only the email column from a DataFrame. Since DataFrames are immutable, the selection returns a new DataFrame:
      val emailDataFrame:DataFrame=students.select("email")
    

    Now, we have a new DataFrame called emailDataFrame, which has only the e-mail as its contents. Let's sample and check whether that is true:

      emailDataFrame.show(3)
    
    //Output
    +--------------------+
    |               email|
    +--------------------+
    |ullamcorper.velit...|
    |pede.Suspendisse@...|
    |Aenean.eget.metus...|
    +--------------------+
    
  • Selecting more than one column: The select function actually accepts an arbitrary number of column names, which means that you can easily select more than one column from your source DataFrame:
    val studentEmailDF=students.select("studentName", "email")
    

    Note

    The only requirement is that the string parameters that specify must be a valid column name. Otherwise, an org.apache.spark.sql.AnalysisException exception is thrown. The printSchema function serves as a quick reference for the column names.

    Let's sample and check whether we have indeed selected the studentName and email columns in the new DataFrame:

    studentEmailDF.show(3)
    

    Output

    +-----------+--------------------+
    |studentName|               email|
    +-----------+--------------------+
    |      Burke|ullamcorper.velit...|
    |      Kamal|pede.Suspendisse@...|
    |       Olga|Aenean.eget.metus...|
    +-----------+--------------------+
    

Filtering data by condition

Now that we have seen how to select columns from a DataFrame, let's see how to filter the rows of a DataFrame based on conditions. For row-based filtering, we can treat the DataFrame as a normal Scala collection and filter the data based on a condition. In all of these examples, I have added the show method at the end for clarity:

  1. Filtering based on a column value:
    //Print the first 5 records that has student id more than 5
      students.filter("id > 5").show(7)
    

    Output

    +--+-----------+--------------+--------------------+
    |id|studentName|         phone|               email|
    +--+-----------+--------------+--------------------+
    | 6|     Laurel|1-691-379-9921|adipiscing@consec...|
    | 7|       Sara|1-608-140-1995|Donec.nibh@enimEt...|
    | 8|     Kaseem|1-881-586-2689|cursus.et.magna@e...|
    | 9|        Lev|1-916-367-5608|Vivamus.nisi@ipsu...|
    |10|       Maya|1-271-683-2698|accumsan.convalli...|
    |11|        Emi|1-467-270-1337|        [email protected]|
    |12|      Caleb|1-683-212-0896|Suspendisse@Quisq...|
    +--+-----------+--------------+--------------------+
    

    Notice that even though the id field is inferenced as a String type, it does the numerical comparison correctly. On the other hand, students.filter("email > 'c'") would give back all the e-mail IDs that start with a character greater than 'c'.

  2. Filtering based on an empty column value. The following filter selects all students without names:
    students.filter("studentName =''").show(7)
    

    Output

    +--+-----------+--------------+--------------------+
    |id|studentName|         phone|               email|
    +--+-----------+--------------+--------------------+
    |21|           |1-598-439-7549|consectetuer.adip...|
    |32|           |1-184-895-9602|accumsan.laoreet@...|
    |45|           |1-245-752-0481|Suspendisse.eleif...|
    |83|           |1-858-810-2204|sociis.natoque@eu...|
    |94|           |1-443-410-7878|Praesent.eu.nulla...|
    +--+-----------+--------------+--------------------+
    
  3. Filtering based on more than one condition. This filter shows all records whose student names are empty or student name field has a NULL string value:
    students.filter("studentName ='' OR studentName = 'NULL'").show(7)
    

    Output

    +--+-----------+--------------+--------------------+
    |id|studentName|         phone|               email|
    +--+-----------+--------------+--------------------+
    |21|           |1-598-439-7549|consectetuer.adip...|
    |32|           |1-184-895-9602|accumsan.laoreet@...|
    |33|       NULL|1-105-503-0141|[email protected]|
    |45|           |1-245-752-0481|Suspendisse.eleif...|
    |83|           |1-858-810-2204|sociis.natoque@eu...|
    |94|           |1-443-410-7878|Praesent.eu.nulla...|
    +--+-----------+--------------+--------------------+
    

    We are just limiting the output to seven records using the show(7) function.

  4. Filtering based on SQL-like conditions.

    This filter gets the entries of all students whose names start with the letter 'M'.

    students.filter("SUBSTR(studentName,0,1) ='M'").show(7)
    

    Output

    +--+-----------+--------------+--------------------+
    |id|studentName|         phone|               email|
    +--+-----------+--------------+--------------------+
    |10|       Maya|1-271-683-2698|accumsan.convalli...|
    |19|    Malachi|1-608-637-2772|Proin.mi.Aliquam@...|
    |24|    Marsden|1-477-629-7528|Donec.dignissim.m...|
    |37|      Maggy|1-910-887-6777|facilisi.Sed.nequ...|
    |61|     Maxine|1-422-863-3041|aliquet.molestie....|
    |77|      Maggy|1-613-147-4380| [email protected]|
    |97|    Maxwell|1-607-205-1273|metus.In@musAenea...|
    +--+-----------+--------------+--------------------+
    

Sorting data in the frame

Using the sort function, we can order the DataFrame by a particular column:

  1. Ordering by a column in descending order:
      students.sort(students("studentName").desc).show(7)
    

    Output

    +--+-----------+--------------+--------------------+
    |id|studentName|         phone|               email|
    +--+-----------+--------------+--------------------+
    |50|      Yasir|1-282-511-4445|eget.odio.Aliquam...|
    |52|       Xena|1-527-990-8606|in.faucibus.orci@...|
    |86|     Xandra|1-677-708-5691|libero@arcuVestib...|
    |43|     Wynter|1-440-544-1851|amet.risus.Donec@...|
    |31|    Wallace|1-144-220-8159| [email protected]|
    |66|      Vance|1-268-680-0857|pellentesque@netu...|
    |41|     Tyrone|1-907-383-5293|non.bibendum.sed@...|
    | 5|     Trevor|1-300-527-4967|dapibus.id@acturp...|
    |65|      Tiger|1-316-930-7880|nec@mollisnoncurs...|
    |15|      Tarik|1-398-171-2268|[email protected]|
    +--+-----------+--------------+--------------------+
    
  2. Ordering by more than one column (ascending):
    students.sort("studentName", "id").show(10)
    

    Output

    +--+-----------+--------------+--------------------+
    |id|studentName|         phone|               email|
    +--+-----------+--------------+--------------------+
    |21|           |1-598-439-7549|consectetuer.adip...|
    |32|           |1-184-895-9602|accumsan.laoreet@...|
    |45|           |1-245-752-0481|Suspendisse.eleif...|
    |83|           |1-858-810-2204|sociis.natoque@eu...|
    |94|           |1-443-410-7878|Praesent.eu.nulla...|
    |91|       Abel|1-530-527-7467|    [email protected]|
    |69|       Aiko|1-682-230-7013|turpis.vitae.puru...|
    |47|       Alma|1-747-382-6775|    [email protected]|
    |26|      Amela|1-526-909-2605| [email protected]|
    |16|      Amena|1-878-250-3129|lorem.luctus.ut@s...|
    +--+-----------+--------------+--------------------+
    

Alternatively, the orderBy alias of the sort function can be used to achieve this. Also, multiple column orders could be specified using the DataFrame's apply method:

students.sort(students("studentName").desc, students("id").asc).show(10)

Renaming columns

If we don't like the column names of the source DataFrame and wish to change them to something nice and meaningful, we can do that using the as function while selecting the columns.

In this example, we rename the "studentName" column to "name" and retain the "email" column's name as is:

val copyOfStudents=students.select(students("studentName").as("name"), students("email"))

copyOfStudents.show()

Output

+--------+--------------------+
|    name|               email|
+--------+--------------------+
|   Burke|ullamcorper.velit...|
|   Kamal|pede.Suspendisse@...|
|    Olga|Aenean.eget.metus...|
|   Belle|vitae.aliquet.nec...|
|  Trevor|dapibus.id@acturp...|
|  Laurel|adipiscing@consec...|
|    Sara|Donec.nibh@enimEt...|

Treating the DataFrame as a relational table

The real power of DataFrames lies in the fact that we can treat it like a relational table and use SQL to query. This involves two simple steps:

  1. Register the students DataFrame as a table with the name "students" (or any other name):
    students.registerTempTable("students")
    
  2. Query it using regular SQL:
    val dfFilteredBySQL=sqlContext.sql("select * from students where studentName!='' order by email desc")
    
      dfFilteredBySQL.show(7)
    
    id studentName phone          email
    87 Selma       1-601-330-4409 vulputate.velit@p
    96 Channing    1-984-118-7533 viverra.Donec.tem
    4  Belle       1-246-894-6340 vitae.aliquet.nec
    78 Finn        1-213-781-6969 vestibulum.massa@
    53 Kasper      1-155-575-9346 velit.eget@pedeCu
    63 Dylan       1-417-943-8961 vehicula.aliquet@
    35 Cadman      1-443-642-5919 ut.lacus@adipisci
    

    Note

    The lifetime of the temporary table is tied to the life of the SQLContext that was used to create the DataFrame.

Joining two DataFrames

Now that we have seen how to register a DataFrame as a table, let's see how to perform SQL-like join operations on DataFrames.

Inner join

An inner join is the default join and it just gives those results that are matching on both DataFrames when a condition is given:

val students1=sqlContext.csvFile(filePath="StudentPrep1.csv", useHeader=true, delimiter='|')

val students2=sqlContext.csvFile(filePath="StudentPrep2.csv", useHeader=true, delimiter='|')

val studentsJoin=students1.join(students2, students1("id")===students2("id"))
studentsJoin.show(studentsJoin.count.toInt)

The output is as follows:

Inner join

Right outer join

A right outer join shows all the additional unmatched rows that are available in the right-hand-side DataFrame. We can see from the following output that the entry with ID 999 from the right- hand-side DataFrame is now shown:

val studentsRightOuterJoin=students1.join(students2, students1("id")===students2("id"), "right_outer")
studentsRightOuterJoin.show(studentsRightOuterJoin.count.toInt)
Right outer join

Left outer join

Similar to a right outer join, a left outer join returns not only the matching rows, but also the additional unmatched rows of the left-hand-side DataFrame:

 val studentsLeftOuterJoin=students1.join(students2, students1("id")===students2("id"), "left_outer")
 studentsLeftOuterJoin.show(studentsLeftOuterJoin.count.toInt)

Saving the DataFrame as a file

As the next step, let's save a DataFrame in a file store. The load function, which we used in an earlier recipe, has a similar-looking counterpart called save.

This involves two steps:

  1. Create a map containing the various options that you would like the save method to use. In this case, we specify the filename and ask it to have a header:
    val options=Map("header"->"true", "path"->"ModifiedStudent.csv")
    

    To keep it interesting, let's choose column names from the source DataFrame. In this example, we pick the studentName and email columns and change the studentName column's name to just name.

    val copyOfStudents=students.select(students("studentName").as("name"), students("email"))
    
  2. Finally, save this new DataFrame with the headers in a file named ModifiedStudent.csv:
    copyOfStudents.save("com.databricks.spark.csv", SaveMode.Overwrite, options)
    

The second argument is a little interesting. We can choose Overwrite (as we did here), Append, Ignore, or ErrorIfExists. Overwrite— as the name implies—overwrites the file if it already exists, Ignore ignores writing if the file exists, ErrorIfExists complains for pre-existence of the file, and Append continues writing from the last edit location. Throwing an error is the default behavior.

The output of the save method looks like this:

Saving the DataFrame as a file
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.21.158