Chapter 2. Introduction to Sockets

After having a look at the basic structure of ZeroMQ in previous chapter, in this chapter we will have a look at sockets with respect to the following points:

  • The publish-subscribe pattern
  • The pipeline pattern

The publish-subscribe pattern

First, let's introduce the second classic pattern, that is, the publish-subscribe pattern, which is a one-way distribution pattern where the server sends messages to a set of clients. It is a one-to-many model. The fundamental idea of this pattern is a publisher sends a message and connected subscribers receive the message, whereas disconnected subscribers just miss the message. A publisher is loosely coupled to the subscribers and does not care if any subscribers exist. It is similar to how TV channels or radio stations work. A TV channel always broadcasts TV shows and only the viewers who turn that channel on receive the broadcast. If you miss the time, you miss your favorite show (unless you have TiVo or something similar, but let's assume that our scenario happens in a world where recordings have not been invented). The advantage of the publish-subscribe pattern is that it provides a more dynamic network topology.

The publish-subscribe pattern can be summarized in the following four main points:

  • Publish: An event is published by the publisher
  • Notify: The subscriber is notified of a published event
  • Subscribe: A new subscription is issued by a subscriber
  • Unsubscribe: A subscriber removes its existing subscription

Let's take an example to make things clearer. Consider a scenario where we would like to set up a stock exchange program. There are brokers and they would like to know how certain stocks are doing in the market. Our publisher is the stock market and our subscribers are the brokers.

Instead of getting real numbers from stock markets, we will just generate some random numbers for stock values.

Before jumping into any code, first let's see what the publish-subscribe pattern looks like.

The publish-subscribe pattern

The publish-subscribe pattern

The following is the publisher code (server):

/*
 *  Stock Market Server
 *  Binds PUB socket to tcp://*:4040
 *  Publishes random stock values of random companies
 */

