Chapter 5. Tweaking Message Delivery

While reading the previous chapters, you may have wondered about the fate of messages that are stuck in queues forever. You may even have decided to test the usage of the basic message property named expiration. It's now time to actually tackle the notion of message time-to-live thoroughly. You may also have been wondering whether there was an option to prevent messages that target inexistent queues from being silently dropped. That's also an important question we'll discuss in this chapter.

In this chapter, we will discuss the following topics:

  • Message time-to-live
  • Dead-letter exchanges and queues
  • Mandatory delivery
  • Returned message handling

Handling dead letters

Things are going very well at Clever Coney Media. The user messaging feature gets traction as more and more users learn how to use it. After a few months of activity, one thing becomes clear though: some users don't log in to the application often, which leads to messages piling up in their inbox queues. Though the amount of data is not detrimental (yet), the idea of having messages lying around in queues, potentially forever, is not satisfactory. Imagine users logging in after a couple of weeks of vacation and being flooded with obsolete messages—this is the negative type of user experience that CCM is keen on avoiding.

CCM decides to address this by specifying a new rule: after one week, any user message not delivered will be either:

  • E-mailed to the user if it's a direct user-to-user message and if the user has opted for an e-mail fallback
  • Discarded if it's a topic or a public address message

So, users turn to RabbitMQ to find out what is offered in terms of message expiration. It appears that the following options are possible:

  • Using the standard AMQP message expiration property for published messages
  • Using a custom RabbitMQ extension that allows users to define a message time-to-live (TTL) per queue
  • Using a custom RabbitMQ extension that allows users to define a TTL for the queue itself

The first option is interesting because it is a standard AMQP option; however, after reading more about how it is supported in RabbitMQ, it turns out that those messages are only discarded when consumed. Even if expired, they would still sit in the queue, which would defeat the purpose of what they're trying to achieve. CCM rules out the last option because we do not want the queue to be deleted. This leaves the second option: you will configure each user inbox queue with a TTL, which will be enforced by RabbitMQ whether the queue is being consumed or not.

This is all fine and dandy, but what actually happens to messages when they expire? Remember that you want to consume these messages in order to e-mail them. So, how can you achieve this? This is where RabbitMQ's Dead Letter Exchange (DLX) comes handy. In messaging parlance, a dead letter is a message that can't be delivered, either because its intended target fails to be achieved or because it expires (typically, a message property indicates the exact failure reason). Thus, in your case, messages that reach their TTL will become dead letters. RabbitMQ offers the option to automatically route these dead letters to a specific exchange, a so-called dead letter exchange. Since you want to receive messages sent to this exchange, you will have to bind a queue to it, consume it, and log received messages. This queue will act as what's known as a Dead Letter Queue (DLQ), the ultimate destination of dead letters. The following diagram illustrates the overall architecture that CCM intends to roll out.

Handling dead letters

Dead letter handling architecture

What's notable in this diagram is that when they expire, messages published to the DLX use the original routing key they had when they were delivered to a user inbox queue. This behavior can be modified as RabbitMQ allows the definition of a specific routing key to use when messages are published to the DLX. You're happy with the default behavior; the original routing key is an interesting bit of information you'd like to use in order to find out the ID of the concerned user. Therefore, you've made the DLX exchange a fanout one in order to have all messages routed in the DLQ, whatever their original routing key could have been.

The battle plan is ready. It's now time to roll it out!

Refactoring queues

The first step to roll out this architecture consists of configuring the user inbox queues with the desired TTL of one week and a DLX equal to "user-dlx". Using the RabbitMQ extensions to AMQP, this can be achieved by respectively defining the "x-message-ttl" and "x-dead-letter-exchange" arguments when declaring the queue.

You could be tempted to jump right to your code editor and modify the declareUserMessageQueue method to use the following arguments:

arguments.put("x-message-ttl", TimeUnit.DAYS.toMillis(7L));
arguments.put("x-dead-letter-exchange", USER_DL_EXCHANGE);

