Chapter 2. Creating an Application Inbox

Applications that need to use RabbitMQ need to establish a permanent connection to it. When this connection is established, logical channels can be created and message-oriented interactions, such as publishing and getting messages, can be performed. After learning these fundamentals, you'll learn how exchange-routing strategies determine how messages are delivered to queues. In particular, you will learn about direct exchange, which delivers messages to a single queue, and topic exchange, which delivers messages to multiple queues based on pattern-matching routing keys.

In this chapter, we will discuss the following topics:

  • Establishing a solid connection to RabbitMQ
  • Working with channels
  • Publishing messages to RabbitMQ
  • Getting messages from RabbitMQ
  • Direct and topic exchanges

Connecting to RabbitMQ

Before delving into the code, let's quickly summarize what Clever Coney Media (CCM) wants to achieve with RabbitMQ. As said in the previous chapter, it wants to add an application inbox to allow users of its web application to send messages to each other. The expected user experience is more like that of an e-mail than instant messaging, though messages will be transient by definition; once received, it will not be possible to read it again. Thus, message queuing is a perfect match for it; each user will have a dedicated message queue where messages will wait until retrieval.

The following diagram illustrates the architecture CCM has in place and where RabbitMQ will fit in:

Connecting to RabbitMQ

CCM's main application architecture

From what you've learned in Chapter 1, A Rabbit Springs to Life, you need to establish a physical (network) connection between the application servers and RabbitMQ, which will multiplex many logical channels. Unlike creating channels, creating connections is a costly operation, very much like it is with database connections. Typically, database connections are pooled, where each instance of the pool is used by a single execution thread. AMQP is different in the sense that a single connection can be used by many threads through many multiplexed channels. Thus, establishing a single long-lived connection between each application server and RabbitMQ should be enough to support the needs of this new feature, until it's proven that it gets saturated by traffic and multiple connections become a necessity.

So for now, CCM will start with a single connection. Since the Rich Internet Application is written in Java, we will discover this client API first. According to the documentation, connecting to RabbitMQ is as simple as the following:

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection connection = factory.newConnection();

This seems easy enough, but CCM is worried about writing production-grade code, that is, code that can gracefully handle failures. What if RabbitMQ is not running? Clearly, it does not want to take their whole application down if this happens. What if RabbitMQ needs a restart? They also want their application to recover gracefully when that occurs. In fact, they want their application to keep functioning whether the whole messaging subsystem is working or not. The user experience will be altered accordingly.

In summary, the behavior CCM aims at is the following:

  • The application should start whether a connection to RabbitMQ can be established or not
  • If the connection to RabbitMQ is lost, it should reconnect by itself
  • If the connection is down, sending or fetching messages should fail gracefully

Note