#include <string.h>
#include "zmq.h"
int main (int argc, char const *argv[]) {
  
  void* context = zmq_ctx_new();
  void* publisher = zmq_socket(context, ZMQ_PUB);
  printf("Starting server...
");

  int conn = zmq_bind(publisher, "tcp://*:4040");

  const char* companies[2] = {"Company1", "Company2"};
  int count = 0;
  for(;;) {
    int price = count % 2;
    int which_company = count % 2;
    int index = strlen(companies[0]);
    char update[12];
    snprintf(update, sizeof update, "%s", companies[which_company]);
    
    zmq_msg_t message;
    zmq_msg_init_size(&message, index);
    memcpy(zmq_msg_data(&message), update, index);
    zmq_msg_send(&message, publisher, 0);
    zmq_msg_close(&message);
    count++;
  }

  zmq_close(publisher);
  zmq_ctx_destroy(context);
  
  return 0;
}

And the following is the subscriber code (client):

/*
 *  Stock Market Client
 *  Connects SUB socket to tcp://localhost:4040
 *  Collects stock exchange values
 */

#include <stdlib.h>
#include <string.h>
#include "zmq.h"
int main (int argc, char const *argv[]) {

  void* context = zmq_ctx_new();
  void* subscriber = zmq_socket(context, ZMQ_SUB);

 
  printf("Collecting stock information from the server.
");

  int conn = zmq_connect(subscriber, "tcp://localhost:4040");
  conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0);

  int i;
  for(i = 0; i < 10; i++) {
    zmq_msg_t reply;
    zmq_msg_init(&reply);
    zmq_msg_recv(&reply, subscriber, 0);
    
    int length = zmq_msg_size(&reply);
    char* value = malloc(length);
    memcpy(value, zmq_msg_data(&reply), length);
    zmq_msg_close(&reply);
    printf("%s
", value);    
    free(value);
  }
  zmq_close(subscriber);
  zmq_ctx_destroy(context);

  return 0;
}

Setting a subscription using zmq_setsockopt(3) and subscribe is mandatory whenever you use a SUB socket, otherwise you will not receive any messages. This is a very common error.

The subscriber can set numerous subscriptions, which the subscriber receives, to any messages if an update matches any of the subscriptions. It can unsubscribe from particular subscriptions as well. Subscriptions are fixed-length blobs.

A subscriber receives the message using zmq_msg_recv(3). zmq_msg_recv(3) receives a message from a socket and stores the message. Previous messages, if any, are deallocated.

int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);

The flag option can only be one value, which is ZMQ_DONTWAIT. If ZMQ_DONTWAIT is specified, then the operation is performed in the non-blocking mode. If the message is successfully received, it (zmq_msg_recv(3)) returns the size of the message in bytes; otherwise it returns -1 and the error message flag.

The publish-subscribe pattern is asynchronous and sending a message to a SUB socket causes an error. You could call zmq_msg_send(3) to send messages whenever you want but you should never call zmq_msg_recv(3) on a PUB socket.

The following is the sample output of the client code:

Company2 570
Company2 878
Company2 981
Company2 783
Company1 855
Company1 524
Company2 639
Company1 984
Company1 158
Company2 145

The publisher will always send messages even if there is no subscriber. You could try it and see for yourself. You would see that the publisher sends something like the following:

Sending... Company2 36
Sending... Company2 215
Sending... Company2 712
Sending... Company2 924
Sending... Company2 721
Sending... Company1 668
Sending... Company2 83
Sending... Company2 209
Sending... Company1 450
Sending... Company1 940
Sending... Company1 57
Sending... Company2 3
Sending... Company1 100
Sending... Company2 947

Let's say we want to receive the results of Company1 or the company name that we pass as an argument. In that case, we could change our client code to the following:

//
//  Stock Market Client
//  Connects SUB socket to tcp://localhost:4040
//  Collects stock exchange values
//

#include <stdlib.h>
#include <string.h>
#include "zmq.h"


int main (int argc, char const *argv[]) {

  void* context = zmq_ctx_new();
  void* subscriber = zmq_socket(context, ZMQ_SUB);

  const char* filter;
  
  if(argc > 1) {
    filter = argv[1];
  } else {
    filter = "Company1";
  }
  printf("Collecting stock information from the server.
");

  int conn = zmq_connect(subscriber, "tcp://localhost:4040");
  conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));

  int i = 0;
  for(i = 0; i < 10; i++) {
    zmq_msg_t reply;
    zmq_msg_init(&reply);
    zmq_msg_recv(&reply, subscriber, 0);
    
    int length = zmq_msg_size(&reply);
    char* value = malloc(length + 1);
    memcpy(value, zmq_msg_data(&reply), length);
    zmq_msg_close(&reply);
    printf("%s
", value);    
    free(value);

  }
  zmq_close(subscriber);
  zmq_ctx_destroy(context);

  return 0;
}

The output would be something like the following:

Company1 575
Company1 504
Company1 513
Company1 584
Company1 444
Company1 1010
Company1 524
Company1 963
Company1 929
Company1 718 

Filtering out messages

Our basic stock exchange application sends messages to subscribers. It seems everything has gone as expected, right? Unfortunately, no.

ZeroMQ matches the subscriber strings with prefix matching, which means ZeroMQ will return Company1, Company10, and Company101 even if you look for Company1 alone.

Let's change our publisher code to the following:

//
//  Stock Market Server
//  Binds PUB socket to tcp://*:4040
//  Publishes random stock values of random companies
//

#include <string.h>
#include "zmq.h"
int main (int argc, char const *argv[]) {
  
  void* context = zmq_ctx_new();
  void* publisher = zmq_socket(context, ZMQ_PUB);

  int conn = zmq_bind(publisher, "tcp://*:4040");
  
  const char* companies[3] = {"Company1", "Company10", "Company101"};
  int count = 0;
  for(;;) {
    int price = count % 17;
    int which_company = count % 3;
    int index = strlen(companies[which_company]);
    char update[64];
    snprintf(update, sizeof update, "%s", companies[which_company]);
    
    zmq_msg_t message;
    zmq_msg_init_size(&message, index);
    memcpy(zmq_msg_data(&message), update, index);
    zmq_msg_send(&message, publisher, 0);
    zmq_msg_close(&message);
    count++;

  }

  zmq_close(publisher);
  zmq_ctx_destroy(context);
  
  return 0;
}

And now let's change our subscriber code to the following:

//
//  Stock Market Client
//  Connects SUB socket to tcp://localhost:4040
//  Collects stock exchange values
//

#include <stdlib.h>
#include <string.h>
#include "zmq.h"


int main (int argc, char const *argv[]) {

  void* context = zmq_ctx_new();
  void* subscriber = zmq_socket(context, ZMQ_SUB);

  const char* filter;
  
  if(argc > 1) {
    filter = argv[1];
  } else {
    filter = "Company1";
  }
  printf("Collecting stock information from the server.
");

  int conn = zmq_connect(subscriber, "tcp://localhost:4040");
  conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));

  int i = 0;
  for(i = 0; i < 10; i++) {
    zmq_msg_t reply;
    zmq_msg_init(&reply);
    zmq_msg_recv(&reply, subscriber, 0);
    
    int length = zmq_msg_size(&reply);
    char* value = malloc(length + 1);
    memcpy(value, zmq_msg_data(&reply), length);
    zmq_msg_close(&reply);
    printf("%s
", value);    
    free(value);

  }
  zmq_close(subscriber);
  zmq_ctx_destroy(context);

  return 0;
}

In this case, the output will be something similar to the following:

Collecting stock information from the server.
Company101 950
Company10 707
Company101 55
Company101 343
Company10 111
Company1 651
Company10 287
Company101 8
Company1 889
Company101 536

Our subscriber code explicitly says that we want to see the results of Company1. However, the publisher sends us the results of Company10 and Company101 as well. This is certainly not what we want. We need to solve this small issue.

We may want to do some dirty hacking to get what we want but using a delimiter is a much simpler solution for it.

We need to make some changes both in the publisher and the subscriber code and we will filter the company names using a delimiter.

The following is our updated publisher code that fixes the previous problem. Have a look at the highlighted line to see how we can use a delimiter to send the message to the subscribers:

//
//  Stock Market Server
//  Binds PUB socket to tcp://*:4040
//  Publishes random stock values of random companies
//

#include <stdlib.h>
#include <string.h>
#include "zmq.h"


int main (int argc, char const *argv[]) {
  
  void* context = zmq_ctx_new();
  void* publisher = zmq_socket(context, ZMQ_PUB);
  int conn = zmq_bind(publisher, "tcp://*:4040");
  conn = zmq_bind(publisher, "ipc://stock.ipc");
  
  const char* companies[3] = {"Company1", "Company10", "Company101"};

  for(;;) {
    int price = count % 17;
    int which_company = count % 3;
    int index = strlen(companies[which_company]);
    char update[64];
    sprintf(update, "%s| %d", companies[which_company], price);
    zmq_msg_t message;
    zmq_msg_init_size(&message, index);
    memcpy(zmq_msg_data(&message), update, index);
    zmq_msg_send(&message, publisher, 0);
    zmq_msg_close(&message);
    count++;
  }

  zmq_close(publisher);
  zmq_ctx_destroy(context);
  
  return 0;
}

And the following is our updated subscriber code to filter results using the delimiter we use in our publisher code:

//
//  Stock Market Client
//  Connects SUB socket to tcp://localhost:4040
//  Collects stock exchange values
//

#include <stdlib.h>
#include <string.h>
#include "zmq.h"


int main (int argc, char const *argv[]) {
  void* context = zmq_ctx_new();
  void* subscriber = zmq_socket(context, ZMQ_SUB);

  const char* filter;
  
  filter = "Company1|";
  printf("Collecting stock information from the server.
");

  int conn = zmq_connect(subscriber, "tcp://localhost:4040");
  conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));

  int i = 0;
  for(i = 0; i < 10; i++) {
    zmq_msg_t reply;
    zmq_msg_init(&reply);
    zmq_msg_recv(&reply, subscriber, 0);
    
    int length = zmq_msg_size(&reply);
    char* value = malloc(length + 1);
    memcpy(value, zmq_msg_data(&reply), length);
    zmq_msg_close(&reply);
    printf("%s
", value);    
    free(value);
  }
  zmq_close(subscriber);
  zmq_ctx_destroy(context);

  return 0;
}

Now we can see the results as expected after the changes we have made in our publisher and subscriber code.

The socket options

Since we use the publish-subscribe pattern, the option name we use is ZMQ_SUBSCRIBE.

int conn = zmq_connect(subscriber, "tcp://localhost:4040");
conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, option_value, strlen(option_value));

