The Spark Streaming module

There is not enough time to give an in-depth introduction to Spark Streaming here, but we can, at the very least, touch on some of the key notions, provide some examples, and give some guidance to more advanced topics.

Spark Streaming is Spark's module for stream data processing, and it is indeed equipped with all the properties we explained in the preceding list: it is a highly fault-tolerant, scalable, and high-throughput system for processing and analyzing streams of live data. Its API is a natural extension of Spark itself and many of the tools available for RDDs and DataFrames carry over to Spark Streaming.

The core abstraction of Spark Streaming applications is the notion of DStream, which stands for discretized stream. To explain the nomenclature, we often think of data streams as a continuous flow of events, which is, of course, an idealization, since all we can ever measure are discrete events. Regardless, this continuous flow of data will hit our system, and for us to process it further, we discretize it into disjoint batches of data. This stream of discrete data batches is realized as DStream in Spark Streaming and is internally implemented as a sequence of RDDs.

The following diagram gives a high-level overview of the data flow and transformation with Spark Streaming:

Figure 2: Input data is fed into Spark Streaming, which discretises this stream as a so called DStream. These sequences of RDDs can then be further transformed and processed by Spark and any module thereof.

As the diagram shows, the data enters Spark Streaming through an input data stream. This data can be produced and ingested from many different sources, which we will discuss further later on. We speak of systems generating events that Spark Streaming can process as sources. Input DStreams take data from sources and do so via receivers for these sources. Once an input DStream has been created, it can be processed through a rich API that allows for many interesting transformations. It serves as a good mental model to think of DStreams as sequences or collections of RDDs, which we can operate on through an interface that is very close to that of RDDs in the Spark core. For instance, operations such as map-reduce, and filter are available for DStreams as well and simply carry over the respective functionality from the individual RDDs to sequences of RDDs. We will discuss all of this in more detail, but let's first turn to a basic example.

As the first example to get started with Spark Streaming, let's consider the following scenario. Assume that we have already loaded the MSNBC data set from earlier and have computed the prefix span model (psModel) from it. This model was fit with data from a single day of user activity, say, yesterday's data. Today, new events of user activity come in. We will create a simple Spark Streaming application with a basic source that generates user data in precisely the schema we had for the MSNBC data; that is, we are given space-separated strings containing numbers between 1 and 17. Our application will then pick up these events and create DStream from them. We can then apply our prefix span model to the data of DStream to find out if the new sequences fed into the system are indeed frequent sequences according to psModel.

To start with a Spark Streaming application in the first place, we need to create a so-called StreamingContext API, which, by convention, will be instantiated as ssc. Assuming that we start an application from scratch, we create the following context:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

val
conf = new SparkConf()
.setAppName("MSNBC data first streaming example")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, batchDuration = Seconds(10))

If you work with the Spark shell, all but the first and last lines are not necessary, since, in such a case, you will be provided with a Spark context (sc) already. We include the creation of the latter regardless, since we aim at a self-contained application. The creation of a new StreamingContext API takes two arguments, namely a SparkContext and an argument called batchDuration, which we set to 10 seconds. The batch duration is the value that tells us how to discretize data for a DStream, by specifying for how long the streaming data should be collected to form a batch within the DStream, that is, one of the RDDs in the sequence. Another detail we want to draw your attention to is that the Spark master is set to two cores by setting local[2]. Since we assume you are working locally, it is important to assign at least two cores to the application. The reason is that one thread will be used to receive input data, while the other will then be free to process it. Should you have more receivers in more advanced applications, you need to reserve one core for each.

Next, we essentially repeat parts of the prefix span model for the sake of completeness of this application. As before, the sequences are loaded from a local text file. Note that this time, we assume the file is in the resources folder of your project, but you can choose to store it anywhere you want:

val transactions: RDD[Array[Int]] = sc.textFile("src/main/resources/msnbc990928.seq") map { line =>
line.split(" ").map(_.toInt)
}
val trainSequences = transactions.map(_.map(Array(_))).cache()
val prefixSpan = new PrefixSpan().setMinSupport(0.005).setMaxPatternLength(15)
val psModel = prefixSpan.run(trainSequences)
val freqSequences = psModel.freqSequences.map(_.sequence).collect()

In the last step of the preceding computation, we collect all the frequent sequences on the master and store them as freqSequences. The reason we do this is that we want to compare this data against the incoming data to see if the sequences of the new data are frequent with respect to the current model (psModel). Unfortunately, unlike many of the algorithms from MLlib, none of the three available pattern mining models in Spark are built to take new data once trained, so we have to do this comparison on our own, using freqSequences.

Next, we can finally create a DStream object of the String type. To this end, we call socketTextStream on our streaming context, which will allow us to receive data from a server, running on port 8000 of localhost, listening on a TCP socket:

