Asynchronous control flow with streams

Going through the examples that we have presented so far, it should be clear that streams can be useful not only to handle I/O, but also as an elegant programming pattern that can be used to process any kind of data. But the advantages do not end at the simple appearance; streams can also be leveraged to turn the asynchronous control flow into flow control, as we will see in this section.

Sequential execution

By default, streams will handle data in a sequence; for example, a _transform() function of a Transform stream will never be invoked again with the next chunk of data, until the previous invocation completes by executing callback(). This is an important property of streams, crucial for processing each chunk in the right order, but it can also be exploited to turn streams into an elegant alternative to the traditional control flow patterns.

Some code is always better than too much explanation, so let's work on an example to demonstrate how we can use streams to execute asynchronous tasks in a sequence. Let's create a function that concatenates a set of files received as input, making sure to honor the order in which they are provided. Let's create a new module called concatFiles.js and define its contents starting from its dependencies:

const fromArray = require('from2-array'); 
const through = require('through2'); 
const fs = require('fs'); 

We will be using through2 to simplify the creation of Transform streams and from2-array in order to create a Readable stream from an array of objects.

Next, we can define the concatFiles() function:

function concatFiles(destination, files, callback) { 
  const destStream = fs.createWriteStream(destination); 
  fromArray.obj(files)                         //[1] 
    .pipe(through.obj((file, enc, done) => {   //[2] 
      const src = fs.createReadStream(file); 
      src.pipe(destStream, {end: false}); 
      src.on('end', done)                      //[3] 
    })) 
    .on('finish', () => {                      //[4] 
      destStream.end();  
      callback(); 
    }); 
} 
module.exports = concatFiles; 

The preceding function implements a sequential iteration over the files array by transforming it into a stream. The procedure followed by the function is explained as follows:

  1. First, we use from2-array to create a Readable stream from the files array.
  2. Next, we create a through (Transform) stream to handle each file in the sequence. For each file, we create a Readable stream and we pipe it into destStream, which represents the output file. We make sure not to close destStream after the source file finishes reading, by specifying {end: false} into the pipe() options.
  3. When all the contents of the source file have been piped into destStream, we invoke the done function, which is exposed by through.obj to communicate the completion of the current processing, which in our case is necessary to trigger the processing of the next file.
  4. When all the files have been processed, the finish event is fired; we can finally end destStream and invoke the callback() function of concatFiles(), which signals the completion of the whole operation.

We can now try to use the little module we just created. Let's do that in a new file, called concat.js:

const concatFiles = require('./concatFiles'); 
concatFiles(process.argv[2], process.argv.slice(3), () => { 
  console.log('Files concatenated successfully'); 
}); 

We can now run the preceding program by passing the destination file as the first command-line argument followed by a list of files to concatenate, for example:

node concatallTogether.txtfile1.txtfile2.txt 

This should create a new file called allTogether.txt containing, in order, the contents of file1.txt and file2.txt.

With the concatFiles() function, we were able to obtain an asynchronous sequential iteration using only streams. As we saw in Chapter 3, Asynchronous Control Flow Patterns with Callbacks, this would have required the use of an iterator, if implemented with pure JavaScript, or an external library such as async. We have now provided another option for achieving the same result, which as we see is also very compact and elegant.

Note

Pattern

Use a stream, or combination of streams, to easily iterate over a set of asynchronous tasks in sequence.

Unordered parallel execution

We just saw that streams process each data chunk in a sequence, but sometimes this can be a bottleneck as we would not make the most of the Node.js concurrency. If we have to execute a slow asynchronous operation for every data chunk, it can be advantageous to parallelize the execution and speed up the overall process. Of course, this pattern can only be applied if there is no relationship between each chunk of data, which might happen frequently for object streams, but very rarely for binary streams.

Note

Caution

Parallel streams cannot be used when the order in which the data is processed is important.

To parallelize the execution of a Transform stream, we can apply the same patterns that we learned in Chapter 3, Asynchronous Control Flow Patterns with Callbacks, but with some adaptations to get them working with streams. Let's see how this works.

Implementing an unordered parallel stream

Let's demonstrate this immediately with an example; let's create a module called parallelStream.js and define a generic Transform stream that executes a given transform function in parallel:

const stream = require('stream'); 
 
