Saving data in plain text format

In this section, we will learn how to save data in plain text format. The following topics will be covered:

  • Saving data in plain text format
  • Loading plain text data
  • Testing

We will save our data in plain text format and investigate how to save it into the Spark directory. We will then load the plain text data, and then test and save it to check whether we can yield the same results code. This is our SavePlainText.scala file:

package com.tomekl007.chapter_4

import java.io.File

import com.tomekl007.UserTransaction
import org.apache.spark.sql.SparkSession
import org.apache.spark.{Partitioner, SparkContext}
import org.scalatest.{BeforeAndAfterEach, FunSuite}
import org.scalatest.Matchers._

import scala.reflect.io.Path

class SavePlainText extends FunSuite with BeforeAndAfterEach{
val spark: SparkContext = SparkSession.builder().master("local[2]").getOrCreate().sparkContext

private val FileName = "transactions.txt"

override def afterEach() {
val path = Path (FileName)
path.deleteRecursively()
}

test("should save and load in plain text") {
//given
val rdd = spark.makeRDD(List(UserTransaction("a", 100), UserTransaction("b", 200)))

//when
rdd.coalesce(1).saveAsTextFile(FileName)

val fromFile = spark.textFile(FileName)

fromFile.collect().toList should contain theSameElementsAs List(
"UserTransaction(a,100)", "UserTransaction(b,200)"
//note - this is string!
)
}
}

We will need a FileName variable, which, in our case, will be a folder name, and Spark will then create a couple of files underneath:

import java.io.File
import com.tomekl007.UserTransaction
import org.apache.spark.sql.SparkSession
import org.apache.spark.{Partitioner, SparkContext}
import org.scalatest.{BeforeAndAfterEach, FunSuite}
import org.scalatest.Matchers._
import scala.reflect.io.Path
class SavePlainText extends FunSuite with BeforeAndAfterEach{
val spark: SparkContext = SparkSession.builder().master("local[2]").getOrCreate().sparkContext
private val FileName = "transactions.txt"

We will use BeforeAndAfterEach in our test case to clean our directory after every test, which means that the path should be deleted recursively. The whole path is deleted after the test, as it is required to rerun the tests without a failure. We need to comment the following code out for the first run to investigate the structure of the saved text file:

//override def afterEach() {
// val path = Path (FileName)
// path.deleteRecursively()
// }

//test("should save and load in plain text") {

We will then create an RDD of two transactions, UserTransaction("a", 100) and UserTransaction("b", 200):

val rdd = spark.makeRDD(List(UserTransaction("a", 100), UserTransaction("b", 200)))

We will then coalesce our data to one partition. coalesce() is a very important aspect. If we want to save our data in a single file, we need to coalesce it into one, but there is an important implication of doing so:

rdd.coalesce(1).saveAsTextFile(FileName)

If we coalesce it to a single file, then only one executor can save the data to our system. This means that saving the data will be very slow and, also, there will be a risk of being out of memory because all data will be sent to one executor. Generally, in the production environment, we save it as many partitions, based on the executors available, or even multiplied by its own factor. So, if we have 16 executors, then we can save it to 64. But this results in 64 files. For test purposes, we will save it to one file, as shown in the preceding code snippet:

rdd.coalesce (numPartitions = 1).saveAsTextFile(FileName)

Now, we'll load the data. We only need to pass the filename to the TextFile method and it will return fromFile:

    val fromFile = spark.textFile(FileName)

We then assert our data, which will yield theSameElementsAS List, UserTransaction(a,100), and UserTransaction(b,200):

    fromFile.collect().toList should contain theSameElementsAs List(
"UserTransaction(a,100)", "UserTransaction(b,200)"
//note - this is string!
)
}
}

The important thing to note is that for a list of strings, Spark doesn't know the schema of our data because we are saving it in plain text.

This is one of the points to note when it comes to saving plain text, because loading the data is not easy, since we need to manually map every string to UserTransaction. So, we will have to parse every record manually, but, for test purposes, we will treat our transaction as strings.

Now, let's start the test and see the structure of the folder that was created:

In the preceding screenshot, we can see that our test passed and that we get transactions.txt. Inside the folder, we have four files. The first one is ._SUCCESS.crc, which means that the save succeeded. Next, we have .part-00000.crc, to control and validate that everything worked properly, which means that the save was proper. We then have _SUCCESS and part-00000, where both files have checksum, but part-00000 has all the data as well. Then, we also have UserTransaction(a,100) and UserTransaction(b,200):

In the next section, we will learn what will happen if we increment the number of partitions.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.130.199