Publish-subscribe

I've alluded to the publish-subscribe model elsewhere in this chapter. Publish-subscribe is a powerful tool for decoupling events from processing code.

At the crux of the pattern is the idea that, as a message publisher, my responsibility for the message should end as soon as I send it. I should not know who is listening to messages or what they will do with the messages. So long as I am fulfilling a contract to produce correctly formatted messages, the rest shouldn't matter.

It is the responsibility of the listener to register its interest in the message type. You'll, of course, wish to register some sort of security to disallow registration of rogue services.

We can update our service bus to do more, to do a complete job of routing and sending multiple messages. Let's call our new method Publish instead of Send. We'll keep Send around to do the sending functionality:

Publish-subscribe

The crow mail analogy we used in the previous section starts to fall apart here as there is no way to broadcast a message using crows. Crows are too small to carry large banners and it is very difficult to train them to do sky writing. I'm unwilling to totally abandon the idea of crows so let's assume that there exists a sort of crow broadcast centre. Sending a message here allows for it to be fanned out to numerous interested parties who have signed up for updates. This centre will be more or less synonymous with a bus.

We'll write our router so that it works as a function of the name of the message. One could route a message using any of its attributes. For instance, a listener could subscribe to all the messages called invoicePaid where the amount field is greater than $10000. Adding this sort of logic to the bus will slow it down and make it far harder to debug. Really this is more the domain of business process orchestration engines than a bus. We'll continue on without that complexity.

The first thing to set up is the ability to subscribe to published messages:

CrowMailBus.prototype.Subscribe = function (messageName, subscriber) {
  this.responders.push({ messageName: messageName, subscriber: subscriber });
};

The Subscribe function just adds a message handler and the name of a message to consume. The responders array is simply an array of handlers.

When a message is published we loop over the array and fire each of the handlers that have registered for messages with that name:

Publish(message) {
  for (let i = 0; i < this.responders.length; i++) {
    if (this.responders[i].messageName == message.__messageName) {
      (function (b) {
        process.nextTick(() => b.subscriber.processMessage(message));
      })(this.responders[i]);
    }
  }
}

The execution here is deferred to the next tick. This is done using a closure to ensure that the correctly scoped variables are passed through. We can now change our CrowMailResponder to use the new Publish method instead of Send:

processMessage(message) {
  var response = { __messageDate: new Date(),
  __from: "responder",
  __corrolationId: message.__corrolationId,
  __messageName: "SquareRootFound",
  body: "Pretty sure it is 3." };
  this.bus.Publish(response);
  console.log("Reply published");
}

Instead of allowing the CrowMailRequestor object to create its own bus as earlier, we need to modify it to accept an instance of bus from outside. We simply assign it to a local variable in CrowMailRequestor. Similarly, CrowMailResponder should also take in an instance of bus.

In order to make use of this we simply need to create a new bus instance and pass it into the requestor:

var bus = new CrowMailBus();
bus.Subscribe("KingdomInvaded", new TestResponder1());
bus.Subscribe("KingdomInvaded", new TestResponder2());
var requestor = new CrowMailRequestor(bus);
requestor.Request();

Here we've also passed in two other responders that are interested in knowing about KingdomInvaded messages. They look like the following:

var TestResponder1 = (function () {
  function TestResponder1() {}
  TestResponder1.prototype.processMessage = function (message) {
    console.log("Test responder 1: got a message");
  };
  return TestResponder1;
})();

Running this code will now get the following:

Message sent!
Reply published
Test responder 1: got a message
Test responder 2: got a message
Crow mail responder: got a message

You can see that the messages are sent using Send. The responder or handler does its work and publishes a message that is passed onto each of the subscribers.

There are some great JavaScript libraries which make publish and subscribe even easier. One of my favorites is Radio.js. It has no external dependencies and its name is an excellent metaphor for publish subscribe. We could rewrite our preceding subscribe example like so:

radio("KingdomInvalid").subscribe(new TestResponder1().processMessage);
radio("KingdomInvalid").subscribe(new TestResponder2().processMessage);

Then publish a message using the following:

radio("KingdomInvalid").broadcast(message);

Fan out and in

