Example - connection to a MQTT message broker

So, let's start with a sample use case. Let's connect to an Internet of Things (IoT) sensor data stream. As we haven't covered machine learning so far, we don't analyze the data, we just showcase the concept.

We are using the IBM Watson IoT platform as a streaming source. At its core, the Watson IoT platform is backed by an MQTT (Message Queue Telemetry Transport) message broker. MQTT is a lightweight telemetry protocol invented by IBM in 1999 and became-- an OASIS (Organization for the Advancement of Structured Information Standards, a global nonprofit consortium that works on the development, convergence, and adoption of standards for security, Internet of Things, energy, content technologies, emergency management, and other areas) standard in 2013--the de facto standard for IoT data integration.

Messaging between applications can be backed by a message queue which is a middleware system supporting asynchronous point to point channels in various delivery modes like first-in first-out (FIFO), last-in first-out (LIFO) or Priority Queue (where each message can be re-ordered by certain criteria).

This is already a very nice feature but still couples applications in a certain way because, once a message is read, it is made unavailable to others.

This way N to N communication is hard (but not impossible) to achieve. In a publish/subscribe model applications are completely de-coupled. There doesn't exists any queues anymore but the notion of topics is introduced. Data providers publish messages on specific topics and data consumers subscribe to those topics. This way N to N communication is very straightforward to achieve since it is reflected by the underlying message delivery model. Such a middleware is called a Message Broker in contrast to a Message Queue.

As cloud services tend to change constantly, and cloud in general is introduced later in this book, the following tutorial explains how to set up the test data generator in the cloud and connect to the remote MQTT message broker. In this example, we will use the IBM Watson IoT Platform, which is an MQTT message broker available in the cloud. Alternatively one can install an open source message broker like MOSQUITTO which also provides a publicly available test installation on the following URL: http://test.mosquitto.org/.

In order to replicate the example, the following steps (1) and (2) are necessary as described in the following tutorial: https://www.ibm.com/developerworks/library/iot-cognitive-iot-app-machine-learning/index.html. Please make sure to note down http_host, org , apiKey, and apiToken during execution of the tutorial. Those are needed later in order to subscribe to data using Apache Spark Structured Streaming.

As the IBM Watson IoT platform uses the open MQTT standard, no special IBM component is necessary to connect to the platform. Instead, we are using MQTT and Apache Bahir as a connector between MQTT and Apache Spark structured streaming.

The goal of the Apache Bahir project is to provide a set of source and sink connectors for various data processing engines including Apache Spark and Apache Flink since they are lacking those connectors. In this case we will use the Apache Bahir MQTT data source for MQTT.

In order to use Apache Bahir, we need to add two dependencies to our local maven repository. A complete pom.xml file is provided in the download section of this chapter. Let's have a look at the dependency section of pom.xml:

We are basically getting the MQTT Apache structured streaming adapter of Apache Bahir and a dependent package for low-level MQTT processing. A simple mvn dependency:resolve command in the directory of the pom.xml file pulls the required dependencies into our local maven repository, where they can be accessed by the Apache Spark driver and transferred to the Apache Spark workers automatically.

Another way of resolving the dependencies is when using the following command in order to start a spark-shell (spark-submit works the same way); the necessary dependencies are automatically distributed to the workers:

Now we need the MQTT credentials that we've obtained earlier. Let's set the values here:

val mqtt_host = "pcoyha.messaging.internetofthings.ibmcloud.com"
val org = "pcoyha"
val apiKey = "a-pcoyha-oaigc1k8ub"
val apiToken = "&wuypVX2yNgVLAcLr8"
var randomSessionId = scala.util.Random.nextInt(10000)

Now we can start creating a stream connecting to an MQTT message broker. We are telling Apache Spark to use the Apache Bahir MQTT streaming source:

val df = spark.readStream.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")

We need to specify credentials such as “username”, “password”, and “clientId” in order to pull data from the MQTT message broker; the link to the tutorial mentioned earlier explains how to obtain these:

    .option("username",apiKey)
.option("password",apiToken)
.option("clientId","a:"+org+":"+apiKey)

As we are using a publish/subscribe messaging model, we have to provide the topic that we are subscribing to--this topic is used by the test data generator that you've deployed to the cloud before:

.option("topic", "iot-2/type/WashingMachine/id/Washer01/evt/voltage/fmt/json")

Once everything is set on the configuration side, we have to provide the endpoint host and port in order to create the stream:

   .load("tcp://"+mqtt_host+":1883")

Interestingly, as can be seen in the following screenshot, this leads to the creation of a DataFrame:

Note that the schema is fixed to [String,Timestamp] and cannot be changed during stream creation--this is a limitation of the Apache Bahir library. However, using the rich DataFrame API, you can parse the value, a JSON string for example, and create new columns.

As discussed before, this is one of the powerful features of Apache Spark structured streaming, as the very same DataFrame (and Dataset) API now can be used to process historic and real-time data. So let's take a look at the contents of this stream by writing it to the console:

val query = df.writeStream.
outputMode("append").
format("console").
start()

As output mode, we choose append to enforce incremental display and avoid having the complete contents of the historic stream being written to the console again and again. As format, we specify console as we just want to debug what's happening on the stream:

Finally, the start method initiates query processing, as can be seen here:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.219.249.210