Using plain JavaScript

Now that we have met our first example of callback hell, we know what we should definitely avoid; however, that's not the only concern when writing asynchronous code. In fact, there are several situations where controlling the flow of a set of asynchronous tasks requires the use of specific patterns and techniques, especially if we are only using plain JavaScript without the aid of any external library. For example, iterating over a collection by applying an asynchronous operation in sequence is not as easy as invoking forEach() over an array, but it actually requires a technique similar to a recursion.

In this section, we will learn not only about how to avoid callback hell but also how to implement some of the most common control flow patterns using only simple and plain JavaScript.

Callback discipline

When writing asynchronous code, the first rule to keep in mind is to not abuse closures when defining callbacks. It can be tempting to do so, because it does not require any additional thinking for problems such as modularization and reusability; however, we have seen how this can have more disadvantages than advantages. Most of the time, fixing the callback hell problem does not require any libraries, fancy techniques, or change of paradigm, but just some common sense.

These are some basic principles that can help us keep the nesting level low and improve the organization of our code in general:

  • You must exit as soon as possible. Use return, continue, or break, depending on the context, to immediately exit the current statement instead of writing (and nesting) complete if...else statements. This will help keep our code shallow.
  • You need to create named functions for callbacks, keeping them out of closures and passing intermediate results as arguments. Naming our functions will also make them look better in stack traces.
  • You need to modularize the code. Split the code into smaller, reusable functions whenever it's possible.

Applying the callback discipline

To demonstrate the power of the earlier mentioned principles, let's apply them to fix the callback hell problem in our web spider application.

For the first step, we can refactor our error-checking pattern by removing the else statement. This is made possible by returning from the function immediately after we receive an error. So, instead of having code such as the following:

if(err) { 
  callback(err); 
} else { 
  //code to execute when there are no errors 
} 

We can improve the organization of our code by writing the following one instead:

if(err) { 
  return callback(err); 
} 
//code to execute when there are no errors 

With this simple trick, we immediately have a reduction of the nesting level of our functions; it is easy and doesn't require any complex refactoring.

Note

A common mistake when executing the optimization we just described is forgetting to terminate the function after the callback is invoked. For the error-handling scenario, the following code is a typical source of defects:

if(err) { 
  callback(err);
} //code to execute when there are no errors.

We should never forget that the execution of our function will continue even after we invoke the callback. It is then important to insert a return instruction to block the execution of the rest of the function. Also note that it doesn't really matter what output is returned by the function; the real result (or error) is produced asynchronously and passed to the callback. The return value of the asynchronous function is usually ignored. This property allows us to write shortcuts such as the following:

return callback(...)

Otherwise we'd have to write slightly more verbose ones such as the following:

callback(...)
return; 

As a second optimization for our spider() function, we can try to identify reusable pieces of code. For example, the functionality that writes a given string to a file can be easily factored out into a separate function as follows:

function saveFile(filename, contents, callback) { 
  mkdirp(path.dirname(filename), err => { 
    if(err) { 
      return callback(err); 
    } 
    fs.writeFile(filename, contents, callback); 
  }); 
} 

Following the same principle, we can create a generic function named download() which takes a URL and a filename as input, and downloads the URL into the given file. Internally, we can use the saveFile() function we created earlier.

function download(url, filename, callback) { 
  console.log(`Downloading ${url}`); 
  request(url, (err, response, body) => { 
    if(err) { 
      return callback(err); 
    } 
    saveFile(filename, body, err => { 
      if(err) { 
        return callback(err); 
      } 
      console.log(`Downloaded and saved: ${url}`); 
      callback(null, body); 
    }); 
  });   
} 

For the last step, we modify the spider() function, which, thanks to our changes, will now look like the following:

function spider(url, callback) { 
  const filename = utilities.urlToFilename(url); 
  fs.exists(filename, exists => { 
    if(exists) { 
      return callback(null, filename, false); 
    } 
    download(url, filename, err => { 
      if(err) { 
        return callback(err); 
      } 
      callback(null, filename, true); 
    }) 
  }); 
} 

