Command and control server

As alluded to earlier in this chapter, a so-called command and control (C&C) server is essentially a database containing information on individual nodes and their configuration, for use by the nodes themselves and administration tools like the one in the next section.

It also includes an HTTP server, for use with HTTP-based over-the-air (OTA) updates. Since the BMaC system is MQTT-based, this server is also written as an MQTT client:

#include "listener.h"  
#include <iostream> 
#include <string> 
 
using namespace std; 
 
#include <Poco/Util/IniFileConfiguration.h> 
#include <Poco/AutoPtr.h> 
#include <Poco/Net/HTTPServer.h> 
 
using namespace Poco::Util; 
using namespace Poco; 
using namespace Poco::Net; 
 
#include "httprequestfactory.h" 
  
int main(int argc, char* argv[]) { 
   cout << "Starting MQTT BMaC Command & Control server...
"; 
    
   int rc; 
   mosqpp::lib_init(); 
    
   cout << "Initialised C++ Mosquitto library.
"; 
    
   string configFile; 
   if (argc > 1) { configFile = argv[1]; } 
   else { configFile = "config.ini"; } 
    
   AutoPtr<IniFileConfiguration> config(new IniFileConfiguration(configFile)); 
   string mqtt_host = config->getString("MQTT.host", "localhost"); 
   int mqtt_port = config->getInt("MQTT.port", 1883); 
   string defaultFirmware = config->getString("Firmware.default", "ota_unified.bin"); 
    
   Listener listener("Command_and_Control", mqtt_host, mqtt_port, defaultFirmware); 
    
   UInt16 port = config->getInt("HTTP.port", 8080); 
   HTTPServerParams* params = new HTTPServerParams; 
   params->setMaxQueued(100); 
   params->setMaxThreads(10); 
   HTTPServer httpd(new RequestHandlerFactory, port, params); 
   httpd.start(); 
    
   cout << "Created listener, entering loop...
"; 
    
   while(1) { 
         rc = listener.loop(); 
         if (rc){ 
               cout << "Disconnected. Trying to reconnect...
"; 
               listener.reconnect(); 
         } 
   } 
    
   cout << "Cleanup...
"; 
 
   mosqpp::lib_cleanup(); 
 
   return 0; 
} 

We're using the Mosquitto C++ MQTT client along with the POCO framework to provide us with the required functionality.

The Listener class is next:

#include <mosquittopp.h> 
#include <string> 
 
using namespace std; 
 
#include <Poco/Data/Session.h> 
#include <Poco/Data/SQLite/Connector.h> 
 
using namespace Poco; 
 
 
class Listener : public mosqpp::mosquittopp { 
   Data::Session* session; 
   string defaultFirmware; 
    
public: 
   Listener(string clientId, string host, int port, string defaultFirmware); 
   ~Listener(); 
    
   void on_connect(int rc); 
   void on_message(const struct mosquitto_message* message); 
   void on_subscribe(int mid, int qos_count, const int* granted_qos); 
}; 

We include the headers from POCO for the SQLite database functionality, which forms the database backend for this application. The class itself derives from the Mosquitto C++ class, providing us with all the basic MQTT functionalities along with a few function stubs, which we still have to implement in a moment:

#include "listener.h" 
 
#include <iostream> 
#include <fstream> 
#include <sstream> 
 
using namespace std; 
 
#include <Poco/StringTokenizer.h> 
#include <Poco/String.h> 
#include <Poco/Net/HTTPSClientSession.h> 
#include <Poco/Net/HTTPRequest.h> 
#include <Poco/Net/HTTPResponse.h> 
#include <Poco/File.h> 
 
using namespace Poco::Data::Keywords; 

struct Node { string uid; string location; UInt32 modules; float posx; float posy; };

We define a structure for a single node:

