How to do it...

The application we created as part of the Exploring the mechanisms of IPC recipe already contains a number of building blocks we can reuse to implement the pub/sub communication.

The Writer class can act as a publisher and the Reader class as a subscriber. We implemented them to handle the strictly defined data types that will define our messages. The named pipes mechanism we used in the preceding recipe works on a byte level and does not guarantee that messages are delivered automatically.

To overcome this limitation, we will use the POSIX message queue API instead of the named pipes. A name used to identify a message queue that both Reader and Writer will accept in their constructors will be used as a topic:

  1. Copy the contents of the ipc2 directory that we created in the previous recipe into a new directory: ipc3.
  2. Let's create a C++ wrapper for the POSIX message queue API. Open ipc1.cpp in your editor and add the required header files and constant definition:
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>

std::string kQueueName = "/test";
  1. Then, define a MessageQueue class. This holds a message queue handle as its private data member. We can use constructors and destructors to manage the opening and closing of the handle in a safe manner using the C++ RAII idiom:
class MessageQueue {
private:
mqd_t handle;
public:
MessageQueue(const std::string& name, int flags) {
handle = mq_open(name.c_str(), flags);
if (handle < 0) {
throw std::runtime_error("Failed to open a queue for
writing");
}
}

MessageQueue(const std::string& name, int flags, int max_count,
int max_size) {
struct mq_attr attrs = { 0, max_count, max_size, 0 };
handle = mq_open(name.c_str(), flags | O_CREAT, 0666,
&attrs);
if (handle < 0) {
throw std::runtime_error("Failed to create a queue");
}
}

~MessageQueue() {
mq_close(handle);
}

  1. Then, we define two simple methods to write messages into and read messages from the queue:
    void Send(const char* data, size_t len) {
if (mq_send(handle, data, len, 0) < 0) {
throw std::runtime_error("Failed to send a message");
}
}

void Receive(char* data, size_t len) {
if (mq_receive(handle, data, len, 0) < len) {
throw std::runtime_error("Failed to receive a message");
}
}
};
  1. We now modify our Writer and Reader classes to work with the new API. Our MessageQueue wrapper does most of the heavy lifting and the code changes are minimal. The Writer class now looks like this:
template<class T>
class Writer {
private:
MessageQueue queue;
public:
Writer(std::string& name):
queue(name, O_WRONLY) {}

void Write(const T& data) {
queue.Send(reinterpret_cast<const char*>(&data), sizeof(data));
}
};
  1. Modifications in the Reader class are more substantial. We make it act as a subscriber and we encapsulate the logic that fetches and handles messages from the queue directly into the class:
template<class T>
class Reader {
private:
MessageQueue queue;
public:
Reader(std::string& name):
queue(name, O_RDONLY) {}

void Run() {
T data;
while(true) {
queue.Receive(reinterpret_cast<char*>(&data),
sizeof(data));
Callback(data);
}
}

protected:
virtual void Callback(const T& data) = 0;
};
  1. Since we still want to keep the Reader class as generic as possible, we will define a new class (CoordLogger), which is derived from Reader, to define the specific handling of our messages:
class CoordLogger : public Reader<Message> {
using Reader<Message>::Reader;

protected:
void Callback(const Message& data) override {
std::cout << "Received coordinate " << data << std::endl;
}
};
  1. The DoWrites code remains mostly the same; the only change is that we use a different constant to identify our queue:
void DoWrites() {
std::vector<Message> messages {{1, 0}, {0, 1}, {1, 1}, {0, 0}};
Writer<Message> writer(kQueueName);
for (const auto& m : messages) {
std::cout << "Write " << m << std::endl;
writer.Write(m);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
  1. Since the message handling logic was moved to the Reader and CoordLogger classes, DoReads is now as simple as this:
void DoReads() {
CoordLogger logger(kQueueName);
logger.Run();
}
  1. The updated main function follows:
int main(int argc, char** argv) {
MessageQueue q(kQueueName, O_WRONLY, 10, sizeof(Message));
pid_t pid = fork();
if (pid) {
DoWrites();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
kill(pid, SIGTERM);
} else {
DoReads();
}
}
  1. Finally, our application needs to be linked with the rt library. We do this by adding one line into our CMakeLists.txt file:
target_link_libraries(ipc3 rt)

You can now build and run the application. 

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.5.239