Publishing to all queues

CCM's journey with RabbitMQ is just getting more exciting; a new application now wants to integrate with the user's messaging platform! Indeed, the customer support team wants to be able to send messages to all users directly from their back-office application. They are fine with the fact that this public address system will only be able to reach users who have already used the messaging system. This means there's no need to forcefully create queues and bindings for all the existing users of the system; only the really active ones who log in regularly will be reachable.

With this specification in hand, you can start planning and come up with the new overall messaging architecture shown in the following diagram. There's no fundamental change; the only addition is the Ruby on Rails back-office application that will be connected to the RabbitMQ in order to publish messages.

Publishing to all queues

The new architecture with the back-office public address

To roll this out, you can use the topic's messaging that's already in place and create a special cs-pa topic to which all users would be subscribed. But in fact, there's a cleaner and simpler approach offered by the AMQP protocol: the fanout exchange. As shown in the following diagram, the fanout exchange routes a copy of each message it receives to all the queues bound to it. This model fits perfectly with the public-address behavior that CCM aims for.

Publishing to all queues

The fanout exchange routes to all bound queues.

With this said, let's wire the fanout exchange in the main Java application.

Binding to the fanout

To start using this new exchange in the main application, you need to perform two steps: declare the fanout exchange when the application starts and bind the user inbox queue to it when a user logs in. So, let's do just that. You will first extend the onApplicationStart method of the UserMessageManager class with the following code:

public static final String USER_FANOUT_EXCHANGE = "user-fanout";

rabbitMqManager.call(new ChannelCallable<DeclareOk>()
{
    @Override
    public String getDescription()
    {
        return "Declaring fanout exchange: " + USER_FANOUT_EXCHANGE;
    }

    @Override
    public DeclareOk call(final Channel channel) throws IOException
    {
        final String exchange = USER_FANOUT_EXCHANGE;
        final String type = "fanout";
        // survive a server restart
        final boolean durable = true;
        // keep it even if not in user
        final boolean autoDelete = false;
        // no special arguments
        final Map<String, Object> arguments = null;

        return channel.exchangeDeclare(exchange, type, durable, autoDelete, arguments);
    }
});

Once again, you'll use the same set of properties except that this time the exchange type is fanout. You then add the following to the onUserLogin method:

rabbitMqManager.call(new ChannelCallable<BindOk>()
{
    @Override
    public String getDescription()
    {
        return "Binding user queue: " + queue + " to exchange: " + USER_FANOUT_EXCHANGE;
    }

    @Override
    public BindOk call(final Channel channel) throws IOException
    {
        // bind the addressee's queue to the fanout exchange
        final String routingKey = "";
        return channel.queueBind(queue, USER_FANOUT_EXCHANGE, routingKey);
    }
});

Did you notice how you used an empty string as the routing key when binding the queue? The value doesn't really matter because the fanout exchange doesn't care about routing keys; however, you can't use null so you settled for "".

Now you're done; there's nothing more to do as the existing server-push infrastructure will remain the same, especially because users can't publish messages to this fanout exchange. So, let's now turn our attention to the code added to the back office to publish messages on this new exchange.

Publishing to all

CCM's back-office system is a Ruby on Rails application. After looking around, it found several AMQP clients that could be used to connect from Ruby to RabbitMQ. They've selected the one called Ruby AMQP (accessible at http://rubyamqp.info) because of its capacity to integrate well with the Rails application and its support for a range of processing models, including Phusion Passenger, which is what they're currently using.

Because this new public address system will be rarely used, you're not concerned about efficient connection management as you are with the main application. In fact, you're fine with connecting and disconnecting for each interaction with the fanout exchange because, if you're having temporary issues with the RabbitMQ broker, retrying to publish from the back-office application will eventually end up working. So, the following is the Ruby code used to publish a message on the public address system:

AMQP.connect(:host => '127.0.0.1',
           :username => 'ccm-dev',
           :password => 'coney123',
           :vhost => 'ccm-dev-vhost') do |connection|

    channel  = AMQP::Channel.new(connection)
    exchange = channel.fanout(
         'user-fanout',
         :durable => true,
         :auto_delete => false)

    message_id = SecureRandom.uuid
    message_json = JSON.generate({
         :time_sent => (Time.now.to_f*1000).to_i,
         :sender_id => -1, # special value for support
         :subject   => pa_subject,
         :content   => pa_content })

    exchange.publish(
         message_json,
         :routing_key      => '',
         :content_type     => 'application/vnd.ccm.pmsg.v1+json',
         :content_encoding => 'UTF-8',
         :message_id       => message_id,
         :persistent       => true,
         :nowait           => false) do

        connection.close
    end

end

The logic in this code should feel familiar; connect to RabbitMQ, get a channel, perform a few channel actions, and then close the connection. You don't need to close the channel before closing the connection; closing the latter closes the former and all other active channels that could have been created on this connection.

Notice that you declare the user-fanout exchange right before using it. You do not want to rely on the implicit pre-existence of the exchange as this would necessarily mean the main application would have to run once to create the exchange before the back office can use it. Since exchange declaration is idempotent, you should therefore declare it at all times.

Tip

Unless there is a strong guarantee that an exchange or a queue will pre-exist, assume it doesn't exist and declare it. Creating a happens before time coupling between two different applications is a recipe for disaster and will blow up at the worst moment. Better to be safe than sorry, especially when AMQP encourages you and provides the necessary means to do so!

Again, you've paid extra attention to make sure the same configuration parameters were used in the exchange declaration than in the Java application.

Tip

Be particularly careful with AMQP client libraries that may use different default values for exchange and queue parameters; it's better to be explicit and to specify all values.

With this code in place, the back-office application can now send public-address messages to all users! This is again a great success, one that again reinforces CCM in its decision to deploy RabbitMQ and build on it.

Running the application

There's nothing spectacular to notice when running the application; messages from the back office successfully flow to the user inbox queues and the only visible change is the newly created user-fanout exchange, visible in the management console shown in the following figure:

Running the application

The fanout exchange for user queues is visible in the management console

At this point, it is very interesting to take a look at the bindings of any particular queue. For this, click on the Queues tab and then scroll down and click on Bindings to unfold the hidden pane. You should see what's reproduced in the following figure where each queue has multiple bindings, one for the user-to-user messaging feature, several for the topics' messages, and a final one for the public-address fanout feature:

Running the application

Each user queue has multiple bindings

Before concluding, let's pause for a second and relish the fact that you now have a successful message integration that works across platforms. This may not seem evident to anyone with a little experience with messaging systems, since it is not short of a small miracle. Messaging systems are the realm of platform-specific implementations at best, vendor-locked ones at worst. Thanks to AMQP and RabbitMQ, these Java and Ruby applications can engage in messaging interactions without having to even think about their heterogeneity.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.223.171.168