InfluxDB for recording sensor readings

Recording the sensor readings and later the statistics read from the coffee machines was a priority from the beginning. The ideal database for this kind of data is a time series database, of which Influx is a common one. The biggest problem with this database is that it does not support MQTT, only offering its HTTP and native interface.

To fix this, a simple MQTT-to-Influx HTTP line protocol bridge was written, again using the Mosquitto client library as well as the POCO framework's HTTP functionality:

#include "mth.h"

#include <iostream>

using namespace std;

#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/StringTokenizer.h>
#include <Poco/String.h>

using namespace Poco;


MtH::MtH(string clientId, string host, int port, string topics, string influxHost,
int influxPort, string influxDb, string influx_sec) : mosquittopp(clientId.c_str()) {
this->topics = topics;
this->influxDb = influxDb;
if (influx_sec == "true") {
cout << "Connecting with HTTPS..." << std::endl;
influxClient = new Net::HTTPSClientSession(influxHost, influxPort);
secure = true;
}
else {
cout << "Connecting with HTTP..." << std::endl;
influxClient = new Net::HTTPClientSession(influxHost, influxPort);
secure = false;
}

int keepalive = 60;
connect(host.c_str(), port, keepalive);
}

In the constructor, we connect to the MQTT broker, and create either an HTTP or HTTPS client, depending on which protocol has been set in the configuration file:


MtH::~MtH() {
delete influxClient;
}


void MtH::on_connect(int rc) {
cout << "Connected. Subscribing to topics... ";

if (rc == 0) {
StringTokenizer st(topics, ",", StringTokenizer::TOK_TRIM | StringTokenizer::TOK_IGNORE_EMPTY);
for (StringTokenizer::Iterator it = st.begin(); it != st.end(); ++it) {
string topic = string(*it);
cout << "Subscribing to: " << topic << " ";
subscribe(0, topic.c_str());

// Add name of the series to the 'series' map.
StringTokenizer st1(topic, "/", StringTokenizer::TOK_TRIM | StringTokenizer::TOK_IGNORE_EMPTY);
string s = st1[st1.count() - 1]; // Get last item.
series.insert(std::pair<string, string>(topic, s));
}
}
else {
cerr << "Connection failed. Aborting subscribing. ";
}
}

Instead of fixed MQTT topics to subscribe to, we use the topics that are defined in the configuration file, here provided to us as a single string with each topic separated by a comma.

We also create an STL map containing the name of the time series to record for the topic, taking the final part of the MQTT topic after the last slash. One could make this further configurable, but for the topics used in the BMaC system this limitation was no consideration as it not necessary to have more complex topics.

void MtH::on_message(const struct mosquitto_message* message) {
string topic = message->topic;
map<string, string>::iterator it = series.find(topic);
if (it == series.end()) {
cerr << "Topic not found: " << topic << " ";
return;
}

if (message->payloadlen < 1) {
cerr << "No payload found. Returning... ";
return;
}

string payload = string((const char*) message->payload, message-
>payloadlen);
size_t pos = payload.find(";");
if (pos == string::npos || pos == 0) {
cerr << "Invalid payload: " << payload << ". Reject. ";
return;
}

string uid = payload.substr(0, pos);
string value = payload.substr(pos + 1);
string influxMsg;
influxMsg = series[topic];
influxMsg += ",location=" + uid;
influxMsg += " value=" + value;
try {
Net::HTTPRequest request(Net::HTTPRequest::HTTP_POST,
"/write?db=" + influxDb, Net::HTTPMessage::HTTP_1_1);
request.setContentLength(influxMsg.length());
request.setContentType("application/x-www-form-urlencoded");
influxClient->sendRequest(request) << influxMsg;

Net::HTTPResponse response;
influxClient->receiveResponse(response);
}
catch (Exception& exc) {
cout << "Exception caught while attempting to connect." <<
std::endl;
cerr << exc.displayText() << std::endl;
return;
}

When we get a new MQTT message in, we find the name of the Influx time series for it, then create a string to send to the InfluxDB server. The assumption here is that the payload consists of the MAC address of the node which sent the message followed by a semi-colon.

We simply get the part after the semi-colon to set it as the value, and use the MAC as the location. This we then send to the database server.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.171.202