The socket options are set with the zmq_setsockopt(3) function. It takes four parameters:

  • Socket
  • Option name
  • Option value
  • Length of the option

This can be made clear by the following line of code:

int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);

Subscription

ZMQ_SUBSCRIBE establishes a new message on the ZMQ_SUB socket. If the option_value argument is not empty, we are subscribed to all messages that start with option_value. You could attach multiple filters to a one ZMQ_SUB socket.

Unsubscription

ZMQ_UNSUBSCRIBE removes a message on the ZMQ_SUB socket. It removes only one message even if there are multiple filters.

An important thing we need to note about the publisher-subscriber sockets is that we do not know when the subscriber starts to receive messages. In this case, it is a good idea to start the subscriber and then to start the publisher. This is because the subscriber always misses the first message as connecting to the publisher takes time and the publisher may already be sending a message.

However, we will talk about how to synchronize the publisher and the subscribers so we do not have to send any messages unless the subscribers are really connected.

Notes on the publisher-subscriber pattern

The key points to be noted about the publisher-subscriber pattern are as follows:

  • Messages are queued up on the publisher's side if you are using TCP and the subscriber is too slow to receive messages. We will show you how to protect the application against this.
  • A subscriber could connect to multiple publishers. Data will be transmitted via the fair-queue strategy.
  • The publisher sends all the messages to all subscribers and filtering is done on the subscriber's side as we have seen from our stock exchange program that we provided earlier.
  • We will return to the publisher-subscriber pattern in Chapter 4, Advanced Patterns, to discuss how to deal with the slow subscribers.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.2.15