Generators

The ES2015 specification introduces another mechanism that, besides other things, can be used to simplify the asynchronous control flow of our Node.js applications. We are talking about generators, also known as semi-coroutines. They are a generalization of subroutines, where there can be different entry points. In a normal function, in fact, we can have only one entry point, which corresponds to the invocation of the function itself. A generator is similar to a function, but in addition, it can be suspended (using the yield statement) and then resumed at a later time. Generators are particularly useful when implementing iterators, and this should ring a bell, as we already discussed how iterators can be used to implement important asynchronous control flow patterns such as sequential and limited parallel execution.

The basics of generators

Before we explore the use of generators for asynchronous control flow, it's important we learn some basic concepts. Let's start from the syntax; a generator function can be declared by appending the * (asterisk) operator after the function keyword:

function* makeGenerator() { 
  //body 
} 

Inside the makeGenerator() function, we can pause the execution using the keyword yield and return to the caller the value passed to it:

function* makeGenerator() { 
  yield 'Hello World'; 
  console.log('Re-entered'); 
} 

In the preceding code, the generator yields a string, Hello World, by putting the execution of the function on pause. When the generator is resumed, the execution will start from console.log('Re-entered').

The makeGenerator() function is essentially a factory that, when invoked, returns a new generator object:

const gen = makeGenerator(); 

The most important method of the generator object is next(), which is used to start/resume the execution of the generator and returns an object in the following form:

{ 
  value: <yielded value> 
  done: <true if the execution reached the end> 
} 

This object contains the value yielded by the generator (value) and a flag to indicate if the generator has completed its execution (done).

A simple example

To demonstrate generators, let's create a new module called fruitGenerator.js:

function* fruitGenerator() { 
    yield 'apple'; 
    yield 'orange'; 
    return 'watermelon'; 
} 
 
const newFruitGenerator = fruitGenerator(); 
console.log(newFruitGenerator.next());    //[1] 
console.log(newFruitGenerator.next());    //[2] 
console.log(newFruitGenerator.next());    //[3] 

The preceding code will print the following output:

{ value: 'apple', done: false }
{ value: 'orange', done: false }
{ value: 'watermelon', done: true }

This is a short explanation of what happened:

  • The first time newFruitGenerator.next() was invoked, the generator started its execution until it reached the first yield command, which put the generator on pause and returned the value apple to the caller.
  • At the second invocation of newFruitGenerator.next(), the generator resumed, starting from the second yield command, which in turn put the execution on pause again, while returning the value orange to the caller.
  • The last invocation of newFruitGenerator.next() caused the execution of the generator to resume from its last instruction, a return statement, which terminates the generator, returns the value watermelon, and sets the done property to true in the result object.

Generators as iterators

To better understand why generators are so useful for implementing iterators, let's build one. In a new module, which we will call iteratorGenerator.js, let's write the following code:

function* iteratorGenerator(arr) { 
  for(let i = 0; i <arr.length; i++) { 
    yield arr[i]; 
  } 
} 
 
const iterator = iteratorGenerator(['apple', 'orange',  'watermelon']); 
let currentItem = iterator.next(); 
while(!currentItem.done) { 
  console.log(currentItem.value); 
  currentItem = iterator.next(); 
} 

This code should print the list of the items in the array as follows:

apple
orange
watermelon

In this example, each time we call iterator.next(), we resume the for loop of the generator, which runs another cycle by yielding the next item in the array. This demonstrates how the state of the generator is maintained across invocations. When resumed, the loop and all the variables are exactly the same as when the execution was put on pause.

Passing values back to a generator

To conclude our exploration of the basic functionality of generators, we will now learn how to pass values back to a generator. This is actually very simple; what we need to do is just provide an argument to the next() method, and that value will be provided as the return value of the yield statement inside the generator.

To show this, let's create a new simple module:

function* twoWayGenerator() { 
  const what = yield null; 
  console.log('Hello ' + what); 
} 
 
const twoWay = twoWayGenerator(); 
twoWay.next(); 
twoWay.next('world'); 