There are ready-made libraries that wrap the RabbitMQ client and make all this possible, such as Spring AMQP (http://projects.spring.io/spring-amqp), Mule AMQP (http://www.mulesoft.org/connectors/amqp-connector), or Beetle (https://github.com/xing/beetle). CCM wants to learn the basics and underlying mechanisms for itself and so do we; hence, we will not use any of them in this book. Consider using them in your projects.

Let's now detail the implementation of the RabbitMqManager class created to reify this behavior. You will discover it piece by piece and comment it as you go, as follows:

public class RabbitMqManager implements ShutdownListener
{
    private final static Logger LOGGER = Logger.getLogger(RabbitMqManager.class.getName());

    private final ConnectionFactory factory;
    private final ScheduledExecutorService executor;
    private volatile Connection connection;

    public RabbitMqManager(final ConnectionFactory factory)
    {
        this.factory = factory;
        executor = Executors.newSingleThreadScheduledExecutor();
        connection = null;
    }

The goal of the RabbitMqManager class is to babysit a single connection to RabbitMQ. Therefore, it keeps a single reference to a Connection instance with a null value, meaning it is not connected. Because reconnection attempts will be made asynchronously, in order to avoid mobilizing a thread of the main application, an executor is created to be able to run asynchronous tasks. The Connection variable is declared volatile so that it is visible to all threads at all time.

This manager attempts connecting only when its start method is called, so let's now look into the following code:

public void start()
{
    try
    {
        connection = factory.newConnection();
        connection.addShutdownListener(this);
        LOGGER.info("Connected to " + factory.getHost() + ":" + factory.getPort());
    }
    catch (final Exception e)
    {
        LOGGER.log(Level.SEVERE, "Failed to connect to " + factory.getHost() + ":" + factory.getPort(), e);
        asyncWaitAndReconnect();
    }
}

What's notable about this code is that it registers the RabbitMqManager class itself as a listener for connection shutdown events so that its shutdownCompleted method (which we'll discuss in a moment) is called when something bad happens to the connection. It also deals with a connection failure on start by calling asyncWaitAndReconnect, a method that we'll look at right now in the following code:

private void asyncWaitAndReconnect()
{
    executor.schedule(new Runnable()
    {
        @Override
        public void run()
        {
            start();
        }
    }, 15, TimeUnit.SECONDS);
}

As you can see, this method simply schedules a restart of the whole RabbitMqManager class to happen in 15 seconds. Why the wait? The main reason is that you want to avoid thrashing on reconnection attempts; there's no point in retrying a reconnection too fast. In fact, a simple exponential back-off strategy could easily be bolted on this code. Let's now look at the following method called by the RabbitMQ Java client when something goes sour with the connection:

@Override
public void shutdownCompleted(final ShutdownSignalException cause)
{
    // reconnect only on unexpected errors
    if (!cause.isInitiatedByApplication())
    {
        LOGGER.log(Level.SEVERE, "Lost connection to " + factory.getHost() + ":" + factory.getPort(),
            cause);

        connection = null;
        asyncWaitAndReconnect();
    }
}

The important aspects here are that we only try a reconnection if the connection shutdown was not initiated by the application, which happens on a normal application termination, and that we reconnect asynchronously in order to avoid mobilizing the RabbitMQ client thread that called the shutdownCompleted method. What's left to look at is the stop method that's used to cleanly terminate RabbitMqManager as follows:

public void stop()
{
    executor.shutdownNow();

    if (connection == null)
    {
        return;
    }

    try
    {
        connection.close();
    }
    catch (final Exception e)
    {
        LOGGER.log(Level.SEVERE, "Failed to close connection", e);
    }
    finally
    {
        connection = null;
    }
}

Again, nothing complex here. After issuing a termination of the executor in charge of running the reconnection attempts, the connection itself is cleanly disposed of; all this in the context of Java's verbose but mandatory exception-handling mechanism. With this in place, connecting to RabbitMQ still looks very much like the example from the documentation, but now with robustness mixed in as follows:

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("ccm-dev");
factory.setPassword("coney123");
factory.setVirtualHost("ccm-dev-vhost");
factory.setHost("localhost");
factory.setPort(5672);

RabbitMqManager connectionManager = new RabbitMqManager(factory);
connectionManager.start();

Establishing a connection is the basis for doing anything with RabbitMQ; however, the real work happens in channels. Let's see what CCM came up with in that matter.

Working with channels

The Channel instances are created by the Connection object; therefore, the logical location to place the channel creation logic is in RabbitMqManager, as follows:

public Channel createChannel()
{
    try
    {
        return connection == null ? null : connection.createChannel();
    }
    catch (final Exception e)
    {
        LOGGER.log(Level.SEVERE, "Failed to create channel", e);
        return null;
    }
}

Again, this is quite simple; if anything goes awry when creating a channel, the method returns null. This is in line with what CCM desires, to shield the application from any RabbitMQ-related failures. Instead of dealing with exceptions coming from the messaging subsystem, it will just have to deal with potential null values. In the same spirit, the disposal of channels is delegated to a method that takes care of potential exceptions as follows:

public void closeChannel(final Channel channel)
{
    // isOpen is not fully trustable!
    if ((channel == null) || (!channel.isOpen()))
    {
        return;
    }

    try
    {
        channel.close();
    }
    catch (final Exception e)
    {
        LOGGER.log(Level.SEVERE, "Failed to close channel: " + channel, e);
    }
}

Note that the isOpen method can't be fully trusted; another thread may close the channel after this check is done. So, the call to the close method could still fail because the channel might have closed already.

Tip

Though channel instances are technically thread safe, it is strongly recommended that you avoid having several threads using the same channel concurrently.

Realizing that the "open channel, do something with the channel, close channel" scenario may occur regularly in the code, CCM decides to support it by creating some code artifacts. It first creates an interface that defines what the contract for this pattern should be, as follows:

public interface ChannelCallable<T>
{
    String getDescription();

    T call(Channel channel) throws IOException;
}

Then, it adds a method to RabbitMqManager in order to execute such a ChannelCallable instance, as follows:

public <T> T call(final ChannelCallable<T> callable)
{
    final Channel channel = createChannel();
 
    if (channel != null)
    {
        try
        {
            return callable.call(channel);
        }
        catch (final Exception e)
        {
            LOGGER.log(Level.SEVERE, "Failed to run: " + callable.getDescription() + " on channel: " + channel, e);
        }
        finally
        {
            closeChannel(channel);
        }
    }

    return null;
}

Again, the invoker of this call method will be shielded from any error that could stem from the messaging layer; it will just receive null if something goes wrong. Notice how the ChannelCallable description is used in the log message. The guiding principle here is that you should always provide as much contextual information as possible when something goes wrong.

CCM is quite happy with its core infrastructure code. It is now able to connect to a RabbitMQ broker, open a channel, and issue a series of commands, all in a thread-safe and exception-safe manner. It's now time to build on this foundation!

Building the inbox

If you remember the discussion about AMQP in Chapter 1, A Rabbit Springs to Life, messages are published to exchanges from where they get routed to queues, ready to be consumed. A routing strategy determines which queue (or queues) the message will be routed to. The routing strategy bases its decision on a routing key (a free-form string) and potentially on message meta-information. In the case of the user-to-user messaging system considered here, one message needs to be routed to the queue acting as the inbox of the addressee. Therefore, the exchange-routing strategy that needs to be used is the direct one, which matches the destination queue name with the routing key used when the message is produced, as illustrated in the following figure:

Building the inbox

The direct exchange route messages to specific queues

To tie the messaging logic in its application, CCM will piggyback an existing polling mechanism that's already in place between the JavaScript frontend and the Java backend. This is not the most efficient approach, and in fact it will be reviewed as you'll soon find out, but it's the easiest way for it to get started and roll out the feature in the best time frame. The following figure shows how the frontend poll will be used to fetch messages from the user's inbox and a regular AJAX call will be used to send a new message. Messages themselves will be represented as JSON objects (refer to Appendix, Message Schemas, for the formal specification of these JSON messages).

They will contain meta-information such as timestamp, sender, and receiver IDs on top of the text contents of the message itself, as shown in the following diagram:

Building the inbox

The frontend/backend interactions of CCM's main application

Let's follow the code CCM has created to roll this feature out and learn about the different concepts at the same time. It's created a UserMessageManager class to encapsulate all the operations related to this particular feature:

public class UserMessageManager
{
    static final String USER_INBOXES_EXCHANGE = "user-inboxes";

    @Inject
    RabbitMqManager rabbitMqManager;

    public void onApplicationStart()
    {
        rabbitMqManager.call(new ChannelCallable<DeclareOk>()
        {
            @Override
            public String getDescription()
            {
                return "Declaring direct exchange: " + USER_INBOXES_EXCHANGE;
            }

            @Override
            public DeclareOk call(final Channel channel) throws IOException
            {
                String exchange = USER_INBOXES_EXCHANGE;
                String type = "direct";
                // survive a server restart
                boolean durable = true;
                // keep it even if nobody is using it
                boolean autoDelete = false;
                // no special arguments
                Map<String, Object> arguments = null;

                return channel.exchangeDeclare(exchange, type, durable, autoDelete, arguments);
            }
        });
    }

After receiving the RabbitMqManager instance via the dependency injection, it's created an onApplicationStart method that, as its name suggests, gets called every time the application server starts. All this method does is declare the exchange where the user-to-user messages are published to. Why do we do this on start? This is because it's a fundamental requirement of the user-to-user messaging subsystem; if the exchange doesn't exist, attempts to publish messages to it will raise exceptions.

Tip

Channels are killed by exceptions—in our case, sending to a nonexistent exchange would not only raise an exception, it will also terminate the channel where the error occurred. Any subsequent code that tries to use the terminated channel will fail too. Thus, do not be surprised to see cascades of failures when something goes wrong.

Notice that the method to create the exchange is called declare not create—this is to suggest that if the exchange already exists, it will do nothing; otherwise, it will actually create it. This is why it's safe to declare this every time the application starts. Also, it would be an overkill to do it when every message is sent, so the application start is the best time to do it.

Besides using the direct type, we also configure the durable, autoDelete, and arguments properties of the exchange. We do not want this exchange to go away after a restart of RabbitMQ, nor when it's not being used anymore; hence, the values we've used.

Tip

An exchange declaration is idempotent only if the exchange properties are the same. Trying to declare an already existing exchange with different properties will fail. Always use consistent properties in your exchange declaration. If you need to change the properties, you'll need to delete the exchange before declaring it with the new properties. The same rule applies to a queue declaration.

After creating the exchange, the next thing we want to do is to have the user inbox queue created and bound to the exchange. The following is how we do it:

public void onUserLogin(final long userId)
{
    final String queue = getUserInboxQueue(userId);

    rabbitMqManager.call(new ChannelCallable<BindOk>()
    {
        @Override
        public String getDescription()
        {
            return "Declaring user queue: " + queue + ", binding it to exchange: "
                   + USER_INBOXES_EXCHANGE;
        }
 
        @Override
        public BindOk call(final Channel channel) throws IOException
        {
            return declareUserMessageQueue(queue, channel);
        }
    });
}

private BindOk declareUserMessageQueue(final String queue, final Channel channel) throws IOException
{
    // survive a server restart
    boolean durable = true;
    // keep the queue
    boolean autoDelete = false;
    // can be consumed by another connection
    boolean exclusive = false;
    // no special arguments
    Map<String, Object> arguments = null;
    channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);

    // bind the addressee's queue to the direct exchange
    String routingKey = queue;
    return channel.queueBind(queue, USER_INBOXES_EXCHANGE, routingKey);
}

Every time a user logs in on the system, the application calls onUserLogin. After getting the addressee's queue name from getUserInboxQueue (merely "user-inbox." + userId), it then calls declareUserMessageQueue (you'll soon understand why the method is split in half). In this method, the queue is declared with an approach that's really similar to how it's done for an exchange, but with slightly different properties, as follows:

  • durable: This is true because you want the queue to stay declared even after a broker restart
  • autoDelete: This is false because you want to keep the queue even if it's not being consumed anymore
  • exclusive: This is false because you want this queue to be consumable by other connections (remember we have several application servers connected to RabbitMQ; hence, the queue will be accessed from different connections)
  • arguments: This is null because you don't need to custom configure the queue

