Publish-subscribe communication

The following diagram provides an overview of the scenario that we will implement:

Publish-subscribe communication

For publish-subscribers we can use a fanout exchange and bind any number of queues to that exchange regardless of the binding key. The PublishSubscribeReceiver class can be used to bind a specified queue to a fanout exchange and receive messages from it:

import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class PublishSubscribeReceiver {

    private final static String EXCHANGE_NAME = "pubsub_exchange";
    private final static Logger LOGGER = LoggerFactory.getLogger(Sender.class);
    private Channel channel = null;
    private Connection connection = null;

    public void initialize() {
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
        } catch (IOException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
    . . .
}

The receive() method can be used to retrieve a message from a queue that is bound to the pubsub_exchange fanout exchange and does the following:

  • Creates the pubsub_exchange, if not already created
  • Creates the specified queue if not already created
  • Binds the queue to the pubsub_exchange using the queueBind() method of the Channel instance that represents the AMQP channel for the receiver; notice that in this case we don't specify any particular binding key and for that reason we are using the empty string
  • Creates a new QueueingConsumer instance, registered using the AMQP channel, and the nextDelivery() method is called to receive a message from the channel:
        public String receive(String queue) {
    
            if (channel == null) {
                initialize();
            }
    
            String message = null;
            try {
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                channel.queueDeclare(queue, false, false, false, null);
                channel.queueBind(queue, EXCHANGE_NAME, "");
                QueueingConsumer consumer = new QueueingConsumer(channel);
                channel.basicConsume(queue, true, consumer);
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                message = new String(delivery.getBody());
                LOGGER.info("Message received: " + message);
                return message;
    
            } catch (IOException e) {
                LOGGER.error(e.getMessage(), e);
            } catch (ShutdownSignalException e) {
                LOGGER.error(e.getMessage(), e);
            } catch (ConsumerCancelledException e) {
                LOGGER.error(e.getMessage(), e);
            } catch (InterruptedException e) {
                LOGGER.error(e.getMessage(), e);
            }
            return message;
        }

And we also have a destroy() method:

    public void destroy() {
        try {
            if (connection != null) {
                connection.close();
            }
        } catch (IOException e) {
            LOGGER.warn(e.getMessage(), e);
        }
    }
}

In order to demonstrate the usage of QueueingConsumer for establishing a publish-subscribe communication channel, we will use the FanoutExchangeSenderDemo class to send a message to the pubsub_exchange fanout exchange:

public class FanoutExchangeSenderDemo {

    private static final String FANOUT_EXCHANGE_TYPE = "fanout";

    public static void sendToFanoutExchange(String exchange) {
        Sender sender = new Sender();
        sender.initialize();
        sender.send(exchange, FANOUT_EXCHANGE_TYPE, "Test message.");
        sender.destroy();
    }

    public static void main(String[] args) {
        sendToFanoutExchange("pubsub_exchange");
    }
}

When you invoke the main() method of the FanoutExchangeSenderDemo class, you may notice from the management console that the pubsub_exchange exchange is created in the RabbitMQ server instance separate from the predefined exchanges:

Publish-subscribe communication

If you restart the RabbitMQ instance then you will not see the pubsub_exchange from the management console again, because the exchange is not marked as durable. In order to mark a queue/exchange as durable, you can provide an additional parameter to the queueDeclare()/exchangeDeclare() methods of the Channel class. In order to provide further message delivery guarantees on the broker, you can use the publisher confirms of the extension.

The PublishSubscribeReceiverDemo class provides a demonstration of the PublishSubscribeReceiver class for the establishment of a publish-subscribe channel:

public class PublishSubscribeReceiverDemo {

    public static void main(String[] args) throws InterruptedException {
        final PublishSubscribeReceiver receiver1 = new PublishSubscribeReceiver();
        receiver1.initialize();
        final PublishSubscribeReceiver receiver2 = new PublishSubscribeReceiver();
        receiver2.initialize();
        Thread t1 = new Thread(new Runnable() {
            public void run() {
                receiver1.receive("pubsub_queue1");
            }
        });
        Thread t2 = new Thread(new Runnable() {
            public void run() {
                receiver2.receive("pubsub_queue2");
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();

        receiver1.destroy();
        receiver2.destroy();
    }
}

The main() method creates two receivers that bind to two different queues: pubsub_queue1 and pubsub_queue2. If you have already sent a message to the pubsub_exchange exchange, it will be delivered to both queues and thus sent to both consumers.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.223.206.69