When executed, the preceding code will print Hello world. This means that the following has happened:

  • The first time the next() method is invoked, the generator reaches the first yield statement and is then put on pause.
  • When next('world') is invoked, the generator resumes from the point where it was put on pause, which is on the yield instruction, but this time we have a value that is passed back to the generator. This value will then be set into the what variable. The generator then executes the console.log() instruction and terminates.

In a similar way, we can force a generator to throw an exception. This is possible by using the throw method of the generator, as shown in the following example:

const twoWay = twoWayGenerator(); 
twoWay.next(); 
twoWay.throw(new Error()); 

Using this last code snippet, the twoWayGenerator() function will throw an exception the moment the yield function returns. This works exactly as if an exception was thrown from inside the generator, and this means that it can be caught and handled like any other exception using a try...catch block.

Asynchronous control flow with generators

You must be wondering how generators can help us with handling asynchronous operations. We can demonstrate this by creating a special function that accepts a generator as an argument and allows us to use asynchronous code inside the generator. This function takes care to resume the execution of the generator when the asynchronous operation completes. We will call this function asyncFlow():

function asyncFlow(generatorFunction) { 
  function callback(err) { 
    if(err) { 
      return generator.throw(err); 
    } 
    const results = [].slice.call(arguments, 1); 
    generator.next(results.length> 1 ? results : results[0]); 
  } 
  const generator = generatorFunction(callback); 
  generator.next(); 
} 

The preceding function takes a generator as input, instantiates it, and then immediately starts its execution:

const generator = generatorFunction(callback); 
generator.next(); 

The generatorFunction() receives as input, a special callback function that invokes generator.throw() if an error is received; otherwise, it resumes the execution of the generator by passing back the results received in the callback function:

if(err) { 
  return generator.throw(err); 
} 
const results = [].slice.call(arguments, 1); 
generator.next(results.length> 1 ? results : results[0]); 

To demonstrate the power of this simple function, let's create a new module called clone.js, which, for no meaningful reason, creates a clone of itself. Paste the asyncFlow() function we just created, followed by the core of the program:

const fs = require('fs'); 
const path = require('path'); 
 
asyncFlow(function* (callback) { 
  const fileName = path.basename(__filename); 
  const myself = yield fs.readFile(fileName, 'utf8', callback); 
  yield fs.writeFile(`clone_of_${filename}`, myself, callback); 
  console.log('Clone created'); 
}); 

Remarkably, with the help of the asyncFlow() function, we were able to write asynchronous code using a linear approach, as we were using blocking functions! The magic behind this result should be clear by now. The callback passed to each asynchronous function will in turn resume the generator as soon as the asynchronous operation is complete. Nothing complicated, but the outcome is surely impressive.

There are two other variations of this technique, one involving the use of promises and the other using thunks.

Note

A thunk used in the generator-based control flow is just a function that partially applies all the arguments of the original function except its callback. The return value is another function that only accepts the callback as an argument. For example, the thunkified version of fs.readFile() would be as follows:

function readFileThunk(filename, options) {   return function(callback){     fs.readFile(filename, options, callback);   } }

Both thunks and promises allow us to create generators that do not need a callback to be passed as an argument; for example, a version of asyncFlow() using thunks might be the following:

function asyncFlowWithThunks(generatorFunction) { 
  function callback(err) { 
    if(err) { 
      return generator.throw(err); 
    } 
    const results = [].slice.call(arguments, 1); 
    const thunk = generator.next(results.length> 1 ? results :  
                  results[0]).value; 
    thunk && thunk(callback); 
  } 
  const generator = generatorFunction(); 
  const thunk = generator.next().value; 
  thunk && thunk(callback); 
} 

The trick is to read the return value of generator.next(), which contains the thunk. The next step is to invoke the thunk itself by injecting our special callback. Simple! This allows us to write the following code:

asyncFlowWithThunks(function* () { 
  const fileName = path.basename(__filename); 
  const myself = yield readFileThunk(__filename, 'utf8'); 
  yield writeFileThunk(`clone_of_${fileName}`, myself); 
  console.log("Clone created"); 
}); 

In the same way, we could implement a version of asyncFlow() that accepts a promise as yieldable. We leave this as an exercise as its implementation requires only a minimal change to the asyncFlowWithThunks() function. We may also implement an asyncFlow() function that accepts both promises and thunks as yieldables, using the same principles.