However, this would be wrong at several levels. The main issue is that you would be changing the declaration from a queue with no arguments to one with two arguments. Remember our discussion in Chapter 2, Creating an Application Inbox, queue (or exchange) declaration is idempotent only if all the parameters used are the same. Any discrepancy in the declaration will yield an exception and will be punished with an immediate channel termination!

Tip

Cultivating a "Spidey sense" for breaking changes in queues and exchange declarations will save you the unpleasant experience of repeated errors and the mass extinction of channels.

The other problem is that this change will only apply when users log in. Indeed, this is when we declare the user inbox queue. This would not fulfill our requirement to apply our expiration rule to all existing queues independent of user actions. Finally, another thing to consider is that if these properties were configured at the queue declaration level, any change to one of them will require deleting and recreating all the queues. Clearly, the TTL and DLX configurations are cross-cutting concerns and should rather be configured in a more global fashion. Is that even possible?

The answer is yes! RabbitMQ has a simple and elegant solution to this problem called policies. RabbitMQ supports policies that define specific behaviors and that can be applied to queues or exchanges. Policies are applied not only when a queue or exchange is declared, but also to an existing queue or exchange. Both queue message TTL and dead letter exchange are configurable via policies, but only a single policy can apply to a queue or exchange. So, you will craft a policy that combines both TTL and DLX settings and apply it to all user inbox queues. This cannot be achieved via the AMQP protocol, so you can't do this using the RabbitMQ client. You'll instead use the powerful command-line tools provided with RabbitMQ (should you want to do it by code, the management REST API would be your friend). This strategy to refactor the existing queues is achieved with the following single command-line operation:

$ sudo rabbitmqctl set_policy -p ccm-dev-vhost Q_TTL_DLX "user-inbox.d+" '{"message-ttl":604800000, "dead-letter-exchange":"user-dlx"}' --apply-to queues

Let's take some time to dissect the preceding command:

  • sudo rabbitmqctl set_policy: This part of the command uses the set_policy control command
  • -p ccm-dev-vhost: This part of the command applies the message to the development virtual host
  • Q_TTL_DLX: This part of the command names the message so that we understand it pertains to queue time-to-live and dead letter exchange
  • "user-inbox.d+": This part of the command uses some regex fu to apply the entire command to the user inbox queues only by selecting them by name
  • '{"message-ttl":604800000, "dead-letter-exchange":"user-dlx"}': This part of the command uses a policy definition composed of a TTL of seven days in milliseconds and the name of the DLX
  • --apply-to queues: This part of the command ensures that this policy is only applied to queues, which is somewhat redundant with the regex, but acts as a safety net because it selects RabbitMQ entities by type instead of name

So here we go! You can run this command and life will be peachy. Wait a second! At this time, you haven't created the "user-dlx" exchange and you haven't bound the "user-dlq" queue to it yet. If you apply this policy right now, you will have seven days to roll out the missing exchange and queue. Sure, this is plenty of time, but smart developers don't like to work against the clock if they can avoid it.

Since you're smart, you're not going to run this command right now. Instead, you'll first create the infrastructure in charge of dealing with the dead letters and roll it out to our application. Then and only then will you apply the "Q_TTL_DLX" policy.

Undertaking messages

You need to create the necessary infrastructure to deal with expired messages, which means you need to do the following:

  1. Declare the user-dlx fanout exchange.
  2. Declare the user-dlq queue and bind it to the user-dlx fanout.
  3. Create a subscriber of the user-dlq queue that consumes and e-mails the dead letters.

To implement this behavior, you will add extra code to the onApplicationStart method of the UserMessageManager class. First, you'll add the following code to create the exchange and bind the queue to it:

static final String USER_DL_EXCHANGE = "user-dlx";
static final String USER_DL_QUEUE = "user-dlq";

