After having a look at the basic structure of ZeroMQ in previous chapter, in this chapter we will have a look at sockets with respect to the following points:
First, let's introduce the second classic pattern, that is, the publish-subscribe pattern, which is a one-way distribution pattern where the server sends messages to a set of clients. It is a one-to-many model. The fundamental idea of this pattern is a publisher sends a message and connected subscribers receive the message, whereas disconnected subscribers just miss the message. A publisher is loosely coupled to the subscribers and does not care if any subscribers exist. It is similar to how TV channels or radio stations work. A TV channel always broadcasts TV shows and only the viewers who turn that channel on receive the broadcast. If you miss the time, you miss your favorite show (unless you have TiVo or something similar, but let's assume that our scenario happens in a world where recordings have not been invented). The advantage of the publish-subscribe pattern is that it provides a more dynamic network topology.
The publish-subscribe pattern can be summarized in the following four main points:
Let's take an example to make things clearer. Consider a scenario where we would like to set up a stock exchange program. There are brokers and they would like to know how certain stocks are doing in the market. Our publisher is the stock market and our subscribers are the brokers.
Instead of getting real numbers from stock markets, we will just generate some random numbers for stock values.
Before jumping into any code, first let's see what the publish-subscribe pattern looks like.
The following is the publisher code (server):
/* * Stock Market Server * Binds PUB socket to tcp://*:4040 * Publishes random stock values of random companies */ #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* publisher = zmq_socket(context, ZMQ_PUB); printf("Starting server... "); int conn = zmq_bind(publisher, "tcp://*:4040"); const char* companies[2] = {"Company1", "Company2"}; int count = 0; for(;;) { int price = count % 2; int which_company = count % 2; int index = strlen(companies[0]); char update[12]; snprintf(update, sizeof update, "%s", companies[which_company]); zmq_msg_t message; zmq_msg_init_size(&message, index); memcpy(zmq_msg_data(&message), update, index); zmq_msg_send(&message, publisher, 0); zmq_msg_close(&message); count++; } zmq_close(publisher); zmq_ctx_destroy(context); return 0; }
And the following is the subscriber code (client):
/* * Stock Market Client * Connects SUB socket to tcp://localhost:4040 * Collects stock exchange values */ #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* subscriber = zmq_socket(context, ZMQ_SUB); printf("Collecting stock information from the server. "); int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0); int i; for(i = 0; i < 10; i++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, subscriber, 0); int length = zmq_msg_size(&reply); char* value = malloc(length); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); printf("%s ", value); free(value); } zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }
Setting a subscription using zmq_setsockopt(3)
and subscribe
is mandatory whenever you use a SUB
socket, otherwise you will not receive any messages. This is a very common error.
The subscriber can set numerous subscriptions, which the subscriber receives, to any messages if an update matches any of the subscriptions. It can unsubscribe from particular subscriptions as well. Subscriptions are fixed-length blobs.
A subscriber receives the message using zmq_msg_recv(3)
. zmq_msg_recv(3)
receives a message from a socket and stores the message. Previous messages, if any, are deallocated.
int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);
The flag option can only be one value, which is ZMQ_DONTWAIT
. If ZMQ_DONTWAIT
is specified, then the operation is performed in the non-blocking mode. If the message is successfully received, it (zmq_msg_recv(3)
) returns the size of the message in bytes; otherwise it returns -1
and the error message flag.
The publish-subscribe pattern is asynchronous and sending a message to a SUB
socket causes an error. You could call zmq_msg_send(3)
to send messages whenever you want but you should never call zmq_msg_recv(3)
on a PUB
socket.
The following is the sample output of the client code:
Company2 570 Company2 878 Company2 981 Company2 783 Company1 855 Company1 524 Company2 639 Company1 984 Company1 158 Company2 145
The publisher will always send messages even if there is no subscriber. You could try it and see for yourself. You would see that the publisher sends something like the following:
Sending... Company2 36 Sending... Company2 215 Sending... Company2 712 Sending... Company2 924 Sending... Company2 721 Sending... Company1 668 Sending... Company2 83 Sending... Company2 209 Sending... Company1 450 Sending... Company1 940 Sending... Company1 57 Sending... Company2 3 Sending... Company1 100 Sending... Company2 947
Let's say we want to receive the results of Company1
or the company name that we pass as an argument. In that case, we could change our client code to the following:
// // Stock Market Client // Connects SUB socket to tcp://localhost:4040 // Collects stock exchange values // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* subscriber = zmq_socket(context, ZMQ_SUB); const char* filter; if(argc > 1) { filter = argv[1]; } else { filter = "Company1"; } printf("Collecting stock information from the server. "); int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter)); int i = 0; for(i = 0; i < 10; i++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, subscriber, 0); int length = zmq_msg_size(&reply); char* value = malloc(length + 1); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); printf("%s ", value); free(value); } zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }
The output would be something like the following:
Company1 575 Company1 504 Company1 513 Company1 584 Company1 444 Company1 1010 Company1 524 Company1 963 Company1 929 Company1 718
Our basic stock exchange application sends messages to subscribers. It seems everything has gone as expected, right? Unfortunately, no.
ZeroMQ matches the subscriber strings with prefix matching, which means ZeroMQ will return Company1
, Company10
, and Company101
even if you look for Company1
alone.
Let's change our publisher code to the following:
//
// Stock Market Server
// Binds PUB socket to tcp://*:4040
// Publishes random stock values of random companies
//
#include <string.h>
#include "zmq.h"
int main (int argc, char const *argv[]) {
void* context = zmq_ctx_new();
void* publisher = zmq_socket(context, ZMQ_PUB);
int conn = zmq_bind(publisher, "tcp://*:4040");
const char* companies[3] = {"Company1", "Company10", "Company101"};
int count = 0;
for(;;) {
int price = count % 17;
int which_company = count % 3;
int index = strlen(companies[which_company]);
char update[64];
snprintf(update, sizeof update, "%s", companies[which_company]);
zmq_msg_t message;
zmq_msg_init_size(&message, index);
memcpy(zmq_msg_data(&message), update, index);
zmq_msg_send(&message, publisher, 0);
zmq_msg_close(&message);
count++;
}
zmq_close(publisher);
zmq_ctx_destroy(context);
return 0;
}
And now let's change our subscriber code to the following:
//
// Stock Market Client
// Connects SUB socket to tcp://localhost:4040
// Collects stock exchange values
//
#include <stdlib.h>
#include <string.h>
#include "zmq.h"
int main (int argc, char const *argv[]) {
void* context = zmq_ctx_new();
void* subscriber = zmq_socket(context, ZMQ_SUB);
const char* filter;
if(argc > 1) {
filter = argv[1];
} else {
filter = "Company1";
}
printf("Collecting stock information from the server.
");
int conn = zmq_connect(subscriber, "tcp://localhost:4040");
conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));
int i = 0;
for(i = 0; i < 10; i++) {
zmq_msg_t reply;
zmq_msg_init(&reply);
zmq_msg_recv(&reply, subscriber, 0);
int length = zmq_msg_size(&reply);
char* value = malloc(length + 1);
memcpy(value, zmq_msg_data(&reply), length);
zmq_msg_close(&reply);
printf("%s
", value);
free(value);
}
zmq_close(subscriber);
zmq_ctx_destroy(context);
return 0;
}
In this case, the output will be something similar to the following:
Collecting stock information from the server. Company101 950 Company10 707 Company101 55 Company101 343 Company10 111 Company1 651 Company10 287 Company101 8 Company1 889 Company101 536
Our subscriber code explicitly says that we want to see the results of Company1
. However, the publisher sends us the results of Company10
and Company101
as well. This is certainly not what we want. We need to solve this small issue.
We may want to do some dirty hacking to get what we want but using a delimiter is a much simpler solution for it.
We need to make some changes both in the publisher and the subscriber code and we will filter the company names using a delimiter.
The following is our updated publisher code that fixes the previous problem. Have a look at the highlighted line to see how we can use a delimiter to send the message to the subscribers:
// // Stock Market Server // Binds PUB socket to tcp://*:4040 // Publishes random stock values of random companies // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* publisher = zmq_socket(context, ZMQ_PUB); int conn = zmq_bind(publisher, "tcp://*:4040"); conn = zmq_bind(publisher, "ipc://stock.ipc"); const char* companies[3] = {"Company1", "Company10", "Company101"}; for(;;) { int price = count % 17; int which_company = count % 3; int index = strlen(companies[which_company]); char update[64]; sprintf(update, "%s| %d", companies[which_company], price); zmq_msg_t message; zmq_msg_init_size(&message, index); memcpy(zmq_msg_data(&message), update, index); zmq_msg_send(&message, publisher, 0); zmq_msg_close(&message); count++; } zmq_close(publisher); zmq_ctx_destroy(context); return 0; }
And the following is our updated subscriber code to filter results using the delimiter we use in our publisher code:
//
// Stock Market Client
// Connects SUB socket to tcp://localhost:4040
// Collects stock exchange values
//
#include <stdlib.h>
#include <string.h>
#include "zmq.h"
int main (int argc, char const *argv[]) {
void* context = zmq_ctx_new();
void* subscriber = zmq_socket(context, ZMQ_SUB);
const char* filter;
filter = "Company1|";
printf("Collecting stock information from the server.
");
int conn = zmq_connect(subscriber, "tcp://localhost:4040");
conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));
int i = 0;
for(i = 0; i < 10; i++) {
zmq_msg_t reply;
zmq_msg_init(&reply);
zmq_msg_recv(&reply, subscriber, 0);
int length = zmq_msg_size(&reply);
char* value = malloc(length + 1);
memcpy(value, zmq_msg_data(&reply), length);
zmq_msg_close(&reply);
printf("%s
", value);
free(value);
}
zmq_close(subscriber);
zmq_ctx_destroy(context);
return 0;
}
Now we can see the results as expected after the changes we have made in our publisher and subscriber code.
Since we use the publish-subscribe
pattern, the option name we use is ZMQ_SUBSCRIBE
.
int conn = zmq_connect(subscriber, "tcp://localhost:4040");
conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, option_value, strlen(option_value));
The socket options are set with the zmq_setsockopt(3)
function. It takes four parameters:
This can be made clear by the following line of code:
int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);
ZMQ_SUBSCRIBE
establishes a new
message on the ZMQ_SUB
socket. If the option_value
argument is not empty, we are subscribed to all messages that start with option_value
. You could attach multiple filters to a one ZMQ_SUB
socket.
ZMQ_UNSUBSCRIBE
removes a message on
the ZMQ_SUB
socket. It removes only one message even if there are multiple filters.
An important thing we need to note about the publisher-subscriber sockets is that we do not know when the subscriber starts to receive messages. In this case, it is a good idea to start the subscriber and then to start the publisher. This is because the subscriber always misses the first message as connecting to the publisher takes time and the publisher may already be sending a message.
However, we will talk about how to synchronize the publisher and the subscribers so we do not have to send any messages unless the subscribers are really connected.
The key points to be noted about the publisher-subscriber pattern are as follows:
3.144.252.201