Checkpointing

Real-time streaming applications are meant to be long running and resilient to failures of all sorts. Spark Streaming implements a checkpointing mechanism that maintains enough information to recover from failures.

There are two types of data that needs to be checkpointed:

  • Metadata checkpointing
  • Data checkpointing

Checkpointing can be enabled by calling checkpoint() function on the StreamingContext as follows:

def checkpoint(directory: String)

Specifies the directory where the checkpoint data will be reliably stored.


Note that this must be a fault-tolerant file system like HDFS.

Once checkpoint directory is set, any DStream can be checkpointed into the directory based on a specified interval. Looking at the Twitter example, we can checkpoint each DStream every 10 seconds into the directory checkpoints:

val ssc = new StreamingContext(sc, Seconds(5))

val twitterStream = TwitterUtils.createStream(ssc, None)

val wordStream = twitterStream.flatMap(x => x.getText().split(" "))

val aggStream = twitterStream
.flatMap(x => x.getText.split(" ")).filter(_.startsWith("#"))
.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, Seconds(15), Seconds(10), 5)

ssc.checkpoint("checkpoints")

aggStream.checkpoint(Seconds(10))

wordStream.checkpoint(Seconds(10))

The checkpoints directory looks something like the following after few seconds, showing the metadata as well as the RDDs and the logfiles are maintained as part of the checkpointing:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.58.8.127