rabbitMqManager.call(new ChannelCallable<BindOk>()
{
    @Override
    public String getDescription()
    {
        return "Declaring dead-letter exchange: " + USER_DL_EXCHANGE + " and queue: " + USER_DL_QUEUE;
    }

    @Override
    public BindOk call(final Channel channel) throws IOException
    {
        final boolean durable = true;
        final boolean autoDelete = false;

        final String exchange = USER_DL_EXCHANGE;
        final String type = "fanout";
        final Map<String, Object> arguments = null;

        channel.exchangeDeclare(exchange, type, durable, autoDelete, arguments);

        final String queue = USER_DL_QUEUE;
        final boolean exclusive = false;
        channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);

        final String routingKey = "";
        return channel.queueBind(queue, exchange, routingKey);
    }
});

As you can see, this is just a standard fanout exchange declaration and the related queue declaration and binding. You used the same logic while implementing the public address system in Chapter 3, Switching to Server-push. Now let's look at the following consumer code for this queue, the code that you're also adding to the onApplicationStart method:

rabbitMqManager.createSubscription(USER_DL_QUEUE, new SubscriptionDeliveryHandler()
{
    @Override
    public void handleDelivery(final Channel channel,
                               final Envelope envelope,
                               final BasicProperties properties,
                               final byte[] body)
    {
        @SuppressWarnings("unchecked")
        final List<Map<String, LongString>> deathInfo = (List<Map<String, LongString>>) properties.getHeaders().get("x-death");

    if(deathInfo.get(0).get("exchange").toString().equals("user-inboxes"))
        {
            final long userId = Long.valueOf(StringUtils.substringAfter(envelope.getRoutingKey(), "user-inbox."));

            final String contentEncoding = properties.getContentEncoding();

            try
            {
                final String jsonMessage = new String(body, contentEncoding);
                userManager.handleDeadMessage(userId, jsonMessage);
            }
            catch (final UnsupportedEncodingException uee)
            {
                LOGGER.severe("Failed to handle dead message: " + envelope.getRoutingKey() + ", encoding: " + contentEncoding + ", entry: " + Base64.encodeBase64(body));
            }
        }

        try
        {
            final boolean multiple = false;
            channel.basicAck(envelope.getDeliveryTag(), multiple);
        }
        catch (final IOException ioe)
        {
            LOGGER.severe("Failed to acknowledge: " + ToStringBuilder.reflectionToString(envelope, ToStringStyle.SHORT_PREFIX_STYLE));
        }
    }
});

There's a lot happening here, so let's take some time to focus on the important aspects. The overall structure of the method should look familiar. Indeed, you're reusing the same subscription management feature you've created to consume user messages in the WebSocket (refer to Chapter 3, Switching to Server-push). Hurray to code reusage!

