Getting started with streams

In the previous section, we learned why streams are so powerful, but also that they are everywhere in Node.js, starting from its core modules. For example, we have seen that the fs module has createReadStream() for reading from a file and createWriteStream() for writing to a file, the HTTP request and response objects are essentially streams, and the zlib module allows us to compress and decompress data using a streaming interface.

Now that we know why streams are so important, let's take a step back and start to explore them in more detail.

Anatomy of streams

Every stream in Node.js is an implementation of one of the four base abstract classes available in the stream core module:

  • stream.Readable
  • stream.Writable
  • stream.Duplex
  • stream.Transform

Each stream class is also an instance of EventEmitter. Streams, in fact, can produce several types of event, such as end, when a Readable stream has finished reading, or error, when something goes wrong.

Note

Please note that, for brevity, in the examples presented in this chapter, we will often omit proper error management. However, in production applications it is always advised to register an error event listener for all your streams.

One of the reasons why streams are so flexible is the fact that they can not only handle binary data, but almost any JavaScript value; in fact, they can support two operating modes:

  • Binary mode: This mode is where data is streamed in the form of chunks, such as buffers or strings
  • Object mode: This mode is where the streaming data is treated as a sequence of discrete objects (allowing us to use almost any JavaScript value)

These two operating modes allow us to use streams not only for I/O, but also as a tool to elegantly compose processing units in a functional fashion, as we will see later in the chapter.

Note

In this chapter, we will mainly use the Node.js stream interface, also known as Version 3, which was introduced in Node.js 0.11. For further details about the differences with the old interfaces, please refer to this excellent blog post by StrongLoop at https://strongloop.com/strongblog/whats-new-io-js-beta-streams3/.

Readable streams

A Readable stream represents a source of data; in Node.js, it's implemented using the Readable abstract class that is available in the stream module.

Reading from a stream

There are two ways to receive the data from a Readable stream: non-flowing and flowing. Let's analyze these modes in more detail.

The non-flowing mode

The default pattern for reading from a Readable stream consists of attaching a listener for the readable event that signals the availability of new data to read. Then, in a loop, we read all the data until the internal buffer is emptied. This can be done using the read() method, which synchronously reads from the internal buffer and returns a Buffer or String object representing the chunk of data. The read() method has the following signature:

readable.read([size])

Using this approach, the data is explicitly pulled from the stream on demand.

To show how this works, let's create a new module named readStdin.js, which implements a simple program that reads from the standard input (a Readable stream) and echoes everything back to the standard output:

process.stdin 
  .on('readable', () => { 
    let chunk; 
    console.log('New data available'); 
    while((chunk = process.stdin.read()) !== null) { 
      console.log(
        `Chunk read: (${chunk.length}) "${chunk.toString()}"` 
      ); 
    } 
  }) 
  .on('end', () => process.stdout.write('End of stream')); 

The read() method is a synchronous operation that pulls a data chunk from the internal buffers of the Readable stream. The returned chunk is, by default, a Buffer object if the stream is working in binary mode.

Note

In a Readable stream working in binary mode, we can read strings instead of buffers by calling setEncoding(encoding) on the stream, and provide a valid encoding format (for example, utf8).

The data is read exclusively from within the readable listener, which is invoked as soon as new data is available. The read() method returns null when there is no more data available in the internal buffers; in such a case, we have to wait for another readable event to be fired, telling us that we can read again or wait for the end event that signals the end of the stream. When a stream is working in binary mode, we can also specify that we are interested in reading a specific amount of data by passing a size value to the read() method. This is particularly useful when implementing network protocols or when parsing specific data formats.

Now, we are ready to run the readStdin module and experiment with it. Let's type some characters in the console and then press Enter to see the data echoed back into the standard output. To terminate the stream and hence generate a graceful end event, we need to insert an EOF (end-of-file) character (using Ctrl + Z on Windows or Ctrl + D on Linux).

We can also try to connect our program with other processes; this is possible using the pipe operator (|), which redirects the standard output of a program to the standard input of another. For example, we can run a command such as the following:

cat <path to a file> | node readStdin

