Sending messages to many hosts at once

There might be a case when you'd like to send a single message to multiple hosts at the same time. A typical example of such a situation can be a chat application. The ZeroMQ library solves this problem with the Publisher-Subscriber model, which is similar to a well-known multicast. Multiple types of messages can be divided into topics identified by a simple string value.

Do note that with the Publisher-Subscriber model, you can send messages only from the publisher to the subscriber, not the other way round.

Getting ready

For this recipe, you'll be using one publisher part and at least one subscriber. You can run more subscriber instances to see the effect of this network model.

How to do it…

This recipe will be divided into two parts, one for the publisher and the other for the subscriber.

The publisher part

Let's take a look at the publisher recipe:

  1. First you'll need to prepare the ZeroMQ socket for the publisher:
    local zmq = require 'zmq'
    local context = assert(zmq.context())
    local socket = assert(context.socket(zmq.ZMQ_PUB))
    assert(socket.bind("tcp://*:12345"))
  2. Notice that now you're using a different type of socket identified by the zmq.ZMQ_PUB constant.
  3. In the next step, you need to set up message polling for sending. In the publisher mode, you need to send the topic name first:
    local poll = zmq.poll()
    local topic = 'demo'
    poll.add(socket, zmq.ZMQ_POLLOUT, function(socket)
      assert(socket.sendMultipart({topic, 'Hello everyone!'}))
    end)
  4. This time, you'll be using polling for the message output to keep up with your network bandwidth.
  5. In the final part of the publisher, you'll be sending messages over and over in a loop. However, you'd be sending hundreds or thousands of messages per second. It's better to limit the message output to a reasonable one message per second.
    local lastTime = os.clock()
    while true do
      local newTime = os.clock()
      if (newTime-lastTime) >= 1 then
        lastTime = newTime
        poll.start()
      end
    end
    socket.close()

The subscriber part

In this part, you'll be using a socket with the zmq.ZMQ_SUB type identifier. Other code parts are almost identical to the previous code samples:

local zmq = require 'zmq'
local context = assert(zmq.context())
local socket = assert(context.socket(zmq.ZMQ_SUB))
assert(socket.connect("tcp://localhost:12345"))
  1. The cool part starts with using socket options to subscribe to the specific topics:
    local topic = 'demo'
    socket.options.subscribe = topic
  2. You can subscribe to additional topics by using the subscribe socket option multiple times. Refer to the following code, for instance:
    socket.options.subscribe = 'topic1'
    socket.options.subscribe = 'topic2'
  3. There is an option to unsubscribe as well.
    socket.options.unsubscribe = 'topic2'
  4. If you subscribe to the topic with an empty string, it means you need to subscribe to all topics.
  5. After this, you need to prepare polling functions to receive messages from all the subscribed topics:
    local poll = zmq.poll()
    poll.add(socket, zmq.ZMQ_POLLIN, function(s)
      local topic = assert(socket.recv())
      local result = assert(socket.recvAll())
      print(topic, result)
    end)

    Tip

    Do note that you need to use the socket.recv function to obtain only the topic name. Otherwise, the socket.recvAll function will merge the topic name and the message into one string. This way you can differentiate messages from different topics.

  6. In the final step, you'll need to prepare a loop for message polling and cleanup.
    while (true) do
    poll.start()
    end
    
    socket.disconnect()

How it works…

This recipe uses the one-directional pattern that's very close to radio transmission. It can be viewed as if you were sending messages at different frequencies and people can tune-in to specific kinds of messages they like. Transmissions occur for connected peers only, so there's no wasted bandwidth. However, if you connect later to the publisher, you'll always miss the first few messages. It's just like a radio. You won't get your lottery winning numbers if you connect too late. Similarly, if there are too many messages in transmission, you'll miss some of them. This behavior is controlled by the so-called high watermark or HWM in short and it's interpreted as the maximum amount of messages. You can adjust HWM values for sending or receiving using the socket option, as shown in the following code:

socket.options.rcvhwm = 1000
socket.options.sndhwm = 1000

Each message sent from the publisher must contain a topic name, otherwise the subscriber will not get any message.

Similarly, the subscriber can be connected to many publishers with different topics. The topic name is received in the first part of a multipart message.

If you subscribe to the topic with an empty string value, it'll subscribe to all the available topics.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.176.194