Chapter 7. Taking RabbitMQ to Production

Who wants to have all their eggs in the same basket? No one of course, but this is basically what Clever Coney Media has been doing so far since it has been running a single instance of RabbitMQ in production. In this chapter, you'll learn how to address this concern using the clustering and federation features of RabbitMQ. You'll also learn how to check the pulse of the brokers and get alerts if things start turning sour.

In this chapter, you will learn about:

  • Broker clustering
  • High-availability queues
  • The federation plug cluster in
  • Monitoring RabbitMQ

Tackling the broker SPOF

So far, Clever Coney Media has been running a single instance of RabbitMQ for all its production needs. Things have been running smoothly, but it's just a matter of time until something bad happens. Though RabbitMQ brokers are extremely stable, a crash is always possible. Losing an instance altogether due to a virtual instance glitch is a likely possibility that can't be ignored if you're running in the cloud. Therefore, it is essential to tackle the broker single point of failure (SPOF) before something bad happens, to prevent losing data, annoying users, and avoiding the dreaded 2 a.m. phone calls.

The good news is that RabbitMQ provides all the necessary features to deal with this issue out of the box. Indeed, RabbitMQ can easily be configured to run in an active/active deployment, where several brokers are engaged in a cluster to act as a single highly-available AMQP middleware. The active/active aspect is essential, because it means that no manual fail-over operation is needed if one broker goes down, again sparing you a 2 a.m. phone call.

Therefore CCM decides to roll out a second RabbitMQ broker (named rmq-prod-2) and cluster it with the one it already has (named rmq-prod-1). This would lead to the architecture represented in the following diagram:

Tackling the broker SPOF

A high-availability cluster of two RabbitMQ brokers

CCM informs you when the second instance of RabbitMQ is ready that needs to be clustered with the already existing one. It has made sure that the content of the file in /var/lib/rabbitmq/.erlang.cookie is the same as in the first instance. This is required because RabbitMQ relies on Erlang's clustering feature, which allows several Erlang nodes to communicate with each other locally or over the network. The Erlang cluster requires a so-called security cookie as a means of cross-node authentication.

Tip

If your RabbitMQ instances are firewalled from each other, you'll need to open specific ports on top of the one used by AMQP (5672); otherwise, the cluster will not work. You can get more information at http://www.rabbitmq.com/clustering.html#firewall.

You do not need to configure any user or virtual host on the second node, like you did in Chapter 1, A Rabbit Springs to Life. Instead, you just have to join the cluster, and its configuration will automatically be synchronized with the existing RabbitMQ instance, including users, virtual hosts, exchanges, queues, and policies.

Tip

Keep in mind that when a node joins a cluster, it will be completely reset. All its configuration and data will be deleted before it synchronizes with the other members of the cluster.

For this, you run the following command on the second node:

$ sudo rabbitmqctl stop_app
Stopping node rabbit@rmq-prod-2 ...
...done.
$ sudo rabbitmqctl join_cluster rabbit@rmq-prod-1
Clustering node rabbit@rmq-prod-2 with rabbit@rmq-prod-1 ...
...done.
$ sudo rabbitmqctl start_app
Starting node rabbit@rmq-prod-2 ...
...done.

Tip

Make sure the same version of Erlang is used by all the RabbitMQ nodes that engage in a cluster; otherwise, the join_cluster command will fail with an OTP version mismatch error.

Similarly, the same major/minor version of RabbitMQ should be used across nodes, but patch versions can differ; this means that versions 3.2.1 and 3.2.0 can be used in the same cluster, but not 3.2.1 and 3.1.0.

After running these commands, you can check whether the cluster is active by running the cluster_status command on any node. Hereafter, you run it on the first node:

$ sudo rabbitmqctl cluster_status
Cluster status of node rabbit@rmq-prod-1 ...
[{nodes,[{disc,[rabbit@rmq-prod-2,rabbit@rmq-prod-1]}]},
 {running_nodes,[rabbit@rmq-prod-2,rabbit@rmq-prod-1]},
 {partitions,[]}]
...done.

Notice how two lists of nodes are given in the status message: the one named nodes is the list of configured nodes in the cluster, while the one named running_nodes lists the nodes that are actually active. The list of configured nodes is persistent, so it will survive a restart of the brokers. On restart, each broker will automatically re-engage with the cluster.

Tip

Spend some time getting acquainted with RabbitMQ's behavior in a split-brain (also known as network partition) situation at http://www.rabbitmq.com/partitions.html.

