Window operations

Spark Streaming provides windowed processing, which allows you to apply transformations over a sliding window of events. The sliding window is created over an interval specified. Every time the window slides over a source DStream, the source RDDs, which fall within the window specification, are combined and operated upon to generate the windowed DStream. There are two parameters that need to be specified for the window:

  • Window length: This specifies the length in interval considered as the window
  • Sliding interval: This is the interval at which the window is created
The window length and the sliding interval must both be a multiple of the block interval.

Shown in the following is an illustration shows a DStream with a sliding window operation showing how the old window (dotted line rectangle) slides by one interval to the right into the new window (solid line rectangle):

Some of the common window operation are as follows.

Transformation Meaning
window(windowLength, slideInterval) This creates a window on the source DStream and returns the same as a new DStream.
countByWindow(windowLength, slideInterval) This returns count of elements in the DStream by applying a sliding window.
reduceByWindow(func, windowLength, slideInterval) This returns a new DStream by applying the reduce function on each element of the source DStream after creating a sliding window of length windowLength.
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) This aggregates the data by key in the window applied to the source DStream's RDDs and returns a new DStream of (key, value) pairs. The computation is provided by function func.
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

This aggregates the data by key in the window applied to the source DStream's RDDs and returns a new DStream of (key, value) pairs. The key difference between the preceding function and this one is the invFunc, which provides the computation to be done at the beginning of the sliding window.

countByValueAndWindow(windowLength, slideInterval, [numTasks]) This computes the frequency of each key and returns a new DStream of (key, long) pairs within the sliding window as specified.

Let us look at the Twitter stream example in more detail. Our goal is to print the top five words used in tweets streamed every five seconds, using a window of length 15 seconds, sliding every 10 seconds. Hence, we can get the top five words in 15 seconds.

To run this code, follow these steps:

  1. First, open a terminal and change directory to spark-2.1.1-bin-hadoop2.7.
  2. Create a folder streamouts under the spark-2.1.1-bin-hadoop2.7 folder where you have spark installed. When the application runs, the streamouts folder will have collected tweets to text files.
  3. Download the following jars into the directory:
  4. Launch spark-shell with the jars needed for Twitter integration specified:
      ./bin/spark-shell --jars twitter4j-stream-4.0.6.jar,
twitter4j-core-4.0.6.jar,
spark-streaming-twitter_2.11-2.1.0.jar
  1. Now, we can write the code. Shown in the following is the code used to test Twitter event processing:
        import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)

import java.util.Date
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.Twitter._
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

System.setProperty("twitter4j.oauth.consumerKey",
"8wVysSpBc0LGzbwKMRh8hldSm")
System.setProperty("twitter4j.oauth.consumerSecret",
"FpV5MUDWliR6sInqIYIdkKMQEKaAUHdGJkEb4MVhDkh7dXtXPZ")
System.setProperty("twitter4j.oauth.accessToken",
"817207925756358656-yR0JR92VBdA2rBbgJaF7PYREbiV8VZq")
System.setProperty("twitter4j.oauth.accessTokenSecret",
"JsiVkUItwWCGyOLQEtnRpEhbXyZS9jNSzcMtycn68aBaS")

val ssc = new StreamingContext(sc, Seconds(5))

val twitterStream = TwitterUtils.createStream(ssc, None)

val aggStream = twitterStream
.flatMap(x => x.getText.split(" "))
.filter(_.startsWith("#"))
.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, Seconds(15),
Seconds(10), 5)

ssc.checkpoint("checkpoints")
aggStream.checkpoint(Seconds(10))

aggStream.foreachRDD((rdd, time) => {
val count = rdd.count()

if (count > 0) {
val dt = new Date(time.milliseconds)
println(s" $dt rddCount = $count Top 5 words ")
val top5 = rdd.sortBy(_._2, ascending = false).take(5)
top5.foreach {
case (word, count) =>
println(s"[$word] - $count")
}
}
})

ssc.start

//wait 60 seconds
ss.stop(false)
  1. The output is displayed on the console every 15 seconds and looks something like the following:
      Mon May 29 02:44:50 EDT 2017 rddCount = 1453
Top 5 words

[#RT] - 64
[#de] - 24
[#a] - 15
[#to] - 15
[#the] - 13


Mon May 29 02:45:00 EDT 2017 rddCount = 3312
Top 5 words

[#RT] - 161
[#df] - 47
[#a] - 35
[#the] - 29
[#to] - 29

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.182.250