Importing and saving data

I wanted to add this section about importing and saving data here, even though it is not purely about Spark SQL, so I could introduce concepts such as Parquet and JSON file formats. This section also allows me to cover how to access and save data in loose text; as well as the CSV, Parquet and JSON formats, conveniently, in one place.

Processing the Text files

Using the Spark context, it is possible to load a text file into an RDD using the textFile method. Also, the wholeTextFile method can read the contents of a directory into an RDD. The following examples show how a file, based on the local file system (file://), or HDFS (hdfs://) can be read into a Spark RDD. These examples show that the data will be partitioned into six parts for increased performance. The first two examples are the same, as they both manipulate a file on the Linux file system:

sc.textFile("/data/spark/tweets.txt",6)
sc.textFile("file:///data/spark/tweets.txt",6)
sc.textFile("hdfs://server1:4014/data/spark/tweets.txt",6)

Processing the JSON files

JSON is a data interchange format, developed from Javascript. JSON actually stands for JavaScript Object Notation. It is a text-based format, and can be expressed, for instance, as XML. The following example uses the SQL context method called jsonFile to load the HDFS-based JSON data file named device.json. The resulting data is created as a data frame:

val dframe = sqlContext.jsonFile("hdfs:///data/spark/device.json")

Data can be saved in JSON format using the data frame toJSON method, as shown by the following example. First, the Apache Spark and Spark SQL classes are imported:

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};

Next, the object class called sql1 is defined as is a main method with parameters. A configuration object is defined that is used to create a spark context. The master Spark URL is left as the default value, so Spark expects local mode, the local host, and the 7077 port:

object sql1 {

  def main(args: Array[String]) {

    val appName = "sql example 1"
    val conf    = new SparkConf()

    conf.setAppName(appName)

    val sc = new SparkContext(conf)

An SQL context is created from the Spark context, and a raw text file is loaded in CSV format called adult.test.data_1x, using the textFile method. A schema string is then created, which contains the data column names and the schema created from it by splitting the string by its spacing, and using the StructType and StructField methods to define each schema column as a string value:

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    val rawRdd = sc.textFile("hdfs:///data/spark/sql/adult.test.data_1x")

    val schemaString = "age workclass fnlwgt education " +   "educational-num  marital-status occupation relationship " +
"race gender capital-gain capital-loss hours-per-week " +
"native-country income"

    val schema =
      StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

Each data row is then created from the raw CSV data by splitting it with the help of a comma as a line divider, and then the elements are added to a Row() structure. A data frame is created from the schema, and the row data which is then converted into JSON format using the toJSON method. Finally, the data is saved to HDFS using the saveAsTextFile method:

    val rowRDD = rawRdd.map(_.split(","))
      .map(p => Row( p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),
                      p(9),p(10),p(11),p(12),p(13),p(14) ))

    val adultDataFrame = sqlContext.createDataFrame(rowRDD, schema)

    val jsonData = adultDataFrame.toJSON

    jsonData.saveAsTextFile("hdfs:///data/spark/sql/adult.json")

  } // end main

} // end sql1

So the resulting data can be seen on HDFS, the Hadoop file system ls command below shows that the data resides in the target directory as a success file and two part files.

[hadoop@hc2nn sql]$ hdfs dfs -ls /data/spark/sql/adult.json

Found 3 items
-rw-r--r--   3 hadoop supergroup          0 2015-06-20 17:17 /data/spark/sql/adult.json/_SUCCESS
-rw-r--r--   3 hadoop supergroup       1731 2015-06-20 17:17 /data/spark/sql/adult.json/part-00000
-rw-r--r--   3 hadoop supergroup       1724 2015-06-20 17:17 /data/spark/sql/adult.json/part-00001

Using the Hadoop file system's cat command, it is possible to display the contents of the JSON data. I will just show a sample to save space:

[hadoop@hc2nn sql]$ hdfs dfs -cat /data/spark/sql/adult.json/part-00000 | more

{"age":"25","workclass":" Private","fnlwgt":" 226802","education":" 11th","educational-num":"
 7","marital-status":" Never-married","occupation":" Machine-op-inspct","relationship":" Own-
child","race":" Black","gender":" Male","capital-gain":" 0","capital-loss":" 0","hours-per-we
ek":" 40","native-country":" United-States","income":" <=50K"}

Processing the Parquet data is very similar, as I will show next.

Processing the Parquet files

Apache Parquet is another columnar-based data format used by many tools in the Hadoop tool set for file I/O, such as Hive, Pig, and Impala. It increases performance by using efficient compression and encoding routines.

The Parquet processing example is very similar to the JSON Scala code. The DataFrame is created, and then saved in a Parquet format using the save method with a type of Parquet:

    val adultDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    adultDataFrame.save("hdfs:///data/spark/sql/adult.parquet","parquet")

  } // end main

} // end sql2

This results in an HDFS-based directory, which contains three Parquet-based files: a common Metadata file, a Metadata file, and a temporary file:

[hadoop@hc2nn sql]$ hdfs dfs -ls /data/spark/sql/adult.parquet
Found 3 items
-rw-r--r--   3 hadoop supergroup       1412 2015-06-21 13:17 /data/spark/sql/adult.parquet/_common_metadata
-rw-r--r--   3 hadoop supergroup       1412 2015-06-21 13:17 /data/spark/sql/adult.parquet/_metadata
drwxr-xr-x   - hadoop supergroup          0 2015-06-21 13:17 /data/spark/sql/adult.parquet/_temporary

Listing the contents of the metadata file, using the Hadoop file system's cat command, gives an idea of the data format. However the Parquet header is binary, and so, it does not display with more and cat:

[hadoop@hc2nn sql]$ hdfs dfs -cat /data/spark/sql/adult.parquet/_metadata | more
s%
ct","fields":[{"name":"age","type":"string","nullable":true,"metadata":{}},{"name":"workclass
","type":"string","nullable":true,"metadata":{}},{"name":"fnlwgt","type":"string","nullable":
true,"metadata":{}},

For more information about possible Spark and SQL context methods, check the contents of the classes called org.apache.spark.SparkContext, and org.apache.spark.sql.SQLContext, using the Apache Spark API path here for the specific <version> of Spark that you are interested in:

spark.apache.org/docs/<version>/api/scala/index.html

In the next section, I will examine Apache Spark DataFrames, introduced in Spark 1.3.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.150.142