Generator-based control flow using co

As you may guess, the Node.js ecosystem provides some solutions to handle asynchronous control flows using generators, for example, suspend (https://npmjs.org/package/suspend) is one of the oldest and supports promises, thunks, Node.js-style callbacks, as well as raw callbacks. Also, most of the promise libraries we analyzed earlier in the chapter provide helpers to use promises with generators.

All these solutions are based on the same principles we demonstrated with the asyncFlow() function; so, we may want to reuse one of these instead of writing one ourselves.

For the examples in this section, we chose to use co (https://npmjs.org/package/co). It supports several types of yieldables, some of which are:

  • Thunks
  • Promises
  • Arrays (parallel execution)
  • Objects (parallel execution)
  • Generators (delegation)
  • Generator functions (delegation)

co also has its own ecosystem of packages including the following:

  • Web frameworks, the most popular being koa (https://npmjs.org/package/koa)
  • Libraries implementing specific control flow patterns
  • Libraries wrapping popular APIs to support co

We will use co to re-implement our web spider application using generators.

While converting Node.js-style functions to thunks, we are going to use a little library called thunkify (https://npmjs.org/package/thunkify).

Sequential execution

Let's start our practical exploration of generators and co by modifying version 2 of the web spider application. The very first thing we have to do is to load our dependencies and generate a thunkified version of the functions we are going to use. These will go at the top of the spider.js module:

const thunkify = require('thunkify'); 
const co = require('co'); 
 
const request = thunkify(require('request')); 
const fs = require('fs'); 
const mkdirp = thunkify(require('mkdirp')); 
const readFile = thunkify(fs.readFile); 
const writeFile = thunkify(fs.writeFile); 
const nextTick = thunkify(process.nextTick); 

Looking at the preceding code, we can surely notice some similarities with the code we used earlier in the chapter to promisify some APIs. In this regard, it is interesting to point out that if we decided to use the promisified version of our functions instead of their thunkified alternative, the code would remain exactly the same, thanks to the fact that co supports both thunks and promises as yieldable objects. In fact, if we want, we could even use both thunks and promises in the same application, even in the same generator. This is a tremendous advantage in terms of flexibility, as it allows us to use a generator-based control flow with whatever solution we already have at our disposal.

Okay, now let's start transforming the download() function into a generator:

function* download(url, filename) { 
  console.log(`Downloading ${url}`); 
  const response = yield request(url); 
  const body = response[1]; 
  yield mkdirp(path.dirname(filename)); 
  yield writeFile(filename, body); 
  console.log(`Downloaded and saved ${url}`); 
  return body; 
} 

By using generators and co, our download() function suddenly becomes trivial. All we had to do is convert it into a generator function and use yield wherever we had an asynchronous function (as thunk) to invoke.

Next, it's the turn of the spider() function:

function* spider(url, nesting) { 
  const filename = utilities.urlToFilename(url); 
  let body; 
  try { 
    body = yield readFile(filename, 'utf8'); 
  } catch(err) { 
    if(err.code !== 'ENOENT') { 
      throw err; 
    } 
    body = yield download(url, filename); 
  } 
  yield spiderLinks(url, body, nesting); 
} 

The interesting detail to notice from this last fragment of code is how we were able to use a try...catch block to handle exceptions. Also, we can now use throw to propagate errors! Another remarkable line is where we yield the download() function, which is not a thunk nor a promisified function, but just another generator. This is possible, thanks to co, which also supports other generators as yieldables.

Finally, we can also convert spiderLinks(), where we implemented an iteration to download the links of a web page in sequence. With generators, this becomes trivial as well:

function* spiderLinks(currentUrl, body, nesting) { 
  if(nesting === 0) { 
    return nextTick(); 
  } 
 
  const links = utilities.getPageLinks(currentUrl, body); 
  for(let i = 0; i <links.length; i++) { 
    yield spider(links[i], nesting - 1); 
  } 
} 

There is little to explain from the previous code. There is no pattern to show for the sequential iteration; generators and co are doing all the dirty work for us, so we were able to write the asynchronous iteration as if we were using blocking, direct style APIs.

Now comes the most important part, the entry point of our program:

co(function* () { 
  try { 
    yield spider(process.argv[2], 1); 
    console.log('Download complete'); 
  } catch(err) { 
    console.log(err); 
  } 
}); 

This is the only place where we have to invoke co(...) to wrap a generator. In fact, once we do that, co will automatically wrap any generator we pass to a yield statement, and this will happen recursively, so the rest of the program is totally agnostic to the fact we are using co, even though it's under the hood.

Now it should be possible to run our generator-based web spider application.

Parallel execution

The bad news about generators is that they are great for writing sequential algorithms, but they can't be used to parallelize the execution of a set of tasks, at least not just using yield and generators. In fact, the pattern to use in these circumstances is to simply rely on a callback-based or promise-based function, which in turn can easily be yielded and used with generators.

Luckily, for the specific case of the unlimited parallel execution, co already allows us to obtain it natively by simply yielding an array of promises, thunks, generators, or generator functions.

With this in mind, version 3 of our web spider application can be implemented simply by rewriting the spiderLinks() function as follows:

function* spiderLinks(currentUrl, body, nesting) { 
  if(nesting === 0) { 
    return nextTick(); 
  } 
 
  const links = utilities.getPageLinks(currentUrl, body); 
  const tasks = links.map(link => spider(link, nesting - 1)); 
  yield tasks; 
} 

What we did was just to collect all the download tasks, which are essentially generators, and then yield on the resulting array. All these tasks will be executed by co in parallel and then the execution of our generator (spiderLinks) will be resumed when all the tasks finish running.

If you think we cheated by exploiting the feature of co that allows us to yield on an array, it is possible to demonstrate how the same parallel flow can be achieved using a callback-based solution similar to what we have already used earlier in the chapter. Let's use this technique to rewrite spiderLinks() once again:

function spiderLinks(currentUrl, body, nesting) { 
  if(nesting === 0) { 
    return nextTick(); 
  } 
 
  //returns a thunk 
  return callback => { 
    let completed = 0, hasErrors = false; 
    const links = utilities.getPageLinks(currentUrl, body); 
    if(links.length === 0) { 
      return process.nextTick(callback); 
    } 
 
    function done(err, result) { 
      if(err && !hasErrors) { 
        hasErrors = true; 
        return callback(err); 
      } 
      if(++completed === links.length && !hasErrors) { 
        callback(); 
      } 
    } 
 
    for(let i = 0; i < links.length; i++) { 
      co(spider(links[i], nesting - 1)).then(done); 
    } 
  } 
} 

To run the spider() function in parallel, we use co, which executes the generator and returns a promise. This way, we are able to wait for the promise to be resolved and call the done() function. Usually, all the libraries for generator-based control flows have similar features, so you can always transform a generator into a callback-based or a promise-based function if needed.

To start multiple download tasks in parallel, we just reused the callback-based pattern for parallel execution defined earlier in the chapter. We should also notice that we transformed the spiderLinks() function to a thunk (it's not even a generator anymore.) This enabled us to have a callback function to invoke when all the parallel tasks are completed.

Note

Pattern (generator-to-thunk)

It converts a generator to a thunk in order to be able to run it in parallel or utilize it for taking advantage of other callback- or promise-based control flow algorithms.

Limited parallel execution

Now that we know what to do with nonsequential execution flows, it should be easy to plan the implementation of version 4 of our web spider application, the one imposing a limit on the number of concurrent download tasks. We have several options we can use to do that; some of them are as follows:

  • Use the callback-based version of the previously implemented TaskQueue class. We would need to just thunkify its functions and any generator we want to use as a task.
  • Use the promises-based version of the TaskQueue class, and just make sure that each generator we want to use as a task is converted into a function returning a promise.
  • Use async, and thunkify any helper we plan to use, in addition to converting any generator to a callback-based function that can be used by the library.
  • Use a library from the co ecosystem, specifically designed for this type of flow, such as, co-limiter (https://npmjs.org/package/co-limiter).
  • Implement a custom algorithm based on the producer-consumer pattern, the same that co-limiter uses internally.

For educational purposes, we are going to choose the last option, so we can dive into a pattern that is often associated with coroutines (but also threads and processes).

Producer-consumer pattern

The goal is to leverage a queue to feed a fixed number of workers, as many as the concurrency level we want to set. To implement this algorithm, we are going to take as a starting point, the TaskQueue class we defined earlier in the chapter:

class TaskQueue { 
  constructor(concurrency) { 
    this.concurrency = concurrency; 
    this.running = 0; 
    this.taskQueue = []; 
    this.consumerQueue = []; 
    this.spawnWorkers(concurrency); 
  } 
 
  pushTask(task) { 
    if (this.consumerQueue.length !== 0) { 
      this.consumerQueue.shift()(null, task); 
    } else { 
      this.taskQueue.push(task); 
    } 
  } 
 
  spawnWorkers(concurrency) { 
    const self = this; 
    for(let i = 0; i < concurrency; i++) { 
      co(function* () { 
        while(true) { 
          const task = yield self.nextTask(); 
          yield task; 
        } 
      }); 
    } 
  } 
 
  nextTask() { 
    return callback => { 
      if(this.taskQueue.length !== 0) { 
        return callback(null, this.taskQueue.shift()); 
      } 
 
      this.consumerQueue.push(callback); 
    } 
  } 
} 

Let's start to analyze this new implementation of TaskQueue. The first thing to underline is in the constructor. Notice the invocation of this.spawnWorkers(), as this is the method in charge of starting the workers.

Our workers are very simple; they are just generators wrapped around co() and executed immediately so that each one can run in parallel. Internally, each worker is running an infinite loop that blocks (yield) waiting for a new task to be available in the queue (yield self.nextTask()), and when this happens, it yields the task (which is any valid yieldable) waiting for its completion. You may be wondering how we can actually wait for the next task to be queued. The answer is in the nextTask() method. Let's see in greater detail what happens within this method:

nextTask() { 
  return callback => { 
    if(this.taskQueue.length !== 0) { 
      return callback(null, this.taskQueue.shift()); 
    } 
    this.consumerQueue.push(callback); 
  } 
} 

Let's see what happens in this method, which is the core of the pattern:

  1. The method returns a thunk, which is a valid yieldable for co.
  2. The callback of the returned thunk is invoked by providing the next task in the taskQueue function (if there is any available). This will immediately unblock a worker, providing the next task to yield on.
  3. If there are no tasks in the queue, the callback itself is pushed into consumerQueue. By doing this, we are basically putting a worker in idle mode. The callbacks in the consumerQueue function will be invoked as soon as we have a new task to process, which will resume the corresponding worker.

Now, to understand how the idle workers in the consumerQueue function are resumed, we need to analyze the pushTask() method. The pushTask()method invokes the first callback in the consumerQueue function if available, which in turn will unblock a worker. If no callback is available, it means that all the workers are busy, so we simply add a new item to the taskQueue function.

In the TaskQueue class, the workers have the role of consumers, while whoever uses pushTask() can be considered a producer. This pattern shows us how a generator can be very similar to a thread (or a process). In fact, the producer-consumer interaction is probably the most common problem presented when studying inter-process communication techniques, but as we already mentioned, it is also a common use case for coroutines.

Limiting the download tasks concurrency

Now that we have implemented a limited parallel algorithm using generators and the producer-consumer pattern, we can apply it to limit the concurrency of the download tasks of our web spider application (version 4). First, let's load and initialize a TaskQueue object:

const TaskQueue = require('./taskQueue'); 
const downloadQueue = new TaskQueue(2); 

Next, let's modify the spiderLinks() function. Its body is almost identical to the one we just used to implement the unlimited parallel execution flow, so we will only show the changed parts here:

function spiderLinks(currentUrl, body, nesting) { 
  //... 
  return (callback) => { 
    //... 
    function done(err, result) { 
      //... 
    } 
    links.forEach(function(link) { 
      downloadQueue.pushTask(function *() { 
        yield spider(link, nesting - 1); 
        done(); 
      }); 
    }); 
  } 
} 

In each of the tasks, we invoke the done() function just after a download completes, so we can count how many links were downloaded and then notify the callback of the thunk when all are complete.

As an exercise, you can try to implement version 4 of the web spider application, using the other four methods we presented at the beginning of this section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.205.136