Chapter 3. Switching to Server-push

In the previous chapter, you learned how to connect to and get messages from RabbitMQ. Though receiving messages synchronously works perfectly, messages can be "pushed" from RabbitMQ directly to an application consumer for greater efficiency, as you'll discover in this chapter. In the process, you'll also learn how message consumers can either manually acknowledge messages or receive the messages without acknowledgements, the former allowing a zero-message loss design. Finally, you'll be acquainted with the fanout exchange, which routes messages to all queues bound to it, irrespective of the routing keys.

In this chapter, you will learn about the following topics:

  • Consuming messages from queues
  • Manually acknowledging messages
  • The fanout exchange

Moving beyond polling

Clever Coney Media is enjoying the application inbox feature that was rolled out in the previous chapter. Users enjoy it very much as well. Everything works fine except for the fact that the frontend regularly polling the backend for messages is starting to take its toll in terms of load, meaning it has begun to suffer performance degradation. Granted, the slow polling mechanism that was initially in place between the frontend and the backend was not designed to perform more than a basic ping. A better approach is needed.

CCM decides to re-architect the solution in favor of a server-push approach. The idea is to server-push messages to the users' browsers whenever a message is available, instead of regularly polling to fetch a message or, more often than not, nothing. The good news is that there is a technology perfect for this use case: WebSocket. Well supported by modern browsers, this protocol has the advantage of being full-duplex, which means that messages will be able to flow in both directions. Therefore, each frontend to backend WebSocket connection will be used both to server-push messages back to users and also for users to send messages to the server. This is illustrated in the following figure:

Moving beyond polling

WebSocket-based server-push architecture

It is important to note that both server-push and poller-based mechanisms can coexist pacifically. Indeed, CCM will keep the AJAX endpoint currently used by the poller mechanism in order to support older or less capable browsers. The idea is that if a WebSocket connection can't be established, the frontend will revert to the polling mechanism.

Note

You can learn more about the WebSocket protocol at http://tools.ietf.org/html/rfc6455.

Let's now follow CCM as it rolls out server-push.

Consuming queues

The following diagram illustrates the interactions between the client and server WebSocket peers and the RabbitMQ exchanges and queues:

Consuming queues

The client and server WebSockets connecting

In the previous diagram, the publication of messages towards the direct exchange for user-to-user messages, and the topic exchange for group messages, is not different than before. What is different is that instead of getting the messages from RabbitMQ queues, you will consume them. What's the difference? When you consume messages, you register a listener that new messages arriving in the queue will be automatically delivered to. So, unlike the synchronous basicGet operations you were performing before, you will now be using an asynchronous consumer to receive the queued messages.

Since you're still working on the CCM's application server that is in Java, you will look at RabbitMQ's Java API. Registering a queue consumer is as simple as follows:

channel.basicConsume(queue, autoAck, consumer)

Here, the consumer is an implementation of the com.rabbitmq.client.Consumer interface. The interface defines the contract between a queue consumer and RabbitMQ. It sports several methods, some related to receiving error notifications, but the main method you will focus on is the following:

void handleDelivery(String consumerTag,
                    Envelope envelope,
                    AMQP.BasicProperties properties,
                    byte[] body)
    throws IOException;

The handleDelivery method is called whenever a message is received from the queue. Hence, when this method is called, you will want to push the message back to the frontend via the WebSocket server.

Tip

The RabbitMQ Java client comes with a handy default implementation of the com.rabbitmq.client.Consumer interface named com.rabbitmq.client.DefaultConsumer. Use it and override only the methods you are interested in.

The consumer is bound to the channel that was used to consume a particular queue. If this channel is closed, the consumer will stop receiving messages. Since a channel cannot be reopened and has to be recreated from scratch, the implication is that both the channel and its consumer must be re-established in case of problems. CCM decides to tackle this problem by wrapping the consumer in a class that supports the reconnection mechanism.

Creating a consumer subscription wrapper

CCM decides to create a Subscription class to represent a user subscription to its own queue while supporting the possibility of being reconnected. Let's now progressively unfold this class and comment it as we go:

public class Subscription
{
    private final static Logger LOGGER = Logger.getLogger(Subscription.class.getName());

    private final String queue;
    private final SubscriptionDeliverHandler handler;

    private volatile DefaultConsumer consumer;