val rawSequences: DStream[String] = ssc.socketTextStream("localhost", 8000)

What we call rawSequences is the data received through that connection, discretized into 10-second intervals. Before we discuss how to actually send data, let's first continue with the example of processing it once we have received it. Recall that the input data will have the same format as before, so we need to preprocess it in exactly the same way, as follows:

val sequences: DStream[Array[Array[Int]]] = rawSequences
.map(line => line.split(" ").map(_.toInt))
.map(_.map(Array(_)))

The two map operations we use here are structurally the same as before on the original MSNBC data, but keep in mind that this time, map has a different context, since we are working with DStreams instead of RDDs. Having defined sequences, a sequence of RDDs of the Array[Array[Int]] type, we can use it to match against freqSequences. We do so by iterating over each RDD in sequences and then again over each array contained in these RDDs. Next, we count how often the respective array is found in freqSequences, and if it is found, we print that the sequence corresponding to array is indeed frequent:

print(">>> Analyzing new batch of data")
sequences.foreachRDD(
rdd => rdd.foreach(
array => {
println(">>> Sequence: ")
println(array.map(_.mkString("[", ", ", "]")).mkString("[", ", ", "]"))
freqSequences.count(_.deep == array.deep) match {
case count if count > 0 => println("is frequent!")
case _ => println("is not frequent.")
}
}
)
)
print(">>> done")

Note that in the preceding code, we need to compare deep copies of arrays, since nested arrays can't be compared on the nose. To be more precise, one can check them for equality, but the result will always be false.

Having done the transformation, the only thing we are left with on the receiving side of the application is to actually tell it to start listening to the incoming data:

ssc.start()
ssc.awaitTermination()

Through the streaming context, ssc, we tell the application to start and await its termination. Note that in our specific context and for most other applications of this fashion, we rarely want to terminate the program at all. By design, the application is intended as a long-running job, since, in principle, we want it to listen to and analyze new data indefinitely. Naturally, there will be cases of maintenance, but we may also want to regularly update (re-train) psModel with the newly acquired data.

