The following diagram provides an overview of the scenario that we will implement:
For publish-subscribers we can use a fanout exchange and bind any number of queues to that exchange regardless of the binding key. The PublishSubscribeReceiver
class can be used to bind a specified queue to a fanout exchange and receive messages from it:
import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class PublishSubscribeReceiver { private final static String EXCHANGE_NAME = "pubsub_exchange"; private final static Logger LOGGER = LoggerFactory.getLogger(Sender.class); private Channel channel = null; private Connection connection = null; public void initialize() { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } } . . . }
The receive()
method can be used to retrieve a message from a queue that is bound to the pubsub_exchange
fanout exchange and does the following:
pubsub_exchange
, if not already createdpubsub_exchange
using the queueBind()
method of the Channel instance that represents the AMQP channel for the receiver; notice that in this case we don't specify any particular binding key and for that reason we are using the empty stringQueueingConsumer
instance, registered using the AMQP channel, and the nextDelivery()
method is called to receive a message from the channel
:public String receive(String queue) { if (channel == null) { initialize(); } String message = null; try { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); channel.queueDeclare(queue, false, false, false, null); channel.queueBind(queue, EXCHANGE_NAME, ""); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queue, true, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); message = new String(delivery.getBody()); LOGGER.info("Message received: " + message); return message; } catch (IOException e) { LOGGER.error(e.getMessage(), e); } catch (ShutdownSignalException e) { LOGGER.error(e.getMessage(), e); } catch (ConsumerCancelledException e) { LOGGER.error(e.getMessage(), e); } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); } return message; }
And we also have a destroy()
method:
public void destroy() { try { if (connection != null) { connection.close(); } } catch (IOException e) { LOGGER.warn(e.getMessage(), e); } } }
In order to demonstrate the usage of QueueingConsumer
for establishing a publish-subscribe communication channel, we will use the FanoutExchangeSenderDemo
class to send a message to the pubsub_exchange
fanout exchange:
public class FanoutExchangeSenderDemo { private static final String FANOUT_EXCHANGE_TYPE = "fanout"; public static void sendToFanoutExchange(String exchange) { Sender sender = new Sender(); sender.initialize(); sender.send(exchange, FANOUT_EXCHANGE_TYPE, "Test message."); sender.destroy(); } public static void main(String[] args) { sendToFanoutExchange("pubsub_exchange"); } }
When you invoke the main()
method of the FanoutExchangeSenderDemo
class, you may notice from the management console that the pubsub_exchange
exchange is created in the RabbitMQ server instance separate from the predefined exchanges:
If you restart the RabbitMQ instance then you will not see the pubsub_exchange
from the management console again, because the exchange is not marked as durable. In order to mark a queue/exchange as durable, you can provide an additional parameter to the queueDeclare()/exchangeDeclare()
methods of the Channel
class. In order to provide further message delivery guarantees on the broker, you can use the publisher confirms of the extension.
The PublishSubscribeReceiverDemo
class provides a demonstration of the PublishSubscribeReceiver
class for the establishment of a publish-subscribe channel:
public class PublishSubscribeReceiverDemo { public static void main(String[] args) throws InterruptedException { final PublishSubscribeReceiver receiver1 = new PublishSubscribeReceiver(); receiver1.initialize(); final PublishSubscribeReceiver receiver2 = new PublishSubscribeReceiver(); receiver2.initialize(); Thread t1 = new Thread(new Runnable() { public void run() { receiver1.receive("pubsub_queue1"); } }); Thread t2 = new Thread(new Runnable() { public void run() { receiver2.receive("pubsub_queue2"); } }); t1.start(); t2.start(); t1.join(); t2.join(); receiver1.destroy(); receiver2.destroy(); } }
The main()
method creates two receivers that bind to two different queues: pubsub_queue1
and pubsub_queue2
. If you have already sent a message to the pubsub_exchange
exchange, it will be delivered to both queues and thus sent to both consumers.
18.216.171.107