The functionality and the interface of the spider() function remain exactly the same; what changed is only the way the code was organized. By applying the basic principles that we discussed, we were able to drastically reduce the nesting of our code and at the same time increase its reusability and testability. In fact, we could think of exporting both saveFile() and download(), so that we can reuse them in other modules. This also allows us to test their functionality more easily.

The refactoring we carried out in this section clearly demonstrates that most of the time, all we need is just some discipline to make sure we do not abuse closures and anonymous functions. It works brilliantly, requires minimal effort, and just uses plain JavaScript.

Sequential execution

We now begin our exploration of the asynchronous control flow patterns. We will start by analyzing the sequential execution flow.

Executing a set of tasks in sequence means running them one at a time, one after the other. The order of execution matters and must be preserved, because the result of a task in the list may affect the execution of the next. The following image illustrates this concept:

Sequential execution

There are different variations of this flow:

  • Executing a set of known tasks in sequence, without chaining or propagating results
  • Using the output of a task as the input for the next (also known as chain, pipeline, or waterfall)
  • Iterating over a collection while running an asynchronous task on each element, one after the other

Sequential execution, despite being trivial when implemented using the direct style blocking API, is usually the main cause of the callback hell problem when using asynchronous CPS.

Executing a known set of tasks in sequence

We already met a sequential execution while implementing the spider() function in the previous section. By applying the simple rules that we studied, we were able to organize a set of known tasks in a sequential execution flow. Taking that code as a guideline, we can then generalize the solution with the following pattern:

function task1(callback) { 
  asyncOperation(() => { 
    task2(callback); 
  }); 
} 
 
function task2(callback) { 
  asyncOperation(result () => { 
    task3(callback); 
  }); 
} 
 
function task3(callback) { 
  asyncOperation(() => { 
    callback(); //finally executes the callback 
  }); 
} 
 
task1(() => { 
  //executed when task1, task2 and task3 are completed 
  console.log('tasks 1, 2 and 3 executed');
}); 

The preceding pattern shows how each task invokes the next upon the completion of a generic asynchronous operation. The pattern puts the emphasis on the modularization of tasks, showing how closures are not always necessary to handle asynchronous code.

Sequential iteration

The pattern we described earlier works perfectly if we know in advance what and how many tasks are to be executed. This allows us to hardcode the invocation of the next task in the sequence; but what happens if we want to execute an asynchronous operation for each item in a collection? In cases such as this, we cannot hardcode the task sequence anymore; instead, we have to build it dynamically.

Web spider version 2

To show an example of sequential iteration, let's introduce a new feature to the web spider application. We now want to download all the links contained in a web page recursively. To do that, we are going to extract all the links from the page and then trigger our web spider on each one of them recursively and in sequence.

The first step is modifying our spider() function so that it triggers a recursive download of all the links of a page by using a function named spiderLinks(), which we are going to create shortly.

Also, instead of checking if the file already exists, we now try to read it, and start spidering its links; this way, we are able to resume interrupted downloads. As a final change, we make sure we propagate a new parameter, nesting, which helps us limit the recursion depth. The resultant code is as follows:

function spider(url, nesting, callback) { 
  const filename = utilities.urlToFilename(url); 
  fs.readFile(filename, 'utf8', (err, body) => { 
    if(err) { 
      if(err.code ! == 'ENOENT') { 
        return callback(err); 
      } 
 
      return download(url, filename, (err, body) => { 
        if(err) { 
          return callback(err); 
        } 
        spiderLinks(url, body, nesting, callback); 
      }); 
    } 
 
    spiderLinks(url, body, nesting, callback); 
  }); 
} 

Sequential crawling of links

Now we can create the core of this new version of our web spider application, the spiderLinks() function, which downloads all the links of an HTML page using a sequential asynchronous iteration algorithm. Pay attention to the way we are going to define that in the following code block:

function spiderLinks(currentUrl, body, nesting, callback) { 
  if(nesting === 0) { 
    return process.nextTick(callback); 
  } 
  const links = utilities.getPageLinks(currentUrl, body);  //[1] 
  function iterate(index) {                                //[2] 
    if(index === links.length) { 
      return callback(); 
    } 
 
    spider(links[index], nesting - 1, err => {             //[3] 
      if(err) { 
        return callback(err); 
      } 
      iterate(index + 1); 
    }); 
  } 
  iterate(0);                                              //[4] 
} 

