Using the KSQL CLI

The KSQL CLI is a command prompt to interact with KSQL; it is very similar to the one that comes with relational databases such as MariaDB or MySQL. To see all the possible commands, type help and a list with the options will be displayed.

At the moment, we have not informed KSQL of anything. We must declare that something is a table or a stream. We will use the information produced from previous chapters with the producers that write JSON information to the healthchecks topic.

If you remember, the data looks like this:

{"event":"HEALTH_CHECK","factory":"Lake Anyaport","serialNumber":"EW05-HV36","type":"WIND","status":"STARTING","lastStartedAt":"2018-09-17T11:05:26.094+0000","temperature":62.0,"ipAddress":"15.185.195.90"}
{"event":"HEALTH_CHECK","factory":"Candelariohaven","serialNumber":"BO58-SB28","type":"SOLAR","status":"STARTING","lastStartedAt":"2018-08-16T04:00:00.179+0000","temperature":75.0,"ipAddress":"151.157.164.162"}
{"event":"HEALTH_CHECK","factory":"Ramonaview","serialNumber":"DV03-ZT93","type":"SOLAR","status":"RUNNING","lastStartedAt":"2018-07-12T10:16:39.091+0000","temperature":70.0,"ipAddress":"173.141.90.85"}
...

KSQL can read JSON data, and can also read data in Avro format. To declare a stream from the healthchecks topic, we use the following command:

ksql>  CREATE STREAM healthchecks (event string, factory string, serialNumber string, type string, status string, lastStartedAt string, temperature double, ipAddress string) WITH (kafka_topic='healthchecks', value_format='json');

The output is similar to this:

Message
----------------------------
Stream created and running
----------------------------

To review the structure of an existing STREAM, we can use the DESCRIBE command, which is shown here and that tells us the data types and their structure:

ksql> DESCRIBE healthchecks;

The output is similar to the following:

Name          : HEALTHCHECKS
Field | Type
-------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
EVENT | VARCHAR(STRING)
FACTORY | VARCHAR(STRING)
SERIALNUMBER | VARCHAR(STRING)
TYPE | VARCHAR(STRING)
STATUS | VARCHAR(STRING)
LASTSTARTEDAT | VARCHAR(STRING)
TEMPERATURE | DOUBLE
IPADDRESS | VARCHAR(STRING)

Note that at the beginning, two extra fields are shown: ROWTIME (the message timestamp) and ROWKEY (the message key).

When we created the stream, we declared that the Kafka topic is healthchecks. So, if we execute the SELECT command, we obtain a list of the events that are in the topic to which our stream points in real time (remember to run a producer to obtain fresh data). The command is as follows:

ksql> select * from healthchecks;

The output is similar to this:

1532598615943 | null | HEALTH_CHECK | Carliefort | FM41-RE80 | WIND | STARTING | 2017-08-13T09:37:21.681+0000 | 46.0 | 228.247.233.14
1532598616454 | null | HEALTH_CHECK | East Waldo | HN72-EB29 | WIND | RUNNING | 2017-10-31T14:20:13.929+0000 | 3.0 | 223.5.127.146
1532598616961 | null | HEALTH_CHECK | New Cooper | MM04-TZ21 | SOLAR | SHUTTING_DOWN | 2017-08-21T21:10:31.190+0000 | 23.0 | 233.143.140.46
1532598617463 | null | HEALTH_CHECK | Mannmouth | XM02-PQ43 | GEOTHERMAL | RUNNING | 2017-09-08T10:44:56.005+0000 | 73.0 | 221.96.17.237
1532598617968 | null | HEALTH_CHECK | Elvisfort | WP70-RY81 | NUCLEAR | RUNNING | 2017-09-07T02:40:18.917+0000 | 49.0 | 182.94.17.58
1532598618475 | null | HEALTH_CHECK | Larkinstad | XD75-FY56 | GEOTHERMAL | STARTING | 2017-09-06T08:48:14.139+0000 | 35.0 | 105.236.9.137
1532598618979 | null | HEALTH_CHECK | Nakiaton | BA85-FY32 | SOLAR | RUNNING | 2017-08-15T04:10:02.590+0000 | 32.0 | 185.210.26.215
1532598619483 | null | HEALTH_CHECK | North Brady | NO31-LM78 | HYDROELECTRIC | RUNNING | 2017-10-05T12:12:52.940+0000 | 5.0 | 17.48.190.21
1532598619989 | null | HEALTH_CHECK | North Josianemouth | GT17-TZ11 | SOLAR | SHUTTING_DOWN | 2017-08-29T16:57:23.000+0000 | 6.0 | 99.202.136.163

The SELECT command shows the data from the Kafka topic declared in the stream. The query never stops, so it will run till you stop it. New records are printed as new lines, as new events are produced in the topic. To stop a query, type Ctrl + C.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.66.156