We've said that the entire configuration will be synchronized on the new node joining the cluster. You can confirm this by connecting to the management console on the second node. You can use the ccm-admin user to log in because it's been synchronized. As you can see in the following screenshot of the Queues view of the management console, the configuration has actually been synchronized:

Tackling the broker SPOF

All configurations are synchronized after joining the cluster

Note

If you want to add more nodes, you would only need to have each new node join one of the other nodes in the cluster. It would then discover all the other nodes in the cluster automatically (a neat feature provided by the underlying Erlang clustering mechanism).

In the management console of the first node, the Overview tab shows all the nodes that are in the cluster, as illustrated in the following screenshot:

Tackling the broker SPOF

The management console overview shows all cluster members

As you can see, all the members of the cluster are listed, including the statistics and ports they've opened (both for AMQP and the management console itself). You may be wondering what the different values shown in the Type column are. Disc means that this node persists its data to the filesystem, which is the default behavior. It's also possible to start a node as a ram node, in which case all message data will be purely stored in memory. This is an interesting approach for creating high-performance members in a cluster. Stats means the node is the one that contains the management statistics database, which is not spread across the cluster. Finally, the * indicates the node you're connected to.

Note

Nodes can be removed from the cluster, as explained in this tutorial at http://www.rabbitmq.com/clustering.html#breakup.

At this point, you're probably thinking you're done with clustering. In fact, there's one more step to perform to ensure the high availability of your queues' data.

Mirroring queues

With clustering, you ensured that the configuration gets synchronized across all RabbitMQ nodes. This means that clients can now connect to one node or the other and find the exchanges and queues they're expecting. However, there is one thing that is not carried over the cluster by default: the messages themselves. By default, queue data is local to a particular node; so if this node goes down, consumers will have to wait until it comes back to access it. This may sound strange, but it can be a perfectly acceptable scenario for messages used to track long running tasks, for example, for which having to wait for a while would not be tragic.

In your case, you want the data in the users' queues to be highly available. This can be achieved with mirrored queues. When a queue is mirrored, its instances across the network organize themselves around one master and several slaves. All interaction (message queuing and dequeuing) happens with the master; the slaves receive the updates via synchronization over the cluster. If you interact with a node that hosts a slave queue, the interaction would actually be forwarded across the cluster to the master and then synchronized back to the slave.

Activating queue mirroring is done via a policy that is applied to each queue concerned. Since only one policy at a time is allowed on a queue (or exchange), you will first have to clear the Q_TTL_DLX policy you created in Chapter 5, Tweaking Message Delivery and apply a new policy that composes the Q_TTL_DLX policy with the queue mirroring one (that is, the high-availability queue). This sounds more complicated than it is, as you can see by running the following command:

$ sudo rabbitmqctl clear_policy -p ccm-prod-vhost Q_TTL_DLX
Clearing policy "Q_TTL_DLX" ...
...done.
$ sudo rabbitmqctl set_policy -p ccm-prod-vhost HA_Q_TTL_DLX "user-.+" '{"message-ttl":604800000, "dead-letter-exchange":"user-dlx", "ha-mode":"all", "ha-sync-mode":"automatic"}' --apply-to queues
  Setting policy "HA_Q_TTL_DLX" for pattern "user-.+" to "{"ha-mode":"all", "message-ttl":604800000, "dead-letter-exchange":"user-dlx"}" with priority "0" ...
...done.

As you can see, you just added "ha-mode":"all" to the existing TTL and DLX policy rules. The all value for ha-mode means that the queues will be mirrored across all nodes in the cluster, which is exactly what you want for your two-node cluster. Other options are exactly and nodes, which allow specifying a number or nodes and a list of node names in an extra ha-params parameter respectively.

The ha-sync-mode parameter is unsurprisingly used to specify the synchronization mode for the mirrored queue, and can be either manual or automatic. In the manual mode, a newly mirrored slave queue will not receive any of the existing messages, but will eventually become consistent with the master queue, as old messages get consumed. In your case, you want immediate synchronization of the queues so that any existing messages become visible across all nodes, and are fine with the initial unresponsiveness this will create, as performance is not critical for user messages.

Tip

It is possible to manually synchronize a mirrored queue with rabbitmqctl sync_queue <queue_name>. The manual synchronization can be canceled with rabbitmqctl cancel_sync_queue <queue_name>.