The important steps to understand from this new function are as follows:

  1. We obtain the list of all the links contained in the page using the utilities.getPageLinks() function. This function returns only the links pointing to an internal destination (same hostname).
  2. We iterate over the links using a local function called iterate(), which takes the index of the next link to analyze. In this function, the first thing we do is check if the index is equal to the length of the links array, in which case we immediately invoke the callback() function, as it means we processed all the items.
  3. At this point, everything should be ready for processing the link. We invoke the spider() function by decreasing the nesting level and invoking the next step of the iteration when the operation completes.
  4. As the last step in the spiderLinks() function, we bootstrap the iteration by invoking iterate(0).

The algorithm we just presented allows us to iterate over an array by executing an asynchronous operation in sequence, which in our case is the spider() function.

We can now try this new version of the spider application and watch it download all the links of a web page recursively, one after the other. To interrupt the process, which can take a while if there are many links, remember that we can always use Ctrl + C. If we then decide to resume it, we can do so by launching the spider application and providing the same URL we used for the first run.

Note

Now that our web spider application might potentially trigger the download of an entire website, please consider using it carefully. For example, do not set a high nesting level or leave the spider running for more than a few seconds. It is not polite to overload a server with thousands of requests. In some circumstances this can also be considered illegal. Spider responsibly!

The pattern

The code of the spiderLinks() function that we showed previously is a clear example of how it's possible to iterate over a collection while applying an asynchronous operation. We can also notice that it's a pattern that can be adapted to any other situation where we have the need to iterate asynchronously in sequence over the elements of a collection or in general over a list of tasks. This pattern can be generalized as follows:

function iterate(index) { 
  if(index === tasks.length)  { 
    return finish(); 
  } 
  const task = tasks[index]; 
  task(function() { 
    iterate(index + 1); 
  }); 
} 
 
function finish() { 
  //iteration completed 
} 
 
iterate(0); 

Note

It's important to notice that these types of algorithm become really recursive if task() is a synchronous operation. In such a case, the stack will not unwind at every cycle and there might be a risk of hitting the maximum call stack size limit.

The pattern we just presented is very powerful as it can adapt to several situations; for example, we can map the values of an array or we can pass the results of an operation to the next one in the iteration to implement a reduce algorithm, we can quit the loop prematurely if a particular condition is met, or we can even iterate over an infinite number of elements.

We could also choose to generalize the solution even further by wrapping it into a function having a signature such as the following:

iterateSeries(collection, iteratorCallback, finalCallback) 

We leave this to you as an exercise.

Note

Pattern (sequential iterator)

Execute a list of tasks in sequence by creating a function named iterator, which invokes the next available task in the collection and makes sure to invoke the next step of the iteration when the current task completes.

Parallel execution

There are some situations where the order of execution of a set of asynchronous tasks is not important and all we want is just to be notified when all those running tasks are completed. Such situations are better handled using a parallel execution flow, as shown in the following diagram:

Parallel execution

This may sound strange if we consider that Node.js is single threaded, but if we remember what we discussed in Chapter 1, Welcome to the Node.js Platform, we realize that even though we have just one thread, we can still achieve concurrency, thanks to the non-blocking nature of Node.js. In fact, the word parallel is used improperly in this case, as it does not mean that the tasks run simultaneously, but rather that their execution is carried out by an underlying non-blocking API and interleaved by the event loop.

As we know, a task gives control back to the event loop when it requests a new asynchronous operation allowing the event loop to execute another task. The proper word to use for this kind of flow is concurrency, but we will still use parallel for simplicity.

The following diagram shows how two asynchronous tasks can run in parallel in a Node.js program:

Parallel execution

In the previous image, we have a Main function that executes two asynchronous tasks:

  1. The Main function triggers the execution of Task 1 and Task 2. As these trigger an asynchronous operation, they immediately return the control back to the Main function, which then returns it to the event loop.
  2. When the asynchronous operation of Task 1 is completed, the event loop gives control to it. When Task 1 completes its internal synchronous processing as well, it notifies the Main function.
  3. When the asynchronous operation triggered by Task 2 is completed, the event loop invokes its callback, giving the control back to Task 2. At the end of Task 2, the Main function is again notified. At this point, the Main function knows that both Task 1 and Task 2 are complete, so it can continue its execution or return the results of the operations to another callback.

