Let's consider a few practical examples and libraries in Spark/Scala starting with a very traditional problem of word counting.
Most modern machine learning algorithms require multiple passes over data. If the data fits in the memory of a single machine, the data is readily available and this does not present a performance bottleneck. However, if the data becomes too large to fit into RAM, one has a choice of either dumping pieces of the data on disk (or database), which is about 100 times slower, but has a much larger capacity, or splitting the dataset between multiple machines across the network and transferring the results. While there are still ongoing debates, for most practical systems, analysis shows that storing the data over a set of network connected nodes has a slight advantage over repeatedly storing and reading it from hard disks on a single node, particularly if we can split the workload effectively between multiple CPUs.
An average disk has bandwidth of about 100 MB/sec and transfers with a few mms latency, depending on the rotation speed and caching. This is about 100 times slower than reading the data from memory, depending on the data size and caching implementation again. Modern data bus can transfer data at over 10 GB/sec. While the network speed still lags behind the direct memory access, particularly with standard TCP/IP kernel networking layer overhead, specialized hardware can reach tens of GB/sec and if run in parallel, it can be potentially as fast as reading from the memory. In practice, the network-transfer speeds are somewhere between 1 to 10 GB/sec, but still faster than the disk in most practical systems. Thus, we can potentially fit the data into combined memory of all the cluster nodes and perform iterative machine learning algorithms across a system of them.
One problem with memory, however, is that it is does not persist across node failures and reboots. A popular big data framework, Hadoop, made possible with the help of the original Dean/Ghemawat paper (Jeff Dean and Sanjay Ghemawat, MapReduce: Simplified Data Processing on Large Clusters, OSDI, 2004.), is using exactly the disk layer persistence to guarantee fault tolerance and store intermediate results. A Hadoop MapReduce program would first run a map
function on each row of a dataset, emitting one or more key/value pairs. These key/value pairs then would be sorted, grouped, and aggregated by key so that the records with the same key would end up being processed together on the same reducer, which might be running on same or another node. The reducer applies a reduce
function that traverses all the values that were emitted for the same key and aggregates them accordingly. The persistence of intermediate results would guarantee that if a reducer fails for one or another reason, the partial computations can be discarded and the reduce computation can be restarted from the checkpoint-saved results. Many simple ETL-like applications traverse the dataset only once with very little information preserved as state from one record to another.
For example, one of the traditional applications of MapReduce is word count. The program needs to count the number of occurrences of each word in a document consisting of lines of text. In Scala, the word count is readily expressed as an application of the foldLeft
method on a sorted list of words:
val lines = scala.io.Source.fromFile("...").getLines.toSeq val counts = lines.flatMap(line => line.split("\W+")).sorted. foldLeft(List[(String,Int)]()){ (r,c) => r match { case (key, count) :: tail => if (key == c) (c, count+1) :: tail else (c, 1) :: r case Nil => List((c, 1)) } }
If I run this program, the output will be a list of (word, count) tuples. The program splits the lines into words, sorts the words, and then matches each word with the latest entry in the list of (word, count) tuples. The same computation in MapReduce would be expressed as follows:
val linesRdd = sc.textFile("hdfs://...") val counts = linesRdd.flatMap(line => line.split("\W+")) .map(_.toLowerCase) .map(word => (word, 1)). .reduceByKey(_+_) counts.collect
First, we need to process each line of the text by splitting the line into words and generation (word, 1)
pairs. This task is easily parallelized. Then, to parallelize the global count, we need to split the counting part by assigning a task to do the count for a subset of words. In Hadoop, we compute the hash of the word and divide the work based on the value of the hash.
Once the map task finds all the entries for a given hash, it can send the key/value pairs to the reducer, the sending part is usually called shuffle in MapReduce vernacular. A reducer waits until it receives all the key/value pairs from all the mappers, combines the values—a partial combine can also happen on the mapper, if possible—and computes the overall aggregate, which in this case is just sum. A single reducer will see all the values for a given word.
Let's look at the log output of the word count operation in Spark (Spark is very verbose by default, you can manage the verbosity level by modifying the conf/log4j.properties
file by replacing INFO
with ERROR
or FATAL
):
$ wget http://mirrors.sonic.net/apache/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz $ tar xvf spark-1.6.1-bin-hadoop2.6.tgz $ cd spark-1.6.1-bin-hadoop2.6 $ mkdir leotolstoy $ (cd leotolstoy; wget http://www.gutenberg.org/files/1399/1399-0.txt) $ bin/spark-shell Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 1.6.1 /_/ Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. SQL context available as sqlContext. scala> val linesRdd = sc.textFile("leotolstoy", minPartitions=10) linesRdd: org.apache.spark.rdd.RDD[String] = leotolstoy MapPartitionsRDD[3] at textFile at <console>:27
At this stage, the only thing that happened is metadata manipulations, Spark has not touched the data itself. Spark estimates that the size of the dataset and the number of partitions. By default, this is the number of HDFS blocks, but we can specify the minimum number of partitions explicitly with the minPartitions
parameter:
scala> val countsRdd = linesRdd.flatMap(line => line.split("\W+")). | map(_.toLowerCase). | map(word => (word, 1)). | reduceByKey(_+_) countsRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:31
We just defined another RDD derived from the original linesRdd
:
scala> countsRdd.collect.filter(_._2 > 99) res3: Array[(String, Int)] = Array((been,1061), (them,841), (found,141), (my,794), (often,105), (table,185), (this,1410), (here,364), (asked,320), (standing,132), ("",13514), (we,592), (myself,140), (is,1454), (carriage,181), (got,277), (won,153), (girl,117), (she,4403), (moment,201), (down,467), (me,1134), (even,355), (come,667), (new,319), (now,872), (upon,207), (sister,115), (veslovsky,110), (letter,125), (women,134), (between,138), (will,461), (almost,124), (thinking,159), (have,1277), (answer,146), (better,231), (men,199), (after,501), (only,654), (suddenly,173), (since,124), (own,359), (best,101), (their,703), (get,304), (end,110), (most,249), (but,3167), (was,5309), (do,846), (keep,107), (having,153), (betsy,111), (had,3857), (before,508), (saw,421), (once,334), (side,163), (ough...
Word count over 2 GB of text data—40,291 lines and 353,087 words—took under a second to read, split, and group by words.
With extended logging, you could see the following:
http://localhost:4040
The art of parallel performance tuning is to split the workload between different nodes or threads so that the overhead is relatively small and the workload is balanced.
Spark supports listening on incoming streams, partitioning it, and computing aggregates close to real-time. Currently supported sources are Kafka, Flume, HDFS/S3, Kinesis, Twitter, as well as the traditional MQs such as ZeroMQ and MQTT. In Spark, streaming is implemented as micro-batches. Internally, Spark divides input data into micro-batches, usually from subseconds to minutes in size and performs RDD aggregation operations on these micro-batches.
For example, let's extend the Flume example that we covered earlier. We'll need to modify the Flume configuration file to create a Spark polling sink. Instead of HDFS, replace the sink section:
# The sink is Spark a1.sinks.k1.type=org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname=localhost a1.sinks.k1.port=4989
Now, instead of writing to HDFS, Flume will wait for Spark to poll for data:
object FlumeWordCount { def main(args: Array[String]) { // Create the context with a 2 second batch size val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumeWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("/tmp/flume_check") val hostPort=args(0).split(":") System.out.println("Opening a sink at host: [" + hostPort(0) + "] port: [" + hostPort(1).toInt + "]") val lines = FlumeUtils.createPollingStream(ssc, hostPort(0), hostPort(1).toInt, StorageLevel.MEMORY_ONLY) val words = lines .map(e => new String(e.event.getBody.array)).map(_.toLowerCase).flatMap(_.split("\W+")) .map(word => (word, 1L)) .reduceByKeyAndWindow(_+_, _-_, Seconds(6), Seconds(2)).print ssc.start() ssc.awaitTermination() } }
To run the program, start the Flume agent in one window:
$ ./bin/flume-ng agent -Dflume.log.level=DEBUG,console -n a1 –f ../chapter03/conf/flume-spark.conf ...
Then run the FlumeWordCount
object in another:
$ cd ../chapter03 $ sbt "run-main org.akozlov.chapter03.FlumeWordCount localhost:4989 ...
Now, any text typed to the netcat
connection will be split into words and counted every two seconds for a six second sliding window:
$ echo "Happy families are all alike; every unhappy family is unhappy in its own way" | nc localhost 4987 ... ------------------------------------------- Time: 1464161488000 ms ------------------------------------------- (are,1) (is,1) (its,1) (family,1) (families,1) (alike,1) (own,1) (happy,1) (unhappy,2) (every,1) ... ------------------------------------------- Time: 1464161490000 ms ------------------------------------------- (are,1) (is,1) (its,1) (family,1) (families,1) (alike,1) (own,1) (happy,1) (unhappy,2) (every,1) ...
Spark/Scala allows to seamlessly switch between the streaming sources. For example, the same program for Kafka publish/subscribe topic model looks similar to the following:
object KafkaWordCount { def main(args: Array[String]) { // Create the context with a 2 second batch size val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("/tmp/kafka_check") System.out.println("Opening a Kafka consumer at zk:[" + args(0) + "] for group group-1 and topic example") val lines = KafkaUtils.createStream(ssc, args(0), "group-1", Map("example" -> 1), StorageLevel.MEMORY_ONLY) val words = lines .flatMap(_._2.toLowerCase.split("\W+")) .map(word => (word, 1L)) .reduceByKeyAndWindow(_+_, _-_, Seconds(6), Seconds(2)).print ssc.start() ssc.awaitTermination() } }
To start the Kafka broker, first download the latest binary distribution and start ZooKeeper. ZooKeeper is a distributed-services coordinator and is required by Kafka even in a single-node deployment:
$ wget http://apache.cs.utah.edu/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz ... $ tar xf kafka_2.11-0.9.0.1.tgz $ bin/zookeeper-server-start.sh config/zookeeper.properties ...
In another window, start the Kafka server:
$ bin/kafka-server-start.sh config/server.properties ...
Run the KafkaWordCount
object:
$ sbt "run-main org.akozlov.chapter03.KafkaWordCount localhost:2181" ...
Now, publishing the stream of words into the Kafka topic will produce the window counts:
$ echo "Happy families are all alike; every unhappy family is unhappy in its own way" | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic example ... $ sbt "run-main org.akozlov.chapter03.FlumeWordCount localhost:4989 ... ------------------------------------------- Time: 1464162712000 ms ------------------------------------------- (are,1) (is,1) (its,1) (family,1) (families,1) (alike,1) (own,1) (happy,1) (unhappy,2) (every,1)
As you see, the programs output every two seconds. Spark streaming is sometimes called micro-batch processing. Streaming has many other applications (and frameworks), but this is too big of a topic to be entirely considered here and needs to be covered separately. I'll cover some ML on streams of data in Chapter 5, Regression and Classification. Now, let's get back to more traditional SQL-like interfaces.
DataFrame was a relatively recent addition to Spark, introduced in version 1.3, allowing one to use the standard SQL language for data analysis. We already used some SQL commands in Chapter 1, Exploratory Data Analysis for the exploratory data analysis. SQL is really great for simple exploratory analysis and data aggregations.
According to the latest poll results, about 70% of Spark users use DataFrame. Although DataFrame recently became the most popular framework for working with tabular data, it is a relatively heavyweight object. The pipelines that use DataFrames may execute much slower than the ones that are based on Scala's vector or LabeledPoint, which will be discussed in the next chapter. The evidence from different developers is that the response times can be driven to tens or hundreds of milliseconds depending on the query, from submillisecond on simpler objects.
Spark implements its own shell for SQL, which can be invoked in addition to the standard Scala REPL shell: ./bin/spark-sql
can be used to access the existing Hive/Impala or relational DB tables:
$ ./bin/spark-sql … spark-sql> select min(duration), max(duration), avg(duration) from kddcup; … 0 58329 48.34243046395876 Time taken: 11.073 seconds, Fetched 1 row(s)
In standard Spark's REPL, the same query can be performed by running the following command:
$ ./bin/spark-shell … scala> val df = sqlContext.sql("select min(duration), max(duration), avg(duration) from kddcup" 16/05/12 13:35:34 INFO parse.ParseDriver: Parsing command: select min(duration), max(duration), avg(duration) from alex.kddcup_parquet 16/05/12 13:35:34 INFO parse.ParseDriver: Parse Completed df: org.apache.spark.sql.DataFrame = [_c0: bigint, _c1: bigint, _c2: double] scala> df.collect.foreach(println) … 16/05/12 13:36:32 INFO scheduler.DAGScheduler: Job 2 finished: collect at <console>:22, took 4.593210 s [0,58329,48.34243046395876]
3.147.63.199