5

Asynchronous Control Flow Patterns with Promises and Async/Await

Callbacks are the low-level building blocks of asynchronous programming in Node.js, but they are far from being developer-friendly. In fact, in the last chapter, we learned techniques to implement different control flow constructs using callbacks, and we can say that they are quite complex and verbose compared to the (low) level of complexity of the tasks they try to accomplish. In particular, serial execution flow, which is the predominant control flow structure in most of the code we write, can easily lead an untrained developer to write code affected by the callback hell problem. On top of that, even if properly implemented, a serial execution flow seems needlessly complicated and error-prone. Let's also remember how fragile error management with callbacks is; if we forget to forward an error, then it just gets lost, and if we forget to catch any exception thrown by some synchronous code, then the program crashes. And all of this without considering that Zalgo is always breathing down our necks.

Node.js and JavaScript have been criticized for many years for the lack of a native solution to a problem so common and ubiquitous. Luckily, over the years, the community has worked on new solutions to the problem and finally, after many iterations, discussions, and years of waiting, today we have a proper solution to the "callback issue."

The first step toward a better asynchronous code experience is the promise, an object that "carries" the status and the eventual result of an asynchronous operation. A promise can be easily chained to implement serial execution flows and can be moved around like any other object. Promises simplify asynchronous code a lot; however, there was still room for improvement. So, in an attempt to make the ubiquitous serial execution flow as simple as possible, a new construct was introduced, called async/await, which can finally make asynchronous code look like synchronous code.

In today's modern Node.js programming, async/await is the preferred construct to use when dealing with asynchronous code. However, async/await is built on top of promises, as much as promises are built on top of callbacks. So, it's important that we know and master all of them in order to tackle our asynchronous programming problems with the right approach.

In this chapter, you will learn the following:

  • How promises work and how to use them effectively to implement the main control flow constructs we already know about.
  • The async/await syntax, which will become our main tool for dealing with asynchronous code in Node.js.

By the end of the chapter, you will have learned about the two most important components that we have in JavaScript for taming asynchronous code. So, let's get started by discovering promises.

Promises

Promises are part of the ECMAScript 2015 standard (or ES6, which is why they are also called ES6 promises) and have been natively available in Node.js since version 4. But the history of promises goes back a few years earlier, when there were dozens of implementations around, initially with different features and behavior. Eventually, the majority of those implementations settled on a standard called Promises/A+.

Promises represent a big step ahead toward providing a robust alternative to continuation-passing style callbacks for propagating an asynchronous result. As we will see, the use of promises will make all the major asynchronous control flow constructs easier to read, less verbose, and more robust compared to their callback-based alternatives.

What is a promise?

A Promise is an object that embodies the eventual result (or error) of an asynchronous operation. In promises jargon, we say that a Promise is pending when the asynchronous operation is not yet complete, it's fulfilled when the operation successfully completes, and rejected when the operation terminates with an error. Once a Promise is either fulfilled or rejected, it's considered settled.

To receive the fulfillment value or the error (reason) associated with the rejection, we can use the then() method of a Promise instance. The following is its signature:

promise.then(onFulfilled, onRejected)

In the preceding signature, onFulfilled is a callback that will eventually receive the fulfillment value of the Promise, and onRejected is another callback that will receive the reason for the rejection (if any). Both are optional.

To have an idea of how promises can transform our code, let's consider the following callback-based code:

asyncOperation(arg, (err, result) => {
  if(err) {
    // handle the error
  }
  // do stuff with the result
})

Promises allow us to transform this typical continuation-passing style code into a better structured and more elegant code, such as the following:

asyncOperationPromise(arg)
  .then(result => {
    // do stuff with result
  }, err => {
    // handle the error
  })

In the code above, asyncOperationPromise() is returning a Promise, which we can then use to receive the fulfillment value or the rejection reason of the eventual result of the function. So far, it seems that there is nothing major going on, but one crucial property of the then() method is that it synchronously returns another Promise.

Moreover, if any of the onFulfilled or onRejected functions return a value x, the Promise returned by the then() method will:

  • Fulfill with x if x is a value
  • Fulfill with the fulfillment value of x if x is a Promise
  • Reject with the eventual rejection reason of x if x is a Promise

This behavior allows us to build chains of promises, allowing easy aggregation and arrangement of asynchronous operations into several configurations. Moreover, if we don't specify an onFulfilled or onRejected handler, the fulfillment value or rejection reason is automatically forwarded to the next promise in the chain. This allows us, for example, to automatically propagate errors across the whole chain until they are caught by an onRejected handler. With a Promise chain, the sequential execution of tasks suddenly becomes a trivial operation:

asyncOperationPromise(arg)
  .then(result1 => {
    // returns another promise
    return asyncOperationPromise(arg2)
  })
  .then(result2 => {
    // returns a value
    return 'done'
  })
  .then(undefined, err => {
    // any error in the chain is caught here
  })

The following diagram provides another perspective on how a Promise chain works:

Figure 5.1: Promise chain execution flow

Figure 5.1 shows how our program flows when we use a chain of promises. When we invoke then() on Promise A we synchronously receive Promise B as a result and when we invoke then() on Promise B we synchronously receive Promise C as a result. Eventually, when Promise A settles, it will either fulfill or reject, which results in the invocation of either the onFulfilled() or the onRejected() callback respectively. The result of the execution of such a callback will then fulfill or reject Promise B and such a result is, in turn, propagated to the onFulfilled() or the onRejected() callback passed to the then() invocation on Promise B. The execution continues similarly for Promise C and any other promise that follows in the chain.

An important property of promises is that the onFulfilled() and onRejected() callbacks are guaranteed to be invoked asynchronously and at most once, even if we resolve the Promise synchronously with a value. Not only that, the onFulfilled() and onRejected() callbacks will be invoked asynchronously even if the Promise object is already settled at the moment in which then() is called. This behavior shields our code against all those situations where we could unintentionally release Zalgo (see Chapter 3Callbacks and Events), making our asynchronous code more consistent and robust without any extra effort.

