Errors and recovery

Generally, the question that needs to be asked for your application is; is it critical that you receive and process all the data? If not, then on failure you might just be able to restart the application and discard the missing or lost data. If this is not the case, then you will need to use checkpointing, which will be described in the next section.

It is also worth noting that your application's error management should be robust and self-sufficient. What I mean by this is that; if an exception is non-critical, then manage the exception, perhaps log it, and continue processing. For instance, when a task reaches the maximum number of failures (specified by spark.task.maxFailures), it will terminate processing.


It is possible to set up an HDFS-based checkpoint directory to store Apache Spark-based streaming information. In this Scala example, data will be stored in HDFS, under /data/spark/checkpoint. The following HDFS file system ls command shows that before starting, the directory does not exist:

[hadoop@hc2nn stream]$ hdfs dfs -ls /data/spark/checkpoint
ls: `/data/spark/checkpoint': No such file or directory

The Twitter-based Scala code sample given next, starts by defining a package name for the application, and by importing Spark, streaming, context, and Twitter-based functionality. It then defines an application object named stream1:


import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._

object stream1 {

Next, a method is defined called createContext, which will be used to create both the spark, and streaming contexts. It will also checkpoint the stream to the HDFS-based directory using the streaming context checkpoint method, which takes a directory path as a parameter. The directory path being the value (cpDir) that was passed into the createContext method:

  def createContext( cpDir : String ) : StreamingContext = {

    val appName = "Stream example 1"
    val conf    = new SparkConf()


    val sc = new SparkContext(conf)

    val ssc    = new StreamingContext(sc, Seconds(5) )

    ssc.checkpoint( cpDir )


Now, the main method is defined, as is the HDFS directory, as well as Twitter access authority and parameters. The Spark streaming context ssc is either retrieved or created using the HDFS checkpoint directory via the StreamingContext method—getOrCreate. If the directory doesn't exist, then the previous method called createContext is called, which will create the context and checkpoint. Obviously, I have truncated my own Twitter auth. keys in this example for security reasons:

  def main(args: Array[String]) {

    val hdfsDir = "/data/spark/checkpoint"

    val consumerKey       = "QQpxx"
    val consumerSecret    = "0HFzxx"
    val accessToken       = "323xx"
    val accessTokenSecret = "IlQxx"

    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

    val ssc = StreamingContext.getOrCreate(hdfsDir,
      () => { createContext( hdfsDir ) })

    val stream = TwitterUtils.createStream(ssc,None).window( Seconds(60) )

    // do some processing


  } // end main

Having run this code, which has no actual processing, the HDFS checkpoint directory can be checked again. This time it is apparent that the checkpoint directory has been created, and the data has been stored:

[hadoop@hc2nn stream]$ hdfs dfs -ls /data/spark/checkpoint
Found 1 items
drwxr-xr-x   - hadoop supergroup          0 2015-07-02 13:41 /data/spark/checkpoint/0fc3d94e-6f53-40fb-910d-1eef044b12e9

This example, taken from the Apache Spark website, shows how checkpoint storage can be set up and used. But how often is checkpointing carried out? The Meta data is stored during each stream batch. The actual data is stored with a period, which is the maximum of the batch interval, or ten seconds. This might not be ideal for you, so you can reset the value using the method:

DStream.checkpoint( newRequiredInterval )

Where newRequiredInterval is the new checkpoint interval value that you require, generally you should aim for a value which is five to ten times your batch interval.

Checkpointing saves both the stream batch and metadata (data about the data). If the application fails, then when it restarts, the checkpointed data is used when processing is started. The batch data that was being processed at the time of failure is reprocessed, along with the batched data since the failure.

Remember to monitor the HDFS disk space being used for check pointing. In the next section, I will begin to examine the streaming sources, and will provide some examples of each type.