Listener::Listener(string clientId, string host, int port, string defaultFirmware) : mosquittopp(clientId.c_str()) { 
   int keepalive = 60; 
   connect(host.c_str(), port, keepalive); 
    
   Data::SQLite::Connector::registerConnector(); 
   session = new Poco::Data::Session("SQLite", "nodes.db"); 
    
   (*session) << "CREATE TABLE IF NOT EXISTS nodes (uid TEXT UNIQUE,  
         location TEXT,  
         modules INT,  
         posx FLOAT,  
         posy FLOAT)", now; 
          
   (*session) << "CREATE TABLE IF NOT EXISTS firmware (uid TEXT UNIQUE,  
         file TEXT)", now; 
 
   this->defaultFirmware = defaultFirmware; 
} 

In the constructor, we attempt to connect to the MQTT broker, using the provided host and port. We also set up a connection with the SQLite database, and ensure that it has valid nodes and a firmware table:

Listener::~Listener() { 
   // 
} 
 
 
void Listener::on_connect(int rc) { 
   cout << "Connected. Subscribing to topics...
"; 
    
   if (rc == 0) { 
         string topic = "cc/config";   // announce by nodes coming online. 
         subscribe(0, topic.c_str()); 
         topic = "cc/ui/config";       // C&C client requesting configuration. 
         subscribe(0, topic.c_str()); 
         topic = "cc/nodes/new";       // C&C client adding new node. 
         subscribe(0, topic.c_str()); 
         topic = "cc/nodes/update";    // C&C client updating node. 
         subscribe(0, topic.c_str()); 
         topic = "nsa/events/CO2";     // CO2-related events. 
         subscribe(0, topic.c_str()); 
         topic = "cc/firmware";  // C&C client firmware command. 
         subscribe(0, topic.c_str()); 
   } 
   else { 
         cerr << "Connection failed. Aborting subscribing.
"; 
   } 
} 

We reimplement the callback for when a connection has been established with the MQTT broker. In this method, we subscribe to all the MQTT topics in which we are interested.

The next method is called whenever we receive an MQTT message on one of the topics which we subscribed to:

void Listener::on_message(const struct mosquitto_message* message) { 
   string topic = message->topic; 
   string payload = string((const char*) message->payload, message->payloadlen); 
    
   if (topic == "cc/config") { 
         if (payload.length() < 1) { 
               cerr << "Invalid payload: " << payload << ". Reject.
"; 
               return; 
         } 

We validate the payload we receive for each topic. For this first topic, we expect its payload to contain the MAC address of the node which wants to receive its configuration. We make sure that this seems to be the case, then continue:

         Data::Statement select(*session); 
         Node node; 
         node.uid = payload; 
         select << "SELECT location, modules FROM nodes WHERE uid=?", 
                     into (node.location), 
                     into (node.modules), 
                     use (payload); 
                      
         size_t rows = select.execute(); 
          
         if (rows == 1) { 
               string topic = "cc/" + payload; 
               string response = "mod;" + string((const char*) &node.modules, 4); 
               publish(0, topic.c_str(), response.length(), response.c_str()); 
               response = "loc;" + node.location; 
               publish(0, topic.c_str(), response.length(), response.c_str()); 
         } 
         else if (rows < 1) { 
               // No node with this UID found. 
               cerr << "Error: No data set found for uid " << payload << endl; 
         } 
         else { 
               // Multiple data sets were found, which shouldn't be possible... 
               cerr << "Error: Multiple data sets found for uid " << payload << "
"; 
         } 
   } 

We attempt to find the MAC address in the database, reading out the node's configuration if found and making it the payload for the return message.

The next topics are used with the administration tool:

else if (topic == "cc/ui/config") { 

if (payload == "map") {
ifstream mapFile("map.png", ios::binary);
if (!mapFile.is_open()) {
cerr << "Failed to open map file. ";
return;
} stringstream ss;
ss << mapFile.rdbuf();
string mapData = ss.str();
publish(0, "cc/ui/config/map", mapData.length(),

mapData.c_str());
}

In the case of this payload string, we return the binary data for a map image that should exist in the local folder. This map contains the layout of the building we are administrating, for displaying in the tool. 

         else if (payload == "nodes") { 
               Data::Statement countQuery(*session); 
               int rowCount; 
               countQuery << "SELECT COUNT(*) FROM nodes", 
                     into(rowCount), 
                     now; 
                      
               if (rowCount == 0) { 
                     cout << "No nodes found in database, returning...
"; 
                     return; 
               } 
                
               Data::Statement select(*session); 
               Node node; 
               select << "SELECT uid, location, modules, posx, posy FROM nodes", 
                           into (node.uid), 
                           into (node.location), 
                           into (node.modules), 
                           into (node.posx), 
                           into (node.posy), 
                           range(0, 1); 
                            
               string header; 
               string nodes; 
               string nodeStr; 
               UInt32 nodeCount = 0; 
               while (!select.done()) { 
                     select.execute(); 
                     nodeStr = "NODE"; 
                     UInt8 length = (UInt8) node.uid.length(); 
                     nodeStr += string((char*) &length, 1); 
                     nodeStr += node.uid; 
                     length = (UInt8) node.location.length(); 
                     nodeStr += string((char*) &length, 1); 
                     nodeStr += node.location; 
                     nodeStr += string((char*) &node.posx, 4); 
                     nodeStr += string((char*) &node.posy, 4); 
                     nodeStr += string((char*) &node.modules, 4); 
                     UInt32 segSize = nodeStr.length(); 
                      
                     nodes += string((char*) &segSize, 4); 
                     nodes += nodeStr; 
                     ++nodeCount; 
               } 
                
               UInt64 messageSize = nodes.length() + 9; 
               header = string((char*) &messageSize, 8); 
               header += "NODES"; 
               header += string((char*) &nodeCount, 4); 
               header += nodes; 
                
               publish(0, "cc/nodes/all", header.length(), header.c_str()); 
         } 
   } 

The preceding section reads out every single node in the database and returns it in a binary, serialized format.

Next, we create a new node and add it to the database:

   else if (topic == "cc/nodes/new") { 
         UInt32 index = 0; 
         UInt32 msgLength = *((UInt32*) payload.substr(index, 4).data()); 
         index += 4; 
         string signature = payload.substr(index, 4); 
         index += 4; 
          
         if (signature != "NODE") { 
               cerr << "Invalid node signature.
"; 
               return; 
         } 
          
         UInt8 uidLength = (UInt8) payload[index++]; 
         Node node; 
         node.uid = payload.substr(index, uidLength); 
         index += uidLength; 
         UInt8 locationLength = (UInt8) payload[index++]; 
         node.location = payload.substr(index, locationLength); 
         index += locationLength; 
         node.posx = *((float*) payload.substr(index, 4).data()); 
         index += 4; 
         node.posy = *((float*) payload.substr(index, 4).data()); 
         index += 4; 
         node.modules = *((UInt32*) payload.substr(index, 4).data()); 
          
         cout << "Storing new node for UID: " << node.uid << "
"; 
          
         Data::Statement insert(*session); 
         insert << "INSERT INTO nodes VALUES(?, ?, ?, ?, ?)", 
                     use(node.uid), 
                     use(node.location), 
                     use(node.modules), 
                     use(node.posx), 
                     use(node.posy), 
                     now; 
                      
         (*session) << "INSERT INTO firmware VALUES(?, ?)", 
                     use(node.uid), 
                     use(defaultFirmware), 
                     now; 
   } 

Updating a node's configuration is also possible:

   else if (topic == "cc/nodes/update") { 
         UInt32 index = 0; 
         UInt32 msgLength = *((UInt32*) payload.substr(index, 4).data()); 
         index += 4; 
         string signature = payload.substr(index, 4); 
         index += 4; 
          
         if (signature != "NODE") { 
               cerr << "Invalid node signature.
"; 
               return; 
         } 
          
         UInt8 uidLength = (UInt8) payload[index++]; 
         Node node; 
         node.uid = payload.substr(index, uidLength); 
         index += uidLength; 
         UInt8 locationLength = (UInt8) payload[index++]; 
         node.location = payload.substr(index, locationLength); 
         index += locationLength; 
         node.posx = *((float*) payload.substr(index, 4).data()); 
         index += 4; 
         node.posy = *((float*) payload.substr(index, 4).data()); 
         index += 4; 
         node.modules = *((UInt32*) payload.substr(index, 4).data()); 
          
         cout << "Updating node for UID: " << node.uid << "
"; 
          
         Data::Statement update(*session); 
         update << "UPDATE nodes SET location = ?, posx = ?, posy = ?, modules = ? WHERE uid = ?", 
                     use(node.location), 
                     use(node.posx), 
                     use(node.posy), 
                     use(node.modules), 
                     use(node.uid), 
                     now; 
   } 

Next, we look at the topic handler for deleting a node's configuration:

   else if (topic == "cc/nodes/delete") { 
         cout << "Deleting node with UID: " << payload << "
"; 
          
         Data::Statement del(*session); 
         del << "DELETE FROM nodes WHERE uid = ?", 
                     use(payload), 
                     now; 
                      
         (*session) << "DELETE FROM firmware WHERE uid = ?", 
                     use(payload), 
                     now; 
   } 

When we looked at the CO2 module of the firmware earlier, we saw that it generated CO2 events. These also end up here in this example, in order to generate events in JSON format, which we send to some HTTP-based API. We then use the HTTPS client in POCO to send this JSON to the remote server (here set to localhost):

   else if (topic == "nsa/events/CO2") { 
         StringTokenizer st(payload, ";", StringTokenizer::TOK_TRIM | StringTokenizer::TOK_IGNORE_EMPTY); 
         if (st.count() < 4) { 
               cerr << "CO2 event: Wrong number of arguments. Payload: " << payload << "
"; 
               return; 
         } 
          
         string state = "ok"; 
         if (st[1] == "1") { state = "warn"; } 
         else if (st[1] == "2") { state = "crit"; } 
         string increase = (st[2] == "1") ? "true" : "false"; 
         string json = "{ "state": "" + state + "",  
                                 "location": "" + st[0] + "",  
                                 "increase": " + increase + ",  
                                 "ppm": " + st[3] + " }"; 
                                  
         Net::HTTPSClientSession httpsClient("localhost"); 
         try { 
               Net::HTTPRequest request(Net::HTTPRequest::HTTP_POST, 
                                                   "/", 
                                                   Net::HTTPMessage::HTTP_1_1); 
               request.setContentLength(json.length()); 
               request.setContentType("application/json"); 
               httpsClient.sendRequest(request) << json; 
                
               Net::HTTPResponse response; 
               httpsClient.receiveResponse(response); 
         } 
         catch (Exception& exc) { 
               cout << "Exception caught while attempting to connect." << std::endl; 
               cerr << exc.displayText() << std::endl; 
               return; 
         } 
   } 