This is an amazing demonstration of how the streaming paradigm is a universal interface, which enables our programs to communicate, regardless of the language they are written in.

Flowing mode

Another way to read from a stream is by attaching a listener to the data event; this will switch the stream into using flowing mode, where the data is not pulled using read(), but instead it's pushed to the data listener as soon as it arrives. For example, the readStdin application that we created earlier will look like this using flowing mode:

process.stdin 
  .on('data', chunk => {  
    console.log('New data available');  
    console.log( 
      `Chunk read: (${chunk.length}) "${chunk.toString()}"`  
    );  
  })  
  .on('end', () => process.stdout.write('End of stream')); 

Flowing mode is an inheritance of the old version of the stream interface (also known as Streams1), and offers less flexibility to control the flow of data. With the introduction of the Streams2 interface, flowing mode is not the default working mode; to enable it, it's necessary to attach a listener to the data event or explicitly invoke the resume() method. To temporarily stop the stream from emitting data events, we can then invoke the pause() method, causing any incoming data to be cached in the internal buffer.

Note

Calling pause() does not cause the stream to switch back to non-flowing mode.

Implementing Readable streams

Now that we know how to read from a stream, the next step is to learn how to implement a new Readable stream. To do this, it's necessary to create a new class by inheriting the prototype of stream.Readable. The concrete stream must provide an implementation of the _read() method, which has the following signature:

readable._read(size) 

The internals of the Readable class will call the _read() method, which in turn will start to fill the internal buffer using push():

readable.push(chunk) 

Note

Please note that read() is a method called by the stream consumers, while _read() is a method to be implemented by a stream subclass and should never be called directly. The underscore usually indicates that the method is not public and should not be called directly.

To demonstrate how to implement new Readable streams, we can try to implement a stream that generates random strings. Let's create a new module called randomStream.js that will contain the code of our string generator:

const stream = require('stream'); 
const Chance = require('chance'); 
 
const chance = new Chance(); 
 
class RandomStream extends stream.Readable { 
  constructor(options) { 
    super(options); 
  } 
 
  _read(size) { 
    const chunk = chance.string();                        //[1] 
    console.log(`Pushing chunk of size: ${chunk.length}`); 
    this.push(chunk, 'utf8');                             //[2] 
    if(chance.bool({likelihood: 5})) {                    //[3] 
      this.push(null); 
    } 
  } 
} 
 
module.exports = RandomStream; 