Now comes the best part. If an exception is thrown (using the throw statement) in the onFulfilled() or onRejected() handler, the Promise returned by the then() method will automatically reject, with the exception that was thrown provided as the rejection reason. This is a tremendous advantage over CPS, as it means that with promises, exceptions will propagate automatically across the chain, and the throw statement becomes finally usable.

Promises/A+ and thenables

Historically, there have been many different implementations of promises, and most of them were not compatible with each other, meaning that it was not possible to create chains between Promise objects coming from libraries that were using different Promise implementations.

The JavaScript community worked very hard to address this limitation and those efforts led to the creation of the Promises/A+ specification. This specification details the behavior of the then() method, providing an interoperable base, which makes Promise objects from different libraries able to work with each other out of the box. Today, the majority of Promise implementations use this standard, including the native Promise object of JavaScript and Node.js.

For a detailed overview of the Promises/A+ specification, you can refer to the official website at nodejsdp.link/promises-aplus.

As a result of the adoption of the Promises/A+ standard, many Promise implementations, including the native JavaScript Promise API, will consider any object with a then() method a Promise-like object, also called thenable. This behavior allows different Promise implementations to interact with each other seamlessly.

The technique of recognizing (or typing) objects based on their external behavior, rather than their actual type, is called duck typing and is widely used in JavaScript.

The promise API

Let's now take a quick look at the API of the native JavaScript Promise. This is just an overview to give you an idea of what we can do with promises, so don't worry if things are not so clear at this point yet; we will have the chance to use most of these APIs throughout the book.

The Promise constructor (new Promise((resolve, reject) => {})) creates a new Promise instance that fulfills or rejects based on the behavior of the function provided as an argument. The function provided to the constructor will receive two arguments:

  • resolve(obj): This is a function that, when invoked, will fulfill the Promise with the provided fulfillment value, which will be obj if obj is a value. It will be the fulfillment value of obj if obj is a Promise or a thenable.
  • reject(err): This rejects the Promise with the reason err. It is a convention for err to be an instance of Error.

Now, let's take a look at the most important static methods of the Promise object:

  • Promise.resolve(obj): This method creates a new Promise from another Promise, a thenable, or a value. If a Promise is passed, then that Promise is returned as it is. If a thenable is provided, then it's converted to the Promise implementation in use. If a value is provided, then the Promise will be fulfilled with that value.
  • Promise.reject(err): This method creates a Promise that rejects with err as the reason.
  • Promise.all(iterable): This method creates a Promise that fulfills with an array of fulfillment values when every item in the input iterable (such as an Array) object fulfills. If any Promise in the iterable object rejects, then the Promise returned by Promise.all() will reject with the first rejection reason. Each item in the iterable object can be a Promise, a generic thenable, or a value.
  • Promise.allSettled(iterable): This method waits for all the input promises to fulfill or reject and then returns an array of objects containing the fulfillment value or the rejection reason for each input Promise. Each output object has a status property, which can be equal to 'fulfilled' or 'rejected', and a value property containing the fulfillment value, or a reason property containing the rejection reason. The difference with Promise.all() is that Promise.allSettled() will always wait for each Promise to either fulfill or reject, instead of immediately rejecting when one of the promises rejects.
  • Promise.race(iterable): This method returns a Promise that is equivalent to the first Promise in iterable that settles.

Finally, the following are the main methods available on a Promise instance:

  • promise.then(onFulfilled, onRejected): This is the essential method of a Promise. Its behavior is compatible with the Promises/A+ standard that we mentioned before.
  • promise.catch(onRejected): This method is just syntactic sugar (nodejsdp.link/syntactic-sugar) for promise.then(undefined, onRejected).
  • promise.finally(onFinally): This method allows us to set up an onFinally callback, which is invoked when the Promise is settled (either fulfilled or rejected). Unlike onFulfilled and onRejected, the onFinally callback will not receive any argument as input and any value returned from it will be ignored. The Promise returned by finally will settle with the same fulfillment value or rejection reason of the current Promise instance. There is only once exception to all this, which is the case in which we throw inside the onFinally callback or return a rejected Promise. In this case, the returned Promise will reject with the error that is thrown or the rejection reason of the rejected Promise returned.

Let's now see an example of how we can create a Promise from scratch using its constructor.

Creating a promise

Let's now see how we can create a Promise using its constructor. Creating a Promise from scratch is a low-level operation and it's usually required when we need to convert an API that uses another asynchronous style (such as a callback-based style). Most of the time we—as developers—are consumers of promises produced by other libraries and most of the promises we create will come from the then() method. Nonetheless, in some advanced scenarios, we need to manually create a Promise using its constructor.

To demonstrate how to use the Promise constructor, let's create a function that returns a Promise that fulfills with the current date after a specified number of milliseconds. Let's take a look at it:

function delay (milliseconds) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(new Date())
    }, milliseconds)
  })
}

As you probably already guessed, we used setTimeout() to invoke the resolve() function of the Promise constructor. We can notice how the entire body of the function is wrapped by the Promise constructor; this is a frequent code pattern you will see when creating a Promise from scratch.

The delay() function we just created can then be used with some code like the following:

console.log(`Delaying...${new Date().getSeconds()}s`)
delay(1000)
  .then(newDate => {
    console.log(`Done ${newDate.getSeconds()}s`)
  })

The console.log() within the then() handler will be executed approximately after 1 second from the invocation of delay().

The Promises/A+ specification states that the onFulfilled and onRejected callbacks of the then() method have to be invoked only once and exclusively (only one or the other is invoked). A compliant promises implementation makes sure that even if we call resolve or reject multiple times, the Promise is either fulfilled or rejected only once.

Promisification

When some characteristics of a callback-based function are known in advance, it's possible to create a function that transforms such a callback-based function into an equivalent function returning a Promise. This transformation is called promisification.

For example, let's consider the conventions used in Node.js-style callback-based functions:

  • The callback is the last argument of the function
  • The error (if any) is always the first argument passed to the callback
  • Any return value is passed after the error to the callback

Based on these rules, we can easily create a generic function that promisifies a Node.js-style callback-based function. Let's see what this function looks like:

function promisify (callbackBasedApi) {
  return function promisified (...args) {
    return new Promise((resolve, reject) => {              // (1)
      const newArgs = [
        ...args,
        function (err, result) {                           // (2)
          if (err) {
            return reject(err)
          }
          resolve(result)
        }
      ]
      callbackBasedApi(...newArgs)                         // (3)
    })
  }
}

The preceding function returns another function called promisified(), which represents the promisified version of the callbackBasedApi given as the input. This is how it works:

  1. The promisified() function creates a new Promise using the Promise constructor and immediately returns it to the caller.
  2. In the function passed to the Promise constructor, we make sure to pass to callbackBasedApi a special callback. Since we know that the callback always comes last, we simply append it to the arguments list (args) provided to the promisified() function. In the special callback, if we receive an error, we immediately reject the Promise; otherwise, we resolve it with the given result.
  3. Finally, we simply invoke callbackBasedApi with the list of arguments we have built.

Now, let's promisify a Node.js function using our newly created promisify() function. We can use the randomBytes() function of the core crypto module, which produces a buffer containing the specified number of random bytes. The randomBytes() function accepts a callback as the last argument and it follows the conventions we already know very well. Let's see what this looks like:

import { randomBytes } from 'crypto'
const randomBytesP = promisify(randomBytes)
randomBytesP(32)
  .then(buffer => {
    console.log(`Random bytes: ${buffer.toString()}`)
  })

The previous code should print some gibberish to the console; that's because not all generated bytes have a corresponding printable character.

The promisification function we created here is just for educational purposes and it's missing a few features, such as the ability to deal with callbacks returning more than one result. In real life, we would use the promisify() function of the util core module to promisify our Node.js-style callback-based functions. You can take a look at its documentation at nodejsdp.link/promisify.

Sequential execution and iteration

We now know enough to convert the web spider application that we created in the previous chapter to use promises. Let's start directly from version 2, the one downloading the links of a webpage in sequence.

We can access an already promisified version of the core fs API through the promises object of the fs module. For example: import { promises } from 'fs'.

In the spider.js module, the very first step required is to import our dependencies and promisify any callback-based function that we are going to use:

import { promises as fsPromises } from 'fs'                // (1)
import { dirname } from 'path'
import superagent from 'superagent'
import mkdirp from 'mkdirp'
import { urlToFilename, getPageLinks } from './utils.js'
import { promisify } from 'util'
const mkdirpPromises = promisify(mkdirp)                   // (2)

There are two main differences here compared to the spider.js module of the previous chapter:

  1. We import the promises object of the fs module to get access to all the fs functions already promisified.
  2. We manually promisify the mkdirp() function.

Now, we can start converting the download() function:

function download (url, filename) {
  console.log(`Downloading ${url}`)
  let content
  return superagent.get(url)                               // (1)
    .then((res) => {
      content = res.text                                   // (2)
      return mkdirpPromises(dirname(filename))
    })
    .then(() => fsPromises.writeFile(filename, content))
    .then(() => {
      console.log(`Downloaded and saved: ${url}`)
      return content                                       // (3)
    })
}

We can straightaway appreciate the elegance of implementing sequential asynchronous operations with promises. We simply have a clean and very intuitive chain of then() invocations.

Compared to the previous version of the function, this time we are leveraging the out-of-the-box support for promises of the superagent package. Instead of invoking end() on the request object returned by superagent.get(), we simply invoke then() to send the request (1) and receive a Promise that fulfills/rejects with the result of the request.

The final return value of the download() function is the Promise returned by the last then() call in the chain, which fulfills with the content of the webpage (3), which we initialized in the onFulfilled handler of the first then() call (2). This makes sure that the caller receives a Promise that fulfills with content only after all operations (get, mkdirp, and writeFile) have completed.

In the download() function that we've just seen, we have executed a known set of asynchronous operations in sequence. However, in the spiderLinks() function, we will have to deal with a sequential iteration over a dynamic set of asynchronous tasks. Let's see how we can achieve that:

function spiderLinks (currentUrl, content, nesting) {
  let promise = Promise.resolve()                            // (1)
  if (nesting === 0) {
    return promise
  }
  const links = getPageLinks(currentUrl, content)
  for (const link of links) {
    promise = promise.then(() => spider(link, nesting - 1))  // (2)
  }
  return promise
}

To iterate over all the links of a webpage asynchronously, we had to dynamically build a chain of promises as follows:

  1. First, we defined an "empty" Promise, which resolves to undefined. This Promise is used just as the starting point for our chain.
  2. Then, in a loop, we update the promise variable with a new Promise obtained by invoking then() on the previous promise in the chain. This is actually our asynchronous iteration pattern using promises.

At the end of the for loop, the promise variable will contain the promise of the last then() invocation, so it will resolve only when all the promises in the chain have been resolved.

Pattern (sequential iteration with promises)

Dynamically build a chain of promises using a loop.

Now, we can finally convert the spider() function:

export function spider (url, nesting) {
  const filename = urlToFilename(url)
  return fsPromises.readFile(filename, 'utf8')
    .catch((err) => {
      if (err.code !== 'ENOENT') {
        throw err
      }
      // The file doesn't exist, so let's download it
      return download(url, filename)
    })
    .then(content => spiderLinks(url, content, nesting))
}

In this new spider() function, we are using catch() to handle any error produced by readFile(). In particular, if the error has code 'ENOENT', it means that the file doesn't exist yet and therefore we need to download the corresponding URL. The Promise returned from download(), if fulfilled, will return the content at the URL. On the other hand, if the Promise produced by readFile() fulfills, it will skip the catch() handler and go straight to the next then(). In both cases, the onFulfilled handler of the last then() call will always receive the content of the webpage, either coming from the local file or from a fresh download.

Now that we have converted our spider() function as well, we can finally modify the spider-cli.js module:

spider(url, nesting)
  .then(() => console.log('Download complete'))
  .catch(err => console.error(err))

The catch() handler here will intercept any error originating from the entire spider() process.

If we look again at all the code we have written so far, we will be pleasantly surprised by the fact that we haven't included any error propagation logic (as we would be forced to do when using callbacks). This is clearly an enormous advantage, as it greatly reduces the boilerplate in our code and the chances of missing any asynchronous errors.

