The Publish Subscribe pattern allows us to distribute a message to multiple consumer applications from a single producer, allowing us to respond to a single message in multiple ways. This task will demonstrate how we can leverage the Publish Subscribe pattern to improve sHop.
Our sHop application now has a single Work Queue; which we can scale out using multiple consumers. But we can go a step further. The functions UpdateInventory
, SendEmail
, and UpdateReporting
can be executed asynchronously, as they are not dependent on each other. Let's split up these functions and host these on separate consumers.
Let's navigate to our source code examples folder and locate the folder Publish-Subscribe. Here you will find our e-commerce application; it consists of seven files:
consumer.js
orderService.js
producer.js
updateInventory.js
updateRecommendations.js
updateReporting.js
sendEmail.js
Let's take a look at the orderService.js
script and the ProcessOrder
function in it.
We have simplified ProcessOrder; the calls to UpdateInventory, SendEmail, and UpdateReporting have been removed as follows:
this.ProcessOrder = function() { this.PaymentGateway(); this.UpdateStatus(); console.log('INFO, Thank you for placing your order...'), return this.Status; };
We will now call each of the preceding functions individually in separate consumer scripts.
Our producer.js
script has not changed; so let's take a look at the consumer.js
script, specifically the q.subscribe
function call:
OrderService
and call ProcessOrder
, which now returns the order's Status
.OrderComplete
, we create a new fanout
exchange called shop.fanout.exchange
, which we use to publish a message.q.shift
, removing the message from the queue:q.subscribe({ack:true}, function(message) { var service = new orderService(unescape(message.data)); var status = service.ProcessOrder(); if (status === 'OrderComplete') { var exf = connect.exchange('shop.fanout.exchange', {type: 'fanout'}); exf.setMaxListeners(0); exf.publish('', service.Order); } q.shift(); console.log('INFO, Remove order from queue.'), });
Our scripts for updateInventory.js
, updateReporting.js
, and sendEmail.js
are basically the same, except they call a different function on the OrderService
script. Let's take a look at the SendEmail
script.
fanout
exchange named shop.fanout.exchange
, which we use to publish a message.shop.email.queue
, and wait for the queueBindOk
event to fire.q.subscribe
function and create a new OrderService
.SendEmail
as follows:connect.on('ready', function() { var ex = connect.exchange('shop.fanout.exchange', {type: 'fanout'}); var q = connect.queue('shop.email.queue'), q.on('queueDeclareOk', function(args) { q.bind('shop.fanout.exchange', ''), q.on('queueBindOk', function() { q.subscribe(function(message) { var service = new orderService(message.data); service.SendEmail(); }); }); }); });
Let's demonstrate this concept.
rabbitmq-server
Publish-Subscribe
. Execute each of the following commands in a new command-line console:Publish-Subscribe> node updateInventory Publish-Subscribe> node sendEmail Publish-Subscribe> node updateReporting Publish-Subscribe> node consumer Publish-Subscribe> node producer
We now have a producer publishing messages to a consumer, which processes messages/orders. The consumer itself publishes a single message to a fanout
exchange. All three queues bound to the exchange (shop.inventory.queue
, shop.reporting.queue
, and shop.email.queue
) receive the message and react to it in their own way.
The following diagram demonstrates the concept. The producer publishes a message to the Fanout Exchange with an empty routing key. The Fanout Exchange simply forwards the message to all queues bound to the exchange. In this example, all consumers receive a copy of the message. A Fanout Exchange has the simplest routing algorithm of the AMQP exchanges, giving it the best performance:
Our product owners at sHop have decided to add a recommendation service to sHop. When an order is placed, our order
processor will need to update the recommendation's database. Let's add this new function to OrderService
:
this.UpdateRecommendations = function() { console.log('INFO, Updated recommendations'), };
Now we create a new consumer, which is bound to shop.fanout.exchange
and calls UpdateRecommendations
:
connect.on('ready', function() { var ex = connect.exchange('shop.fanout.exchange', {type: 'fanout'}); var q = connect.queue('shop.recommendations.queue'), q.on('queueDeclareOk', function(args) { q.bind('shop.fanout.exchange', ''), q.on('queueBindOk', function() { q.subscribe(function(message) { var service = new orderService(message.data); service.UpdateRecommendations(); }); }); }); });
And there we have it; we have extended our sHop application by simply adding a new consumer. Follow the preceding steps to run this example. Open another command-line console to run updateRecommendations
.
Publish-Subscribe> node updateRecommendations
18.118.2.225