In short, this means that in Node.js, we can only execute in parallel asynchronous operations, because their concurrency is handled internally by the non-blocking APIs. In Node.js, synchronous (blocking) operations cannot run concurrently unless their execution is interleaved with an asynchronous operation, or deferred with setTimeout() or setImmediate(). We will see this in more detail in Chapter 9, Advanced Asynchronous Recipes.

Web spider version 3

Our web spider application seems like a perfect candidate to apply the concept of parallel execution. So far, our application is executing the recursive download of the linked pages in a sequential fashion. We can easily improve the performance of this process by downloading all the linked pages in parallel.

To do that, we just need to modify the spiderLinks() function to make sure to spawn all the spider() tasks at once, and then invoke the final callback only when all of them have completed their execution. So let's modify our spiderLinks() function as follows:

function spiderLinks(currentUrl, body, nesting, callback) { 
  if(nesting === 0) { 
    return process.nextTick(callback); 
  } 
  const links = utilities.getPageLinks(currentUrl, body); 
  if(links.length === 0) { 
    return process.nextTick(callback); 
  } 
 
  let completed = 0, hasErrors = false; 
 
  function done(err) { 
    if(err) { 
      hasErrors = true; 
      return callback(err); 
    } 
    if(++completed === links.length && !hasErrors) { 
      return callback(); 
    } 
  } 
 
  links.forEach(link => { 
    spider(link, nesting - 1, done); 
  }); 
} 

Let's explain what we changed. As we mentioned earlier, the spider() tasks are now started all at once. This is possible by simply iterating over the links array and starting each task without waiting for the previous one to finish:

  links.forEach(link => { 
    spider(link, nesting - 1, done); 
  }); 

Then, the trick to make our application wait for all the tasks to complete is to provide the spider() function with a special callback, which we call done(). The done() function increases a counter when a spider task completes. When the number of completed downloads reaches the size of the links array, the final callback is invoked:

function done(err) { 
  if(err) { 
    hasErrors = true; 
    return callback(err); 
  } 
  if(++completed === links.length && !hasErrors) { 
    callback(); 
  } 
} 

With these changes in place, if we now try to run our spider against a web page, we will notice a huge improvement in the speed of the overall process, as every download is carried out in parallel without waiting for the previous link to be processed.

The pattern

Also, for the parallel execution flow, we can extract our nice little pattern, which we can adapt and reuse for different situations. We can represent a generic version of the pattern with the following code:

const tasks = [ /* ... */ ]; 
let completed = 0; 
tasks.forEach(task => { 
  task(() => { 
    if(++completed === tasks.length) { 
      finish(); 
    } 
  }); 
}); 
 
function finish() { 
  //all the tasks completed 
} 

With small modifications, we can adapt the pattern to accumulate the results of each task into a collection, to filter or map the elements of an array, or to invoke the finish() callback as soon as one or a given number of tasks complete (this last situation in particular is called competitive race).

Note

Pattern (unlimited parallel execution)

Run a set of asynchronous tasks in parallel by spawning them all at once, and then wait for all of them to complete by counting the number of times their callbacks are invoked.

Fixing race conditions with concurrent tasks

Running a set of tasks in parallel can cause issues when using blocking I/O in combination with multiple threads. However, we have just seen that in Node.js this is a totally different story; running multiple asynchronous tasks in parallel is in fact straightforward and cheap in terms of resources. This is one of the most important strengths for Node.js, because it makes parallelization a common practice rather than a complex technique to only use when strictly necessary.

Another important characteristic of the concurrency model of Node.js is the way we deal with task synchronization and race conditions. In multithreaded programming, this is usually done using constructs such as locks, mutexes, semaphores, and monitors, and it can be one of the most complex aspects of parallelization, which has considerable impact on performances as well. In Node.js, we usually don't need a fancy synchronization mechanism, as everything runs on a single thread! However, this doesn't mean that we can't have race conditions; on the contrary, they can be quite common. The root of the problem is the delay between the invocation of an asynchronous operation and the notification of its result. To make a concrete example, we can refer again to our web spider application, in particular, the last version we created, which actually contains a race condition (can you spot it?).