This completes the implementation of version 2 of our web spider application with promises.

An alternative of the sequential iteration pattern with promises makes use of the reduce() function, for an even more compact implementation:

const promise = tasks.reduce((prev, task) => {
  return prev.then(() => {
    return task()
  })
}, Promise.resolve())

Parallel execution

Another execution flow that becomes trivial with promises is the parallel execution flow. In fact, all that we need to do is use the built-in Promise.all() method. This helper function creates another Promise that fulfills only when all the promises received as input are fulfilled. If there is no causal relationship between those promises (for example, they are not part of the same chain of promises), then they will be executed in parallel.

To demonstrate this, let's consider version 3 of our web spider application, which downloads all the links of a page in parallel. Let's just update the spiderLinks() function again to implement a parallel execution flow using promises:

function spiderLinks (currentUrl, content, nesting) {
  if (nesting === 0) {
    return Promise.resolve()
  }
  const links = getPageLinks(currentUrl, content)
  const promises = links.map(link => spider(link, nesting - 1))
  return Promise.all(promises)
}

The pattern here consists in starting the spider() tasks all at once in the links.map() loop. At the same time, each Promise returned by invoking spider() is collected in the final promises array. The critical difference in this loop—as compared to the sequential iteration loop—is that we are not waiting for the previous spider() task in the list to complete before starting a new one. All the spider() tasks are started in the loop at once, in the same event loop cycle.

Once we have all the promises, we pass them to the Promise.all() method, which returns a new Promise that will be fulfilled when all the promises in the array are fulfilled. In other words, it fulfills when all the download tasks have completed. In addition to that, the Promise returned by Promise.all() will reject immediately if any of the promises in the input array reject. This is exactly what we wanted for this version of our web spider.

Limited parallel execution

So far, promises have not disappointed our expectations. We were able to greatly improve our code for both serial and parallel execution. Now, with limited parallel execution, things should not be that different, considering that this flow is just a combination of serial and parallel execution.

In this section, we will go straight to implementing a solution that allows us to globally limit the concurrency of our web spider tasks. In other words, we are going to implement our solution in a class that we can use to instantiate objects that we can pass around to different functions of the same application. If you are just interested in a simple solution to locally limit the parallel execution of a set of tasks, you can still apply the same principles that we will see in this section to implement a special asynchronous version of Array.map(). We leave this to you as an exercise; you can find more details and hints at the end of this chapter.

For a ready-to-use, production-ready implementation of a map() function supporting promises and limited concurrency, you can rely on the p-map package. Find out more at nodejsdp.link/p-map.

Implementing the TaskQueue class with promises

To globally limit the concurrency of our spider download tasks, we are going to reuse the TaskQueue class we implemented in the previous chapter. Let's start with the next() method, where we trigger the execution of a set of tasks until we reach the concurrency limit:

next () {
  while (this.running < this.concurrency && this.queue.length) {
    const task = this.queue.shift()
    task().finally(() => {
      this.running--
      this.next()
    })
    this.running++
  }
}

The core change in the next() method is where we invoke task(). In fact, now we expect that task() returns a Promise, so all we have to do is invoke finally() on that Promise so we can reset the count of running tasks if it either fulfills or rejects.

Now, we implement a new method called runTask(). This method is responsible for queueing a special wrapper function and also for returning a newly built Promise. Such a Promise will essentially forward the result (fulfillment or rejection) of the Promise eventually returned by task(). Let's see what this method looks like:

runTask (task) {
  return new Promise((resolve, reject) => {                // (1)
    this.queue.push(() => {                                // (2)
      return task().then(resolve, reject)                  // (4)
    })
    process.nextTick(this.next.bind(this))                 // (3)
  })
}

In the method we have just seen:

  1. We create a new Promise using its constructor.
  2. We add a special wrapper function to the tasks queue. This function is going to be executed at a later next() run, when there are enough concurrency slots left.
  3. We invoke next() to trigger a new set of tasks to be run. We defer this to a subsequent run of the event loop to guarantee that task is always invoked asynchronously with respect to when runTask() is invoked. This prevents the problems we described in Chapter 3, Callbacks and Events (for example, Zalgo). In fact, we can notice that in the next() method there is another invocation of next() itself, in the finally() handler, that is always asynchronous.
  4. When the wrapper function we queued is finally run, we execute the task we have received as the input, and we forward its results—fulfilment value or rejection reason—to the outer Promise, the one we return from the runTask() method.

With this, we have completed the implementation of our new TaskQueue class using promises. Next, we'll use this new version of the TaskQueue class to implement version 4 of our web spider.

Updating the web spider

Now it's time to adapt our web spider to implement a limited parallel execution flow using the TaskQueue class we have just created.

First, we need to split the spider() function into two functions, one simply initializing a new TaskQueue object and another actually executing the spidering task, which we will call spiderTask(). Then, we need to update the spiderLinks() function to invoke the newly created spiderTask() function and forward the task queue instance received as an input. Let's see what all this looks like:

function spiderLinks (currentUrl, content, nesting, queue) {
  if (nesting === 0) {
    return Promise.resolve()
  }
  const links = getPageLinks(currentUrl, content)
  const promises = links
    .map(link => spiderTask(link, nesting - 1, queue))
  return Promise.all(promises)                             // (2)
}
const spidering = new Set()
function spiderTask (url, nesting, queue) {
  if (spidering.has(url)) {
    return Promise.resolve()
  }
  spidering.add(url)
  const filename = urlToFilename(url)
  return queue
    .runTask(() => {                                       // (1)
      return fsPromises.readFile(filename, 'utf8')
        .catch((err) => {
          if (err.code !== 'ENOENT') {
            throw err
          }
          // The file doesn't exists, so let's download it
          return download(url, filename)
        })
    })
    .then(content => spiderLinks(url, content, nesting, queue))
}
export function spider (url, nesting, concurrency) {
  const queue = new TaskQueue(concurrency)
  return spiderTask(url, nesting, queue)
}