class ParallelStream extends stream.Transform { 
  constructor(userTransform) { 
    super({objectMode: true}); 
    this.userTransform = userTransform; 
    this.running = 0; 
    this.terminateCallback = null; 
  } 
 
  _transform(chunk, enc, done) { 
    this.running++; 
    this.userTransform(chunk, enc, this.push.bind(this), 
    this._onComplete.bind(this)); 
    done(); 
  } 
 
  _flush(done) { 
    if(this.running> 0) { 
      this.terminateCallback = done; 
    } else { 
      done(); 
    } 
  } 
 
  _onComplete(err) { 
    this.running--; 
    if(err) { 
      return this.emit('error', err); 
    } 
    if(this.running === 0) { 
      this.terminateCallback && this.terminateCallback(); 
    } 
  } 
 
} 
 
module.exports = ParallelStream; 

Let's analyze this new class. As you can see, the constructor accepts a userTransform() function, which is then saved as an instance variable; we also invoke the parent constructor and for convenience we enable the object mode by default.

Next, it is the turn of the _transform() method. In this method, we execute the userTransform() function, then we increment the count of running tasks; finally, we notify that the current transformation step is complete by invoking done(). The trick for triggering the processing of another item in parallel is exactly this; we are not waiting for the userTransform() function to complete before invoking done(); instead, we do it immediately. On the other hand, we provide a special callback to userTransform(), which is the this._onComplete() method; this allows us to get notified when the userTransform() completes.

The _flush() method is invoked just before the stream terminates, so if there are still tasks running we can put on hold the release of the finish event by not invoking the done() callback immediately; instead, we assign it to the this.terminateCallback variable. To understand how the stream is then properly terminated, we have to look into the _onComplete() method. This last method is invoked every time an asynchronous task completes. It checks whether there are any more tasks running and, if there are none, it invokes the this.terminateCallback() function, which will cause the stream to end, releasing the finish event that was put on hold in the _flush() method.

The ParallelStream class we just built allows us to easily create a Transform stream that executes its tasks in parallel, but there is a caveat: it does not preserve the order of the items as they are received. In fact, asynchronous operations can complete and push data at any time, regardless of when they are started. We immediately understand that this property does not play well with binary streams where the order of data usually matters, but it can surely be useful with some types of object streams.

Implementing a URL status monitoring application

Now, let's apply our ParallelStream to a concrete example. Let's imagine that we wanted to build a simple service to monitor the status of a big list of URLs. Let's imagine all these URLs are contained in a single file and are newline separated.

Streams can offer a very efficient and elegant solution to this problem, especially if we use our ParallelStream class to parallelize the checking of the URLs.

Let's build this simple application immediately in a new module called checkUrls.js:

const fs = require('fs'); 
const split = require('split'); 
const request = require('request'); 
const ParallelStream = require('./parallelStream'); 
 