At the top of the file, we will load our dependencies. There is nothing special here, except that we are loading a npm module called chance (https://npmjs.org/package/chance), which is a library for generating all sorts of random values, from numbers to strings to entire sentences.

The next step is to create a new class called RandomStream and that specifies stream.Readable as its parent. In the preceding code, we call the constructor of the parent class to initialize its internal state, and forward the options argument received as input. The possible parameters passed through the options object include the following:

  • The encoding argument that is used to convert Buffers to Strings (defaults to null)
  • A flag to enable object mode (objectMode defaults to false)
  • The upper limit of the data stored in the internal buffer, after which no more reading from the source should be done (highWaterMark defaults to 16KB)

Okay, now let's explain the _read() method:

  • The method generates a random string using chance.
  • It pushes the string into the internal reading buffer. Note that since we are pushing String, we also specify the encoding, utf8 (this is not necessary if the chunk is simply a binary Buffer).
  • It terminates the stream randomly, with a likelihood of 5 percent, by pushing null into the internal buffer to indicate an EOF situation or, in other words, the end of the stream.

We can also see that the size argument given in input to the _read() function is ignored, as it is an advisory parameter. We can simply just push all the available data, but if there are multiple pushes inside the same invocation, then we should check whether push() returns false, as this would mean that the internal buffer has reached the highWaterMark limit and we should stop adding more data to it.

That's it for RandomStream; we are now ready to use it. Let's create a new module named generateRandom.js in which we instantiate a new RandomStream object and pull some data from it:

const RandomStream = require('./randomStream'); 
const randomStream = new RandomStream(); 
 
randomStream.on('readable', () => { 
  let chunk; 
  while((chunk = randomStream.read()) !== null) { 
    console.log(`Chunk received: ${chunk.toString()}`); 
  } 
}); 

Now, everything is ready for us to try our new custom stream. Simply execute the generateRandom module as usual and watch a random set of strings flowing on the screen.

Writable streams

A Writable stream represents a data destination; in Node.js, it's implemented using the Writable abstract class, which is available in the stream module.

Writing to a stream

Pushing some data down a Writable stream is a straightforward business; all we need to do is to use the write() method, which has the following signature:

writable.write(chunk, [encoding], [callback]) 

The encoding argument is optional and can be specified if chunk is String (it defaults to utf8, and is ignored if chunk is Buffer); the callback function instead is called when the chunk is flushed into the underlying resource and is optional as well.

To signal that no more data will be written to the stream, we have to use the end() method:

writable.end([chunk], [encoding], [callback]) 

We can provide a final chunk of data through the end() method; in this case, the callback function is equivalent to registering a listener to the finish event, which is fired when all the data written in the stream has been flushed into the underlying resource.

Now, let's show how this works by creating a small HTTP server that outputs a random sequence of strings:

const Chance = require('chance'); 
const chance = new Chance(); 
 
require('http').createServer((req, res) => { 
  res.writeHead(200, {'Content-Type': 'text/plain'});        //[1] 
  while(chance.bool({likelihood: 95})) {                     //[2] 
    res.write(chance.string() + '
');                       //[3] 
  } 
  res.end('
The end...
');                                 //[4] 
  res.on('finish', () => console.log('All data was sent'));  //[5] 
}).listen(8080, () => console.log('Listening on http://localhost:8080')); 

The HTTP server that we created writes into the res object, which is an instance of http.ServerResponse and also a Writable stream. What happens is explained as follows:

  1. We first write the head of the HTTP response. Note that writeHead() is not a part of the Writable interface; in fact, it's an auxiliary method exposed by the http.ServerResponse class.
  2. We start a loop that terminates with a likelihood of 5% (we instruct chance.bool() to return true 95% of the time).
  3. Inside the loop, we write a random string into the stream.
  4. Once we are out of the loop, we call end() on the stream, indicating that no more data will be written. Also, we provide a final string to be written into the stream before ending it.
  5. Finally, we register a listener for the finish event, which will be fired when all the data has been flushed into the underlying socket.

We can call this small module entropyServer.js, and then execute it. To test the server, we can open a browser at the address http://localhost:8080, or use curl from the terminal as follows:

curl localhost:8080 

At this point, the server should start sending random strings to the HTTP client that you chose (please bear in mind that some browsers might buffer the data, and the streaming behavior might not be apparent).

Back-pressure

Similar to a liquid flowing in a real piping system, Node.js streams can also suffer from bottlenecks, where data is written faster than the stream can consume it. The mechanism to cope with this problem consists of buffering the incoming data; however, if the stream doesn't give any feedback to the writer, we might incur a situation where more and more data is accumulated into the internal buffer, leading to undesired levels of memory usage.

To prevent this from happening, writable.write() will return false when the internal buffer exceeds the highWaterMark limit. Writable streams have a highWaterMark property, which is the limit of the internal buffer size beyond which the write() method starts returning false, indicating that the application should now stop writing. When the buffer is emptied, the drain event is emitted, communicating that it's safe to start writing again. This mechanism is called back-pressure.

Note

The mechanism described in this section is similarly applicable to Readable streams. In fact, back-pressure exists in Readable streams too, and it's triggered when the push() method, which is invoked inside _read(), returns false. However, it's a problem specific to stream implementers, so we will deal with it less frequently.

We can quickly demonstrate how to take into account the back-pressure of a Writable stream by modifying the entropyServer that we created before:

const Chance = require('chance'); 
const chance = new Chance(); 
 
require('http').createServer((req, res) => { 
  res.writeHead(200, {'Content-Type': 'text/plain'}); 
 
  function generateMore() {                           //[1] 
    while(chance.bool({likelihood: 95})) { 
      let shouldContinue = res.write( 
      chance.string({length: (16 * 1024) - 1})        //[2] 
    ); 
      if(!shouldContinue) {                           //[3] 
        console.log('Backpressure'); 
        return res.once('drain', generateMore); 
      } 
    } 
    res.end('
The end...
',() => console.log('All data was sent')); 
  } 
  generateMore(); 
}).listen(8080, () => console.log('Listening on http://localhost:8080')); 

The most important steps of the previous code can be summarized as follows:

  1. We wrapped the main logic in a function called generateMore().
  2. To increase the chances of receiving some back-pressure, we increased the size of the data chunk to 16 KB-1 Byte, which is very close to the default highWaterMark limit.
  3. After writing a chunk of data, we check the return value of res.write(); if we receive false, it means that the internal buffer is full and we should stop sending more data. In this case, we exit from the function, and register another cycle of writes for when the drain event is emitted.

If we now try to run the server again, and then generate a client request with curl, there is a high probability that there will be some back-pressure, as the server produces data at a very high rate, faster than the underlying socket can handle.

Implementing Writable streams

We can implement a new Writable stream by inheriting the prototype of stream.Writable and providing an implementation for the _write() method. Let's try to do it immediately while discussing the details along the way.

Let's build a Writable stream that receives objects in the following format:

{ 
  path: <path to a file> 
  content: <string or buffer> 
} 

For each one of these objects, our stream has to save the content part into a file created at the given path. We can immediately see that the inputs of our stream are objects, and not strings or buffers; this means that our stream has to work in object mode.

Let's call the module toFileStream.js:

const stream = require('stream'); 
const fs = require('fs'); 
const path = require('path'); 
const mkdirp = require('mkdirp'); 
 
class ToFileStream extends stream.Writable { 
  constructor() { 
    super({objectMode: true}); 
  } 
 
  _write (chunk, encoding, callback) { 
    mkdirp(path.dirname(chunk.path), err => { 
      if (err) { 
        return callback(err); 
      } 
      fs.writeFile(chunk.path, chunk.content, callback); 
    }); 
  } 
} 
module.exports = ToFileStream; 

As the first step, let's load all the dependencies that we are going to use. Beware that we are requiring the module mkdirp and, as you should know from the previous chapters, it should be installed with NPM.

We created a new class for our new stream, which extends from stream.Writable.

We had to invoke the parent constructor to initialize its internal state; we also provide an options object that specifies that the stream works in object mode (objectMode: true). Other options accepted by stream.Writable are as follows:

  • highWaterMark (the default is 16 KB): This controls the back-pressure limit.
  • decodeStrings (defaults to true): This enables the automatic decoding of strings into binary buffers before passing them to the _write() method. This option is ignored in object mode.

Finally, we provided an implementation for the _write() method. As you can see, the method accepts a data chunk, an encoding (which makes sense only if we are in binary mode and the stream option decodeStrings is set to false). Also, the method accepts a callback function, which needs to be invoked when the operation completes; it's not necessary to pass the result of the operation but, if needed, we can still pass an error that will cause the stream to emit an error event.

Now, to try the stream that we just built, we can create a new module called, for example, writeToFile.js, and perform some write operations against the stream:

const ToFileStream = require('./toFileStream.js'); 
const tfs = new ToFileStream(); 
 
tfs.write({path: "file1.txt", content: "Hello"}); 
tfs.write({path: "file2.txt", content: "Node.js"}); 
tfs.write({path: "file3.txt", content: "Streams"}); 
tfs.end(() => console.log("All files created")); 

With this, we created and used our first custom Writable stream. Run the new module as usual to check its output; you will see that after the execution, three new files will be created.

Duplex streams

A Duplex stream is a stream that is both Readable and Writable. It is useful when we want to describe an entity that is both a data source and a data destination, such as network sockets, for example. Duplex streams inherit the methods of both stream.Readable and stream.Writable, so this is nothing new to us. This means that we can read() or write() data, or listen for both the readable and drain events.

To create a custom Duplex stream, we have to provide an implementation for both _read() and _write(); the options object passed to the Duplex() constructor is internally forwarded to both the Readable and Writable constructors. The options are the same as those we already discussed in the previous sections, with the addition of a new one called allowHalfOpen (defaults to true) that if set to false will cause both the parts (Readable and Writable) of the stream to end if only one of them does.

Note

To have a Duplex stream working in object mode on one side and binary mode on the other, we need to manually set the following properties from within the stream constructor:

this._writableState.objectMode 
 this._readableState.objectMode 

Transform streams

The Transform streams are a special kind of Duplex stream that are specifically designed to handle data transformations.

In a simple Duplex stream, there is no immediate relationship between the data read from the stream and the data written into it (at least, the stream is agnostic to such a relationship). Think about a TCP socket, which just sends and receives data to and from the remote peer; the socket is not aware of any relationship between the input and output. The following diagram illustrates the data flow in a Duplex stream:

Transform streams

On the other side, Transform streams apply some kind of transformation to each chunk of data that they receive from their Writable side and then make the transformed data available on their Readable side. The following diagram shows how the data flows in a Transform stream:

Transform streams

From the outside, the interface of a Transform stream is exactly like that of a Duplex stream. However, when we want to build a new Duplex stream, we have to provide the _read() and _write() methods while, for implementing a new Transform stream, we have to fill in another pair of methods: _transform() and _flush().

Let's show how to create a new Transform stream with an example.

Implementing Transform streams

Let's implement a Transform stream that replaces all the occurrences of a given string. To do this, we have to create a new module named replaceStream.js. Let's jump directly to the implementation:

const stream = require('stream'); 
const util = require('util'); 
 
class ReplaceStream extends stream.Transform { 
  constructor(searchString, replaceString) { 
    super(); 
    this.searchString = searchString; 
    this.replaceString = replaceString; 
    this.tailPiece = ''; 
  } 
 
  _transform(chunk, encoding, callback) { 
    const pieces = (this.tailPiece + chunk)                   //[1] 
      .split(this.searchString); 
    const lastPiece = pieces[pieces.length - 1]; 
    const tailPieceLen = this.searchString.length - 1; 
 
    this.tailPiece = lastPiece.slice(-tailPieceLen);          //[2] 
    pieces[pieces.length - 1] = lastPiece.slice(0,-tailPieceLen); 
 
    this.push(pieces.join(this.replaceString));               //[3] 
    callback(); 
  } 
 
  _flush(callback) { 
    this.push(this.tailPiece); 
    callback(); 
  } 
} 
 
module.exports = ReplaceStream; 

As always, we will start building the module from its dependencies. This time we are not using third-party modules.

Then we created a new class extending from the stream.Transform base class. The constructor of the class accepts two arguments: searchString and replaceString. As you can imagine, they allow us to define the text to match and the string to use as a replacement. We also initialize an internal tailPiece variable that will be used by the _transform() method.

Now, let's analyze the _transform() method, which is the core of our new class. The _transform() method has practically the same signature as that of the _write() method of the Writable stream but, instead of writing data into an underlying resource, it pushes it into the internal buffer using this.push(), exactly as we would do in the _read() method of a Readable stream. This confirms how the two sides of a Transform stream are actually connected.

The _transform() method of ReplaceStream implements the core of our algorithm. To search and replace a string in a buffer is an easy task; however, it's a totally different story when the data is streaming, and possible matches might be distributed across multiple chunks. The procedure followed by the code can be explained as follows:

  1. Our algorithm splits the chunk using the searchString function as a separator.
  2. Then, it takes the last item of the array generated by the operation and extracts the last searchString.length - 1 characters. The result is saved into the tailPiece variable and it will be prepended to the next chunk of data.
  3. Finally, all the pieces resulting from split() are joined together using replaceString as a separator and pushed into the internal buffer.

When the stream ends, we might still have a last tailPiece variable not pushed into the internal buffer. That's exactly what the _flush() method is for; it is invoked just before the stream is ended, and this is where we have one final chance to finalize the stream or push any remaining data before completely ending the stream.

The _flush() method only takes in a callback that we have to make sure to invoke when all the operations are complete, causing the stream to be terminated. With this, we have completed our ReplaceStream class.

Now, it's time to try the new stream. We can create another module called replaceStreamTest.js that writes some data and then reads the transformed result:

const ReplaceStream = require('./replaceStream'); 
 
const rs = new ReplaceStream('World', 'Node.js'); 
rs.on('data', chunk => console.log(chunk.toString())); 
 
rs.write('Hello W'); 
rs.write('orld!'); 
rs.end(); 

To make life a little bit harder for our stream, we spread the search term (which is World) across two different chunks; then, using flowing mode, we read from the same stream, logging each transformed chunk. Running the preceding program should produce the following output:

Hel
lo Node.js
!

Note

There is a fifth type of stream that is worth mentioning: stream.PassThrough. Unlike the other stream classes that we presented, PassThrough is not abstract and can be instantiated straightaway without the need to implement any method. It is, in fact, a Transform stream that outputs every data chunk without applying any transformation.

Connecting streams using pipes

The concept of Unix pipes was invented by Douglas Mcllroy; this enabled the output of a program to be connected to the input of the next. Take a look at the following command:

echo Hello World! | sed s/World/Node.js/g

In the preceding command, echo will write Hello World!to its standard output, which is then redirected to the standard input of the sed command (thanks to the pipe | operator); then sed replaces any occurrence of World with Node.js and prints the result to its standard output (which, this time, is the console).

In a similar way, Node.js streams can be connected together using the pipe() method of the Readable stream, which has the following interface:

readable.pipe(writable, [options]) 

Very intuitively, the pipe() method takes the data that is emitted from the readable stream and pumps it into the provided writable stream. Also, the writable stream is ended automatically when the readable stream emits an end event (unless we specify {end: false} as options). The pipe() method returns the writable stream passed as an argument, allowing us to create chained invocations if such a stream is also Readable (such as a Duplex or Transform stream).

Piping two streams together will create a suction which allows the data to flow automatically to the writable stream, so there is no need to call read() or write(); but most importantly there is no need to control the back-pressure anymore, because it's automatically taken care of.

To make a quick example (there will be tons of them coming), we can create a new module called replace.js that takes a text stream from the standard input, applies the replace transformation, and then pushes the data back to the standard output:

const ReplaceStream = require('./replaceStream'); 
process.stdin 
  .pipe(new ReplaceStream(process.argv[2], process.argv[3])) 
  .pipe(process.stdout); 

The preceding program pipes the data that comes from the standard input into a ReplaceStream and then back to the standard output. Now, to try this small application, we can leverage a Unix pipe to redirect some data into its standard input, as shown in the following example:

echo Hello World! | node replace World Node.js

This should produce the following output:

Hello Node.js

This simple example demonstrates that streams (and in particular text streams) are a universal interface, and pipes are the way to compose and interconnect almost magically all these interfaces.

Note

The error events are not propagated automatically through the pipeline. Take, for example, this code fragment:

stream1
  .pipe(stream2)
  .on('error', function() {});

In the preceding pipeline, we will catch only the errors coming from stream2, which is the stream that we attached the listener to. This means that, if we want to catch any error generated from stream1, we have to attach another error listener directly to it. We will later see a pattern that mitigates this inconvenience (combining streams). Also, we should notice that if the destination stream emits an error it gets automatically unpiped from the source stream, causing the pipeline to break.

Through and from for working with streams

The way we created custom streams so far does not exactly follow the Node way; in fact, inheriting from a base stream class violates the small surface area principle and requires some boilerplate code. This does not mean that the streams were badly designed; in fact, we should not forget that since they are a part of the Node.js core they must be as flexible as possible in order to enable userland modules to extend them for a broad range of purposes.

However, most of the time we don't need all the power and extensibility that prototypal inheritance can give, but usually what we want is just a quick and an expressive way to define new streams. The Node.js community, of course, created a solution also for this. A perfect example is through2 (https://npmjs.org/package/through2), a small library which simplifies the creation of Transform streams. With through2, we can create a new Transform stream by invoking a simple function:

const transform = through2([options], [_transform], [_flush]) 

In a similar way, from2 (https://npmjs.org/package/from2) allows us to easily and succinctly create Readable streams with code such as the following:

const readable = from2([options], _read) 

The advantages of using these little libraries will be immediately clear as soon as we start showing their usage in the rest of the chapter.

Note

The packages through (https://npmjs.org/package/through) and from (https://npmjs.org/package/from) are the original libraries built on top of Streams1.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.13.192