Finally, for managing the stored firmware images, we can use the following topic. Which node uses which firmware version can be set in each node's configuration, though as we saw earlier, the default is to use the latest firmware.

Using this topic, we can list the available firmware images or upload a new one:

   else if (topic == "cc/firmware") { 
         if (payload == "list") { 
               std::vector<File> files; 
               File file("firmware"); 
               if (!file.isDirectory()) { return; } 
                
               file.list(files); 
               string out; 
               for (int i = 0; i < files.size(); ++i) { 
                     if (files[i].isFile()) { 
                           out += files[i].path(); 
                           out += ";"; 
                     } 
               } 
                
               out.pop_back(); 
                
               publish(0, "cc/firmware/list", out.length(), out.c_str()); 
         } 
         else { 
               StringTokenizer st(payload, ";", StringTokenizer::TOK_TRIM | StringTokenizer::TOK_IGNORE_EMPTY); 
                
               if (st[0] == "change") { 
                     if (st.count() != 3) { return; } 
                     (*session) << "UPDATE firmware SET file = ? WHERE uid = ?", 
                                             use (st[1]), 
                                             use (st[2]), 
                                             now; 
               } 
               else if (st[0] == "upload") { 
                     if (st.count() != 3) { return; } 
                      
                     // Write file & truncate if exists. 
                     string filepath = "firmware/" + st[1];                       
                     ofstream outfile("firmware/" + st[1], ofstream::binary | ofstream::trunc); 
                     outfile.write(st[2].data(), st[2].size()); 
                     outfile.close(); 
               } 
         } 
   } 
} 
void Listener::on_subscribe(int mid, int qos_count, const int* granted_qos) { 
   // 
} 

