Publish Subscribe (Advanced)

The Publish Subscribe pattern allows us to distribute a message to multiple consumer applications from a single producer, allowing us to respond to a single message in multiple ways. This task will demonstrate how we can leverage the Publish Subscribe pattern to improve sHop.

Getting ready

Our sHop application now has a single Work Queue; which we can scale out using multiple consumers. But we can go a step further. The functions UpdateInventory, SendEmail, and UpdateReporting can be executed asynchronously, as they are not dependent on each other. Let's split up these functions and host these on separate consumers.

How to do it...

Let's navigate to our source code examples folder and locate the folder Publish-Subscribe. Here you will find our e-commerce application; it consists of seven files:

  • consumer.js
  • orderService.js
  • producer.js
  • updateInventory.js
  • updateRecommendations.js
  • updateReporting.js
  • sendEmail.js

Let's take a look at the orderService.js script and the ProcessOrder function in it.

We have simplified ProcessOrder; the calls to UpdateInventory, SendEmail, and UpdateReporting have been removed as follows:

this.ProcessOrder = function() {
  this.PaymentGateway();
  this.UpdateStatus();
  console.log('INFO, Thank you for placing your
  order...'),
  return this.Status;
};

We will now call each of the preceding functions individually in separate consumer scripts.

Our producer.js script has not changed; so let's take a look at the consumer.js script, specifically the q.subscribe function call:

  1. We create a new OrderService and call ProcessOrder, which now returns the order's Status.
  2. If the order status equals OrderComplete, we create a new fanout exchange called shop.fanout.exchange, which we use to publish a message.
  3. We then call q.shift, removing the message from the queue:
       q.subscribe({ack:true}, function(message) {
        var service = new 
        orderService(unescape(message.data));
          var status = service.ProcessOrder();
           if (status === 'OrderComplete') {
             var exf = connect.exchange('shop.fanout.exchange',  {type: 'fanout'});
             exf.setMaxListeners(0);
             exf.publish('', service.Order);
           }
           q.shift();
           console.log('INFO, Remove order from queue.'),
          });

Our scripts for updateInventory.js, updateReporting.js, and sendEmail.js are basically the same, except they call a different function on the OrderService script. Let's take a look at the SendEmail script.

  1. We create a fanout exchange named shop.fanout.exchange, which we use to publish a message.
  2. We then create a queue named shop.email.queue, and wait for the queueBindOk event to fire.
  3. We call the q.subscribe function and create a new OrderService.
  4. We then call SendEmail as follows:
    connect.on('ready', function() {
        var ex = connect.exchange('shop.fanout.exchange',  {type: 'fanout'});
        var q = connect.queue('shop.email.queue'),
        q.on('queueDeclareOk', function(args) {
            q.bind('shop.fanout.exchange', ''),
            q.on('queueBindOk', function() {
                q.subscribe(function(message) {
                    var service = new 
                      orderService(message.data);
                    service.SendEmail();
                });
            });
        });
    });

Let's demonstrate this concept.

  1. Open a command-line console and start RabbitMQ:
    rabbitmq-server
    
  2. Open a command-line console, navigate to our source code examples folder, and locate the folder Publish-Subscribe. Execute each of the following commands in a new command-line console:
    Publish-Subscribe> node updateInventory
    Publish-Subscribe> node sendEmail
    Publish-Subscribe> node updateReporting
    Publish-Subscribe> node consumer
    Publish-Subscribe> node producer
    

We now have a producer publishing messages to a consumer, which processes messages/orders. The consumer itself publishes a single message to a fanout exchange. All three queues bound to the exchange (shop.inventory.queue, shop.reporting.queue, and shop.email.queue) receive the message and react to it in their own way.

How it works...

The following diagram demonstrates the concept. The producer publishes a message to the Fanout Exchange with an empty routing key. The Fanout Exchange simply forwards the message to all queues bound to the exchange. In this example, all consumers receive a copy of the message. A Fanout Exchange has the simplest routing algorithm of the AMQP exchanges, giving it the best performance:

How it works...

There's more...

Our product owners at sHop have decided to add a recommendation service to sHop. When an order is placed, our order processor will need to update the recommendation's database. Let's add this new function to OrderService:

this.UpdateRecommendations = function() {
  console.log('INFO, Updated recommendations'),
};

Now we create a new consumer, which is bound to shop.fanout.exchange and calls UpdateRecommendations:

connect.on('ready', function() {
    var ex = connect.exchange('shop.fanout.exchange',  {type: 'fanout'});
    var q = connect.queue('shop.recommendations.queue'),
    q.on('queueDeclareOk', function(args) {
        q.bind('shop.fanout.exchange', ''),
        q.on('queueBindOk', function() {
            q.subscribe(function(message) {
                var service = new
                  orderService(message.data);
                service.UpdateRecommendations();
            });
        });
    });
});

And there we have it; we have extended our sHop application by simply adding a new consumer. Follow the preceding steps to run this example. Open another command-line console to run updateRecommendations.

Publish-Subscribe> node updateRecommendations
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.2.225