Structured streaming

Structured streaming is a scalable and fault-tolerant stream processing engine built on top of Spark SQL engine. This brings stream processing and computations closer to batch processing, rather than the DStream paradigm and challenges involved with Spark streaming APIs at this time. The structured streaming engine takes care of several challenges like exactly-once stream processing, incremental updates to results of processing, aggregations, and so on.

The structured streaming API also provides the means to tackle a big challenge of Spark streaming, that is, Spark streaming processes incoming data in micro-batches and uses the received time as a means of splitting the data, thus not considering the actual event time of the data. The structured streaming allows you to specify such an event time in the data being received so that any late coming data is automatically handled.

The structured streaming is GA in Spark 2.2, and the APIs are marked GA. Refer to https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.

The key idea behind structured streaming is to treat a live data stream as an unbounded table being appended to continuously as events are processed from the stream. You can then run computations and SQL queries on this unbounded table as you normally do on batch data. A Spark SQL query for instance will process the unbounded table:

As the DStream keeps changing with time, more and more data will be processed to generate the results. Hence, the unbounded input table is used to generate a result table. The output or results table can be written to an external sink known as Output.

The Output is what gets written out and can be defined in a different mode:

  • Complete mode: The entire updated result table will be written to the external storage. It is up to the storage connector to decide how to handle the writing of the entire table.
  • Append mode: Only any new rows appended to the result table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the result table are not expected to change.
  • Update mode: Only the rows that were updated in the result table since the last trigger will be written to the external storage. Note that this is different from the complete mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be equivalent to Append mode.

Shown in the following is an illustration of the output from the unbounded table:

We will show an example of creating a Structured streaming query by listening to input on localhost port 9999.

If using a Linux or Mac, it's easy to start a simple server on port 9999: nc -lk 9999.

Shown in the following is an example where we start by creating an inputStream calling SparkSession's readStream API and then extracting the words from the lines. Then we group the words and count the occurrences before finally writing the results to the output stream:

//create stream reading from localhost 9999
val inputLines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
inputLines: org.apache.spark.sql.DataFrame = [value: string]

// Split the inputLines into words
val words = inputLines.as[String].flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]

// Generate running word count
val wordCounts = words.groupBy("value").count()
wordCounts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]

val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
query: org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] = org.apache.spark.sql.streaming.DataStreamWriter@4823f4d0

query.start()

As you keep typing words in the terminal, the query keeps updating and generating results which are printed on the console:

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| dog| 1|
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| dog| 1|
| cat| 1|
+-----+-----+

scala> -------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| dog| 2|
| cat| 1|
+-----+-----+
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.98.186