    public Subscription(final String queue, final SubscriptionDeliverHandler handler)
    {
        this.queue = queue;
        this.handler = handler;
    }

The state that the Subscription class will encapsulate consists of the following:

  • queue: This is the queue name that will be consumed to receive user messages
  • handler: This is the callback that will be called when a message arrives
  • consumer: This is the instance of the consumer, when connected to a channel, declared volatile so it can safely be re-created by another thread

At this point, you're probably wondering what the SubscriptionDeliveryHandler interface look like. It is as follows:

public interface SubscriptionDeliverHandler
{
    void handleDelivery(Channel channel,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body);
}

As you can see, the preceding code exposes only a handleDelivery method, which is very similar to the one from RabbitMQ's Consumer interface (see http://bit.ly/rmqconsumer), but provides channel instead of consumerTag. For now, let's just say that CCM doesn't need consumerTag but needs the current channel. You'll soon find out why. For now, let's keep on with our discovery of the Subscription class. First, let's look at what happens when it's starting:

public String start(final Channel channel) throws IOException
{
    consumer = null;

    if (channel != null)
    {
        try
        {
            consumer = new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(final String consumerTag,
                    final Envelope envelope,
                    final BasicProperties properties,
                    final byte[] body) throws IOException
                {
                  handler.handleDelivery(channel, envelope, properties, body);
                }

            };

            final boolean autoAck = false;
            final String consumerTag = channel.basicConsume(queue, autoAck, consumer);
            LOGGER.info("Consuming queue: " + queue + ": with tag: " + consumerTag + " on channel: "
                        + channel);

            return consumerTag;
        }
        catch (final Exception e)
        {
            LOGGER.log(Level.SEVERE, "Failed to start consuming queue: " + queue, e);
            consumer = null;
        }
    }

    return null;
}