Then the queue is bound to the exchange using its own name as the routing key so that the direct routing strategy can route messages to it. When this is done, publishing messages to the user-inboxes exchange will actually deliver messages to the user queue whose name matches the published routing key.

Tip

If no queue is bound to an exchange or if the routing strategy can't find a matching destination queue, the message published to the exchange will be discarded silently. It's possible to optionally be notified when unroutable messages are discarded, as we will see in subsequent chapters.

Again, when the same properties are used, these operations are idempotent, so we can safely declare the queue and bind it to the exchange again and again, each time a user logs in.

Sending user messages

Now let's look at the method of UserMessageManager that's in charge of sending messages:

static final String MESSAGE_CONTENT_TYPE = "application/vnd.ccm.pmsg.v1+json";
static final String MESSAGE_ENCODING = "UTF-8";

public String sendUserMessage(final long userId, final String jsonMessage)
{
    return rabbitMqManager.call(new ChannelCallable<String>()
    {
        @Override
        public String getDescription()
        {
            return "Sending message to user: " + userId;
        }

        @Override
        public String call(final Channel channel) throws IOException
        {
            String queue = getUserInboxQueue(userId);

            // it may not exist so declare it
            declareUserMessageQueue(queue, channel);

            String messageId = UUID.randomUUID().toString();

            BasicProperties props = new BasicProperties.Builder()
                .contentType(MESSAGE_CONTENT_TYPE)
                .contentEncoding(MESSAGE_ENCODING)
                .messageId(messageId)
                .deliveryMode(2)
                .build();

            String routingKey = queue;

            // publish the message to the direct exchange
            channel.basicPublish(USER_INBOXES_EXCHANGE, routingKey, props,
                jsonMessage.getBytes(MESSAGE_ENCODING));

            return messageId;
        }
    });
}

