Running CPU-bound tasks

The totalSales() API, even though expensive in terms of resources, was not affecting the ability of the server to accept concurrent requests. What we learned about the event loop in Chapter 1, Welcome to the Node.js Platform, should provide an explanation for this behavior: invoking an asynchronous operation causes the stack to unwind back to the event loop, leaving it free to handle other requests.

However, what happens when we run a long, synchronous task that never gives back the control to the event loop? This kind of task is also known as CPU-bound, because its main characteristic is that it is heavy on CPU utilization rather than being heavy on I/O operations.

Let's work immediately on an example to see how these types of task behave in Node.js.

Solving the subset sum problem

Let's now choose a computationally expensive problem to use as a base for our experiment. A good candidate is the subset sum problem that consists of deciding whether a set (or multiset) of integers contains a non-empty subset that has a sum equal to zero. For example, if we had as input the set [1, 2, -4, 5, -3], the subsets satisfying the problem are [1, 2, -3] and [2, -4, 5, -3].

The simplest algorithm is the one that checks every possible combination of subsets of any size, and it has a computational cost of O(2n), or in other words, it grows exponentially with the size of the input. This means that a set of 20 integers would require up to 1,048,576 combinations to be checked, not bad for testing our assumptions. Of course, the solution might be found a lot sooner than that; so, to make things harder, we are going to consider the following variation of the subset sum problem: given a set of integers, we want to calculate all the possible combinations whose sum is equal to a given arbitrary integer.

Let's then work to build such an algorithm; let's create a new module called subsetSum.js. We will start by creating a class called SubsetSum:

const EventEmitter = require('events').EventEmitter; 
 
class SubsetSum extends EventEmitter { 
  constructor(sum, set) { 
    super(); 
    this.sum = sum; 
    this.set = set; 
    this.totalSubsets = 0; 
  } 
//... 

The SubsetSum class is extending from the EventEmitter class; this allows us to produce an event every time we find a new subset matching the sum received as input. As we will see, this will give us a lot of flexibility.

Next, let's see how we can generate all the possible combinations of subsets:

  _combine(set, subset) { 
    for(let i = 0; i < set.length; i++) { 
      let newSubset = subset.concat(set[i]); 
      this._combine(set.slice(i + 1), newSubset); 
      this._processSubset(newSubset); 
    } 
  } 

We will not go into too much detail about the algorithm, but there are two important things to notice:

  • The _combine() method is completely synchronous; it recursively generates every possible subset without ever giving back the control to the event loop. If we think about it, this is perfectly normal for an algorithm not requiring any I/O.
  • Every time a new combination is generated, we provide it to the _processSubset() method for further processing.

The _processSubset() method is responsible for verifying that the sum of the elements of the given subset is equal to the number we are looking for:

  _processSubset(subset) { 
    console.log('Subset', ++this.totalSubsets, subset); 
    const res = subset.reduce((prev, item) => (prev + item), 0); 
    if(res == this.sum) { 
      this.emit('match', subset); 
    } 
  } 

Trivially, the _processSubset() method applies a reduce operation to the subset in order to calculate the sum of its elements. Then, it emits an event of type 'match' when the resulting sum is equal to the one we are interested in finding (this.sum).

Finally, the start() method puts all the preceding pieces together:

  start() { 
    this._combine(this.set, []); 
    this.emit('end'); 
  } 

The preceding method triggers the generation of all the combinations by invoking _combine(), and lastly, emits an 'end' event signaling that all the combinations were checked and any possible match has already been emitted. This is possible because _combine() is synchronous; therefore, the 'end' event will be emitted as soon as the function returns, which means that all the combinations were calculated.

Next, we have to expose the algorithm we just created over the network, as always we can use a simple HTTP server for the task. In particular, we want to create an endpoint in the format /subsetSum?data=<Array>&sum=<Integer> that invokes the SubsetSum algorithm with the given array of integers and sum to match.

Let's then implement this simple server in a module named app.js:

const http = require('http'); 
const SubsetSum = require('./subsetSum'); 
 
http.createServer((req, res) => { 
  const url = require('url').parse(req.url, true); 
  if(url.pathname === '/subsetSum') { 
    const data = JSON.parse(url.query.data); 
    res.writeHead(200); 
    const subsetSum = new SubsetSum(url.query.sum, data); 
    subsetSum.on('match', match => { 
      res.write('Match: ' + JSON.stringify(match) + '
'); 
    }); 
    subsetSum.on('end', () => res.end()); 
    subsetSum.start(); 
  } else { 
    res.writeHead(200); 
    res.end('Im alive!
'); 
  } 
}).listen(8000, () => console.log('Started')); 

Thanks to the fact that the SubsetSum object returns its results using events, we can stream the matching subsets as soon as they are generated by the algorithm, in real time. Another detail to mention is that our server responds with the text I'm Alive! every time we hit a URL different from /subsetSum. We will use this for checking the responsiveness of our server, as we will see in a moment.

We are now ready to try our subset sum algorithm. Curious to know how our server will handle it? Let's fire it up then:

node app

As soon as the server starts, we are ready to send our first request; let's try with a set of 17 random numbers, which will result in the generation of 131,071 combinations, a nice amount to keep our server busy for a while:

curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116, 119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]" --data-urlencode "sum=0"

We will start to see the results streaming live from the server, but if we try the following command in another terminal while the first request is still running, we will spot a huge problem:

curl -G http://localhost:8000

We will immediately see that this last request hangs until the subset sum algorithm of the first request has finished; the server is unresponsive! This was kind of what we expected. The Node.js event loop runs in a single thread, and if this thread is blocked by a long synchronous computation, it will be unable to execute even one more cycle in order to respond with a simple I'm alive!

We quickly understand that this behavior does not work for any kind of application meant to serve multiple requests. But don't despair in Node.js, we can tackle this type of situation in several ways. Let's analyze the two most important ones.

Interleaving with setImmediate

Usually, a CPU-bound algorithm is built upon a set of steps. It can be a set of recursive invocations, a loop, or any variation/combination of those. So, a simple solution to our problem would be to give back the control to the event loop after each one of these steps completes (or after a certain number of them). This way, any pending I/O can still be processed by the event loop in those intervals where the long-running algorithm yields the CPU. A simple way to achieve this is to schedule the next step of the algorithm to run after any pending I/O requests. This sounds like the perfect use case for the setImmediate() function (we already introduced this API in Chapter 2, Node.js Essential Patterns).

Note

Pattern

Interleave the execution of a long-running synchronous task with setImmediate().

Interleaving the steps of the subset sum algorithm

Let's now see how this pattern applies to the subset sum algorithm. All we have to do is slightly modify the subsetSum.js module. For convenience, we are going to create a new module called subsetSumDefer.js, taking the code of the original subsetSum class as a starting point.

The first change we are going to make is to add a new method called _combineInterleaved(), which is the core of the pattern we are implementing:

_combineInterleaved(set, subset) { 
  this.runningCombine++; 
  setImmediate(() => { 
    this._combine(set, subset); 
    if(--this.runningCombine === 0) { 
      this.emit('end'); 
    } 
  }); 
}   

As we can see, all we had to do is defer the invocation of the original (synchronous) _combine() method with setImmediate(). However, now it becomes more difficult to know when the function has finished generating all the combinations because the algorithm is not synchronous anymore. To fix this, we have to keep track of all the running instances of the _combine() method using a pattern very similar to the asynchronous parallel execution we have seen in Chapter 3, Asynchronous Control Flow Patterns with Callbacks. When all the instances of the _combine() method have finished running, we can emit the end event notifying any listener that the process has completed.

To finalize the refactoring of the subset sum algorithm, we need a couple more tweaks. First, we need to replace the recursive step in the _combine() method with its deferred counterpart:

  _combine(set, subset) { 
    for(let i = 0; i < set.length; i++) { 
      let newSubset = subset.concat(set[i]); 
      this._combineInterleaved(set.slice(i + 1), newSubset); 
      this._processSubset(newSubset); 
    } 
  } 

With the preceding change, we make sure that each step of the algorithm will be queued in the event loop using setImmediate() and therefore executed after any pending I/O request instead of running synchronously.

The other small tweak is in the start() method:

  start() { 
    this.runningCombine = 0; 
    this._combineInterleaved(this.set, []); 
  } 

In the preceding code, we initialize the number of running instances of the _combine() method to 0. We also replaced the call to _combine() with a call to _combineInterleaved() and removed the emission of the 'end' event, because now this is handled asynchronously in _combineInterleaved().

With this last change, our subset sum algorithm should now be able to run its CPU-bound code in steps interleaved by intervals where the event loop can run and process any other pending request.

The last missing bit is updating the app.js module so that it can use the new version of the SubsetSum API. This is actually a trivial change:

const http = require('http'); 
//const SubsetSum = require('./subsetSum'); 
const SubsetSum = require('./subsetSumDefer'); 
 
http.createServer(function(req, res) { 
  // ... 

We are now ready to try this new version of the subset sum server. Let's start the app module by using the following command:

node app

Then, try to send a request again to calculate all the subsets matching a given sum:

curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116, 119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]" --data-urlencode "sum=0"

While the request is running, we might now want to see whether the server is responsive:

curl -G http://localhost:8000

Cool! The second request now should return immediately, even while a SubsetSum task is running, confirming that our pattern is working well.

Considerations on the interleaving pattern

As we saw, running a CPU-bound task while preserving the responsiveness of an application is not that complicated, it just requires the use of setImmediate() to schedule the next step of an algorithm after any pending I/O. However, this is not the best pattern in terms of efficiency; in fact, deferring a task introduces a small overhead that, multiplied by all the steps that an algorithm has to run, can have a significant impact. This is usually the last thing we want when running a CPU-bound task, especially if we have to return the result directly to the user, which should happen in a reasonable amount of time. A possible solution to mitigate the problem would be using setImmediate() only after a certain number of steps—instead of using it at every single step—but still this would not solve the root of the problem.

Bear in mind that this does not mean that the pattern we have just seen should be avoided at all costs, in fact, if we look at the bigger picture, a synchronous task does not necessarily have to be extremely long and complex to create troubles. In a busy server, even a task that blocks the event loop for 200 milliseconds can create undesirable delays. In those situations where the task is executed sporadically or in the background and does not have to run for too long, using setImmediate() to interleave its execution is probably the simplest and most effective way to avoid blocking the event loop.

Note

process.nextTick() cannot be used to interleave a long-running task. As we saw in Chapter 1,  Welcome to the Node.js Platform, nextTick() schedules an operation before any pending I/O, and this can eventually cause I/O starvation in the case of repeated calls. You can verify that by yourself by replacing setImmediate() with process.nextTick() in the previous sample. You might also want to know that this behavior was introduced with Node.js 0.10; in fact, with Node.js 0.8, process.nextTick() can still be used as an interleaving mechanism. Take a look at this GitHub issue to know more about the history and motivations of this change at https://github.com/joyent/node/issues/3335.

Using multiple processes

Deferring the steps of an algorithm is not the only option we have for running CPU-bound tasks; another pattern for preventing the event loop from blocking is using child processes. We already know that Node.js gives its best when running I/O-intensive applications such as web servers, which allows us to optimize resource utilization thanks to its asynchronous architecture.

So, the best way we have to maintain the responsiveness of an application is to not run expensive CPU-bound tasks in the context of the main application and instead, use separate processes. This has three main advantages:

  • The synchronous task can run at full speed, without the need to interleave the steps of its execution
  • Working with processes in Node.js is simple, probably easier than modifying an algorithm to use setImmediate(), and allows us to easily use multiple processors without the need to scale the main application itself
  • If we really need maximum performance, the external process might be created in lower-level languages, such as good old C (always use the best tool for the job!)

Node.js has an ample tool belt of APIs for interacting with external processes. We can find all we need in the child_process module. Moreover, when the external process is just another Node.js program, connecting it to the main application is extremely easy and we don't even feel like we are running something external to the local application. The magic happens thanks to the child_process.fork() function, which creates a new child Node.js process and also automatically creates a communication channel with it, allowing us to exchange information using an interface very similar to an EventEmitter. Let's see how this works by refactoring our subset sum server again.

Delegating the subset sum task to other processes

The goal for the refactoring of the SubsetSum task is to create a separate child process responsible for handling the synchronous processing, leaving the event loop of the server free to handle requests coming from the network. This is the recipe we are going to follow to make this possible:

  1. We will create a new module named processPool.js that will allow us to create a pool of running processes. Starting a new process is expensive and requires time, so keeping them constantly running and ready to handle requests allows us to save time and CPU. Also, the pool will help us limit the number of processes running at the same time to avoid exposing the application to denial-of-service (DoS) attacks.
  2. Next, we will create a module called subsetSumFork.js responsible for abstracting a SubsetSum task running in a child process. Its role will be communicating with the child process and exposing the results of the task as if they were coming from the current application.
  3. At last, we need a worker (our child process), a new Node.js program with the only goal of running the subset sum algorithm and forwarding its results to the parent process.

Note

A DoS attack is an attempt to make a machine or network resource unavailable to its intended users, such as to temporarily or indefinitely interrupt or suspend services of a host connected to the Internet.

Implementing a process pool

Let's start by building the processPool.js module piece by piece:

const fork = require('child_process').fork; 
 
class ProcessPool { 
  constructor(file, poolMax) { 
    this.file = file; 
    this.poolMax = poolMax; 
    this.pool = []; 
    this.active = []; 
    this.waiting = []; 
  } 
  //... 

In the first part of the module, we import the child_process.fork() function that we will use to create new processes. Then, we define the ProcessPool constructor that accepts a file parameter representing the Node.js program to run and the maximum number of running instances in the pool (poolMax). We then define three instance variables:

  • pool is the set of running processes ready to be used
  • active contains the list of the processes currently being used
  • waiting contains a queue of callbacks for all those requests that could not be fulfilled immediately because of the lack of an available process

The next piece of the ProcessPool class is the acquire() method, which is responsible for returning a process ready to be used:

acquire(callback) { 
  let worker; 
  if(this.pool.length > 0) {                // [1] 
    worker = this.pool.pop(); 
    this.active.push(worker); 
    return process.nextTick(callback.bind(null, null, worker)); 
  } 
 
  if(this.active.length >= this.poolMax) {  // [2] 
    return this.waiting.push(callback); 
  } 
 
  worker = fork(this.file);                 // [3] 
  this.active.push(worker); 
  process.nextTick(callback.bind(null, null, worker)); 
} 

Its logic is very simple and is explained as follows:

  1. If we have a process in pool ready to be used, we simply move it to the active list and then return it by invoking callback (in a deferred fashion... remember Zalgo?).
  2. If there are no available processes in pool and we already have reached the maximum number of running processes, we have to wait for one to be available. We achieve this by queuing the current callback in the waiting list.
  3. If we haven't yet reached the maximum number of running processes, we will create a new one using child_process.fork(), add it to the active list, and then return it to the caller using the callback.

The last method of the ProcessPool class is release(), whose purpose is to put a process back in pool:

  release(worker) { 
    if(this.waiting.length > 0) {                          // [1] 
      const waitingCallback = this.waiting.shift(); 
      waitingCallback(null, worker); 
    } 
    this.active = this.active.filter(w => worker !==  w);  // [2] 
    this.pool.push(worker); 
  } 

The preceding code is also very simple and its explanation is as follows:

  • If there is a request in the waiting list, we simply reassign the worker being released by passing it to the callback at the head of the waiting queue.
  • Otherwise, we remove the worker from the active list and put it back into pool.

As we can see, the processes are never stopped but just reassigned, allowing us to save time by not restarting them at each request. However, it's important to observe that this might not always be the best choice and this greatly depends on the requirements of our application. Possible tweaks for reducing long-term memory usage and adding robustness to our process pool are as follows:

  • Terminate idle processes to free memory after a certain time of inactivity
  • Add a mechanism to kill non-responsive processes or restart those that have simply crashed

But in this example, we will keep the implementation of our process pool simple, as the details we might want to add are really endless.

Communicating with a child process

Now that our ProcessPool class is ready, we can use it to implement the SubsetSumFork wrapper whose role is to communicate with the worker and expose the results it produces. As we said, starting a process with child_process.fork() also gives us a simple message-based communication channel, so let's see how this works by implementing the subsetSumFork.js module:

const EventEmitter = require('events').EventEmitter; 
const ProcessPool = require('./processPool'); 
const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2); 
 
class SubsetSumFork extends EventEmitter { 
  constructor(sum, set) { 
    super(); 
    this.sum = sum; 
    this.set = set; 
  } 
 
  start() { 
    workers.acquire((err, worker) => {               // [1] 
      worker.send({sum: this.sum, set: this.set}); 
 
      const onMessage = msg => { 
        if (msg.event === 'end') {                   // [3] 
          worker.removeListener('message', onMessage); 
          workers.release(worker); 
        } 
 
        this.emit(msg.event, msg.data);             // [4] 
      }; 
 
      worker.on('message', onMessage);             // [2] 
    }); 
  } 
} 
module.exports = SubsetSumFork; 

The first thing to notice is that we initialized a ProcessPool object using a file named subsetSumWorker.js as the target which represents our child worker. We also set the maximum capacity of the pool to 2.

Another point worth mentioning is that we tried to maintain the same public API of the original SubsetSum class. In fact, SubsetSumFork is an EventEmitter whose constructor accepts sum and set, while the start() method triggers the execution of the algorithm, which runs on a separate process this time. This is what happens when the start() method is invoked:

  1. We try to acquire a new child process from the pool. When this happens, we immediately use the worker handle to send the child process a message with the input of the job to run. The send() API is provided automatically by Node.js to all processes that start with child_process.fork(), this is essentially the communication channel that we were talking about.
  2. We then start listening for any message returned from the worker process, using the on() method to attach a new listener (this is also a part of the communication channel provided by all processes that start with child_process.fork()).
  3. In the listener, we first check whether we received an end event, which means that the SubsetSum task has finished, in which case we remove the onMessage listener and release the worker, putting it back into the pool.
  4. The worker produces messages in the format {event, data} allowing us to seamlessly re-emit any event produced by the child process.

That's it for the SubsetSumFork wrapper; let's now implement the worker application.

Note

It is good to know that the send() method available on a child process instance can also be used to propagate a socket handle from the main application to a child process (look at the documentation at: http://nodejs.org/api/child_process.html#child_process_child_send_message_sendhandle). This is actually the technique used by the cluster module to distribute the load of an HTTP server across multiple processes (as of Node.js 0.10). We will see this in more detail in the next chapter.

Communicating with the parent process

Let's now create the subsetSumWorker.js module, our worker application, the entire content of this module will run in a separate process:

const SubsetSum = require('./subsetSum'); 
 
process.on('message', msg => {                   // [1] 
  const subsetSum = new SubsetSum(msg.sum, msg.set); 
 
  subsetSum.on('match', data => {                // [2] 
    process.send({event: 'match', data: data}); 
  }); 
 
  subsetSum.on('end', data => { 
    process.send({event: 'end', data: data}); 
  }); 
 
  subsetSum.start(); 
}); 

We can immediately see that we are reusing the original (and synchronous) SubsetSum as it is. Now that we are in a separate process, we don't have to worry about blocking the event loop anymore, all the HTTP requests will continue to be handled by the event loop of the main application, without disruptions.

When the worker is started as a child process, this is what happens:

  1. It immediately starts listening for messages coming from the parent process. This can be easily done with the process.on() function (also, a part of the communication API provided when the process starts using child_process.fork()). The only message we expect from the parent process is the one providing the input to a new SubsetSum task. As soon as such a message is received, we create a new instance of a SubsetSum class and register the listeners for the match and end events. Lastly, we start the computation with subsetSum.start().
  2. Every time an event is received from the running algorithm, we wrap it in an object with the format, {event, data}, and send it to the parent process. These messages are then handled in the subsetSumFork.js module, as we have seen in the previous section.

As we can see, we just had to wrap the algorithm we already built, without modifying its internals. This clearly shows that any portion of an application can be easily put in an external process by simply using the pattern we have just seen.

Tip

When the child process is not a Node.js program, the simple communication channel we just described is not available. In these situations, we can still establish an interface with the child process by implementing our own protocol on top of the standard input and standard output streams, which are exposed to the parent process.

To find out more about all the capabilities of the child_process API, you can refer to the official Node.js documentation at http://nodejs.org/api/child_process.html.

Considerations on the multiprocess pattern

As always, to try this new version of the subset sum algorithm, we simply have to replace the module used by the HTTP server (file app.js):

const http = require('http'); 
//const SubsetSum = require('./subsetSum'); 
//const SubsetSum = require('./subsetSumDefer'); 
const SubsetSum = require('./subsetSumFork'); 
//... 

We can now start the server again and try to send a sample request:

curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116,119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]" --data-urlencode "sum=0"

Similar to the interleaving pattern we have seen before, with this new version of the subsetSum module the event loop is not blocked while running the CPU-bound task. This can be confirmed by sending another concurrent request as follows:

curl -G http://localhost:8000

The preceding command line should immediately return a string as follows:

I'm alive!

More interestingly, we can also try to start two subsetSum tasks concurrently, we can see that they will use the full power of two different processors in order to run (if our system has more than one processor, of course). Instead, if we try to run three subsetSum tasks concurrently, the result should be that the last one to start will hang. This is not because the event loop of the main process is blocked, but because we set a concurrency limit of two processes for the subsetSum task, which means that the third request will be handled as soon as at least one of the two processes in the pool becomes available again.

As we saw, the multiprocess pattern is definitely more powerful and flexible than the interleaving pattern; however, it's still not scalable as the amount of resources offered by a single machine is still a hard limit. The answer in this case is to distribute the load across multiple machines, but this is another story and falls under the category of distributed architectural patterns which we will explore in the next chapters.

Tip

It is worth mentioning that threads can be a possible alternative to processes when running CPU-bound tasks. Currently, there are a few npm packages that expose an API for working with threads to userland modules; one of the most popular is webworker-threads (https://npmjs.org/package/webworker-threads). However, even if threads are more lightweight, fully-fledged processes can offer more flexibility and a better level of isolation in the case of problems such as freezing or crashing.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.163.197