You certainly must have noticed that we apply this policy only to the user inboxes and dead-letter queue. You're most likely wondering about the log and the service queues. For the log queues, we will be looking at another high-availability option because it does not make sense to mirror the high traffic that goes through them across the cluster. For the service temporary response queues, there is no need to make them highly available; if something goes wrong with a broker, the synchronous interaction will break and the client will have to back off and retry. However, the service request queues need to be mirrored to allow providers and consumers to be connected to different RabbitMQ brokers. This is done with the following command:

$ sudo rabbitmqctl set_policy -p ccm-prod-vhost HA_Q ".+-service" '{"ha-mode":"all", "ha-sync-mode":"automatic"}' --apply-to queuesSetting policy "HA_Q" for pattern ".+-service" to "{"ha-mode":"all", "ha-sync-mode":"automatic"}" with priority "0" ...
...done.

As you can see, you opted for the .+-service pattern, so any new service that you could develop alongside the authentication one will have its request queue automatically mirrored, as long as its name ends with -service.

If you take a look at the Queues tab of the management console after running the above command, you'll see that the HA_Q_TTL_DLX and HA_Q policies have been applied to the intended queues, as visible in the following screenshot:

Mirroring queues

Mirrored queues with the HA policies applied.

Notice how the mirrored queues have a +1 next to them. It's not an option for sharing them with your friends on Google Plus; instead, it denotes the fact that the queues are mirrored to one other node in the cluster. Staying in the management console, if you look at Details of any mirrored queue, you will see something similar to the next image. As you can see, the master node (rabbit@rmq-prod-1) and the slave nodes (only rabbit@rmq-prod-2, in your case) are clearly detailed:

Mirroring queues

Master and slave nodes are detailed for each mirrored queue

At this point, the RabbitMQ brokers are clustered and user queues are mirrored. However, the client applications are not yet able to benefit from this highly-available deployment. Let's fix this right away.

Connecting to the cluster

The applications that connect to RabbitMQ need to be modified a little so that they can benefit from the cluster. Currently, they connect to a single node and thus, should be modified to be able to connect to both nodes, trying one and failing over to the other one in case of trouble. Besides this modification, no other change is required from the client applications. They will continue to interact with the exchanges and queues they know about in the same way as before.

Let's first modify the main Java application. All you need to do is edit the RabbitMqManager class, so it receives by injection both a com.rabbitmq.client.ConnectionFactory and an array of com.rabbitmq.client.Address instances, one for each RabbitMQ node. Then you can modify the start() method as shown in the following code:

public void start()
{
    try
    {
        connection = factory.newConnection(addresses);
        connection.addShutdownListener(this);
        LOGGER.info("Connected to " + connection.getAddress().getHostName() + ":" + connection.getPort());
        restartSubscriptions();
    }
    catch (final Exception e)
    {
        LOGGER.log(Level.SEVERE, "Failed to connect to " + Arrays.toString(addresses), e);
        asyncWaitAndReconnect();
    }
}

Basically, the list of broker addresses is passed to the connection factory and the actual connection is used in the success log statement, while the list of addresses is used in the failure log statement. With this in place, the RabbitMQ Java client will connect to the first responsive node in the address list and will try each of the provided broker addresses until it can establish a connection, or eventually fail. In case of failure, the overall reconnect mechanism you've already put in place will kick in and the addresses will once again be attempted for connection. The following code illustrates how connection factory and the list of addresses are created before being passed on to RabbitMqManager:

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("ccm-prod");
factory.setPassword("******");
factory.setVirtualHost("ccm-prod-vhost");

Address[] addresses = new Address[]{
    new Address("rmq-prod-1", 5672),
    new Address("rmq-prod-2", 5672)};

With this in place, the main Java application is able to benefit from the cluster. Let's turn our attention to the Ruby on Rails back office. Things are a little simpler here because it doesn't maintain a permanent connection to RabbitMQ. Therefore, all that is needed is a mechanism to attempt connecting to the first broker, then the second, and run a block provided on the first successfully established connection.

You can achieve this very elegantly, thanks to the on_tcp_connection_failure mechanism provided by the amqp gem, as follows:

def run_with_connection(settings, &action)
  broker = settings[:brokers].shift

  raise "Impossible to connect to any broker" if broker.nil?
  
  settings.merge!(broker)

  settings.merge!({
    :on_tcp_connection_failure => Proc.new {
      run_with_connection(settings, &action)
    }
  })

  EventMachine.run do
    AMQP.connect(settings) do |connection|
      action.call(connection)
    end
  end