Now, you should understand why the declareUserMessageQueue method was extracted from onUserLogin:. We are calling it in sendUserMessage every time one user sends a message to another. Why on earth are we doing that? Haven't we already declared and bound the user queue on login? Well, maybe and maybe not; there is no guarantee that the addressee has ever logged into the system, so as far as the sender is concerned, it's impossible to be sure the destination queue exists. Thus, the safest path is to declare it on every message sent, bearing in mind that this declare operation is idempotent, so it will not do anything if the queue already exists. It may seem strange at first, but it's the sender's responsibility to ensure the addressee's queue exists if they want to be sure the message will not be lost.

Note

This is a common pattern with AMQP; when there is no strong happens before relationship between events, idempotent re-declaration is the way to go. Conversely, the check, then act pattern is discouraged; trying to check the pre-existence of an exchange or a queue can't give any guarantee of success in the typical distributed environment where AMQP is used.

The method for publishing a message is very simple. You call basicPublish towards the user-inboxes exchange, using the queue name as the routing key (as per the direct routing), some optional message properties, and an array of bytes that represent the actual message payload. Let's detail the message properties we've associated with the message as follows:

  • contentType: Because a message is published, thus consumed as a byte array, nothing really says what these bytes represent. Sure, in our current situation, both publishers and consumers are in the same system, in the same class too, so you could implicitly assume the content type is what we expect. This said, the reason we always specify a content type is that we want messages to be self-contained; whichever system ends up receiving or introspecting a message will know for sure what the byte array it contains represents. Moreover, by embedding a version number in the content type (application/vnd.ccm.pmsg.v1+json), we future-proof the system in case we later decide to alter the JSON representation of messages.
  • contentEncoding: You use a specific encoding (UTF-8) when you serialize string messages into byte arrays so that they can be published. Again, in order for the messages to be self-explicit, we provide all the necessary meta-information to allow reading them.
  • messageID: As you will see later in the book, message identifiers are an important aspect of traceability in messaging and distributed applications. For now, let us just say that you want each message to have a unique identifier, hence the usage of a UUID for generating such an identifier.
  • deliveryMode: This is probably the most mysterious parameter as it is set to 2. The AMQP specification defines the value for this property as follows: for Non-persistent it is set to 1 and for Persistent it is set to 2. Now it's clearer! Indeed, you want a guarantee that the RabbitMQ broker will write the message to the disk so that it won't be lost, no matter what.

