In Chapter 1, Getting Started, and Chapter 2, Introduction to Sockets, we worked on programs with a single socket. Working on one socket is easy, but working on multiple sockets is somewhat tricky. To work with multiple sockets, we use zmq_poll(3)
, which is an event loop that allows an application to multiplex I/O with multiple sockets.
/* Polling with ZeroMQ */ #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* ctx = zmq_ctx_new(); void* pull = zmq_socket(ctx, ZMQ_PULL); zmq_connect(pull, "tcp://localhost:4040"); void* subscriber = zmq_socket(ctx, ZMQ_SUB); char* company_name = "Company1"; int length = strlen(company_name) + 1; zmq_connect(subscriber, "tcp://localhost:5050"); zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, company_name, length); printf("starting... "); zmq_pollitem_t polls[2]; polls[0].socket = pull; polls[0].fd = 0; polls[0].events = ZMQ_POLLIN; polls[0].revents = 0; polls[1].socket = subscriber; polls[1].fd = 0; polls[1].events = ZMQ_POLLIN; polls[1].revents = 0; for(;;) { zmq_msg_t msg; int res = zmq_poll(polls, 2, -1); if(polls[0].revents > 0) { zmq_msg_init(&msg); zmq_msg_recv(&msg, pull, 0); zmq_msg_close(&msg); } if(polls[1].revents > 0) { zmq_msg_init(&msg); zmq_msg_recv(&msg, subscriber, 0); zmq_msg_close(&msg); } } zmq_close(pull); zmq_close(subscriber); zmq_ctx_destroy(ctx); return 0; }
You could initialize polling options by putting them into an array as well. For example, the following code connects to three different sockets and pulls (ZMQ_PULL
) the results from those sockets. Those sockets push data using ZMQ_PUSH
.
/* Pull from multiple sockets with zmq_poll. */ #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* pull1 = zmq_socket(context, ZMQ_PULL); zmq_bind(pull1, "tcp://*:5050"); void* pull2 = zmq_socket(context, ZMQ_PULL); zmq_bind(pull2, "tcp://*:4040"); void* pull3 = zmq_socket(context, ZMQ_PULL); zmq_bind(pull3, "tcp://*:6060"); printf("Starting... "); zmq_pollitem_t polls[] = { {pull1, 0, ZMQ_POLLIN, 0}, {pull2, 0, ZMQ_POLLIN, 0}, {pull3, 0, ZMQ_POLLIN, 0} }; int length = sizeof(polls) / sizeof(zmq_pollitem_t); for(;;) { zmq_msg_t msg; zmq_poll(polls, length, -1); if(polls[0].revents & ZMQ_POLLIN) { zmq_msg_init(&msg); zmq_msg_recv(&msg, pull1, 0); zmq_msg_close(&msg); } if(polls[1].revents > 0) { zmq_msg_init(&msg); zmq_msg_recv(&msg, pull2, 0); zmq_msg_close(&msg); } if(polls[2].revents > 0) { zmq_msg_init(&msg); zmq_msg_recv(&msg, pull3, 0); zmq_msg_close(&msg); } } zmq_close(pull1); zmq_close(pull2); zmq_close(pull3); zmq_ctx_destroy(context); return 0; }
3.129.23.30