You may be puzzled by the very first line of code in the handle method. We create a deathInfo variable by fetching a message header named x-death. Do you remember we said messages sent to the DLX can retain their original routing key? Well, there's something else that happens to them: RabbitMQ injects a custom header name x-death, which contains extra contextual information about the cause of death. This extra header is a key-value map with the following entries:

  • queue: This indicates the queue name where the message was stored before it expired
  • exchange: This indicates the exchange that this message was sent to
  • reason: This indicates whether the message is rejected, the TTL for the message has expired, or the queue length limit is exceeded
  • time: This indicates the date and time when the message was dead lettered
  • routing keys: This indicates all the routing keys associated with the message (RabbitMQ supports multiple routing keys in an extension to AMQP known as the sender-selected destination, which is beyond the scope of this book and is fully documented at http://www.rabbitmq.com/sender-selected.html)

With this map in hand, you can get the original exchange and compare it to see if it's the user-inboxes one. In this way, you will only trigger user-specific logic to deal with dead messages for user-to-user messages. All other messages are just directly acknowledged after being consumed, effectively draining the DLX until it's empty. The user ID is extracted from the routing key in order to call the userManager.handleDeadMessage method in charge of e-mailing the message to the user if he or she has opted for it.

Note that the reason of death could be used to further filter messages. Here you've assumed only expired ones will hit the DLQ; however, in the future, you may roll out new policies that could make messages die for other reasons, such as the incapacity to be delivered.

Tip

Extracting the user ID from the routing key is borderline hackish. A cleaner approach would consist of adding the target user ID in a custom header for user-to-user messages.

Finally, pay attention to how the message bytes get logged when they can't properly be decoded to a string. They're encoded in base 64, which is always possible, and logged alongside the encoding, providing you with enough information to understand the issue.

Tip

Make your life easier and log enough contextual data when an exception occurs. Always consider what information you'll need if you need to perform forensics for a particular exception.

After rolling out this code to your application servers, you will see that the dead letter exchange and queue have been correctly created. Now you can set the "Q_TTL_DLX" policy, as shown in the following code:

$ sudo rabbitmqctl set_policy -p ccm-dev-vhost Q_TTL_DLX "user-inbox.d+" '{"message-ttl":604800000, "dead-letter-exchange":"user-dlx"}' --apply-to queues
Setting policy "Q_TTL_DLX" for pattern "user-inbox\.\d+" to "{"message-ttl":604800000, "dead-letter-exchange":"user-dlx"}" with priority "0" ...
...done.

After running this script, you can use the management console to see what's been changed on the user inbox queue definitions. The following screenshot shows a few of these queues:

Undertaking messages

The Q_TTL_DLX policy is applied to all user inbox queues

As you can see in the following screenshot, it's clearly visible that the Q_TTL_DLX policy has been applied to user inbox queues, while other queues such as the user-dlq haven't been affected. In the management interface, let's click on the Admin tab and then the Policies tab (on the right). Notice how the custom policy is visible in the following screenshot:

Undertaking messages

The Q_TTL_DLX details are visible in the management console

At this point, any message created and that will stay for more than seven days in a user queue will be unmercifully moved to the DLQ, consumed, potentially e-mailed, and buried for real! But what should be done with the existing messages that were created before you rolled out the policy? There is, unfortunately, no out-of-the-box solution to this problem, so you will have to take a somewhat drastic measure—you will purge all the queues that are not empty and that have no active subscribers. This is rough, but is your only way to get out of the current conundrum. Moreover, it's a solution you can easily implement with a simple script.

So far, we've been using the rabbitmqctl script to manage our RabbitMQ broker. You need to install a new script that comes bundled with the management console you installed in Chapter 1, A Rabbit Springs to Life. This script called rabbitmqadmin can be downloaded by simply browsing a particular URL of the management interface, namely http://localhost:15672/cli/. After following the displayed download instructions, install the script in a location that makes it available to all users (typically, /usr/local/bin on a Linux machine).

Note

More information on the rabbitmqadmin script can be found at http://www.rabbitmq.com/management-cli.html.

You can now create a script that will drop all consumerless queues that are not empty, as shown in the following code:

#!/bin/bash

queues_to_purge=`rabbitmqctl list_queues -p ccm-dev-vhost name messages_ready consumers | grep "user-inbox.[[:digit:]]+[[:space:]]+[1-9][[:digit:]]*[[:space:]]+0" | awk '{ print $1}'`

for queue in $queues_to_purge ; do
    echo -n "Purging $queue ... "
    rabbitmqadmin -V ccm-dev-vhost -u ccm-admin -p hare123 purge queue name=$queue
done

Notice that you used both rabbitmqctl and rabbitmqadmin to achieve your goal, the former having the capacity to list specific attributes of queues in a way that's easy to parse, the latter having the capacity to purge queues. After executing this script as a super user, the state of the RabbitMQ broker is where you wanted it and your TTL and DLX policy will keep it that way in the long run!

Sending this message to the e-mail bridge gives a new idea to the customer support team.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.144.56