RabbitMQ can be used for applications log processing, thanks to its high performance. You're about to learn how to route logs between applications publishing them and custom scripts consuming them. You'll also use the AMQP plugin for JMeter in order to find the performance capacity of consumers. You'll discover that performance can be improved by using message prefetching, a quality of the service property of channels. Finally, you'll see how expressive routing keys can open the door to unexpected new features.
In this chapter, we will discuss the following topics:
So far, Clever Coney Media has used only RabbitMQ in the context of its main user-facing application. However, others in the company are interested in benefiting from message queuing. If you remember the overview of the architecture that was introduced in Chapter 1, A Rabbit Springs to Life, CCM uses Python to perform a data analysis of the user data stored in the different databases being used in the company. The team that is in charge of internal analytics has been looking for an elegant solution to aggregate logs from different applications in order to roll out new statistics, both for internal and end-user consumption.
Taking its interoperable nature into account, CCM thinks that AMQP is the perfect fit for this need; the idea is to publish logs from all applications to RabbitMQ and then use Python to consume, persist, slice, and then dice this data. The following diagram illustrates the architecture it has in mind:
There are two main sources of logs to be dealt with: Log4j for the Java application and syslog for the Apache2-based applications. The team quickly identifies the following two libraries that will facilitate the rolling out of this architecture:
Both these libraries publish logs to topic exchanges and use a configurable routing key that is composed of the level and the source of the log. Let's get to know these routing keys:
severity
is a number between 0 and 7 (the lowest, the most critical) and facility
is a number between 0 and 23 (http://tools.ietf.org/html/rfc5424). Bevis translates these numbers to human-readable values in the routing key. For example, 3.18
gets translated to err.local2
.level.name
, where level
is a string such as INFO
or ERROR
, and name
is either a fully qualified classname for application-level logs (for instance, com.ccm.MyClass
), or access
for access logs.With such rich routing keys, there is no strong rationale to use a different exchange for each of these two log sources. We will, therefore, configure the libraries to publish to a single topic exchange. It's time to take a look at the implementation!
Let's start working on the Python script that processes log messages. This script will be in charge of archiving the logs in HDF5, a file format that is well suited for efficiently storing, retrieving, and analyzing swarms of data.
Discussing HDF5 is beyond the scope of this book. You can get more information at http://www.hdfgroup.org/HDF5/whatishdf5.html.
The logs' archive script will consume messages from a single queue that will bind to the topic exchange with an all matching routing key (#
), as shown in the following diagram:
If you remember our previous discussion about the notion of happens before in the context of declaring exchanges and queues, you should be wondering what program (or programs) will be in charge of these declarations in our current case.
After investigating the syslog and Log4j publishers, it turns out that the former doesn't do any kind of declaration, while the latter declares the exchange in the durable (not-autodelete) mode, but doesn't declare or bind any queue. Consequently, the Python script will have to use the same exchange declaration (which is fine with us as these settings are what we wanted) and will have to create and bind the app-logs-archiver
queue. To ensure no message gets lost, you will start your Python script before rolling out the syslog and Log4j publishers.
Let's look at the logs-archiver.py
script, for which we use the amqp
library (online documentation available at http://amqp.readthedocs.org). Note that in the following code, the store_log_data
function has been elided for brevity:
#!/usr/bin/env python import amqp connection = amqp.Connection(host='ccm-dev-rabbit', userid='ccm-dev', password='coney123', virtual_host='ccm-dev-vhost') channel = connection.channel() EXCHANGE = 'app-logs' QUEUE = 'app-logs-archiver' channel.exchange_declare(exchange=EXCHANGE, type='topic', durable=True, auto_delete=False) channel.queue_declare(queue=QUEUE, durable=True, auto_delete=False) channel.queue_bind(queue=QUEUE, exchange=EXCHANGE, routing_key='#') def handle_message(message): store_log_data(message) message.channel.basic_ack(delivery_tag=message.delivery_tag) channel.basic_consume(callback=handle_message, queue=QUEUE, no_ack=False) print ' [*] Waiting for messages. To exit press CTRL+C' while channel.callbacks: channel.wait() channel.close() connection.close()
Thanks to the AMQP specification, this code should look familiar. Indeed, the same concepts are (thankfully) named identically across all the client libraries out there. As you can see, we perform the following tasks:
The latter point is important; we don't want to risk losing any log message, so the store_log_data
function should throw an exception. The log message that cannot be handled will be eventually represented for processing. If the error condition is temporary, it will clear up upon redelivery. Otherwise, we will need to address the issue in the code.
The question you're probably burning to have answered now is: how will this code behave under load? After all, you're about to concentrate all access and application logs of CCM on it! Let's find out with a round of load testing.
18.220.160.43