Checkpointing

In Chapter 4, Understanding Spark Programming Model, we discussed various techniques of caching/persisting RDDs to avoid all the re-computation in cases of failures. The same techniques can be followed with a DStream type as well. However, as streaming applications are meant to run all the time, an application may fail because of system failures or network failures as well. To make the Spark Streaming application capable of recovering from such failures, it should be checkpointed to all external locations, most likely a fault tolerant storage such as HDFS and so on.

Spark Streaming allows us to checkpoint the following types of information:

  • Metadata checkpointing: This helps to checkpoint the metadata of the Spark Streaming application such as configurations provided to the application while submitting, the batches of data that are in processing, and a set of operations defined to be executed to incoming DStreams, and so on.
  • Data checkpointing/generated DStream checkpointing: This helps to checkpoint generated DStreams in case of stateful operations as generated DStreams can be combination of various transformed DStreams across multiple batches and this chain keeps on increasing in a streaming application. So it must be checkpointed.
Checkpoint generated DStreams, in the case of stateful transformations, are mandatory in Spark Streaming. However, users can also create checkpoints in case of stateless transformations to checkpoint metadata of the Spark Streaming application or to avoid re-computation. Even RDDs can be checkpointed.

To enable checkpointing in Spark Streaming applications, the following method can be called on the Spark Streaming context:

jssc.checkpoint("/path/of/the/checkpoint_directory");

It is suggested to keep checkpoints in a fault tolerant filesystem, preferably HDFS.

Two types of transformation, that is, windowing based, and full session based can be implemented with state in Spark Streaming. Full session stateful transformation maintains session throughout the life cycle of any Spark Streaming job and one of the most popular methods that maintains the state throughout the life cycle is updateStateByKey. However, Apache Spark 1.6 introduced an efficient alternative to updateStateBykey that is not only better in performance, but is easier to code as well.

Let's calculate the average temperature of flights for the entire journey of the flight. As different flights have different journey times and keeping the flight delay situation in mind, we will solve this use case using the mapWithState stateful stream processing.

The mapWithState transformation can be executed on the PairDStream type, so let's convert our flightDetailsStream to the PairStream type with the key as flightId and the value as flightDetailsObject:

JavaPairDStream<String, FlightDetails> flightDetailsPairStream = flightDetailsStream.mapToPair(f -> new Tuple2<String, FlightDetails>(f.getFlightId(), f)); 

The mapWithState transformation accepts an object of org.apache.spark.streaming.StateSpec, which is of an abstract type.

The implementation of which requires an implementation of:

org.apache.spark.api.java.function.Function3 or org.apache.spark.api.java.function.Function4 type. 

The following is the implementation of a Function3 interface that will be used to create a StateSpec object:

Function3<String, Optional<FlightDetails>, State<List<FlightDetails>>, Tuple2<String, Double>> mappingFunc = (flightId, curFlightDetail, state) -> { 
                     //fetch existing state else create  new list.                
                      List<FlightDetails> details = state.exists() ? state.get() : new ArrayList<>(); 
               boolean isLanded = false; 
           /*if optionsla object is present, add it in the list and check set the boolean flag 'islanded' to true if flight has landed*/ 
                      if (curFlightDetail.isPresent()) { 
                     details.add(curFlightDetail.get()); 
                     if (curFlightDetail.get().isLanded()) { 
                           isLanded = true; 
                     } 
               } 
//Calculate average value of the all temperature values   
               Double avgSpeed = details.stream().mapToDouble(f -> f.getTemperature()).average().orElse(0.0); 
 
         /*Remove the state if flight has landed else update the state with the updated list of FlightDetails objects */ 
         if (isLanded) { 
                     state.remove(); 
               } else { 
                     state.update(details); 
               } 
         //Return the current average   
return new Tuple2<String, Double>(flightId, avgSpeed); 
}; 

As shown in the preceding code example, Function3 contains four parameters that will be used to create a StateSpec object:

  • KeyType: Type of key of PairStream, which is String in our case.
  • Optional<ValueType>: Optional object of ValueType, which is FlightDetails in our case. This parameter is optional since a batch may or may not contain data for a particular key.
  • org.apache.spark.streaming.State<T>: We are storing the state as List<FightDetails>.
  • MappedType: This is the final type the user wants to output. As we want to evaluate the average temperature, we have kept it as a Double type.
Please do read the comments added in the code to understand the working of the Function3 implementation.

Now, we will execute the stateful transformation of flightDetailsPairStream as follows:

JavaMapWithStateDStream<String, FlightDetails, List<FlightDetails>, Tuple2<String, Double>> mapWithState = flightDetailsPairStream.mapWithState(StateSpec.function(mappingFunc)); 
 
mapWithState.print(); 
As defined in the previous section, the following statements need to be added at the end of every streaming application to start streaming context:
jssc.start();
jssc.awaitTermination();

Every time a message is received for the flight average of the temperature until that time is calculated and printed and once the flight lands the final average is calculated and the state will be removed.

In real-world scenarios, it is possible that devices that are emitting messages fail because of some reason. So to handle such scenarios, timeout can be configured to remove the state if any message is not received in some specific duration.

Timeout can be configured while defining StateSpec objects as follows:

flightDetailsPairStream.mapWithState(StateSpec.function(mappingFunc).timeout(Durations.minutes(5))); 

This signifies that if a device does not send a message in five minutes then state will be removed. The user can set the timeout limit based on the requirement.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.51.153