A first look at streams

As we saw in the DOM, streams give us the ability to control the flow of data and be able to process data in a way that creates a nonblocking system. We can see this by creating a simple stream. Let's go ahead and utilize one of the built-in streams that comes with Node.js, readFileStream. Let's write the following script:

import fs from 'fs';
import { PassThrough } from 'stream'

const str = fs.createReadStream('./example.txt');
const pt = new PassThrough();
str.pipe(pt);
pt.on('data', (chunk) => {
console.log(chunk);
});

Here, we have imported the fs library and the PassThrough stream from the stream library. Then, we created a read stream for the example.txt file, as well as a PassThrough stream.

A PassThrough stream allows us to process the data without having to explicitly create a stream ourselves. We read in the data and piped it to our PassThrough stream.

From here, we are able to get a handle to the data event, which gives us a chunk of data. On top of this, we have made sure to put our data event listener after the pipe method. By doing this, we have made sure that no data events run before we have attached our listener.

Let's create the following example.txt file:

This is some data
it should be processed by our system
it should be coming in chunks that our system can handle
most likely this will all come in one chunk

Now, if we run the node --experimental-modules read_file_stream.js command, we will see that it prints out a Buffer. All of the data processing is in binary chunks that are wrapped in Buffer objects unless we explicitly set it to something such as object mode. If we change the console log command to print out the following, we should get output in plain text:

console.log(chunk.toString('utf8'));

Let's create a program that counts the number of times the word the is used in the text. We can do this with our PassThrough stream, like so:

import fs from 'fs';
import { PassThrough } from 'stream';

let numberOfThe = 0;
const chars = Buffer.from('the');
let currPos = 0;
const str = fs.createReadStream('./example.txt');
const pt = new PassThrough();
str.pipe(pt);
pt.on('data', (chunk) => {
for(let i = 0; i < chunk.byteLength; i++) {
const char = chunk[i];
if( char === chars[currPos] ) {
if( currPos === chars.byteLength - 1 ) // we are at the end so
reset
numberOfThe += 1;
currPos = 0;
} else {
currPos += 1;
}
} else {
currPos += 1;
}
}
});
pt.on('end', () => {
console.log('the number of THE in the text is: ', numberOfThe);
});

We need to keep a count of the number of times we see the word the. We also are going to create a byte buffer of the the string. We will also need to keep track of our current position. By doing this, whenever we get data, we can run through it and test each byte. If the byte matches the current position that we are holding, then we need to do another check. If it equals our character byte count for the word the, then we update the number of the and reset our current position. Otherwise, we set our current position to the next index. If we don't get a match, we need to reset our current position; otherwise, we will get any combination of the characters t, h, and e.

This is an interesting example of how to utilize a PassThrough stream, but let's go ahead and create our own write Transform stream. We are going to apply the same operation that we did before, but we are going to build a custom stream. As stated in the documentation, we must write the _transform function and, optionally, implement the _flush function. We are going to implement both the _transform and _flush functions. We are also going to utilize the new class syntax instead of utilizing the old prototype-based system. One thing to keep in mind when building our own custom streams is to run the super(options) method before we do anything else in our stream. This will allow the user to pass various stream options that they have access to without us needing to do anything.

With all of this in mind, we should get something that looks like the following:

import { Transform } from 'stream';

class GetThe extends Transform {
#currPos = 0;
#numberOfThe = 0;

static chars = Buffer.from('the');
constructor(options) {
super(options);
}
_transform(chunk, encoding, callback) {
for(let i = 0; i < chunk.byteLength; i++) {
const char = chunk[i];
if( char === GetThe.chars[this.#currPos]) {
if( this.#currPos === GetThe.chars.byteLength - 1 ) {
this.#numberOfThe += 1;
this.#currPos = 0;
} else {
this.#currPos += 1;
}
} else {
this.#currPos = 0;
}
}
callback();
}
_flush(callback) {
callback(null, this.#numberOfThe.toString());
}
}

export default GetThe;

First, we import the Transform stream from the stream base library. We extend it and create a couple of private variables, that is, the current position in the the buffer and the current count of the in our stream. We also create a static variable for the buffer that we are comparing it to. Then, we have our constructor. This is where we pass the options to the Transform stream's constructor.

Next, we implement the _transform method in the same way that we implemented the data event on the PassThrough stream. The only new piece should be the call to the callback at the end. This lets our stream know that we are ready to process more data. If we need to error out, we can pass that as the first argument. We can also pass a second parameter, as shown in the _flush function. This allows us to pass the processed data to whoever may be listening. In our case, we only want to pass the number of the that we found in the text. We can also only pass a Buffer, String, or Uint8Array, so we decide to pass the string version of our number (we could have used a Buffer and this would have probably been the better choice). Finally, we export this from our module.

Inside of our read_file_stream file, we will import this module with the following command:

import GetThe from './custom_transform.js';

Then, we can utilize it with the following code:

const gt = new GetThe();
gt.on('data', (data) => {
console.log('the number of THE produced by the custom stream is: ',
data.toString('utf8'));
});
const str2 = fs.createReadStream('./example.txt');
str2.pipe(gt);

By doing this, we have wrapped all of that logic into a separate module and a reusable stream instead of just doing this in the data event of PassThrough. We also have the ability to chain our stream implementation to another stream (in this case, there would probably be no point unless we were going to pass it to a socket).

This was a short introduction to the stream interface and provided an overview of what we will be discussing at length in later chapters. Next, we will take a look at some modules that come with Node.js and how they can help us to write server applications.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.140.185.147