The crucial instruction in the code we have just seen is where we invoke queue.runTask() (1). Here, the task that we are queuing (and therefore limiting) comprises just the retrieval of the contents of the URL from either the local filesystem or the remote URL location. Only after this task has been run by the queue can we continue to spider the links of the webpage. Note that we are intentionally keeping spiderLinks() outside of the task that we want to limit. This is because spiderLinks() can trigger more spiderTasks() and that would create a deadlock if the depth of the spidering process is higher than the concurrency limit of the queue.

We can also notice how in spiderLinks() we simply continue to use Promise.all() (2) to download all the links of a webpage in parallel. This is because it's the responsibility of our queue to limit the concurrency of the tasks.

In production code, you can use the package p-limit (available at nodejsdp.link/p-limit) to limit the concurrency of a set of tasks. The package essentially implements the pattern we have just shown but wrapped in a slightly different API.

This concludes our exploration of JavaScript promises. Next, we are going to learn about the async/await pair, which will completely revolutionize the way we deal with asynchronous code.

Async/await

As we have just seen, promises are a quantum leap ahead of callbacks. They allow us to write clean and readable asynchronous code and provide a set of safeguards that can only be achieved with boilerplate code when working with callback-based asynchronous code. However, promises are still suboptimal when it comes to writing sequential asynchronous code. The Promise chain is indeed much better than having callback hell, but still, we have to invoke a then() and create a new function for each task in the chain. This is still too much for a control flow that is definitely the most commonly used in everyday programming. JavaScript needed a proper way to deal with the ubiquitous asynchronous sequential execution flow, and the answer arrived with the introduction in the ECMAScript standard of async functions and the await expression (async/await for short).

The async/await dichotomy allows us to write functions that appear to block at each asynchronous operation, waiting for the results before continuing with the following statement. As we will see, any asynchronous code using async/await has a readability comparable to traditional synchronous code.

Today, async/await is the recommended construct for dealing with asynchronous code in both Node.js and JavaScript. However, async/await does not replace all that we have learned so far about asynchronous control flow patterns; on the contrary, as we will see, async/await piggybacks heavily onto promises.

Async functions and the await expression

An async function is a special type of function in which it's possible to use the await expression to "pause" the execution on a given Promise until it resolves. Let's consider a simple example and use the delay() function we implemented in the Creating a promise subsection. The Promise returned by delay() resolves with the current date as the value after the given number of milliseconds. Let's use this function with the async/await pair:

async function playingWithDelays () {
  console.log('Delaying...', new Date())
  const dateAfterOneSecond = await delay(1000)
  console.log(dateAfterOneSecond)
  const dateAfterThreeSeconds = await delay(3000)
  console.log(dateAfterThreeSeconds)
  return 'done'
}

As we can see from the previous function, async/await seems to work like magic. The code doesn't even look like it contains any asynchronous operation. However, don't be mistaken; this function does not run synchronously (they are called async functions for a reason!). At each await expression, the execution of the function is put on hold, its state saved, and the control returned to the event loop. Once the Promise that has been awaited resolves, the control is given back to the async function, returning the fulfilment value of the Promise.

The await expression works with any value, not just promises. If a value other than a Promise is provided, then its behavior is similar to awaiting a value that it first passed to Promise.resolve().

Let's now see how we can invoke our new async function:

playingWithDelays()
  .then(result => {
    console.log(`After 4 seconds: ${result}`)
  })

From the preceding code, it's clear that async functions can be invoked just like any other function. However, the most observant of you may have already spotted another important property of async functions: they always return a Promise. It's like if the return value of an async function was passed to Promise.resolve() and then returned to the caller.

Invoking an async function is instantaneous, like any other asynchronous operation. In other words, async functions return a Promise synchronously. That Promise will then eventually settle based on the result or error produced by the function.

From this first encounter with async/await, we can see how dominant promises still are in our discussion. In fact, we can consider async/await just a syntactic sugar for a simpler consumption of promises. As we will see, all the asynchronous control flow patterns with async/await use promises and their API for most of the heavy-lifting operations.

Error handling with async/await

Async/await doesn't just improve the readability of asynchronous code under standard conditions, but it also helps when handling errors. In fact, one of the biggest gains of async/await is the ability to normalize the behavior of the try...catch block, to make it work seamlessly with both synchronous throws and asynchronous Promise rejections. Let's demonstrate that with an example.

A unified try...catch experience

Let's define a function that returns a Promise that rejects with an error after a given number of milliseconds. This is very similar to the delay() function that we already know very well:

function delayError (milliseconds) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      reject(new Error(`Error after ${milliseconds}ms`))
    }, milliseconds)
  })
}

Next, let's implement an async function that can throw an error synchronously or await a Promise that will reject. This function demonstrates how both the synchronous throw and the Promise rejection are caught by the same catch block:

async function playingWithErrors (throwSyncError) {
  try {
    if (throwSyncError) {
      throw new Error('This is a synchronous error')
    }
    await delayError(1000)
  } catch (err) {
    console.error(`We have an error: ${err.message}`)
  } finally {
    console.log('Done')
  }
}

Now, invoking the function like this:

playingWithErrors(true)

Will print to the console the following:

We have an error: This is a synchronous error
Done

While invoking the function with false as the input, like this:

playingWithErrors(false)

Will produce the following output:

We have an error: Error after 1000ms
Done

If we remember how we had to deal with errors in Chapter 4, Asynchronous Control Flow Patterns with Callbacks, we will surely appreciate the giant improvements introduced by both promises and async/await. Now, error handling is just as it should be: simple, readable, and most importantly, supporting both synchronous and asynchronous errors.

The "return" versus "return await" trap

One common antipattern when dealing with errors with async/await is returning a Promise that rejects to the caller and expecting the error to be caught by the local try...catch block of the function that is returning the Promise.

For example, consider the following code:

async function errorNotCaught () {
  try {
    return delayError(1000)
  } catch (err) {
    console.error('Error caught by the async function: ' +
      err.message)
  }
}
errorNotCaught()
  .catch(err => console.error('Error caught by the caller: ' +
    err.message))

The Promise returned by delayError() is not awaited locally, which means that it's returned as it is to the caller. As a consequence, the local catch block will never be invoked. In fact, the previous code will output:

