Chapter 2. Design Patterns with RabbitMQ

As a robust messaging solution, RabbitMQ provides different utilities for distributing messages between endpoints in the communication channel. These utilities provide an implementation of best practices and design patterns that apply to messaging solutions and form the backbone of a messaging broker such as RabbitMQ.

Topics covered in the chapter:

  • Messaging patterns in RabbitMQ
  • Point-to-point communication
  • Publish-subscribe communication
  • Request-reply communication
  • Message router

Messaging patterns in RabbitMQ

Messaging patterns in RabbitMQ are implemented based on exchanges, queues, and the bindings between them. We can distinguish between the different approaches for implementing a design pattern with RabbitMQ:

  • For point-to-point communication between the publisher and the broker you can use a default or a direct exchange in order to deliver a message to a single queue. However, note that there might be multiple subscribers to this single queue, thus implementing publish-subscribe between the broker and the message receivers bound to that queue.
  • For publish-subscribe, we can use a fanout exchange, which will deliver a message from an exchange to all queues that are bound to this exchange; in this manner, we may have a queue-per-subscriber strategy for implementing publish-subscribe.
  • For request-response communication, we can use two separate exchanges and two queues; the publisher sets a message identifier in the message header and sends the request message to the request exchange, which in turn delivers the message to the request queue. The subscriber retrieves the message from the request queue, processes it, and sends a response message to the response exchange by also setting the same message identifier found in the request message to the response message header. The response exchange then delivers the message to a response queue. The publisher is subscribed to a response queue in order to retrieve response messages and uses the message identifier from the response message header to map the response message to the corresponding request message.
  • For message routing we can use a topic exchange in order to deliver messages based on a binding key pattern or a headers exchange based on one or more headers.

It is important to remember that AMQP 0-9-1 protocol messages are load-balanced between consumers in a round-robin fashion. In this case, if there are multiple consumers on a message queue (bound using the basic.consume AMQP protocol command) then only one of them will receive the message, signifying that we have competing consumers. The same applies for the basic.get AMQP protocol command that retrieves a message from a queue on-demand (pull style) rather than by consumption (push style). If a message arrives on a queue that has no subscribers then the message will stay in the queue until a new subscriber is bound to the queue or the message is explicitly requested using basic.get. A message can also be rejected using the basic.reject AMQP protocol command. We will illustrate each of the preceding message patterns with concrete examples in subsequent sections. Before trying out the examples, you have to include the AMQP client library for Java. If you are using Maven, you can include the following dependencies for the client library along with the slf4j dependencies that provide the slf4j logging features used to provide logging capabilities in the examples:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.4.1</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.6.1</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.6.1</version>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.16</version>
</dependency>

In order to send messages to RabbitMQ, the Sender class will be used:

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Sender {

    private final static String QUEUE_NAME = "event_queue";
    private final static Logger LOGGER =     
      LoggerFactory.getLogger(Sender.class);
    private static final String DEFAULT_EXCHANGE = "";
    private Channel channel;
    private Connection connection;
}

The initialize() method is used to initialize the message sender by doing the following:

  • Creating a ConnectionFactory that is used to create AMQP connections to a running RabbitMQ server instance; in this case, this is an instance running on localhost and accepting connections on the default port (5672)
  • Creating a new connection using the connection factory
  • Creating a new channel for sending messages in the created connection:
        public void initialize() {
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                connection = factory.newConnection();
                channel = connection.createChannel();
            } catch (IOException e) {
                LOGGER.error(e.getMessage(), e);
            }
        }

The send() method has two variants: one that accepts a message and sends it to the default queue and a second one that accepts an exchange name, exchange type, and the message to send. The first variant is appropriate for point-to-point communication and does the following:

  • Declares a queue in the message broker using the queueDeclare() method; if the queue is already created then it is not recreated by the method
  • Publishes a message on the default exchange that is delivered to that queue

The second variant of send() is appropriate for the publish-subscribe type of communication and does the following:

  • Declares the specified exchange along with its type on the message bus using the exchangeDeclare() method; the exchange is not recreated if it exists on the message bus
  • Sends a message to this exchange with a routing key equal to the empty string (we are indicating that we will not use the routing key with this variant of the method):
        public void send(String message) {
            try {
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                channel.basicPublish(DEFAULT_EXCHANGE, QUEUE_NAME, null,
                        message.getBytes());
            } catch (IOException e) {
                LOGGER.error(e.getMessage(), e);
            }
        }
        
        public void send(String exchange, String type, String message) {
            try {
                channel.exchangeDeclare(exchange, type);
                channel.basicPublish(exchange, "", null,
                        message.getBytes());
            } catch (IOException e) {
                LOGGER.error(e.getMessage(), e);
            }
        }

The destroy() method is used to close the connection and all connection channels to the message broker:

        public void destroy() {
        try {
            if (connection != null) {
                connection.close();
            }
        } catch (IOException e) {
            LOGGER.warn(e.getMessage(), e);
        }
    }
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.235.23