Writing to a topic

So far, we have processed the data and printed the results in real time. To send these results to another topic, we use a CREATE command modality, where it is specified from a SELECT.

Let's start by writing the uptime as a string and writing the data in a comma-delimited format, shown as follows (remember that KSQL supports comma-delimited, JSON, and Avro formats). At the moment, it's enough because we're only writing one value:

ksql> CREATE STREAM uptimes WITH (kafka_topic='uptimes', value_format='delimited') AS SELECT CAST((STRINGTOTIMESTAMP('2017-11-18','yyyy-MM-dd''T''HH:mm:ss.SSSZ')-STRINGTOTIMESTAMP(lastStartedAt,'yyyy-MM-dd'))/86400/1000 AS string) AS uptime FROM healthchecks;

The output is similar to this:

Message
----------------------------
Stream created and running
----------------------------

Our query is running in the background. To see it is running, we could use a console consumer of the uptimes topic, shown as follows:

$ ./kafka-console-consumer --bootstrap-server localhost:9092 --topic uptimes --property print.key=true

The output is similar to this:

null  39
null 42
null 21

The results are correct; however, we forgot to use the machine serial number as the message key. To do this, we have to rebuild our query and our stream.

The first step is to use the show queries command, shown here:

ksql> show queries;

The output is similar to this:

 Query ID       | Kafka Topic | Query String
-------------------------------------------------------------------------------
CSAS_UPTIMES_0 | UPTIMES | CREATE STREAM uptimes WITH (kafka_topic='uptimes', value_format='delimited') AS SELECT CAST((STRINGTOTIMESTAMP('2017-11-18','yyyy-MM-dd''T''HH:mm:ss.SSSZ')-STRINGTOTIMESTAMP(lastStartedAt,'yyyy-MM-dd'))/86400/1000 AS string) AS uptime FROM healthchecks;
-------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;

With the Query ID, use the terminate <ID> command, shown as follows:

ksql> terminate CSAS_UPTIMES_0;

The output is similar to this:

Message
-------------------
Query terminated.
-------------------

To delete the stream, use the DROP STREAM command, shown as follows:

ksql> DROP STREAM uptimes;

The output is similar to this:

Message
------------------------------
Source UPTIMES was dropped.
------------------------------

To write the events key correctly, we must use the PARTITION BY clause. First, we regenerate our stream with a partial calculation, shown as follows:

ksql> CREATE STREAM healthchecks_processed AS SELECT serialNumber, CAST((STRINGTOTIMESTAMP('2017-11-18','yyyy-MM-dd''T''HH:mm:ss.SSSZ')-STRINGTOTIMESTAMP(lastStartedAt,'yyyy-MM-dd'))/86400/1000 AS string) AS uptime FROM healthchecks;

The output is similar to this:

Message
----------------------------
Stream created and running
----------------------------

This stream has two fields (serialNumber and uptime). To write these calculated values to a topic, we use CREATE STREAM, AS SELECT as follows:

ksql> CREATE STREAM uptimes WITH (kafka_topic='uptimes', value_format='delimited') AS SELECT * FROM healthchecks_processed;

The output is similar to this:

Message
----------------------------
Stream created and running
----------------------------

Finally, run a console consumer to show the results, demonstrated as follows:

$ ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic uptimes --property print.key=true

The output is similar to this:

EW05-HV36   33
BO58-SB28 20
DV03-ZT93 46
...

Now, close the KSQL CLI (Ctrl + C and close the command window). As the queries are still running in KSQL, you still see the outputs in the console consumer window.

Congratulations, you have built a Kafka Streams application with a few KSQL commands.

To unveil all the power of KSQL, it is important to review the official documentation at the following address: 

https://docs.confluent.io/current/ksql/docs/tutorials/index.html

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.66.156