One of the advantages of messaging is that the new behavior that wasn't initially envisioned can easily be grafted to a system because of its lowly coupled nature. In our case, the fact that all application logs are not being published to a single topic exchange allows us to create a specific consumer that will receive only error messages and report them to the operations team.
If you remember our discussion about the routing keys used by the application logs' publishers, all we need to do is to receive messages whose routing first component (that is, the string before the first period of the routing key) indicates an error. These components are as follows:
err
, crit
, alert,
and emerg
ERROR
and FATAL
Now we know this, we can create a Python script that will create and bind a queue to the app-logs
topic exchange, using the one-binding-per-error-message-routing-key pattern. The following code shows the logs-error-reporter.py
script without the body of the report_error
function (eluded for brevity):
#!/usr/bin/env python import amqp connection = amqp.Connection(host='ccm-dev-rabbit', userid='ccm-dev', password='coney123', virtual_host='ccm-dev-vhost') channel = connection.channel() EXCHANGE = 'app-logs' QUEUE = 'app-logs-error-reporter' channel.exchange_declare(exchange=EXCHANGE, type='topic', durable=True, auto_delete=False) channel.queue_declare(queue=QUEUE, durable=True, auto_delete=False) # bind syslog severities: channel.queue_bind(queue=QUEUE, exchange=EXCHANGE, routing_key='err.#') channel.queue_bind(queue=QUEUE, exchange=EXCHANGE, routing_key='crit.#') channel.queue_bind(queue=QUEUE, exchange=EXCHANGE, routing_key='alert.#') channel.queue_bind(queue=QUEUE, exchange=EXCHANGE, routing_key='emerg.#') # bind log4j levels channel.queue_bind(queue=QUEUE, exchange=EXCHANGE, routing_key='ERROR.#') channel.queue_bind(queue=QUEUE, exchange=EXCHANGE, routing_key='FATAL.#') channel.basic_qos(prefetch_count=50, prefetch_size=0, a_global=False) def handle_message(message): report_error(message) message.channel.basic_ack(delivery_tag=message.delivery_tag) channel.basic_consume(callback=handle_message, queue=QUEUE, no_ack=False) print ' [*] Waiting for messages. To exit press CTRL+C' while channel.callbacks: channel.wait() channel.close() connection.close()
Notice how you've leveraged the #
wildcard in the queue-binding operations that are highlighted in the preceding script. This allows you to match only the first part of the routing key (the severity) and accept anything else after it.
With this script running, let's browse to the Exchanges tab of the RabbitMQ management console once more and click on the apps-log exchange. The bindings shown in the following screenshot should be visible:
Since you're in the management console, let's do something we haven't done yet. Let's use it to send test messages to the apps-log
exchange. Scroll down a little below the Bindings section shown in the preceding screenshot until you reach the Publish message section. In this interface, fill in the routing key and payload as shown in the following screenshot:
After clicking on Publish message, the error gets correctly reported by the Python script. We further test by changing the routing key to info.local2
or ERROR.com.ccm.Tests
to see what is reported and what isn't. Everything works as expected, so you're very glad for this last minute idea and the capacity to roll it out cleanly message queuing thanks to RabbitMQ.
3.135.190.182