Error caught by the caller: Error after 1000ms

If our intention is catching locally any error generated by the asynchronous operation that produces the value that we want to return to the caller, then we have to use the await expression on that Promise before we return the value to the caller. The following code demonstrates this:

async function errorCaught () {
  try {
    return await delayError(1000)
  } catch (err) {
    console.error('Error caught by the async function: ' +
      err.message) }
}
errorCaught()
  .catch(err => console.error('Error caught by the caller: ' +
    err.message))

All we did was add an await after the return keyword. This is enough to cause the async function to "deal" with the Promise locally and therefore also catch any rejection locally. As a confirmation, when we run the previous code, we should see the following output:

Error caught by the async function: Error after 1000ms

Sequential execution and iteration

Our exploration of control flow patterns with async/await starts with sequential execution and iteration. We already mentioned a few times that the core strength of async/await lies in its ability to make asynchronous serial execution easy to write and straightforward to read. This was already apparent in all the code samples we have written so far; however, it will become even more obvious now that we will start converting our web spider version 2. Async/await is so simple to use and understand that there are really no patterns here to study. We will get straight to the code, without any preamble.

So, let's start with the download() function of our web spider; this is how it looks with async/await:

async function download (url, filename) {
  console.log(`Downloading ${url}`)
  const { text: content } = await superagent.get(url)
  await mkdirpPromises(dirname(filename))
  await fsPromises.writeFile(filename, content)
  console.log(`Downloaded and saved: ${url}`)
  return content
}

Let's appreciate for a moment how simple and compact the download() function has become. Let's just consider that the same functionality was implemented with callbacks in two different functions using a total of 19 lines of code. Now we just have seven. Plus, the code is now completely flat, with no nesting at all. This tells us a lot about the enormous positive impact that async/await has on our code.

Now, let's see how we can iterate asynchronously over an array using async/await. This is exemplified in the spiderLinks() function:

async function spiderLinks (currentUrl, content, nesting) {
  if (nesting === 0) {
    return
  }
  const links = getPageLinks(currentUrl, content)
  for (const link of links) {
    await spider(link, nesting - 1)
  }
}

Even here there is no pattern to learn. We just have a simple iteration over a list of links and for each item, we await on the Promise returned by spider().

The next code fragment shows the spider() function implemented using async/await. The aspect to notice here is how errors are easily dealt with using just a try...catch statement, making everything easier to read:

export async function spider (url, nesting) {
  const filename = urlToFilename(url)
  let content
  try {
    content = await fsPromises.readFile(filename, 'utf8')
  } catch (err) {
    if (err.code !== 'ENOENT') {
      throw err
    }
    content = await download(url, filename)
  }
  return spiderLinks(url, content, nesting)
}

And with the spider() function, we have completed the conversion of our web spider application to async/await. As you can see, it has been quite a smooth process but the results are quite impressive.

Antipattern – using async/await with Array.forEach for serial execution

It's worth mentioning that there is a common antipattern whereby developers will try to use Array.forEach() or Array.map() to implement a sequential asynchronous iteration with async/await, which, of course, won't work as expected.

To see why, let's take a look at the following alternate implementation (which is wrong!) of the asynchronous iteration in the spiderLinks() function:

links.forEach(async function iteration(link) {
  await spider(link, nesting - 1)
})

In the previous code, the iteration function is invoked once for each element of the links array. Then, in the iteration function, we use the await expression on the Promise returned by spider(). However, the Promise returned by the iteration function is just ignored by forEach(). The result is that all the spider() functions are invoked in the same round of the event loop, which means they are started in parallel, and the execution continues immediately after invoking forEach(), without waiting for all the spider() operations to complete.

Parallel execution

There are mainly two ways to run a set of tasks in parallel using async/await; one purely uses the await expression and the other relies on Promise.all(). They are both very simple to implement; however, be advised that the method relying on Promise.all() is the recommended (and optimal) one to use.

Let's see an example of both. Let's consider the spiderLinks() function of our web spider. If we wanted to purely use the await expression to implement an unlimited parallel asynchronous execution flow, we would do it with some code like the following:

async function spiderLinks (currentUrl, content, nesting) {
  if (nesting === 0) {
    return
  }
  const links = getPageLinks(currentUrl, content)
  const promises = links.map(link => spider(link, nesting - 1))
  for (const promise of promises) {
    await promise
  }
}

That's it—very simple. In the previous code, we first start all the spider() tasks in parallel, collecting their promises with a map(). Then, we loop, and we await on each one of those promises.

At first, it seems neat and functional; however, it has a small undesired effect. If a Promise in the array rejects, we have to wait for all the preceding promises in the array to resolve before the Promise returned by spiderLinks() will also reject. This is not optimal in most situations, as we usually want to know if an operation has failed as soon as possible. Luckily, we already have a built-in function that behaves exactly the way we want, and that's Promise.all(). In fact, Promise.all() will reject as soon as any of the promises provided in the input array reject. Therefore, we can simply rely on this method even for all our async/await code. And, since Promise.all() returns just another Promise, we can simply invoke an await on it to get the results from multiple asynchronous operations. The following code shows an example:

const results = await Promise.all(promises)

So, to wrap up, our recommended implementation of the spiderLinks() function with parallel execution and async/await will look almost identical to that using promises. The only visible difference is the fact that we are now using an async function, which always returns a Promise:

async function spiderLinks (currentUrl, content, nesting) {
  if (nesting === 0) {
    return
  }
  const links = getPageLinks(currentUrl, content)
  const promises = links.map(link => spider(link, nesting - 1))
  return Promise.all(promises)
}

What we just learned about parallel execution and async/await simply reiterates the fact that async/await is inseparable from promises. Most of the utilities that work with promises will also seamlessly work with async/await and we should never hesitate to take advantage of them in our async functions.

Limited parallel execution

To implement a limited parallel execution pattern with async/await, we can simply reuse the TaskQueue class that we created in the Limited parallel execution subsection within the Promises section. We can either use it as it is or convert its internals to async/await. Converting the TaskQueue class to async/await is a trivial operation and we'll leave this to you as an exercise. Either way, the TaskQueue external interface shouldn't change; both implementations will have a runTask() method that returns a Promise that settles when the task has been run by the queue.

