Driver failure recovery

Driver failure recovery can be accomplished by using StreamingContext.getOrCreate() to either initialize StreamingContext from an existing checkpoint or to create a new StreamingContext.

The two conditions for a streaming application when started are as follows:

  • When the program is being started for the first time, it needs to create a new StreamingContext, set up all the streams, and then call start()
  • When the program is being restarted after failure, it needs to initialize a StreamingContext from the checkpoint data in the checkpoint directory and then call start()

We will implement a function createStreamContext(), which creates the StreamingContext and sets up the various DStreams to parse the tweets and generate the top five tweet hashtags every 15 seconds using a window. But instead of calling createStreamContext() and then calling ssc.start() , we will call getOrCreate() so that if the checkpointDirectory exists, then the context will be recreated from the checkpoint data. If the directory does not exist (the application is running for the first time), then the function createStreamContext() will be called to create a new context and set up the DStreams:

val ssc = StreamingContext.getOrCreate(checkpointDirectory,
createStreamContext _)

Shown in the following is the code showing the definition of the function and how getOrCreate() can be called:

val checkpointDirectory = "checkpoints"

// Function to create and setup a new StreamingContext
def createStreamContext(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(5))

val twitterStream = TwitterUtils.createStream(ssc, None)

val wordStream = twitterStream.flatMap(x => x.getText().split(" "))

val aggStream = twitterStream
.flatMap(x => x.getText.split(" ")).filter(_.startsWith("#"))
.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, Seconds(15), Seconds(10), 5)

ssc.checkpoint(checkpointDirectory)

aggStream.checkpoint(Seconds(10))

wordStream.checkpoint(Seconds(10))

aggStream.foreachRDD((rdd, time) => {
val count = rdd.count()

if (count > 0) {
val dt = new Date(time.milliseconds)
println(s" $dt rddCount = $count Top 5 words ")
val top10 = rdd.sortBy(_._2, ascending = false).take(5)
top10.foreach {
case (word, count) => println(s"[$word] - $count")
}
}
})
ssc
}

// Get StreamingContext from checkpoint data or create a new one
val ssc = StreamingContext.getOrCreate(checkpointDirectory, createStreamContext _)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.218.167.41