As a robust messaging solution, RabbitMQ provides different utilities for distributing messages between endpoints in the communication channel. These utilities provide an implementation of best practices and design patterns that apply to messaging solutions and form the backbone of a messaging broker such as RabbitMQ.
Topics covered in the chapter:
Messaging patterns in RabbitMQ are implemented based on exchanges, queues, and the bindings between them. We can distinguish between the different approaches for implementing a design pattern with RabbitMQ:
topic
exchange in order to deliver messages based on a binding key pattern or a headers
exchange based on one or more headers.It is important to remember that AMQP 0-9-1 protocol messages are load-balanced between consumers in a round-robin fashion. In this case, if there are multiple consumers on a message queue (bound using the basic.consume
AMQP protocol command) then only one of them will receive the message, signifying that we have competing consumers. The same applies for the basic.get
AMQP protocol command that retrieves a message from a queue on-demand (pull style) rather than by consumption (push style). If a message arrives on a queue that has no subscribers then the message will stay in the queue until a new subscriber is bound to the queue or the message is explicitly requested using basic.get
. A message can also be rejected using the basic.reject
AMQP protocol command. We will illustrate each of the preceding message patterns with concrete examples in subsequent sections. Before trying out the examples, you have to include the AMQP client library for Java. If you are using Maven, you can include the following dependencies for the client library along with the slf4j
dependencies that provide the slf4j logging features used to provide logging capabilities in the examples:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency>
In order to send messages to RabbitMQ, the Sender
class will be used:
import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Sender { private final static String QUEUE_NAME = "event_queue"; private final static Logger LOGGER = LoggerFactory.getLogger(Sender.class); private static final String DEFAULT_EXCHANGE = ""; private Channel channel; private Connection connection; }
The initialize()
method is used to initialize the message sender by doing the following:
ConnectionFactory
that is used to create AMQP connections to a running RabbitMQ server instance; in this case, this is an instance running on localhost and accepting connections on the default port (5672)public void initialize() { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } }
The send()
method has two variants: one that accepts a message and sends it to the default queue and a second one that accepts an exchange name, exchange type, and the message to send. The first variant is appropriate for point-to-point communication and does the following:
queueDeclare()
method; if the queue is already created then it is not recreated by the methodThe second variant of send()
is appropriate for the publish-subscribe type of communication and does the following:
exchangeDeclare()
method; the exchange is not recreated if it exists on the message buspublic void send(String message) { try { channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish(DEFAULT_EXCHANGE, QUEUE_NAME, null, message.getBytes()); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } } public void send(String exchange, String type, String message) { try { channel.exchangeDeclare(exchange, type); channel.basicPublish(exchange, "", null, message.getBytes()); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } }
The destroy()
method is used to close the connection and all connection channels to the message broker:
public void destroy() { try { if (connection != null) { connection.close(); } } catch (IOException e) { LOGGER.warn(e.getMessage(), e); } } }
3.143.235.23