The notable bits, hidden in Java's typical error-handling drama, are as follows:

  • On start, a fresh channel instance is provided
  • RabbitMQ's DefaultConsumer keeps a reference to this channel
  • Its handleDelivery method is directly wired to CCM's own version of handleDelivery in the configured handler
  • The automatic acknowledgment of messages is turned off (we'll discuss why soon)
  • basicConsume is the channel method in charge of establishing a consumer instance as the listener of a queue's messages
  • The consumer field is nullified if the subscription hasn't been activated

Let's delve into the stop method right away, as follows:

public void stop()
{
    final Channel channel = getChannel();
    if (channel == null)
    {
        return;
    }

    LOGGER.log(Level.INFO, "Stopping subscription: " + this);

    try
    {
        channel.basicCancel(consumer.getConsumerTag());
    }
    catch (final Exception e)
    {
        LOGGER.log(Level.SEVERE, "Failed to cancel subscription: " + this, e);
    }

    try
    {
        channel.close();
    }
    catch (final Exception e)
    {
        LOGGER.log(Level.SEVERE, "Failed to close channel: " + channel, e);
    }
    finally
    {
        consumer = null;
    }
}

There's not much to it really. With lots of fail-safe mechanisms, this method firstly cancels the active consumer so RabbitMQ stops delivering messages to it, before closing the current channel and nullifying the consumer field. Note that the current channel is extracted from the current consumer via the getChannel method as follows:

public Channel getChannel()
{
    return consumer == null ? null : consumer.getChannel();
}

You're almost done with this class. Let's take a look at the last two methods:

@Override
protected void finalize() throws Throwable
{
    stop();
}

@Override
public String toString()
{
    final ToStringHelper tsh = Objects.toStringHelper(this).addValue(hashCode()).add("queue", queue);
    if (consumer != null)
    {
        tsh.add("channel", getChannel());
        tsh.add("consumerTag", consumer.getConsumerTag());
    }
    return tsh.toString();
}

The finalize method is overridden to ensure that the subscription is closed if for any reason, the class gets garbage collected before stop is properly called. Because the stop method is idempotent, it's fine to call it several times. The toString method is overridden to provide a nice textual rendering of the Subscription class.

Tip

Good production-grade systems produce meaningful log entries; strive to give enough contexts when you log events in order to simplify forensics when something goes wrong, or during development to allow you to trace the execution across multiple classes and threads.

The Subscription class by itself is not enough to ensure the robustness of the system because it doesn't contain any reconnection logic. Therefore, its instances must be "babysat" by an external entity. Let's see how this is done.

Babysitting subscriptions

Being the channel factory of your application, the RabbitMqManager class is the natural factory for Subscription instances. Because it creates the subscription and because it takes care of handling connection issues and reconnections, the RabbitMqManager class is the most appropriate entity for babysitting Subscription instances. The following is how subscriptions are created:

private final Set<Subscription> subscriptions;
 
public RabbitMqManager(final ConnectionFactory factory)
{
    // ... existing code omitted

    subscriptions = synchronizedSet(new HashSet<Subscription>());
}

public Subscription createSubscription(final String queue, final SubscriptionDeliverHandler handler)
{
    final Subscription subscription = new Subscription(queue, handler);
    subscriptions.add(subscription);
    startSubscription(subscription);
    return subscription;
}

private void startSubscription(final Subscription subscription)
{
    final Channel channel = createChannel();

    if (channel != null)
    {
        try
        {
            subscription.start(channel);
        }
        catch (final Exception e)
        {
            LOGGER.log(Level.SEVERE, "Failed to start subscription: " + subscription + " on channel: "
                                     + channel, e);
        }
    }
}

What is interesting to note is that whether the start operation succeeds or not, a Subscription instance will be provided to the caller of createSubscription. This opens the door for graceful and transparent reconnections. So how does reconnection actually work? If you remember from the previous chapter, it's the start method of the RabbitMqManager that gets called when a reconnection attempt occurs. The only change that was needed to this method was to add a call to restartSubscriptions, which is reproduced after the following:

private void restartSubscriptions()
{
    LOGGER.info("Restarting " + subscriptions.size() + " subscriptions");

    for (final Subscription subscription : subscriptions)
    {
        startSubscription(subscription);
    }
}

That's it. You can now tie the WebSocket server endpoint with the subscription mechanism.

Tying into the WebSocket endpoint

You first need to refactor the UserMessageManager class to expose variants of sendUserMessage and sendTopicMessage that take a channel argument. Indeed, since you will have an active channel associated with a subscription, you will use it not only to consume messages, but also to produce them.

Tip

Channels are full duplex, which means that one channel can be used for both publishing and consuming messages.

On top of these basic refactorings, you also need to add the following to the UserMessageManager method to allow creating a subscription for a particular user inbox:

public Subscription subscribeToUserInbox(final long userId, final SubscriptionDeliverHandler handler)
{
    final String queue = getUserInboxQueue(userId);
    return rabbitMqManager.createSubscription(queue, handler);
}

CCM uses a JSR-356-compliant implementation of server-side WebSocket in its Java application backend. In this model, an application has to expose WebSocket endpoints, so CCM will create one endpoint dedicated to user messaging.

Note

Here is a good introduction to JSR-356, the Java API for WebSocket: http://www.oracle.com/technetwork/articles/java/jsr356-1937161.html.

We will look at the main methods of UserMessageServerEndpoint, keeping in mind that WebSocket authentication will not be discussed. Let's first look at what happens when a user browser connects to the WebSocket server:

@OnOpen
public void startSubscription(@PathParam("user-id") final long userId, final Session session)
{
    session.setMaxIdleTimeout(0);

    final Subscription subscription = userMessageManager.subscribeToUserInbox(userId,
        new SubscriptionDeliverHandler()
        {
            @Override
            public void handleDelivery(final Channel channel,
              final Envelope envelope,
              final BasicProperties properties,
              final byte[] body)
            {
              try
              {
                final String contentEncoding = properties.getContentEncoding();
                session.getBasicRemote().sendText(new String(body, contentEncoding));
                channel.basicAck(envelope.getDeliveryTag(), false);
                }
                catch (final Exception e)
                {
                  LOGGER.log(Level.SEVERE,
                    "Failed to push over websocket message ID: " + properties.getMessageId(), e);

                  try
                  {
                      final boolean requeue = true;
                      channel.basicReject(envelope.getDeliveryTag(), requeue);
                  }
                  catch (final Exception e2)
                  {
                      LOGGER.log(Level.SEVERE,
                        "Failed to reject and requeue message ID: " + properties.getMessageId(), e);
                  }
                }
            }
        });

    session.getUserProperties().put(RABBITMQ_SUBSCRIPTION, subscription);
}

The important aspect of this method is that it uses userMessageManager received by the dependency injection, to subscribe SubscriptionDeliverHandler that is in charge of sending the messages consumed from the user-specific queue on WebSocket. Do you see how the channel instance passed in CCM's custom handleDelivery method comes handy? It is required needed to perform manual message acknowledgement with the basicAck channel action. You perform this single message acknowledgment if and only if the sendText WebSocket operation has succeeded (that is, it didn't throw an exception). Otherwise, you use basicReject to actively reject and requeue the delivered message. If you don't acknowledge and fail to reject a message, the RabbitMQ broker will eventually redeliver the message once the subscription is re-established with a new channel.