Tip

Do not confuse exchange and queue durability with message persistence; non-persistent messages stored in a durable queue will be gone after a broker restart, leaving you with an empty queue.

But what would happen if the sending of the user message fails for example, if the connection with RabbitMQ is broken? In that case, the sendUserMessage class will return null and it will be up to the caller to deal with the issue. In your case, you will simply inform the end user that the messaging application is currently experiencing issues.

Note

Why would you ever use a non-persistent delivery mode? Isn't the whole point of a message broker such as RabbitMQ to guarantee that messages aren't lost? This is true, but there are circumstances where this guarantee can be relaxed. Consider a scenario where a fire hose-like publisher bombards the broker with a deluge of noncritical messages. In that case, using a non-persistent delivery would spare accessing the host machine's disk, resulting in elevated performances.

Before going any further, let's take a look at the structure of an AMQP message.

AMQP message structure

The following figure illustrates the structure of an AMQP message where you will recognize the four properties we've just used and discover a few more. Note that this figure uses the specification name of the fields; each language implementation renames them slightly so they can be valid names, for example, content-type becomes contentType in Java.

AMQP message structure

Structure of an AMQP message

Except the reserved one, all these properties are free to use and, unless otherwise specified, are ignored by the AMQP broker. In the case of RabbitMQ, the only field that is supported by the broker is the user-id field, which is validated to ensure it matches the name of the broker user that established the connection to the broker. Notice how the headers property allows you to add extra key-value pairs in case none of the standard properties fit your needs.