Starting from this assumption, converting the web spider v4 from promises to async/await is also a trivial task and we won't show all the steps here as we wouldn't be learning anything new. Instead, what we'll do in this section is examine a third variation of the TaskQueue class that uses async/await and a producer-consumer approach.

The general idea to apply this approach to our problem goes as follows:

  • On one side, we have an unknown set of producers adding tasks into a queue.
  • On the other side, we have a predefined set of consumers, responsible for extracting and executing the tasks from the queue, one at a time.

The following diagram should help us understand the setup:

Figure 5.2: Using the Producer-Consumer pattern to implement limited parallel execution

The number of consumers will determine the concurrency with which the tasks will be executed. The challenge here is to put the consumers to "sleep" when the queue is empty and "wake them up" again when there are new tasks to run. But we are lucky, since Node.js is single-threaded, so putting a task to "sleep" just means giving back control to the event loop, while "resuming" a task is equivalent to invoking a callback.

With this in mind, let's then take a look at some code. We will create a new class called TaskQueuePC with a public interface similar to one of the TaskQueue classes we implemented previously in this chapter. Taking a top-down approach, let's see how we can implement the constructor:

export class TaskQueuePC {
  constructor (concurrency) {
    this.taskQueue = []
    this.consumerQueue = []
    // spawn consumers
    for (let i = 0; i < concurrency; i++) {
      this.consumer()
    }
  }
  // ...

First of all, we can notice that we now have two queues, one to hold our tasks (taskQueue) and the other to store our sleeping consumers (consumerQueue). It will be clearer in a while how these queues will be used. In the second part of our constructor, we spawn as many consumers as the concurrency we want to attain. Let's see what a consumer looks like:

async consumer () {
  while (true) {                                           // (1)
    try {
      const task = await this.getNextTask()                // (2)
      await task()                                         // (3)
    } catch (err) {
      console.error(err)                                   // (4)
    }
  }
}

Our consumer is an infinite while loop (1). At each iteration, we try to retrieve a new task from the queue using getNextTask() (2). As we will see, this will cause the current consumer to sleep if the queue is empty. When a new task is eventually available, we just execute it (3). Any error thrown from the above operation should not cause the consumer to stop, so we simply log it (4) and continue with the next iteration.

By the look of it, it may seem that each consumer in TaskQueuePC is an actual thread. In fact, our consumer() function has an infinite loop and it can "pause" until awakened by some other "thread." In reality, we should not forget that each consumer is an async function, which is nothing more than a nice syntax built around promises and callbacks. The while loop may seem to be spinning continuously consuming CPU cycles, but under the hood, the loop is more similar to an asynchronous recursion than a traditional while loop.

With the next code fragment, we should start to get an idea of what's going on. Let's take a look at the implementation of getNextTask():

async getNextTask () {
  return new Promise((resolve) => {
    if (this.taskQueue.length !== 0) {
      return resolve(this.taskQueue.shift())               // (1)
    }
    this.consumerQueue.push(resolve)                       // (2)
  })
}

The getNextTask() method returns a new Promise that resolves with the first task in the queue if the queue is not empty. The first task is removed from taskQueue and used as an argument to invoke resolve (1). If the queue is instead empty, we postpone the resolution of the Promise by queuing the resolve callback into the consumerQueue. This will effectively put the Promise—and the consumer that is awaiting the Promise—to sleep.

Now comes the "gluing" part of the whole TaskQueuePC class, which corresponds to the producer side of the algorithm. That's implemented in the runTask() method:

runTask (task) {
  return new Promise((resolve, reject) => {
    const taskWrapper = () => {                            // (1)
      const taskPromise = task()
      taskPromise.then(resolve, reject)
      return taskPromise
    }
    if (this.consumerQueue.length !== 0) {                 // (2)
      const consumer = this.consumerQueue.shift()
      consumer(taskWrapper)
    } else {                                               // (3)
      this.taskQueue.push(taskWrapper)
    }
  })
}

First, we create a taskWrapper function (1) that, when executed, has the responsibility for running the input task and forwarding the status of the Promise returned by task() to the outer Promise returned by runTask(). Next, if the consumerQueue is not empty (2), it means that there is at least one consumer that is asleep, waiting for new tasks to run. We then extract the first consumer from the queue (remember, that's essentially the resolve callback of the Promise returned by getNextTask()) and we invoke it immediately by passing our taskWrapper. If, instead, all the consumers are already busy (3), we push taskWrapper into the taskQueue.

This concludes the implementation of our TaskQueuePC class. The public interface of the TaskQueuePC class is identical to that of the TaskQueue class that we implemented in the Promises section, so migrating the code of our web spider to the new algorithm will be a trivial task.

This also concludes our exploration of the async/await construct. But, before we wrap up the chapter, we'll dive into a subtle problem affecting promises.

The problem with infinite recursive promise resolution chains

At this point in the chapter, you should have a strong understanding of how promises work and how to use them to implement the most common control flow constructs. This is therefore the right time to discuss an advanced topic that every professional Node.js developer should know and understand. This advanced topic is about a memory leak caused by infinite Promise resolution chains. The bug seems to affect the actual Promises/A+ specification, so no compliant implementation is immune.

It is quite common in programming to have tasks that don't have a predefined ending or take as an input a potentially infinite array of data. We can include in this category things like the encoding/decoding of live audio/video streams, the processing of live cryptocurrency market data, and the monitoring of IoT sensors. But we can have much more trivial situations than those, for example, when making heavy use of functional programming.

To take a simple example, let's consider the following code, which defines a simple infinite operation using promises:

function leakingLoop () {
  return delay(1)
    .then(() => {
      console.log(`Tick ${Date.now()}`)
      return leakingLoop()
    })
}

The leakingLoop() function that we just defined uses the delay() function (which we created at the beginning of this chapter) to simulate an asynchronous operation. When the given number of milliseconds has elapsed, we print the current timestamp and we invoke leakingLoop() recursively to start the operation over again. The interesting part is that the Promise returned by leakingLoop() never resolves because its status depends on the next invocation of leakingLoop(), which in turn depends on the next invocation of leakingLoop() and so on. This situation creates a chain of promises that never settle, and it will cause a memory leak in Promise implementations that strictly follow the Promises/A+ specification, including JavaScript ES6 promises.

To demonstrate the leak, we can try running the leakingLoop() function many times to accentuate the effects of the leak:

for (let i = 0; i < 1e6; i++) {
  leakingLoop()
}

Then we can take a look at the memory footprint of the process using our favorite process inspector and notice how it grows indefinitely until (after a few minutes) the process crashes entirely.

The solution to the problem is to break the chain of Promise resolution. We can do that by making sure that the status of the Promise returned by leakingLoop() does not depend on the promise returned by the next invocation of leakingLoop().

We can ensure that by simply removing a return instruction:

function nonLeakingLoop () {
  delay(1)
    .then(() => {
      console.log(`Tick ${Date.now()}`)
      nonLeakingLoop()
    })
}

Now, if we use this new function in our sample program, we should see that the memory footprint of the process will go up and down, following the schedule of the various runs of the garbage collector, which means that there is no memory leak.

However, the solution we have just proposed radically changes the behavior of the original leakingLoop() function. In particular, this new function won't propagate eventual errors produced deeply within the recursion, since there is no link between the status of the various promises. This inconvenience may be mitigated by adding some extra logging within the function. But sometimes the new behavior itself may not be an option. So, a possible solution involves wrapping the recursive function with a Promise constructor, such as in the following code sample:

function nonLeakingLoopWithErrors () {
  return new Promise((resolve, reject) => {
    (function internalLoop () {
      delay(1)
        .then(() => {
          console.log(`Tick ${Date.now()}`)
          internalLoop()
        })
        .catch(err => {
          reject(err)
        })
    })()
  })
}

In this case, we still don't have any link between the promises created at the various stages of the recursion; however, the Promise returned by the nonLeakingLoopWithErrors() function will still reject if any asynchronous operation fails, no matter at what depth in the recursion that happens.

A third solution makes use of async/await. In fact, with async/await we can simulate a recursive Promise chain with a simple infinite while loop, such as the following:

async function nonLeakingLoopAsync () {
  while (true) {
    await delay(1)
    console.log(`Tick ${Date.now()}`)
  }
}

In this function too, we preserve the behavior of the original recursive function, whereby any error thrown by the asynchronous task (in this case delay()) is propagated to the original function caller.

We should note that we would still have a memory leak if instead of a while loop, we chose to implement the async/await solution with an actual asynchronous recursive step, such as the following:

async function leakingLoopAsync () {
  await delay(1)
  console.log(`Tick ${Date.now()}`)
  return leakingLoopAsync()
}

The code above would still create an infinite chain of promises that never resolve and therefore it's still affected by the same memory leak issue of the equivalent promise-based implementation.

If you are interested in knowing more about the memory leak discussed in this section, you can check the related Node.js issue at nodejsdp.link/node-6673 or the related issue on the Promises/A+ GitHub repository at nodejsdp.link/promisesaplus-memleak.

So, the next time you are building an infinite promise chain, remember to double-check if there are the conditions for creating a memory leak, as you learned in this section. If that's the case, you can apply one of the proposed solutions, making sure to choose the one that is best suited to your context.

Summary

In this chapter, we've learned how to use promises and async/await syntax to write asynchronous code that is more concise, cleaner, and easier to read.

As we've seen, promises and async/await greatly simplify the serial execution flow, which is the most commonly used control flow. In fact, with async/await, writing a sequence of asynchronous operations is almost as easy as writing synchronous code. Running some asynchronous operations in parallel is also very easy thanks to the Promise.all() utility.

But the advantages of using promises and async/await don't stop here. We've learned that they provide a transparent shield against tricky situations such as code with mixed synchronous/asynchronous behavior (a.k.a. Zalgo, which we discussed in Chapter 3, Callbacks and Events). On top of that, error management with promises and async/await is much more intuitive and leaves less room for mistakes (such as forgetting to forward errors, which is a serious source of bugs in code using callbacks).

In terms of patterns and techniques, we should definitely keep in mind the chain of promises (to run tasks in series), promisification, and the Producer-Consumer pattern. Also, pay attention when using Array.forEach() with async/await (you are probably doing it wrong) and keep in mind the difference between a simple return and return await in async functions.

Callbacks are still widely used in the Node.js and JavaScript world. We find them in legacy APIs, in code that interacts with native libraries, or when there is the need to micro-optimize particular routines. That's why they are still relevant to us, Node.js developers; however, for most of our day-to-day programming tasks, promises and async/await are a huge step ahead compared to callbacks and therefore they are now the de facto standard for dealing with asynchronous code in Node.js. That's why we will be using promises and async/await throughout the rest of the book too to write our asynchronous code.

In the next chapter, we will explore another fascinating topic relative to asynchronous code execution, which is also another fundamental building block in the whole Node.js ecosystem, that is, streams.

Exercises

  • 5.1 Dissecting Promise.all(): Implement your own version of Promise.all() leveraging promises, async/await, or a combination of the two. The function must be functionally equivalent to its original counterpart.
  • 5.2 TaskQueue with promises: Migrate the TaskQueue class internals from promises to async/await where possible. Hint: you won't be able to use async/await everywhere.
  • 5.3 Producer-consumer with promises: Update the TaskQueuePC class internal methods so that they use just promises, removing any use of the async/await syntax. Hint: the infinite loop must become an asynchronous recursion. Beware of the recursive Promise resolution memory leak!
  • 5.4 An asynchronous map(): Implement a parallel asynchronous version of Array.map() that supports promises and a concurrency limit. The function should not directly leverage the TaskQueue or TaskQueuePC classes we presented in this chapter, but it can use the underlying patterns. The function, which we will define as mapAsync(iterable, callback, concurrency), will accept the following as inputs:
    • An iterable, such as an array.
    • A callback, which will receive as the input each item of the iterable (exactly like in the original Array.map()) and can return either a Promise or a simple value.
    • A concurrency, which defines how many items in the iterable can be processed by callback in parallel at each given time.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.103.219