We have already seen a few operations on DStreams and we recommend you to refer to the latest Spark Streaming documentation (http://spark.apache.org/docs/latest/streaming-programming-guide.html) for more details. Essentially, many of the (functional) programming functionalities available on basic Scala collections that we also know from RDDs carry over seamlessly to DStreams as well. To name a few, these are filter, flatMap, map, reduce, and reduceByKey. Other SQL-like functionalities, such as cogroup, count, countByValue, join, or union, are also at your disposal. We will see some of the more advanced functionalities later on in a second example. 

Now that we have covered the receiving end, let's briefly discuss how to create a data source for our app. One of the simplest ways to send input data from a command line over a TCP socket is to use Netcat, which is available for most operating systems, often preinstalled. To start Netcat locally on port 8000, run the following command in a terminal separate from your Spark application or shell:

nc -lk 8000

Assuming you already started the Spark Streaming application for receiving data from before, we can now type new sequences into the Netcat terminal window and confirm each by hitting Enter. For instance, type the following four sequences within 10 seconds:

You will see the following output:

If you are either really slow at typing or so unlucky that you start typing when the 10-second window is almost over, the output might be split into more parts. Looking at the actual output, you will see that the often discussed categories front page and news, represented by categories 1 and 2, are frequent. Also, since 23 is not a sequence item contained in the original data set, it can't be frequent. Lastly, the sequence <4, 5> is apparently also not frequent, which is something we didn't know before. 

Choosing Netcat for this example is a natural choice for the time and space given in this chapter, but you will never see it used for this purpose in serious production environments. In general, Spark Streaming has two types of sources available: basic and advanced. Basic sources can also be queues of RDDs and other custom sources apart from file streams, which the preceding example represents. On the side of advanced sources, Spark Streaming has a lot of interesting connectors to offer: Kafka, Kinesis, Flume, and advanced custom sources. This wide variety of advanced sources makes it attractive to incorporate Spark Streaming as a production component, integrating well with the other infrastructure components.

Taking a few steps back and considering what we have achieved by discussing this example, you may be inclined to say that apart from introducing Spark Streaming itself and working with data producers and receivers, the application itself did not solve many of our aforementioned concerns. This criticism is valid, and in a second example, we want to address the following remaining issues with our approach:

  • Input data for our DStreams had the same structure as our offline data, that is, it was already pre-aggregated with respect to users, which is not very realistic
  • Apart from the two calls to map and one to foreachRDD, we didn't see much in terms of functionality and added value in operating with DStreams
  • We did not do any analytics on data streams but only checked them against a list of precomputed patterns

To resolve these issues, let's slightly redefine our example setting. This time, let's assume that one event is represented by one user clicking on one site, where each such site falls under one of the categories 1-17, as before. Now, we cannot possibly simulate a complete production environment, so we make the simplifying assumption that each unique user has already been equipped with an ID. Given this information, let's say events come in as key-value pairs consisting of a user ID and a category of this click event.

With this setup, we have to think about how to aggregate these events to generate sequences from them. For this purpose, we need to collect data points for each user ID in a given windowIn the original data set, this window was obviously one full day, but depending on the application, it may make sense to choose a much smaller window. If we think about the scenario of a user browsing his favorite online shop, the click and other events that go back a few hours will unlikely influence his or her current desire to buy something. For this reason, a reasonable assumption made in online marketing and related fields is to limit the window of interest to about 20-30 minutes, a so-called user session. In order for us to see results much quicker, we will use an even smaller window of 20 seconds for our application. We call this the window length.

Now that we know how far back we want to analyze the data from a given point in time, we also have to define how often we want to carry out the aggregation step, which we will call the sliding interval. One natural choice would be to set both to the same amount of time, leading to disjoint windows of aggregation, that is, every 20 seconds. However, it might also be interesting to choose a shorter sliding window of 10 seconds, which would lead to aggregation data that overlaps 10 seconds each. The following diagram illustrates the concepts we just discussed:

Figure 3: Visualisation of a window operation transforming a DStream to another. In this example the batch duration of the Spark Streaming application has been set to 10 seconds. The window length for the transformation operating on batches of data is 40 seconds and we carry out the window operation every 20 seconds, leading to an overlap of 20 seconds each and a resulting DStream that is batched in 20-second blocks.

To turn this knowledge into a concrete example, we assume that the event data has the form key:value, that is, one such event could be 137: 2, meaning that the user with ID 137 clicked on a page with the category news. To process these events, we have to modify our preprocessing like this:

val rawEvents: DStream[String] = ssc.socketTextStream("localhost", 9999)
val
events: DStream[(Int, String)] = rawEvents.map(line => line.split(": "))
.map(kv => (kv(0).toInt, kv(1)))

With these key-value pairs, we can now aim to do the aggregation necessary to group events by the user ID. As outlined earlier, we do this by aggregating on a given window of 20 seconds with a sliding interval of 10 seconds:

val duration = Seconds(20)
val slide = Seconds(10)

val rawSequencesWithIds: DStream[(Int, String)] = events
.reduceByKeyAndWindow((v1: String, v2: String) => v1 + " " + v2, duration, slide)
val rawSequences = rawSequencesWithIds.map(_.2)
// remainder as in previous example

In the preceding code, we are using a more advanced operation on DStreams, namely reduceByKeyAndWindow, in which we specify an aggregation function on values of key-value pairs, as well as a window duration and sliding interval. In the last step of the computation, we strip the user IDs so that the structure of rawSequences is identical to the previous example. This means that we have successfully converted our example to work on unprocessed events, and it will still check against frequent sequences of our baseline model. We will not show more examples of how the output of this application looks, but we encourage you to play around with this application and see how the aggregation works on key-value pairs.

To wrap up this example, and the chapter, let's look at one more interesting way of aggregating event data. Let's say we want to dynamically count how often a certain ID occurs in the event stream, that is, how many page clicks a user generates. We already have our events DStream defined previously, so we could approach the count as follows:

val countIds = events.map(e => (e._1, 1))
val counts: DStream[(Int, Int)] = countIds.reduceByKey(_ + _)

In a way, this works as we intended; it counts events for IDs. However, note that what is returned is again a DStream, that is, we do not actually aggregate across streaming windows but just within the sequences of RDDs. To aggregate across the full stream of events, we need to keep track of count states since from the start. Spark Streaming offers a method on DStreams for precisely this purpose, namely updateStateByKey. It can be used by providing updateFunction, which takes the current state and new values as input and returns an updated state. Let's see how it works in practice for our event count:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
Some(runningCount.getOrElse(0) + newValues.sum)
}
val runningCounts = countIds.updateStateByKey[Int](updateFunction _)

We first define our update function itself. Note that the signature of updateStateByKey requires us to return an Option, but in essence, we just compute the running sum of state and incoming values. Next, we provide updateStateByKey with an Int type signature and the previously created updateFunction method. Doing so, we get precisely the aggregation we wanted in the first place.

Summarizing, we introduced event aggregation, two more complex operations on DStreams (reduceByKeyAndWindow and updateStateByKey), and counted events in a stream with this example. While the example is still simplistic in what it does, we hope to have provided the reader with a good entry point for more advanced applications. For instance, one could extend this example to calculate moving averages over the event stream or change it towards computing frequent patterns on a per-window basis.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.252.87