Discretized streams

Spark Streaming is built on an abstraction called Discretized Streams referred, to as DStreams. A DStream is represented as a sequence of RDDs, with each RDD created at each time interval. The DStream can be processed in a similar fashion to regular RDDs using similar concepts such as a directed cyclic graph-based execution plan (Directed Acyclic Graph). Just like a regular RDD processing, the transformations and actions that are part of the execution plan are handled for the DStreams.

DStream essentially divides a never ending stream of data into smaller chunks known as micro-batches based on a time interval, materializing each individual micro-batch as a RDD which can then processed as a regular RDD. Each such micro-batch is processed independently and no state is maintained between micro-batches thus making the processing stateless by nature. Let's say the batch interval is 5 seconds, then while events are being consumed, real-time and a micro-batch are created at every 5-second interval and the micro-batch is handed over for further processing as an RDD. One of the main advantages of Spark Streaming is that the API calls used to process the micro-batch of events are very tightly integrated into the spark for APIs to provide seamless integration with the rest of the architecture. When a micro-batch is created, it gets turned into an RDD, which makes it a seamless process using spark APIs.

The DStream class looks like the following in the source code showing the most important variable, a HashMap[Time, RDD] pairs:

class DStream[T: ClassTag] (var ssc: StreamingContext)

//hashmap of RDDs in the DStream
var generatedRDDs = new HashMap[Time, RDD[T]]()

Shown in the following is an illustration of a DStream comprising an RDD created every T seconds:

In the following example, a streaming context is created to create micro-batches every 5 seconds and to create an RDD, which is just like a Spark core API RDD. The RDDs in the DStream can be processed just like any other RDD.

The steps involved in building a streaming application are as follows:

  1. Create a StreamingContext from the SparkContext.
  2. Create a DStream from StreamingContext.
  3. Provide transformations and actions that can be applied to each RDD.
  4. Finally, the streaming application is started by calling start() on the StreamingContext. This starts the entire process of consuming and processing real-time events.
Once the Spark Streaming application has started, no further operations can be added. A stopped context cannot be restarted and you have to create a new streaming context if such a need arises.

Shown in the following is an example of how to create a simple streaming job accessing Twitter:

  1. Create a StreamingContext from the SparkContext:
      scala> val ssc = new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext =
org.apache.spark.streaming.StreamingContext@8ea5756
  1. Create a DStream from StreamingContext:
      scala> val twitterStream = TwitterUtils.createStream(ssc, None)
twitterStream: org.apache.spark.streaming.dstream
.ReceiverInputDStream[twitter4j.Status] =
org.apache.spark.streaming.Twitter.TwitterInputDStream@46219d14
  1. Provide transformations and actions that can be applied to each RDD:
      val aggStream = twitterStream
.flatMap(x => x.getText.split(" ")).filter(_.startsWith("#"))
.map(x => (x, 1))
.reduceByKey(_ + _)
  1. Finally, the streaming application is started by calling start() on the StreamingContext. This starts the entire process of consuming and processing real-time events:
      ssc.start()
//to stop just call stop on the StreamingContext
ssc.stop(false)
  1. Created a DStream of type ReceiverInputDStream, which is defined as an abstract class for defining any InputDStream that has to start a receiver on worker nodes to receive external data. Here, we are receiving from Twitter stream:
        class InputDStream[T: ClassTag](_ssc: StreamingContext) extends
DStream[T](_ssc)

class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
extends InputDStream[T](_ssc)
  1. If you run a transformation flatMap() on the twitterStream, you get a FlatMappedDStream, as shown in the following:
      scala> val wordStream = twitterStream.flatMap(x => x.getText()
.split(" "))

wordStream: org.apache.spark.streaming.dstream.DStream[String] =
org.apache.spark.streaming.dstream.FlatMappedDStream@1ed2dbd5
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.22.249.90