A fantastic use of the publish subscribe pattern is allowing you to fan out a problem to a number of different nodes. Moore's law has always been about the doubling of the number of transistors per square unit of measure. If you've been paying attention to processor clock speeds you may have noticed that there hasn't really been any significant change in clock speeds for a decade. In fact, clock speeds are now lower than they were in 2005.

This is not to say that processors are "slower" than they once were. The work that is performed in each clock tick has increased. The number of cores has also jumped up. It is now unusual to see a single core processor; even in cellular phones dual core processors are becoming common. It is the rule, rather than the exception, to have computers that are capable of doing more than one thing at a time.

At the same time, cloud computing is taking off. The computers you purchase outright are faster than the ones available to rent from the cloud. The advantage of cloud computing is that you can scale it out easily. It is nothing to provision a hundred or even a thousand computers to form a cloud provider.

Writing software that can take advantage of multiple cores is the great computing problem of our time. Dealing directly with threads is a recipe for disaster. Locking and contention is far too difficult a problem for most developers: me included! For a certain class of problems, they can easily be divided up into sub problems and distributed. Some call this class of problems "embarrassingly parallelizable".

Messaging provides a mechanism for communicating the inputs and outputs from a problem. If we had one of these easily parallelized problems, such as searching, then we would bundle up the inputs into one message. In this case it would contain our search terms. The message might also contain the set of documents to search. If we had 10,000 documents then we could divide the search space up into, say, four collections of 2500 documents. We would publish five messages with the search terms and the range of documents to search as can be seen here:

Fan out and in

Different search nodes will pick up the messages and perform the search. The results will then be sent back to a node that will collect the messages and combine them into one. This is what will be returned to the client.

Of course this is a bit of an over simplification. It is likely that the receiving nodes themselves would maintain a list of documents over which they had responsibility. This would prevent the original publishing node from having to know anything about the documents over which it was searching. The search results could even be returned directly to the client that would do the assembling.

Even in a browser, the fan out and in approach can be used to distribute a calculation over a number of cores through the use of web workers. A simple example might take the form of creating a potion. A potion might contain a number of ingredients that can be combined to create a final product. It is quite computationally complicated combining ingredients so we would like to farm the process out to a number of workers.

We start with a combiner that contains a combine() method as well as a complete() function that is called once all the distributed ingredients are combined:

class Combiner {
  constructor() {
    this.waitingForChunks = 0;
  }
  combine(ingredients) {
    console.log("Starting combination");
    if (ingredients.length > 10) {
      for (let i = 0; i < Math.ceil(ingredients.length / 2); i++) {
        this.waitingForChunks++;
        console.log("Dispatched chunks count at: " + this.waitingForChunks);
        var worker = new Worker("FanOutInWebWorker.js");
        worker.addEventListener('message', (message) => this.complete(message));
        worker.postMessage({ ingredients: ingredients.slice(i, i * 2) });
      }
    }
  }
  complete(message) {
    this.waitingForChunks--;
    console.log("Outstanding chunks count at: " + this.waitingForChunks);
    if (this.waitingForChunks == 0)
      console.log("All chunks received");
  }
};

In order to keep track of the number of workers outstanding, we use a simple counter. Because the main section of code is single threaded we have no risk of race conditions. Once the counter shows no remaining workers we can take whatever steps are necessary. The web worker looks like the following:

self.addEventListener('message', function (e) {
  var data = e.data;
  var ingredients = data.ingredients;
  combinedIngredient = new Westeros.Potion.CombinedIngredient();
  for (let i = 0; i < ingredients.length; i++) {
    combinedIngredient.Add(ingredients[i]);
  }
  console.log("calculating combination");
  setTimeout(combinationComplete, 2000);
}, false);

function combinationComplete() {
  console.log("combination complete");
  (self).postMessage({ event: 'combinationComplete', result: combinedIngredient });
}

In this case we simply put in a timeout to simulate the complex calculation needed to combine ingredients.

The sub problems that are farmed out to a number of nodes don't have to be identical problems. However, they should be sufficiently complicated that the cost savings of farming them out are not consumed by the overhead of sending out a message.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.129.42.134