The totalSales()
API, even though expensive in terms of resources, was not affecting the ability of the server to accept concurrent requests. What we learned about the event loop in
Chapter 1, Welcome to the Node.js Platform, should provide an explanation for this behavior: invoking an asynchronous operation causes the stack to unwind back to the event loop, leaving it free to handle other requests.
However, what happens when we run a long, synchronous task that never gives back the control to the event loop? This kind of task is also known as CPU-bound, because its main characteristic is that it is heavy on CPU utilization rather than being heavy on I/O operations.
Let's work immediately on an example to see how these types of task behave in Node.js.
Let's now choose a computationally expensive problem to use as a base for our experiment. A good candidate is the subset sum problem that consists of deciding whether a set (or multiset) of integers contains a non-empty subset that has a sum equal to zero. For example, if we had as input the set [1, 2, -4, 5, -3], the subsets satisfying the problem are [1, 2, -3] and [2, -4, 5, -3].
The simplest algorithm is the one that checks every possible combination of subsets of any size, and it has a computational cost of O(2n), or in other words, it grows exponentially with the size of the input. This means that a set of 20 integers would require up to 1,048,576 combinations to be checked, not bad for testing our assumptions. Of course, the solution might be found a lot sooner than that; so, to make things harder, we are going to consider the following variation of the subset sum problem: given a set of integers, we want to calculate all the possible combinations whose sum is equal to a given arbitrary integer.
Let's then work to build such an algorithm; let's create a new module called subsetSum.js
. We will start by creating a class called SubsetSum
:
const EventEmitter = require('events').EventEmitter; class SubsetSum extends EventEmitter { constructor(sum, set) { super(); this.sum = sum; this.set = set; this.totalSubsets = 0; } //...
The SubsetSum
class is extending from the EventEmitter
class; this allows us to produce an event every time we find a new subset matching the sum received as input. As we will see, this will give us a lot of flexibility.
Next, let's see how we can generate all the possible combinations of subsets:
_combine(set, subset) { for(let i = 0; i < set.length; i++) { let newSubset = subset.concat(set[i]); this._combine(set.slice(i + 1), newSubset); this._processSubset(newSubset); } }
We will not go into too much detail about the algorithm, but there are two important things to notice:
_combine()
method is completely synchronous; it recursively generates every possible subset without ever giving back the control to the event loop. If we think about it, this is perfectly normal for an algorithm not requiring any I/O._processSubset()
method for further processing.The _processSubset()
method is responsible for verifying that the sum of the elements of the given subset is equal to the number we are looking for:
_processSubset(subset) { console.log('Subset', ++this.totalSubsets, subset); const res = subset.reduce((prev, item) => (prev + item), 0); if(res == this.sum) { this.emit('match', subset); } }
Trivially, the _processSubset()
method applies a reduce
operation to the subset in order to calculate the sum of its elements. Then, it emits an event of type 'match'
when the resulting sum is equal to the one we are interested in finding (this.sum
).
Finally, the start()
method puts all the preceding pieces together:
start() { this._combine(this.set, []); this.emit('end'); }
The preceding method triggers the generation of all the combinations by invoking _combine()
, and lastly, emits an 'end'
event signaling that all the combinations were checked and any possible match has already been emitted. This is possible because _combine()
is synchronous; therefore, the 'end'
event will be emitted as soon as the function returns, which means that all the combinations were calculated.
Next, we have to expose the algorithm we just created over the network, as always we can use a simple HTTP server for the task. In particular, we want to create an endpoint in the format /subsetSum?data=<Array>&sum=<Integer>
that invokes the SubsetSum
algorithm with the given array of integers and sum to match.
Let's then implement this simple server in a module named app.js
:
const http = require('http');
const SubsetSum = require('./subsetSum');
http.createServer((req, res) => {
const url = require('url').parse(req.url, true);
if(url.pathname === '/subsetSum') {
const data = JSON.parse(url.query.data);
res.writeHead(200);
const subsetSum = new SubsetSum(url.query.sum, data);
subsetSum.on('match', match => {
res.write('Match: ' + JSON.stringify(match) + '
');
});
subsetSum.on('end', () => res.end());
subsetSum.start();
} else {
res.writeHead(200);
res.end('Im alive!
');
}
}).listen(8000, () => console.log('Started'));
Thanks to the fact that the SubsetSum
object returns its results using events, we can stream the matching subsets as soon as they are generated by the algorithm, in real time. Another detail to mention is that our server responds with the text I'm Alive!
every time we hit a URL different from /subsetSum
. We will use this for checking the responsiveness of our server, as we will see in a moment.
We are now ready to try our subset sum algorithm. Curious to know how our server will handle it? Let's fire it up then:
node app
As soon as the server starts, we are ready to send our first request; let's try with a set of 17 random numbers, which will result in the generation of 131,071 combinations, a nice amount to keep our server busy for a while:
curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116, 119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]" --data-urlencode "sum=0"
We will start to see the results streaming live from the server, but if we try the following command in another terminal while the first request is still running, we will spot a huge problem:
curl -G http://localhost:8000
We will immediately see that this last request hangs until the subset sum algorithm of the first request has finished; the server is unresponsive! This was kind of what we expected. The Node.js event loop runs in a single thread, and if this thread is blocked by a long synchronous computation, it will be unable to execute even one more cycle in order to respond with a simple I'm alive!
We quickly understand that this behavior does not work for any kind of application meant to serve multiple requests. But don't despair in Node.js, we can tackle this type of situation in several ways. Let's analyze the two most important ones.
Usually, a CPU-bound algorithm is built upon a set of steps. It can be a set of recursive invocations, a loop, or any variation/combination of those. So, a simple solution to our problem would be to give back the control to the event loop after each one of these steps completes (or after a certain number of them). This way, any pending I/O can still be processed by the event loop in those intervals where the long-running algorithm yields the CPU. A simple way to achieve this is to schedule the next step of the algorithm to run after any pending I/O requests. This sounds like the perfect use case for the setImmediate()
function (we already introduced this API in
Chapter 2, Node.js Essential Patterns).
Let's now see how this pattern applies to the subset sum algorithm. All we have to do is slightly modify the subsetSum.js
module. For convenience, we are going to create a new module called subsetSumDefer.js
, taking the code of the original subsetSum
class as a starting point.
The first change we are going to make is to add a new method called _combineInterleaved()
, which is the core of the pattern we are implementing:
_combineInterleaved(set, subset) { this.runningCombine++; setImmediate(() => { this._combine(set, subset); if(--this.runningCombine === 0) { this.emit('end'); } }); }
As we can see, all we had to do is defer the invocation of the original (synchronous) _combine()
method with setImmediate()
. However, now it becomes more difficult to know when the function has finished generating all the combinations because the algorithm is not synchronous anymore. To fix this, we have to keep track of all the running instances of the _combine()
method using a pattern very similar to the asynchronous parallel execution we have seen in
Chapter 3, Asynchronous Control Flow Patterns with Callbacks. When all the instances of the _combine()
method have finished running, we can emit the end
event notifying any listener that the process has completed.
To finalize the refactoring of the subset sum algorithm, we need a couple more tweaks. First, we need to replace the recursive step in the _combine()
method with its deferred counterpart:
_combine(set, subset) {
for(let i = 0; i < set.length; i++) {
let newSubset = subset.concat(set[i]);
this._combineInterleaved(set.slice(i + 1), newSubset);
this._processSubset(newSubset);
}
}
With the preceding change, we make sure that each step of the algorithm will be queued in the event loop using setImmediate()
and therefore executed after any pending I/O request instead of running synchronously.
The other small tweak is in the start()
method:
start() { this.runningCombine = 0; this._combineInterleaved(this.set, []); }
In the preceding code, we initialize the number of running instances of the _combine()
method to 0. We also replaced the call to _combine()
with a call to _combineInterleaved()
and removed the emission of the 'end'
event, because now this is handled asynchronously in _combineInterleaved()
.
With this last change, our subset sum algorithm should now be able to run its CPU-bound code in steps interleaved by intervals where the event loop can run and process any other pending request.
The last missing bit is updating the app.js
module so that it can use the new version of the SubsetSum
API. This is actually a trivial change:
const http = require('http');
//const SubsetSum = require('./subsetSum');
const SubsetSum = require('./subsetSumDefer');
http.createServer(function(req, res) {
// ...
We are now ready to try this new version of the subset sum server. Let's start the app
module by using the following command:
node app
Then, try to send a request again to calculate all the subsets matching a given sum:
curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116, 119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]" --data-urlencode "sum=0"
While the request is running, we might now want to see whether the server is responsive:
curl -G http://localhost:8000
Cool! The second request now should return immediately, even while a SubsetSum
task is running, confirming that our pattern is working well.
As we saw, running a CPU-bound task while preserving the responsiveness of an application is not that complicated, it just requires the use of setImmediate()
to schedule the next step of an algorithm after any pending I/O. However, this is not the best pattern in terms of efficiency; in fact, deferring a task introduces a small overhead that, multiplied by all the steps that an algorithm has to run, can have a significant impact. This is usually the last thing we want when running a CPU-bound task, especially if we have to return the result directly to the user, which should happen in a reasonable amount of time. A possible solution to mitigate the problem would be using setImmediate()
only after a certain number of steps—instead of using it at every single step—but still this would not solve the root of the problem.
Bear in mind that this does not mean that the pattern we have just seen should be avoided at all costs, in fact, if we look at the bigger picture, a synchronous task does not necessarily have to be extremely long and complex to create troubles. In a busy server, even a task that blocks the event loop for 200 milliseconds can create undesirable delays. In those situations where the task is executed sporadically or in the background and does not have to run for too long, using setImmediate()
to interleave its execution is probably the simplest and most effective way to avoid blocking the event loop.
process.nextTick()
cannot be used to interleave a long-running task. As we saw in Chapter 1, Welcome to the Node.js Platform, nextTick()
schedules an operation before any pending I/O, and this can eventually cause I/O starvation in the case of repeated calls. You can verify that by yourself by replacing setImmediate()
with process.nextTick()
in the previous sample. You might also want to know that this behavior was introduced with Node.js 0.10; in fact, with Node.js 0.8, process.nextTick()
can still be used as an interleaving mechanism. Take a look at this GitHub issue to know more about the history and motivations of this change at https://github.com/joyent/node/issues/3335.
Deferring the steps of an algorithm is not the only option we have for running CPU-bound tasks; another pattern for preventing the event loop from blocking is using child processes. We already know that Node.js gives its best when running I/O-intensive applications such as web servers, which allows us to optimize resource utilization thanks to its asynchronous architecture.
So, the best way we have to maintain the responsiveness of an application is to not run expensive CPU-bound tasks in the context of the main application and instead, use separate processes. This has three main advantages:
setImmediate()
, and allows us to easily use multiple processors without the need to scale the main application itselfNode.js has an ample tool belt of APIs for interacting with external processes. We can find all we need in the child_process
module. Moreover, when the external process is just another Node.js program, connecting it to the main application is extremely easy and we don't even feel like we are running something external to the local application. The magic happens thanks to the child_process.fork()
function, which creates a new child Node.js process and also automatically creates a communication channel with it, allowing us to exchange information using an interface very similar to an EventEmitter
. Let's see how this works by refactoring our subset sum server again.
The goal for the refactoring of the SubsetSum
task is to create a separate child process responsible for handling the synchronous processing, leaving the event loop of the server free to handle requests coming from the network. This is the recipe we are going to follow to make this possible:
processPool.js
that will allow us to create a pool of running processes. Starting a new process is expensive and requires time, so keeping them constantly running and ready to handle requests allows us to save time and CPU. Also, the pool will help us limit the number of processes running at the same time to avoid exposing the application to denial-of-service (DoS) attacks.subsetSumFork.js
responsible for abstracting a SubsetSum
task running in a child process. Its role will be communicating with the child process and exposing the results of the task as if they were coming from the current application.Let's start by building the processPool.js
module piece by piece:
const fork = require('child_process').fork; class ProcessPool { constructor(file, poolMax) { this.file = file; this.poolMax = poolMax; this.pool = []; this.active = []; this.waiting = []; } //...
In the first part of the module, we import the child_process.fork()
function that we will use to create new processes. Then, we define the ProcessPool
constructor that accepts a file
parameter representing the Node.js program to run and the maximum number of running instances in the pool (poolMax
). We then define three instance variables:
pool
is the set of running processes ready to be usedactive
contains the list of the processes currently being usedwaiting
contains a queue of callbacks for all those requests that could not be fulfilled immediately because of the lack of an available processThe next piece of the ProcessPool
class is the acquire()
method, which is responsible for returning a process ready to be used:
acquire(callback) { let worker; if(this.pool.length > 0) { // [1] worker = this.pool.pop(); this.active.push(worker); return process.nextTick(callback.bind(null, null, worker)); } if(this.active.length >= this.poolMax) { // [2] return this.waiting.push(callback); } worker = fork(this.file); // [3] this.active.push(worker); process.nextTick(callback.bind(null, null, worker)); }
Its logic is very simple and is explained as follows:
pool
ready to be used, we simply move it to the active
list and then return it by invoking callback
(in a deferred fashion... remember Zalgo?).pool
and we already have reached the maximum number of running processes, we have to wait for one to be available. We achieve this by queuing the current callback in the waiting
list.child_process.fork()
, add it to the active
list, and then return it to the caller using the callback
.The last method of the ProcessPool
class is release()
, whose purpose is to put a process back in pool
:
release(worker) { if(this.waiting.length > 0) { // [1] const waitingCallback = this.waiting.shift(); waitingCallback(null, worker); } this.active = this.active.filter(w => worker !== w); // [2] this.pool.push(worker); }
The preceding code is also very simple and its explanation is as follows:
waiting
list, we simply reassign the worker
being released by passing it to the callback at the head of the waiting
queue.active
list and put it back into pool
.As we can see, the processes are never stopped but just reassigned, allowing us to save time by not restarting them at each request. However, it's important to observe that this might not always be the best choice and this greatly depends on the requirements of our application. Possible tweaks for reducing long-term memory usage and adding robustness to our process pool are as follows:
But in this example, we will keep the implementation of our process pool simple, as the details we might want to add are really endless.
Now that our ProcessPool
class is ready, we can use it to implement the SubsetSumFork
wrapper whose role is to communicate with the worker and expose the results it produces. As we said, starting a process with child_process.fork()
also gives us a simple message-based communication channel, so let's see how this works by implementing the subsetSumFork.js
module:
const EventEmitter = require('events').EventEmitter; const ProcessPool = require('./processPool'); const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2); class SubsetSumFork extends EventEmitter { constructor(sum, set) { super(); this.sum = sum; this.set = set; } start() { workers.acquire((err, worker) => { // [1] worker.send({sum: this.sum, set: this.set}); const onMessage = msg => { if (msg.event === 'end') { // [3] worker.removeListener('message', onMessage); workers.release(worker); } this.emit(msg.event, msg.data); // [4] }; worker.on('message', onMessage); // [2] }); } } module.exports = SubsetSumFork;
The first thing to notice is that we initialized a ProcessPool
object using a file named subsetSumWorker.js
as the target which represents our child worker. We also set the maximum capacity of the pool to 2
.
Another point worth mentioning is that we tried to maintain the same public API of the original SubsetSum
class. In fact, SubsetSumFork
is an EventEmitter
whose constructor accepts sum
and set
, while the start()
method triggers the execution of the algorithm, which runs on a separate process this time. This is what happens when the start()
method is invoked:
worker
handle to send the child process a message with the input of the job to run. The send()
API is provided automatically by Node.js to all processes that start with child_process.fork()
, this is essentially the communication channel that we were talking about.on()
method to attach a new listener (this is also a part of the communication channel provided by all processes that start with child_process.fork()
).end
event, which means that the SubsetSum
task has finished, in which case we remove the onMessage
listener and release the worker
, putting it back into the pool.{event, data}
allowing us to seamlessly re-emit any event produced by the child process.That's it for the SubsetSumFork
wrapper; let's now implement the worker application.
It is good to know that the send()
method available on a child process instance can also be used to propagate a socket handle from the main application to a child process (look at the documentation at: http://nodejs.org/api/child_process.html#child_process_child_send_message_sendhandle). This is actually the technique used by the cluster
module to distribute the load of an HTTP server across multiple processes (as of Node.js 0.10). We will see this in more detail in the next chapter.
Let's now create the subsetSumWorker.js
module, our worker application, the entire content of this module will run in a separate process:
const SubsetSum = require('./subsetSum'); process.on('message', msg => { // [1] const subsetSum = new SubsetSum(msg.sum, msg.set); subsetSum.on('match', data => { // [2] process.send({event: 'match', data: data}); }); subsetSum.on('end', data => { process.send({event: 'end', data: data}); }); subsetSum.start(); });
We can immediately see that we are reusing the original (and synchronous) SubsetSum
as it is. Now that we are in a separate process, we don't have to worry about blocking the event loop anymore, all the HTTP requests will continue to be handled by the event loop of the main application, without disruptions.
When the worker is started as a child process, this is what happens:
process.on()
function (also, a part of the communication API provided when the process starts using child_process.fork()
). The only message we expect from the parent process is the one providing the input to a new SubsetSum
task. As soon as such a message is received, we create a new instance of a SubsetSum
class and register the listeners for the match
and end
events. Lastly, we start the computation with subsetSum.start()
.{event, data}
, and send it to the parent process. These messages are then handled in the subsetSumFork.js
module, as we have seen in the previous section.As we can see, we just had to wrap the algorithm we already built, without modifying its internals. This clearly shows that any portion of an application can be easily put in an external process by simply using the pattern we have just seen.
When the child process is not a Node.js program, the simple communication channel we just described is not available. In these situations, we can still establish an interface with the child process by implementing our own protocol on top of the standard input and standard output streams, which are exposed to the parent process.
To find out more about all the capabilities of the child_process
API, you can refer to the official Node.js documentation at http://nodejs.org/api/child_process.html.
As always, to try this new version of the subset sum algorithm, we simply have to replace the module used by the HTTP server (file app.js
):
const http = require('http');
//const SubsetSum = require('./subsetSum');
//const SubsetSum = require('./subsetSumDefer');
const SubsetSum = require('./subsetSumFork');
//...
We can now start the server again and try to send a sample request:
curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116,119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]" --data-urlencode "sum=0"
Similar to the interleaving pattern we have seen before, with this new version of the subsetSum
module the event loop is not blocked while running the CPU-bound task. This can be confirmed by sending another concurrent request as follows:
curl -G http://localhost:8000
The preceding command line should immediately return a string as follows:
I'm alive!
More interestingly, we can also try to start two subsetSum
tasks concurrently, we can see that they will use the full power of two different processors in order to run (if our system has more than one processor, of course). Instead, if we try to run three subsetSum
tasks concurrently, the result should be that the last one to start will hang. This is not because the event loop of the main process is blocked, but because we set a concurrency limit of two processes for the subsetSum
task, which means that the third request will be handled as soon as at least one of the two processes in the pool becomes available again.
As we saw, the multiprocess pattern is definitely more powerful and flexible than the interleaving pattern; however, it's still not scalable as the amount of resources offered by a single machine is still a hard limit. The answer in this case is to distribute the load across multiple machines, but this is another story and falls under the category of distributed architectural patterns which we will explore in the next chapters.
It is worth mentioning that threads can be a possible alternative to processes when running CPU-bound tasks. Currently, there are a few npm packages that expose an API for working with threads to userland modules; one of the most popular is webworker-threads
(https://npmjs.org/package/webworker-threads). However, even if threads are more lightweight, fully-fledged processes can offer more flexibility and a better level of isolation in the case of problems such as freezing or crashing.
3.137.163.197