fs.createReadStream(process.argv[2])                       //[1] 
  .pipe(split())                                           //[2] 
  .pipe(new ParallelStream((url, enc, push, done) => {     //[3] 
    if(!url) return done(); 
    request.head(url, (err, response) => { 
      push(url + ' is ' + (err ? 'down' : 'up') + '
'); 
      done(); 
    }); 
  })) 
  .pipe(fs.createWriteStream('results.txt'))   //[4] 
  .on('finish', () => console.log('All urls were checked')); 

As we can see, with streams our code looks very elegant and straightforward; let's see how it works:

  1. First, we create a Readable stream from the file given as input.
  2. We pipe the contents of the input file through split (https://npmjs.org/package/split), a Transform stream that ensures outputting each line on a different chunk.
  3. Then, it's the time to use our ParallelStream to check the URL. We do this by sending a head request and waiting for a response. When the callback is invoked, we push the result of the operation down the stream.
  4. Finally, all the results are piped into a file, results.txt.

Now, we can run the checkUrls module with a command such as this:

node checkUrlsurlList.txt

Here the file urlList.txt contains a list of URLs, for example:

When the command finishes running, we will see that a file, results.txt, was created. This contains the results of the operation, for example:

    http://thiswillbedownforsure.com is down 
    http://loige.co is up 
    http://www.mariocasciaro.me is up 

There is a good probability that the order in which the results are written is different from the order in which the URLs were specified in the input file. This is clear evidence that our stream executes its tasks in parallel, and it does not enforce any order between the various data chunks in the stream.

Note

For the sake of curiosity, we might want to try replacing ParallelStream with a normal through2 stream, and compare the behavior and performances of the two (you might want to do this as an exercise). We will see that using through2 is way slower, because each URL would be checked in a sequence, but also that the order of the results in the file results.txt would be preserved.

Unordered limited parallel execution

If we try to run the checkUrls application against a file that contains thousands or millions of URLs, we will surely run into trouble. Our application will create an uncontrolled number of connections all at once, sending a considerable amount of data in parallel and potentially undermining the stability of the application and the availability of the entire system. As we already know, the solution to keep the load and resource usage under control is to limit the concurrency of the parallel tasks.

Let's see how this works with streams by creating a limitedParallelStream.js module, which is an adaptation of parallelStream.js that we created in the previous section.

Let's see what it looks like, starting from its constructor (we will highlight the changed parts):

class LimitedParallelStream extends stream.Transform { 
  constructor(concurrency, userTransform) { 
    super({objectMode: true}); 
    this.concurrency = concurrency; 
    this.userTransform = userTransform; 
    this.running = 0; 
    this.terminateCallback = null; 
    this.continueCallback = null; 
  } 
//... 

We need a concurrency limit to be taken as the input, and this time we are going to save two callbacks, one for any pending _transform method (continueCallback) and another one for the callback of the _flush method (terminateCallback).

Next is the _transform() method:

  _transform(chunk, enc, done) { 
    this.running++; 
    this.userTransform(chunk, enc, this._onComplete.bind(this)); 
    if(this.running < this.concurrency) { 
      done(); 
    } else { 
      this.continueCallback = done; 
    } 
  } 

This time in the _transform() method, we have to check whether we have any free execution slots before we invoke done() and trigger the processing of the next item. If we have already reached the maximum number of concurrent running streams, we can simply save the done() callback into the continueCallback variable, so that it can be invoked as soon as a task finishes.

The _flush() method remains exactly the same as in the ParallelStream class, so let's move directly to implementing the _onComplete() method:

_onComplete(err) { 
  this.running--; 
  if(err) { 
    return this.emit('error', err); 
  } 
  const tmpCallback = this.continueCallback; 
  this.continueCallback = null; 
  tmpCallback && tmpCallback(); 
  if(this.running === 0) { 
    this.terminateCallback && this.terminateCallback(); 
  } 
} 

Every time a task completes we invoke any saved continueCallback()that will cause the stream to unblock, triggering the processing of the next item.

That's it for the limitedParallelStream module; we can now use it in the checkUrls module in place of parallelStream and have the concurrency of our tasks limited to the value that we set.

Ordered parallel execution

The parallel streams that we created previously might shuffle the order of the emitted data, but there are situations where this is not acceptable; sometimes, in fact, it is necessary to have each chunk emitted in the same order in which it was received. However, not all hopes is lost, we can still run the transform function in parallel; all we have to do is to sort the data emitted by each task so that it follows the same order in which the data was received.

This technique involves the use of a buffer to reorder the chunks while they are emitted by each running task. For brevity, we are not going to provide an implementation of such a stream, as it's quite verbose for the scope of this book; what we are going to do instead is reuse one of the available packages on NPM built for this specific purpose, for example, through2-parallel (https://npmjs.org/package/through2-parallel).

We can quickly check the behavior of an ordered parallel execution by modifying our existing checkUrls module. Let's say that we want our results to be written in the same order as the URLs in the input file, while executing our checks in parallel. We can do this using through2-parallel:

//... 
const throughParallel = require('through2-parallel'); 
 
fs.createReadStream(process.argv[2]) 
  .pipe(split()) 
  .pipe(throughParallel.obj({concurrency: 2},(url, enc, done) => { 
      //... 
    }) 
  ) 
  .pipe(fs.createWriteStream('results.txt')) 
  .on('finish', () => console.log('All urls were checked')); 

As we can see, the interface of through2-parallel is very similar to that of through2; the only difference is that we can also specify a concurrency limit for the transform function that we provide. If we try to run this new version of checkUrls, we will now see that the results.txt file lists the results in the same order as the URLs appear in the input file.

Note

It is important to see that, even though the order of the output is the same as the input, the asynchronous tasks still run in parallel and can possibly complete in any order.

With this, we conclude our analysis of the asynchronous control flow with streams; next, we are going to focus on some piping patterns.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.224.44.53