Working with multiple sockets

In Chapter 1, Getting Started, and Chapter 2, Introduction to Sockets, we worked on programs with a single socket. Working on one socket is easy, but working on multiple sockets is somewhat tricky. To work with multiple sockets, we use zmq_poll(3), which is an event loop that allows an application to multiplex I/O with multiple sockets.

/*
  
  Polling with ZeroMQ

*/
#include <string.h>
#include "zmq.h"

int main (int argc, char const *argv[]) {
  
  void* ctx = zmq_ctx_new();
  void* pull = zmq_socket(ctx, ZMQ_PULL);
  zmq_connect(pull, "tcp://localhost:4040");
  
  void* subscriber = zmq_socket(ctx, ZMQ_SUB);

  char* company_name = "Company1";
  int length = strlen(company_name) + 1;

  zmq_connect(subscriber, "tcp://localhost:5050");
  zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, company_name, length);
  
  printf("starting...
");
  
  zmq_pollitem_t polls[2];
  
  polls[0].socket = pull;
  polls[0].fd = 0;
  polls[0].events = ZMQ_POLLIN;
  polls[0].revents = 0;
  
  polls[1].socket = subscriber;
  polls[1].fd = 0;
  polls[1].events = ZMQ_POLLIN;
  polls[1].revents = 0;
  
  for(;;) {
    zmq_msg_t msg;
    int res =  zmq_poll(polls, 2, -1);

    if(polls[0].revents > 0) {
      zmq_msg_init(&msg);
      zmq_msg_recv(&msg, pull, 0);
      zmq_msg_close(&msg);
    }

    if(polls[1].revents > 0) {
      zmq_msg_init(&msg);
      zmq_msg_recv(&msg, subscriber, 0);
      zmq_msg_close(&msg);
    }
  }

  zmq_close(pull);
  zmq_close(subscriber);
  zmq_ctx_destroy(ctx);
  
  return 0;
}

You could initialize polling options by putting them into an array as well. For example, the following code connects to three different sockets and pulls (ZMQ_PULL) the results from those sockets. Those sockets push data using ZMQ_PUSH.

/*

 Pull from multiple sockets with zmq_poll.

*/

#include "zmq.h"

int main (int argc, char const *argv[]) {
  
  void* context = zmq_ctx_new();

  void* pull1 = zmq_socket(context, ZMQ_PULL);
  zmq_bind(pull1, "tcp://*:5050");
  
  void* pull2 = zmq_socket(context, ZMQ_PULL);
  zmq_bind(pull2, "tcp://*:4040");
  
  void* pull3 = zmq_socket(context, ZMQ_PULL);
  zmq_bind(pull3, "tcp://*:6060");
  
  printf("Starting...
");
  
  zmq_pollitem_t polls[] = {
    {pull1, 0, ZMQ_POLLIN, 0},
    {pull2, 0, ZMQ_POLLIN, 0},
    {pull3, 0, ZMQ_POLLIN, 0}
  };
  
  int length = sizeof(polls) / sizeof(zmq_pollitem_t);

  for(;;) {
    zmq_msg_t msg;
    zmq_poll(polls, length, -1);

    if(polls[0].revents & ZMQ_POLLIN) {
      zmq_msg_init(&msg);
      zmq_msg_recv(&msg, pull1, 0);
      zmq_msg_close(&msg);
    }
    if(polls[1].revents > 0) {
      zmq_msg_init(&msg);
      zmq_msg_recv(&msg, pull2, 0);
      zmq_msg_close(&msg);
    }
    if(polls[2].revents > 0) {
      zmq_msg_init(&msg);
      zmq_msg_recv(&msg, pull3, 0);
      zmq_msg_close(&msg);
    }
  }
  
  zmq_close(pull1);
  zmq_close(pull2);
  zmq_close(pull3);
  zmq_ctx_destroy(context);
  
  return 0;
}

Tip

Never access the zmq_msg_t variables directly.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.129.23.30