I am very excited to introduce you to this chapter. Unstructured data is what, in reality, makes big data different from the old data, it also makes Scala to be the new paradigm for processing the data. To start with, unstructured data at first sight seems a lot like a derogatory term. Notwithstanding, every sentence in this book is unstructured data: it does not have the traditional record / row / column semantics. For most people, however, this is the easiest thing to read rather than the book being presented as a table or spreadsheet.
In practice, the unstructured data means nested and complex data. An XML document or a photograph are good examples of unstructured data, which have very rich structure to them. My guess is that the originators of the term meant that the new data, the data that engineers at social interaction companies such as Google, Facebook, and Twitter saw, had a different structure to it as opposed to a traditional flat table that everyone used to see. These indeed did not fit the traditional RDBMS paradigm. Some of them can be flattened, but the underlying storage would be too inefficient as the RDBMSs were not optimized to handle them and also be hard to parse not only for humans, but for the machines as well.
A lot of techniques introduced in this chapter were created as an emergency Band-Aid to deal with the need to just process the data.
In this chapter, we will cover the following topics:
You already saw unstructured data in the previous chapters, the data was an array of LabeledPoint, which is a tuple (label: Double, features: Vector). The label is just a number of type Double. Vector is a sealed trait with two subclasses: SparseVector and DenseVector. The class diagram is as follows:
Each observation is a tuple of label and features, and features can be sparse. Definitely, if there are no missing values, the whole row can be represented as vector. A dense vector representation requires (8 x size + 8) bytes. If most of the elements are missing—or equal to some default value—we can store only the non-default elements. In this case, we would require (12 x non_missing_size + 20) bytes, with small variations depending on the JVM implementation. So, the threshold for switching between one or another, from the storage point of view, is when the size is greater than 1.5 x ( non_missing_size + 1 ), or if roughly at least 30% of elements are non-default. While the computer languages are good at representing the complex structures via pointers, we need some convenient form to exchange these data between JVMs or machines. First, let's see first how Spark/Scala does it, specifically persisting the data in the Parquet format:
akozlov@Alexanders-MacBook-Pro$ bin/spark-shell Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 1.6.1-SNAPSHOT /_/ Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. SQL context available as sqlContext. scala> import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.LabeledPoint scala> import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.Vectors Wha scala> scala> val points = Array( | LabeledPoint(0.0, Vectors.sparse(3, Array(1), Array(1.0))), | LabeledPoint(1.0, Vectors.dense(0.0, 2.0, 0.0)), | LabeledPoint(2.0, Vectors.sparse(3, Array((1, 3.0)))), | LabeledPoint.parse("(3.0,[0.0,4.0,0.0])")); pts: Array[org.apache.spark.mllib.regression.LabeledPoint] = Array((0.0,(3,[1],[1.0])), (1.0,[0.0,2.0,0.0]), (2.0,(3,[1],[3.0])), (3.0,[0.0,4.0,0.0])) scala> scala> val rdd = sc.parallelize(points) rdd: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = ParallelCollectionRDD[0] at parallelize at <console>:25 scala> scala> val df = rdd.repartition(1).toDF df: org.apache.spark.sql.DataFrame = [label: double, features: vector] scala> df.write.parquet("points")
What we did was create a new RDD dataset from command line, or we could use org.apache.spark.mllib.util.MLUtils
to load a text file, converted it to a DataFrames and create a serialized representation of it in the Parquet file under the points
directory.
What Parquet stands for?
Apache Parquet is a columnar storage format, jointly developed by Cloudera and Twitter for big data. Columnar storage allows for better compression of values in the datasets and is more efficient if only a subset of columns need to be retrieved from the disk. Parquet was built from the ground up with complex nested data structures in mind and uses the record shredding and assembly algorithm described in the Dremel paper (https://blog.twitter.com/2013/dremel-made-simple-with-parquet). Dremel/Parquet encoding uses definition/repetition fields to denote the level in the hierarchy the data is coming from, which covers most of the immediate encoding needs, as it is sufficient to store optional fields, nested arrays, and maps. Parquet stores the data by chunks, thus probably the name Parquet, which means flooring composed of wooden blocks arranged in a geometric pattern. Parquet can be optimized for reading only a subset of blocks from disk, depending on the subset of columns to be read and the index used (although it very much depends on whether the specific implementation is aware of these features). The values in the columns can use dictionary and Run-Length Encoding (RLE), which provides exceptionally good compression for columns with many duplicate entries, a frequent use case in big data.
Parquet file is a binary format, but you might look at the information in it using parquet-tools
, which are downloadable from http://archive.cloudera.com/cdh5/cdh/5:
akozlov@Alexanders-MacBook-Pro$ wget -O - http://archive.cloudera.com/cdh5/cdh/5/parquet-1.5.0-cdh5.5.0.tar.gz | tar xzvf - akozlov@Alexanders-MacBook-Pro$ cd parquet-1.5.0-cdh5.5.0/parquet-tools akozlov@Alexanders-MacBook-Pro$ tar xvf xvf parquet-1.5.0-cdh5.5.0/parquet-tools/target/parquet-tools-1.5.0-cdh5.5.0-bin.tar.gz akozlov@Alexanders-MacBook-Pro$ cd parquet-tools-1.5.0-cdh5.5.0 akozlov@Alexanders-MacBook-Pro $ ./parquet-schema ~/points/*.parquet message spark_schema { optional double label; optional group features { required int32 type (INT_8); optional int32 size; optional group indices (LIST) { repeated group list { required int32 element; } } optional group values (LIST) { repeated group list { required double element; } } } }
Let's look at the schema, which is very close to the structure depicted in Figure 1: first member is the label of type double and the second and last one is features of composite type. The keyword optional is another way of saying that the value can be null (absent) in the record for one or another reason. The lists or arrays are encoded as a repeated field. As the whole array may be absent (it is possible for all features to be absent), it is wrapped into optional groups (indices and values). Finally, the type encodes whether it is a sparse or dense representation:
akozlov@Alexanders-MacBook-Pro $ ./parquet-dump ~/points/*.parquet row group 0 ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- label: DOUBLE GZIP DO:0 FPO:4 SZ:78/79/1.01 VC:4 ENC:BIT_PACKED,PLAIN,RLE features: .type: INT32 GZIP DO:0 FPO:82 SZ:101/63/0.62 VC:4 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE .size: INT32 GZIP DO:0 FPO:183 SZ:97/59/0.61 VC:4 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE .indices: ..list: ...element: INT32 GZIP DO:0 FPO:280 SZ:100/65/0.65 VC:4 ENC:PLAIN_DICTIONARY,RLE .values: ..list: ...element: DOUBLE GZIP DO:0 FPO:380 SZ:125/111/0.89 VC:8 ENC:PLAIN_DICTIONARY,RLE label TV=4 RL=0 DL=1 ------------------------------------------------------------------------------------------------------------------------------------------------------------------ page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN SZ:38 VC:4 features.type TV=4 RL=0 DL=1 DS: 2 DE:PLAIN_DICTIONARY ------------------------------------------------------------------------------------------------------------------------------------------------------------------ page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN_DICTIONARY SZ:9 VC:4 features.size TV=4 RL=0 DL=2 DS: 1 DE:PLAIN_DICTIONARY ------------------------------------------------------------------------------------------------------------------------------------------------------------------ page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN_DICTIONARY SZ:9 VC:4 features.indices.list.element TV=4 RL=1 DL=3 DS: 1 DE:PLAIN_DICTIONARY ------------------------------------------------------------------------------------------------------------------------------------------------------------------ page 0: DLE:RLE RLE:RLE VLE:PLAIN_DICTIONARY SZ:15 VC:4 features.values.list.element TV=8 RL=1 DL=3 DS: 5 DE:PLAIN_DICTIONARY ------------------------------------------------------------------------------------------------------------------------------------------------------------------ page 0: DLE:RLE RLE:RLE VLE:PLAIN_DICTIONARY SZ:17 VC:8 DOUBLE label ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- *** row group 1 of 1, values 1 to 4 *** value 1: R:0 D:1 V:0.0 value 2: R:0 D:1 V:1.0 value 3: R:0 D:1 V:2.0 value 4: R:0 D:1 V:3.0 INT32 features.type ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- *** row group 1 of 1, values 1 to 4 *** value 1: R:0 D:1 V:0 value 2: R:0 D:1 V:1 value 3: R:0 D:1 V:0 value 4: R:0 D:1 V:1 INT32 features.size ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- *** row group 1 of 1, values 1 to 4 *** value 1: R:0 D:2 V:3 value 2: R:0 D:1 V:<null> value 3: R:0 D:2 V:3 value 4: R:0 D:1 V:<null> INT32 features.indices.list.element ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- *** row group 1 of 1, values 1 to 4 *** value 1: R:0 D:3 V:1 value 2: R:0 D:1 V:<null> value 3: R:0 D:3 V:1 value 4: R:0 D:1 V:<null> DOUBLE features.values.list.element ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- *** row group 1 of 1, values 1 to 8 *** value 1: R:0 D:3 V:1.0 value 2: R:0 D:3 V:0.0 value 3: R:1 D:3 V:2.0 value 4: R:1 D:3 V:0.0 value 5: R:0 D:3 V:3.0 value 6: R:0 D:3 V:0.0 value 7: R:1 D:3 V:4.0 value 8: R:1 D:3 V:0.0
You are probably a bit confused about the R
: and D
: in the output. These are the repetition and definition levels as described in the Dremel paper and they are necessary to efficiently encode the values in the nested structures. Only repeated fields increment the repetition level and only non-required fields increment the definition level. Drop in R
signifies the end of the list(array). For every non-required level in the hierarchy tree, one needs a new definition level. Repetition and definition level values are small by design and can be efficiently stored in a serialized form.
What is best, if there are many duplicate entries, they will all be placed together. The case for which the compression algorithm (by default, it is gzip) are optimized. Parquet also implements other algorithms exploiting repeated values such as dictionary encoding or RLE compression.
This is a simple and efficient serialization out of the box. We have been able to write a set of complex objects to a file, each column stored in a separate block, representing all values in the records and nested structures.
Let's now read the file and recover RDD. The Parquet format does not know anything about the LabeledPoint
class, so we'll have to do some typecasting and trickery here. When we read the file, we'll see a collection of org.apache.spark.sql.Row
:
akozlov@Alexanders-MacBook-Pro$ bin/spark-shell Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 1.6.1-SNAPSHOT /_/ Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. SQL context available as sqlContext. scala> val df = sqlContext.read.parquet("points") df: org.apache.spark.sql.DataFrame = [label: double, features: vector] scala> val df = sqlContext.read.parquet("points").collect df: Array[org.apache.spark.sql.Row] = Array([0.0,(3,[1],[1.0])], [1.0,[0.0,2.0,0.0]], [2.0,(3,[1],[3.0])], [3.0,[0.0,4.0,0.0]]) scala> val rdd = df.map(x => LabeledPoint(x(0).asInstanceOf[scala.Double], x(1).asInstanceOf[org.apache.spark.mllib.linalg.Vector])) rdd: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[16] at map at <console>:25 scala> rdd.collect res12: Array[org.apache.spark.mllib.regression.LabeledPoint] = Array((0.0,(3,[1],[1.0])), (1.0,[0.0,2.0,0.0]), (2.0,(3,[1],[3.0])), (3.0,[0.0,4.0,0.0])) scala> rdd.filter(_.features(1) <= 2).collect res13: Array[org.apache.spark.mllib.regression.LabeledPoint] = Array((0.0,(3,[1],[1.0])), (1.0,[0.0,2.0,0.0]))
Personally, I think that this is pretty cool: without any compilation, we can encode and decide complex objects. One can easily create their own objects in REPL. Let's consider that we want to track user's behavior on the web:
akozlov@Alexanders-MacBook-Pro$ bin/spark-shell Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 1.6.1-SNAPSHOT /_/ Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. SQL context available as sqlContext. scala> case class Person(id: String, visits: Array[String]) { override def toString: String = { val vsts = visits.mkString(","); s"($id -> $vsts)" } } defined class Person scala> val p1 = Person("Phil", Array("http://www.google.com", "http://www.facebook.com", "http://www.linkedin.com", "http://www.homedepot.com")) p1: Person = (Phil -> http://www.google.com,http://www.facebook.com,http://www.linkedin.com,http://www.homedepot.com) scala> val p2 = Person("Emily", Array("http://www.victoriassecret.com", "http://www.pacsun.com", "http://www.abercrombie.com/shop/us", "http://www.orvis.com")) p2: Person = (Emily -> http://www.victoriassecret.com,http://www.pacsun.com,http://www.abercrombie.com/shop/us,http://www.orvis.com) scala> sc.parallelize(Array(p1,p2)).repartition(1).toDF.write.parquet("history") scala> import scala.collection.mutable.WrappedArray import scala.collection.mutable.WrappedArray scala> val df = sqlContext.read.parquet("history") df: org.apache.spark.sql.DataFrame = [id: string, visits: array<string>] scala> val rdd = df.map(x => Person(x(0).asInstanceOf[String], x(1).asInstanceOf[WrappedArray[String]].toArray[String])) rdd: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[27] at map at <console>:28 scala> rdd.collect res9: Array[Person] = Array((Phil -> http://www.google.com,http://www.facebook.com,http://www.linkedin.com,http://www.homedepot.com), (Emily -> http://www.victoriassecret.com,http://www.pacsun.com,http://www.abercrombie.com/shop/us,http://www.orvis.com))
As a matter of good practice, we need to register the newly created classes with the Kryo
serializer
—Spark will use another serialization mechanism to pass the objects between tasks and executors. If the class is not registered, Spark will use default Java serialization, which might be up to 10 x slower:
scala> :paste // Entering paste mode (ctrl-D to finish) import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[Person]) } } object MyKryoRegistrator { def register(conf: org.apache.spark.SparkConf) { conf.set("spark.serializer", classOf[KryoSerializer].getName) conf.set("spark.kryo.registrator", classOf[MyKryoRegistrator].getName) } } ^D // Exiting paste mode, now interpreting. import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} defined class MyKryoRegistrator defined module MyKryoRegistrator scala>
If you are deploying the code on a cluster, the recommendation is to put this code in a jar on the classpath.
I've certainly seen examples of up to 10 level deep nesting in production. Although this might be an overkill for performance reasons, nesting is required in more and more production business use cases. Before we go into the specifics of constructing a nested object in the example of sessionization, let's get an overview of serialization in general.
3.145.179.120