On each successful MQTT topic subscription, this method is called, allowing us to do something else if needed.

Next, we look at the HTTP server component, starting with the HTTP request handler factory:

#include <Poco/Net/HTTPRequestHandlerFactory.h> 
#include <Poco/Net/HTTPServerRequest.h> 
 
using namespace Poco::Net; 
 
#include "datahandler.h" 
 
 
class RequestHandlerFactory: public HTTPRequestHandlerFactory { 
public: 
   RequestHandlerFactory() {} 
   HTTPRequestHandler* createRequestHandler(const HTTPServerRequest& request) { 
         return new DataHandler(); 
   } 
}; 

This handler will always return an instance of the following class:

#include <iostream> 
#include <vector> 
 
using namespace std; 
 
#include <Poco/Net/HTTPRequestHandler.h> 
#include <Poco/Net/HTTPServerResponse.h> 
#include <Poco/Net/HTTPServerRequest.h> 
#include <Poco/URI.h> 
#include <Poco/File.h> 
 
#include <Poco/Data/Session.h> 
#include <Poco/Data/SQLite/Connector.h> 
 
using namespace Poco::Data::Keywords; 
 
using namespace Poco::Net; 
using namespace Poco; 
 
 
class DataHandler: public HTTPRequestHandler { 
public: 
   void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response) { 
         cout << "DataHandler: Request from " + request.clientAddress().toString() << endl; 
          
         URI uri(request.getURI()); 
         string path = uri.getPath(); 
         if (path != "/") { 
               response.setStatus(HTTPResponse::HTTP_NOT_FOUND); 
               ostream& ostr = response.send(); 
               ostr << "File Not Found: " << path; 
               return; 
         } 
          
         URI::QueryParameters parts; 
         parts = uri.getQueryParameters(); 
         if (parts.size() > 0 && parts[0].first == "uid") { 
               Data::SQLite::Connector::registerConnector(); 
               Data::Session* session = new Poco::Data::Session("SQLite", "nodes.db"); 
                
               Data::Statement select(*session); 
               string filename; 
               select << "SELECT file FROM firmware WHERE uid=?", 
                                 into (filename), 
                                 use (parts[0].second); 
                
               size_t rows = select.execute(); 
                
               if (rows != 1) { 
                     response.setStatus(HTTPResponse::HTTP_NOT_FOUND); 
                     ostream& ostr = response.send(); 
                     ostr << "File Not Found: " << parts[0].second; 
                     return; 
               } 
                
               string fileroot = "firmware/"; 
               File file(fileroot + filename); 
                
               if (!file.exists() || file.isDirectory()) { 
                     response.setStatus(HTTPResponse::HTTP_NOT_FOUND); 
                     ostream& ostr = response.send(); 
                     ostr << "File Not Found."; 
                     return; 
               } 
                
               string mime = "application/octet-stream"; 
               try { 
                     response.sendFile(file.path(), mime); 
               } 
               catch (FileNotFoundException &e) { 
                     cout << "File not found exception triggered..." << endl; 
                     cerr << e.displayText() << endl; 
                      
                     response.setStatus(HTTPResponse::HTTP_NOT_FOUND); 
                     ostream& ostr = response.send(); 
                     ostr << "File Not Found."; 
                     return; 
               } 
               catch (OpenFileException &e) { 
                     cout << "Open file exception triggered..." << endl; 
                     cerr << e.displayText() << endl; 
                      
                     response.setStatus(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); 
                     ostream& ostr = response.send(); 
                     ostr << "Internal Server Error. Couldn't open file."; 
                     return; 
               } 
         } 
         else { 
               response.setStatus(HTTPResponse::HTTP_BAD_REQUEST); 
               response.send(); 
               return; 
         } 
   } 
}; 

This class looks fairly impressive, yet mostly does just an SQLite database lookup for the node ID (MAC address) and returns the appropriate firmware image if found.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.192.59