Messaging serendipity

One of the advantages of messaging is that the new behavior that wasn't initially envisioned can easily be grafted to a system because of its lowly coupled nature. In our case, the fact that all application logs are not being published to a single topic exchange allows us to create a specific consumer that will receive only error messages and report them to the operations team.

If you remember our discussion about the routing keys used by the application logs' publishers, all we need to do is to receive messages whose routing first component (that is, the string before the first period of the routing key) indicates an error. These components are as follows:

  • For the syslog publisher: err, crit, alert, and emerg
  • for the Log4j publisher: ERROR and FATAL

Now we know this, we can create a Python script that will create and bind a queue to the app-logs topic exchange, using the one-binding-per-error-message-routing-key pattern. The following code shows the logs-error-reporter.py script without the body of the report_error function (eluded for brevity):

#!/usr/bin/env python
import amqp

connection = amqp.Connection(host='ccm-dev-rabbit', userid='ccm-dev', password='coney123', virtual_host='ccm-dev-vhost')
channel = connection.channel()

EXCHANGE = 'app-logs'
QUEUE = 'app-logs-error-reporter'

channel.exchange_declare(exchange=EXCHANGE, type='topic', durable=True, auto_delete=False)
channel.queue_declare(queue=QUEUE, durable=True, auto_delete=False)

# bind syslog severities:
channel.queue_bind(queue=QUEUE, exchange=EXCHANGE, routing_key='err.#')
channel.queue_bind(queue=QUEUE, exchange=EXCHANGE, routing_key='crit.#')
channel.queue_bind(queue=QUEUE, exchange=EXCHANGE, routing_key='alert.#')
channel.queue_bind(queue=QUEUE, exchange=EXCHANGE, routing_key='emerg.#')

# bind log4j levels
channel.queue_bind(queue=QUEUE, exchange=EXCHANGE, routing_key='ERROR.#')
channel.queue_bind(queue=QUEUE, exchange=EXCHANGE, routing_key='FATAL.#')

channel.basic_qos(prefetch_count=50, prefetch_size=0, a_global=False)

def handle_message(message):
    report_error(message)
    message.channel.basic_ack(delivery_tag=message.delivery_tag)

channel.basic_consume(callback=handle_message, queue=QUEUE, no_ack=False)

print ' [*] Waiting for messages. To exit press CTRL+C'
while channel.callbacks:
    channel.wait()

channel.close()
connection.close()

Notice how you've leveraged the # wildcard in the queue-binding operations that are highlighted in the preceding script. This allows you to match only the first part of the routing key (the severity) and accept anything else after it.

With this script running, let's browse to the Exchanges tab of the RabbitMQ management console once more and click on the apps-log exchange. The bindings shown in the following screenshot should be visible:

Messaging serendipity

The application logs exchange's multiple bindings

Since you're in the management console, let's do something we haven't done yet. Let's use it to send test messages to the apps-log exchange. Scroll down a little below the Bindings section shown in the preceding screenshot until you reach the Publish message section. In this interface, fill in the routing key and payload as shown in the following screenshot:

Messaging serendipity

Using the management console to send test log messages

After clicking on Publish message, the error gets correctly reported by the Python script. We further test by changing the routing key to info.local2 or ERROR.com.ccm.Tests to see what is reported and what isn't. Everything works as expected, so you're very glad for this last minute idea and the capacity to roll it out cleanly message queuing thanks to RabbitMQ.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.190.182