The problem we are talking about lies in the spider() function, where we check if a file already exists, before starting to download the corresponding URL:

function spider(url, nesting, callback) { 
  const filename = utilities.urlToFilename(url); 
  fs.readFile(filename, 'utf8', (err, body) => { 
    if(err) { 
      if(err.code !== 'ENOENT') { 
        return callback(err); 
      } 
 
      return download(url, filename, function(err, body) { 
//... 

The problem is that, two spider tasks operating on the same URL might invoke fs.readFile() on the same file before one of the two tasks completes the download and creates a file, causing both tasks to start a download. This situation is shown in the following diagram:

Fixing race conditions with concurrent tasks

The preceding diagram shows how Task 1 and Task 2 are interleaved in the single thread of Node.js and how an asynchronous operation can actually introduce a race condition. In our case, the two spider tasks end up downloading the same file.

How can we fix that? The answer is much simpler than we might think. In fact, all we need is a variable to mutually exclude multiple spider() tasks running on the same URL. This can be achieved with some code such as the following:

const spidering = new Map(); 
function spider(url, nesting, callback) { 
  if(spidering.has(url)) { 
    return process.nextTick(callback); 
  } 
  spidering.set(url, true); 
 
//... 

The fix does not require many comments. We simply exit the function immediately if the flag for the given url is set in the spidering map; otherwise, we set the flag and continue with the download. For our case, we don't need to release the lock, as we are not interested in downloading a URL twice, even if the spider tasks are executed at two completely different points in time.

Race conditions can cause many problems, even if we are in a single-threaded environment. In some circumstances, they can lead to data corruption and are usually very hard to debug because of their ephemeral nature. So, it's always good practice to double check for this type of situation when running tasks in parallel.

Limited parallel execution

Often, spawning parallel tasks without control can lead to an excessive load. Imagine having thousands of files to read, URLs to access, or database queries to run in parallel. A common problem in such situations is running out of resources, for example, by utilizing all the file descriptors available for an application when trying to open too many files at once. In a web application, it may also create a vulnerability that is exploitable with Denial of Service (DoS) attacks. In all such situations, it is a good idea to limit the number of tasks that can run at the same time. This way, we can add some predictability to the load of our server and also make sure that our application will not run out of resources. The following diagram describes a situation where we have five tasks that run in parallel with a concurrency limit of 2:

Limited parallel execution

From the preceding figure, it should be clear how our algorithm will work:

  1. Initially, we spawn as many tasks as we can without exceeding the concurrency limit.
  2. Then, every time a task is completed, we spawn one or more tasks until we don't reach the limit again.

Limiting the concurrency

We now present a pattern to execute a set of given tasks in parallel with limited concurrency:

const tasks = ... 
let concurrency = 2, running = 0, completed = 0, index = 0; 
function next() {                                             //[1] 
  while(running < concurrency && index < tasks.length) { 
    task = tasks[index++]; 
    task(() => {                                              //[2] 
      if(completed === tasks.length) { 
        return finish(); 
      } 
      completed++, running--; 
      next(); 
    }); 
    running++; 
  } 
} 
next(); 
 
function finish() { 
  //all tasks finished 
} 

This algorithm can be considered a mix between a sequential execution and a parallel execution. In fact, we might notice similarities with both the patterns we presented earlier in the chapter:

  1. We have an iterator function, which we called next(), and then an inner loop that spawns in parallel as many tasks as possible while staying within the concurrency limit.
  2. The next important part is the callback we pass to each task, which checks if we completed all the tasks in the list. If there are still tasks to run, it invokes next() to spawn another bunch of tasks.

Pretty simple, isn't it?

Globally limiting the concurrency

Our web spider application is perfect for applying what we learned about limiting the concurrency of a set of tasks. In fact, to avoid the situation in which we have thousands of links crawled at the same time, we can enforce a limit on the concurrency of this process by adding some predictability on the number of concurrent downloads.

Note

Node.js versions before 0.11 already limit the number of concurrent HTTP connections per host to 5. This can, however, be changed to accommodate our needs. Find out more in the official docs at http://nodejs.org/docs/v0.10.0/api/http.html#http_agent_maxsockets. Starting from Node.js 0.11, there is no default limit on the number of concurrent connections.

We could apply the pattern we just learned to our spiderLinks() function, but what we would obtain is only limiting the concurrency of a set of links found within one single page. If we chose, for example, a concurrency of 2, we would have at most two links downloaded in parallel for each page. However, as we can download multiple links at once, each page would then spawn another two downloads, causing the grand total of download operations to grow exponentially anyway.

Queues to the rescue

What we really want then, is to limit the global number of download operations we can have running in parallel. We could slightly modify the pattern showed before, but we prefer to leave this as an exercise for you, as we want to take this opportunity to introduce another mechanism, which makes use of queues to limit the concurrency of multiple tasks. Let's see how this works.

We are now going to implement a simple class named TaskQueue, which will combine a queue with the algorithm we presented before. Let's create a new module named taskQueue.js:

class TaskQueue { 
  constructor(concurrency) { 
    this.concurrency = concurrency; 
    this.running = 0; 
    this.queue = []; 
  } 
 
  pushTask(task) { 
    this.queue.push(task); 
    this.next(); 
  } 
 
  next() { 
    while(this.running < this.concurrency && this.queue.length) { 
      const task = this.queue.shift(); 
      task(() => { 
        this.running--; 
        this.next(); 
      }); 
      this.running++; 
    } 
  } 
}; 

The constructor of this class takes as input only the concurrency limit, but besides that, it initializes the variables running and queue. The former variable is a counter used to keep track of all the running tasks, while the latter is the array that will be used as a queue to store the pending tasks.

The pushTask() method simply adds a new task to the queue and then bootstraps the execution of the worker by invoking this.next().

The next() method spawns a set of tasks from the queue, ensuring that it does not exceed the concurrency limit.

We might notice that this method has some similarities with the pattern that limits the concurrency we presented earlier. It essentially starts as many tasks from the queue as possible, without exceeding the concurrency limit. When each task is complete, it updates the count of running tasks and then starts another round of tasks by invoking next() again. The interesting property of the TaskQueue class is that it allows us to dynamically add new items to the queue. The other advantage is that now we have a central entity responsible for the limitation of the concurrency of our tasks, which can be shared across all the instances of a function's execution. In our case, it's the spider() function, as we will see in a moment.

Web spider version 4

Now that we have our generic queue to execute tasks in a limited parallel flow, let's use it straightaway in our web spider application. Let's first load the new dependency and create a new instance of the TaskQueue class by setting the concurrency limit to 2:

const TaskQueue = require('./taskQueue'); 
const downloadQueue = new TaskQueue(2); 

Next, we need to update the spiderLinks() function so that it can use the newly created downloadQueue:

function spiderLinks(currentUrl, body, nesting, callback) { 
  if(nesting === 0) { 
    return process.nextTick(callback); 
  } 
 
  const links = utilities.getPageLinks(currentUrl, body); 
  if(links.length === 0) { 
    return process.nextTick(callback); 
  } 
 
  let completed = 0, hasErrors = false; 
  links.forEach(link => { 
    downloadQueue.pushTask(done => { 
      spider(link, nesting - 1, err => { 
        if(err) { 
          hasErrors= true; 
          return callback(err); 
        } 
        if(++completed === links.length && !hasErrors) { 
          callback(); 
        } 
        done(); 
      }); 
    }); 
  }); 
} 

This new implementation of the function is extremely easy, and it's very similar to the algorithm for unlimited parallel execution, which we presented earlier in the chapter. This is because we are delegating the concurrency control to the TaskQueue object, and the only thing we have to do is to check when all our tasks are complete. The only interesting part in the preceding code is how we defined our tasks:

  • We run the spider() function by providing a custom callback.
  • In the callback, we check if all the tasks relative to this execution of the spiderLinks() function are completed. When this condition is true, we invoke the final callback of the spiderLinks() function.
  • At the end of our task, we invoke the done() callback so that the queue can continue its execution.

After we have applied these small changes, we can now try to run the spider module again. This time, we should notice that no more than two downloads will be active at the same time.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.44.182