Handling Event-time and late data

Event time is the time inside the data itself. Traditional Spark Streaming only handled time as the received time for the DStream purposes, but this is not enough for many applications where we need the event time. For example, if you want to get the number of times hashtag appears in a tweet every minute, then you should want to use the time when the data was generated, not when Spark receives the event. To get event time into the mix, it is very easy to do so in structured streaming by considering the event time as a column in the row/event. This allows window-based aggregations to be run using the event time rather than the received time. Furthermore, this model naturally handles data that has arrived later than expected based on its event time. Since Spark is updating the result table, it has full control over updating old aggregates when there is late data as well as cleaning up old aggregates to limit the size of intermediate state data. There is also support for watermarking event streams, which allows the user to specify the threshold of late data and allows the engine to accordingly clean up the old state.

Watermarks enable the engine to track the current event times and determine whether the event needs to be processed or has been already processed by checking the threshold of how late data can be received. For instance, if the event time is denoted by eventTime and the threshold interval of late arriving data is lateThreshold, then by checking the difference between the max(eventTime) - lateThreshold and comparing with the specific window starting at time T, the engine can determine if the event can be considered for processing in this window or not.

Shown in the following is an extension of the preceding example on structured streaming listening on port 9999. Here we are enabling Timestamp as part of the input data so that we can do Window operations on the unbounded table to generate results:

import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// Create DataFrame representing the stream of input lines from connection to host:port
val inputLines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.option("includeTimestamp", true)
.load()

// Split the lines into words, retaining timestamps
val words = inputLines.as[(String, Timestamp)].flatMap(line =>
line._1.split(" ").map(word => (word, line._2))
).toDF("word", "timestamp")

// Group the data by window and word and compute the count of each group
val windowedCounts = words.withWatermark("timestamp", "10 seconds")
.groupBy(

window($"timestamp", "10 seconds", "10 seconds"), $"word"
).count().orderBy("window")

// Start running the query that prints the windowed word counts to the console
val query = windowedCounts.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "false")

query.start()
query.awaitTermination()
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.43.216