Tip

Use manual acknowledgment if there is a risk that the processing of a message may fail and you want the broker to eventually redeliver it. Redelivery of unacknowledged messages doesn't happen immediately unless the basicReject or basicRecover channel actions are used. With the automatic acknowledgment mode, it's impossible to reject messages or recover channels.

Also, note how the session's user properties are used to store the subscription so that it can be used in other methods. Indeed, you are required to gracefully terminate the subscription in case of disconnection of the WebSocket, as shown in the following code:

@OnClose
public void stopSubscription(final Session session)
{
    final Subscription subscription = (Subscription) session.getUserProperties().get(
        RABBITMQ_SUBSCRIPTION);

    if (subscription != null)
    {
        subscription.stop();
    }
}

Of course, the Subscription instance is also required to publish messages because, as you may remember, it acts as a channel provider, as shown in the following code:

@OnMessage
public void publishMessage(final String jsonMessage, final Session session)
    throws IOException, EncodeException
{
    final Subscription subscription = (Subscription) session.getUserProperties().get(
        RABBITMQ_SUBSCRIPTION);

    final Channel channel = subscription == null ? null : subscription.getChannel();
    if (channel == null)
    {
        LOGGER.log(Level.SEVERE, "No active channel to dispatch message: " + jsonMessage);
        return;
    }

    // inspect the message to find out where to route it
    final Message message = OBJECT_MAPPER.readValue(jsonMessage, Message.class);
    if (message.getAddresseeId() != null)
    {
        userMessageManager.sendUserMessage(message.getAddresseeId(), jsonMessage, channel);
    }
    else if (!Strings.isNullOrEmpty(message.getTopic()))
    {
        userMessageManager.sendTopicMessage(message.getTopic(), jsonMessage, channel);
    }
    else
    {
        LOGGER.log(Level.SEVERE, "Received unroutable message: " + jsonMessage);
    }
}

Did you see how you now use the sendUserMessage and sendTopicMessage method variants that take a channel as a third argument? There is now no reason to keep using the somewhat wasteful method of creating and closing the channel each time since you now have access to a channel, which itself benefits from the reconnection mechanism you've created.

CCM is now ready to activate server-push for its user messaging feature!

Running the application

CCM has tested whether the server-push mechanism successfully withstands connection issues with the RabbitMQ broker. Its application can start even if it can't connect to the broker, and it can recover if the broker is restarted.

Tip

The robustness of a distributed system isn't the sole responsibility of one actor but a combination of the effort of all its members. No matter how highly available one of its members is, accounting for its potential failure in other members that depend on it will ensure a smooth ride and avoid the proverbial wake-up call.

Let's take a glance at the management console to see how switching to server-push has affected things. The following figure shows the channel's view:

Running the application

Active consumers keeping channels open

Remember how with the polling approach, where channels were open and closed very quickly, no channel was visible on the view. Now, because each consumer keeps its channel open, you can see active channels in the management console. You can also see the associated usage rates.

Note

There is no logical limit to the number of channels a RabbitMQ broker can handle; the limiting factors are the available memory on the broker as each channel mobilizes memory, and the actual network bandwidth available for pushing messages on all these channels.

Now, let's take a look at the queue view of the management console:

Running the application

Manual acknowledgements showing up in rates

Did you see how the ack column shows a non-zero rate? This is because you are now using manual acknowledgment. Thus, the RabbitMQ client now sends ack messages over the wire to the broker. This definitely has a cost in terms of bandwidth usage and general performance; however, if in your case, you value the guarantee of successful message processing over speed, it is perfectly acceptable.

After hearing about the resounding success of server-pushed user messages, the customer support team at Clever Coney Media came up with a new requirement: being able to message all users in the system. Let's see how this new feature can be implemented!

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.6.243