Fetching user messages

We can now turn our attention to the method in UserMessageManager that's in charge of retrieving messages. Remember that you're piggybacking a poll request that the frontend regularly sends to the application; therefore, you will retrieve the messages from the user inbox queue in a synchronous manner, holding the application thread in charge of dealing with the poll requests until you've removed all the pending messages from the queue. The channel method to use for this is called basicGet. Let's see it in action as follows:

public List<String> fetchUserMessages(final long userId)
{
    return rabbitMqManager.call(new ChannelCallable<List<String>>()
    {
        @Override
        public String getDescription()
        {
            return "Fetching messages for user: " + userId;
        }
        @Override
        public List<String> call(final Channel channel) 
          throws IOException
        {
            List<String> messages = new ArrayList<>();

            String queue = getUserInboxQueue(userId);
            boolean autoAck = true;

            GetResponse getResponse;

            while ((getResponse = channel.basicGet(queue, autoAck)) != null)
            {
                final String contentEncoding = getResponse.getProps().getContentEncoding();
                messages.add(new String(getResponse.getBody(), contentEncoding));
            }

            return messages;
        }
    });
}

In the preceding method, you can assume that the user queue exists, and thus, that you can safely get messages from it. This is a reasonable assumption as this method will always be called after onUserLogin has been called for the user, leading to the pre-existence of the queue. Notice how basicGet is called repeatedly until a null response is received, which means the queue is empty. Notice also how you use the content encoding from the received message properties to build a string out of the body's byte array.

One thing remains unclear: what is this autoAck flag about? AMQP brokers rely on client-side acknowledgement for the certainty that a message has been correctly received and can now be permanently destroyed from the broker storage. It is therefore up to the consumer to acknowledge a message if and only if it is done with processing, or if they are certain that there is no risk of losing it if it processes asynchronously. In our case, since the risk of losing a message is acceptable, you do not manually acknowledge messages. Instead, you inform the broker to consider them as acknowledged as soon as we get them (you'll look into manual acknowledgement further in the book).

And that is it! You now have a working user inbox ready to be tested. Sure, it is not extremely fast (we rely on polls) and is wasteful in resources (a channel is created and closed on each poll). However, all in all it works, it doesn't leak resources, and it can recover gracefully from a RabbitMQ broker restart. Let's take a look at the management console when running the application with a dozen simulated users.

Seeing it run

With the application running on a pair of servers connected to RabbitMQ, you can see the following established connections from the management console:

Seeing it run

The management console provides connection information

As expected, one connection per application server has been established. Notice how the upstream and downstream network throughputs are clearly represented. What about channels? Because they get opened and closed very quickly, it's actually hard to see any from the management console. With our current architecture, they just don't stay long enough to be rendered on the user interface of the console. So let's look at the following exchanges:

Seeing it run

The user-inbox direct exchange shows up in the management console

You can see the user exchange and the rate of messages coming in and out of it. Their being consumed as fast as they come in is a good sign as it means the current architecture is sufficient for our needs and messages are not piling up. However, what are all these other exchanges that we can see here? Clearly, we haven't created them by code, so they should be coming from somewhere. Indeed, the nameless exchange represented as (AMQP default) and all the exchanges whose names start with amq. are defined by the AMQP specification and, as such, must be provided by default by RabbitMQ. Now, what about queues? Let's have a look at them:

Seeing it run

Each user-to-user inbox queue is visible in the management console

As expected, you see one queue per user and some nifty usage statistics. Notice how the ack column is empty. This should be no surprise to you if you remember what we've said about message acknowledgement. You're receiving messages while letting RabbitMQ know we won't be acknowledging them; thus, there's no activity related to acknowledging messages!

Note

Do not fear the multiplication of queues; with enough RAM, a RabbitMQ can deal with hundreds of thousands of queues and bindings without flinching.

Confident about its architecture and implementation, CCM rolls out the user-to-user messaging subsystem to production. It's an immediate success. Users actually want a new feature added to it: the capacity to send a message to a group of users. Let's see how we're going to implement this new feature with RabbitMQ.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.186.83