Advanced streaming sources

For receiver-based sources, fault tolerance of a streaming job depends on two factors, which are failure scenario and type of receiver. A receiver receives the data and replicates them on two Spark executors to achieve fault tolerance and hence if one of the executors fails then the tasks and receivers are executed on another node by Spark without any data loss. However, if the driver fails then by design all the executors fail and the computed data and other meta information gets lost. To achieve fault tolerance on a driver node checkpointing of DStreams should be done, which saves the DAG of DStreams to fault-tolerant storage of the checkpoint. When the failed driver gets restarted it looks for meta information from the checkpoint location to launch new executors and restart the receivers. However, to restart a failed Driver, mentioning the checkpoint folder location is not enough. Automatic driver restart configuration needs to be used while submitting the job on a cluster, for example, in YARN the job should be run in cluster mode and number of retries set in configuration yarn.resourcemanager.am.max-attempts. Similarly in Spark standalone mode one should submit the Spark job with cluster mode and -- supervise while in Mesos marathon can restart applications or use the --supervise flag.

Apart from setting cluster configuration, the restarted job should be able to figure out if the streaming job has to recover the StreamingContext from the checkpoint or create a new one. This can be achieved by writing the code with following guidelines:

  • The setup code should be written in a function that returns a StreamingContext.
  • Within the same function, also mention the checkpoint directory as an argument to the method checkpoint() on StreamingContext.
  • Set up a context by using the factory method getOrCreate() on StreamingContext and passing the arguments as the checkpoint directory and the name of the function returning StreamingContext. When the streaming job gets restarted it first looks into the checkpoint directory first to create the StremaingContext and if it does not exist in the said folder then it creates a new one.

In the previous sections, we went through an example of writing a streaming word count job accepting data from a socket. We can extend the same example to make the streaming job recoverable in case of driver failure by refactoring the code and moving the streaming logic to a function that returns StreamingContext. We have also made the job stateful by adding a mapWithState function that holds a word and its count during the entire session:

protectedstatic JavaStreamingContext createContext(String ip, int port, String checkpointDirectory) {
SparkConf sparkConf = new SparkConf().setAppName("WordCountRecoverableEx");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
streamingContext.checkpoint(checkpointDirectory);
// Initial state RDD input to mapWithState
@SuppressWarnings("unchecked")
List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);
JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream(ip,port,
StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = StreamingLines.flatMap(str -> Arrays.asList(str.split(" ")).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str -> new Tuple2<>(str, 1)).reduceByKey((count1, count2) -> count1 + count2);

// Update the cumulative count function
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = newFunction3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
i
public Tuple2<String, Integer> call(String word, Optional<In@Overrideteger> one, State<Integer> state) {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tup;le2<>(word, sum);
state.update(sum);
return output;
}
};
// DStream made of get cumulative counts that get updated in every batch
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = wordCounts.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
stateDstream.print();
return streamingContext;
}

The createContext() method can now be called from the main method as follows:

Function0<JavaStreamingContext> createContextFunc = newFunction0<JavaStreamingContext>() {
@Override
public JavaStreamingContext call() {
returncreateContext(ip, port, checkpointDirectory);
}
};
JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
ssc.start();
ssc.awaitTermination();

Although enabling checkpoints by the following previous guidelines does guarantee recovery of Spark driver, in-memory blocks of buffered data are lost on the driver restart. Reliable receivers send acknowledgements to the source hence in case if a receiver fails while buffering the data itself then no acknowledgement will be sent to the source. When the receiver recovers, the source will again send the data that will be consumed and acknowledged by the receivers. But a scenario can arise when the acknowledgement has been sent, but while the message is being processed, the driver fails and by design all other executors also get killed and hence the data being operated upon gets lost. On the other hand, unreliable receivers do not send acknowledgement in the first place and hence the data may be lost when the receiver recovers from failure. To overcome this problem Spark Streaming introduced a feature called write ahead log (WAL), which writes the data to the checkpoint directory first and then the streaming processing starts. In case of driver recovery with WAL enabled, the data is read from checkpoint directory and any loss of data by driver is avoided. Though this process slightly degrades the performance, it makes driver recovery data loss free. WAL can be enabled by setting the following Spark configuration to true:

sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") 

WAL, along with checkpoints, ensures at least once semantics (no data loss), but not exactly once processing guarantee. Let's analyze a scenario where the receiver writes the data in WAL, but fails while trying to update Kafka offset. After a while the receiver recovers from failure and first processes the messages from WAL and then requests for a new set of data from Kafka, but since the offset was not updated in Kafka when the receiver failed hence Kafka will again send the same set of data and the message will be processed again. Though in some use cases this may not be of any concern while in others some mechanism has to be developed to process such duplicate records.

To overcome duplicity of data and ensure exactly once semantics, Spark Streaming introduced, Kafka Direct API, as described in the streaming sources section, which does away with WAL and treats Kafka as a filesystem while consuming data from the topic. Some salient features of Kafka Direct API are:

  • It ensures exactly once semantics
  • Data is not consumed using Kafka receivers, but a simple consumer API is used by executors to consume data directly from Kafka
  • WAL is not used in case of failure, data is re-consumed in case of recovering from any failure directly from Kafka
  • Spark driver calculates the offset range to be consumed in the next micro batch interval and then it is Spark executor, which uses simple consumer API to consume those offsets
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.139.224