end

settings = {
  :brokers  => [
                {:host => 'rmq-prod-1', :port=> 5672},
                {:host => 'rmq-prod-2', :port=> 5672}
               ],
  :vhost    => "ccm-prod-vhost",
  :user     => "ccm-prod",
  :password => "******"
}

Notice how each connection is attempted by mutating the settings hash using the broker host and port information. With this in place, calling run_with_connection(settings) will create a valid connection to RabbitMQ and pass it to the block provided.

At this point, you've taken care of all the systems concerned with user queues. But what about the log aggregation mechanism? It's indeed time to address this concern.

Federating brokers

So far, you've followed an approach to high availability that most developers should be very familiar with. The way you created a cluster of two RabbitMQ brokers is really similar to what is typically done when making a relational database highly available. The database remains a centralized resource that offers high guarantees of availability. But RabbitMQ is not a one-trick rabbit when it comes to high availability. Remember, you left the log queues out of the equation for a reason; you did not want to mirror such a highly-trafficked queue. What could you do in order for CCM to enjoy the same guarantees for log aggregation? Enter the notion of messaging topologies.

If you think beyond the notion of a single centralized enterprise resource and instead think in terms of distributed components, the idea of creating a topology of RabbitMQ brokers will emerge. RabbitMQ offers the following two plugins that allow the connection of brokers:

  • The shovel plugin, which connects queues in one broker to exchanges in another broker
  • The federation plugin, which connects queues to queues or exchanges to exchanges across brokers

Both plugins ensure a reliable delivery of messages across brokers; if messages can't be routed to the target broker, they'll remain safely accumulated. Neither require brokers to be clustered, which simplifies setup and management (RabbitMQ and Erlang versions can mismatch). Moreover, both plugins work fine over WAN connections, something clustering doesn't do well.

Note

In a federation, only the node where messages converge needs to be manually configured; its upstream nodes get automatically configured for the topology. Conversely with shovels, each source node needs to be manually configured to send to a destination node, which itself is unaware of the fact that it's engaged in a particular topology.

In your case, the ideal topology consists of running a RabbitMQ node collocated with each application that emits logs to the app-logs topic exchange (refer to Chapter 4, Handling Application Logs), and have this exchange forward all messages to a centralized single RabbitMQ node where the app-logs-archiver and app-logs-error-reporter queues will be bound. This topology is illustrated in the following diagram:

Federating brokers

A topology that federates log messages to a central broker

In this topology, all applications will write to a local RabbitMQ node, which will act as a store-and-forward broker, pushing all logs to a centralized RabbitMQ node. If this central node is down, the log entries will remain locally accumulated until it comes back up. Obviously, the assumption here is that the local RabbitMQ nodes are extremely stable. Your experience with running RabbitMQ in the past few months will help you with this approach. Moreover, logs are considered important but not critical data for CCM, so a best-effort approach is acceptable. Knowing this, you chose to use the federation plugin, as it's the one that supports exchange to exchange connectivity (with shovel, messages would have to be accumulated in a local queue on each node).

Note

More information on the shovel plugin can be found at http://www.rabbitmq.com/shovel.html.

The federation plugin needs to be installed on all RabbitMQ nodes that will engage in the topology. Therefore, you install it by running the following commands on each node:

$ sudo rabbitmq-plugins enable rabbitmq_federation
The following plugins have been enabled:
  rabbitmq_federation
Plugin configuration has changed. Restart RabbitMQ for changes to take effect.
$ sudo rabbitmq-plugins enable rabbitmq_federation_management
The following plugins have been enabled:
  rabbitmq_federation_management
Plugin configuration has changed. Restart RabbitMQ for changes to take effect.

Moreover, unlike with clustering, each node needs to be manually set up to have the desired user and virtual host configured. Therefore, you need to run the necessary command, as discussed in Chapter 1, A Rabbit Springs to Life. Next, you need to configure the apps-log exchange federation itself. This involves multiple steps (which we will detail hereafter) that are all run on the central broker, that is, the one towards which all logs will converge. First, you need to configure what are called upstreams, which are the RabbitMQ nodes that will send data to the central broker.

Five upstreams are needed, since there are five servers that will send logs over; however, we will only consider two in the following examples for brevity's sake. What you're about to do for two upstreams will be done the same way for the other three:

$ sudo rabbitmqctl set_parameter -p ccm-prod-vhost federation-upstream app-prod-1-logs '{"uri":"amqp://ccm-prod:******@app-prod-1:5672/ccm-prod-vhost"}'
Setting runtime parameter "app-prod-1-logs" for component "federation-upstream" to "{"uri":"amqp://ccm-prod:******@app-prod-1:5672/ccm-prod-vhost"}" ...
...done.
$ sudo rabbitmqctl set_parameter -p ccm-prod-vhost federation-upstream app-prod-2-logs '{"uri":"amqp://ccm-prod:******@app-prod-2:5672/ccm-prod-vhost"}'
Setting runtime parameter "app-prod-2-logs" for component "federation-upstream" to "{"uri":"amqp://ccm-prod:******@app-prod-2:5672/ccm-prod-vhost"}" ...
...done.

The next step consists of creating an upstream set, which is a logical group of upstreams referred to by their names. You run the following command to create an upstream set named app-prod-logs, and that contains the app-prod-1-logs and app-prod-2-logs upstreams:

$ sudo rabbitmqctl set_parameter -p ccm-prod-vhost federation-upstream-set app-prod-logs '[{"upstream": "app-prod-1-logs"},{"upstream": "app-prod-2-logs"}]'
Setting runtime parameter "app-prod-logs" for component "federation-upstream-set" to "[{"upstream": "app-prod-1-logs"},{"upstream": "app-prod-2-logs"}]" ...
...done.

Tip

If you know that you'll never have more than one logical group of upstreams, you can skip the creation of an upstream set and use the implicit set named all, which automatically contains all the upstreams in a virtual host.

After this, you need to configure the user that the federation plugin will use in the central broker to interact with the federated exchange, with the following command:

$ sudo rabbitmqctl set_parameter federation local-username '"ccm-prod"'
Setting runtime parameter "local-username" for component "federation" to ""ccm-prod"" ...
...done.

If you browse the Federation Upstreams tab in the Admin section of the management console, you'll see that the two upstreams have been correctly configured (as shown in the following screenshot):

Federating brokers

Upstream nodes are configured in a federation

If you switch to Federation Status, you'll see that it's empty, meaning that it's inactive. Why is that? After all, you've just created the topology. The reason is no exchange nor is queue yet actively engaged in the topology. Because of its dynamic nature, the federation is inactive. To bring it to life, you need to create a policy applied to the app-logs exchange that configures it to be federated with the app-prod-logs upstream set you've just created. You decide on naming this policy LOGS_UPSTREAM and run the following command:

$ sudo rabbitmqctl set_policy -p ccm-prod-vhost --apply-to exchanges LOGS_UPSTREAM "app-logs" '{"federation-upstream-set":"app-prod-logs"}'
Setting policy "LOGS_UPSTREAM" for pattern "app-logs" to "{"federation-upstream-set":"app-prod-logs"}" with priority "0" ...
...done.

After running this command, if you come back to the Federation Status tab, you'll see that the federation is now running links for the app-logs exchange from the two upstream nodes of the configured set (as shown in the following screenshot):

Federating brokers

Running upstream links for a federated exchange

If you look at the app-logs exchange on this node, you'll see that there's nothing special to it, except that it has the LOGS_UPSTREAM policy applied to it (as represented in the following screenshot):

Federating brokers

An exchange gets federated via a specific policy

Tip

It's also possible to get the status of the federation from the command line by running sudo rabbitmqctl eval 'rabbit_federation_status:status().' on the downstream node.

Now if you connect to the management console of any of the upstream nodes and look at the same exchange, you'll see what's represented in the following image. Now we're talking! The downstream node clearly has informed the upstream nodes of the federation, because the link established for the app-logs exchange is clearly visible (albeit grayed out, you may need to squint to be able to read it).

Federating brokers

In an upstream node, federation links are visible in the management console

If you look at the Connections and Channels tabs of the management console, you'll see that the downstream node is connected to the upstream mode over the AMQP protocol. Except for the setup of the topology itself, there's nothing magical about the federation. It's been built on top of AMQP, and thus, benefits from the same advantages offered by the protocol. Hence, if your RabbitMQ instances are firewalled, no special port besides the one used by AMQP (5672 by default) needs to be opened.

Note

You can read more about the federation plugin at http://www.rabbitmq.com/federation.html and http://www.rabbitmq.com/federation-reference.html.

From now on, you'll sleep better at night. You've clustered the nodes that were required to be highly available and deployed the others in a reliable topology. But what if things are going really bad with a broker? How will you know? It's time to review some monitoring strategies.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.19.185