Building a custom Readable stream

A Readable stream does exactly what it states, it reads from a streaming source. It outputs data based on some criteria. Our example of this is a take on the simple example that is shown in the Node.js documentation.

We are going to take our example of counting the number of lorem in the text file, but we are going to output the location in the file that we found lorem:

  1. Import the Readable class and the createReadStream method from their respective modules:
import { Readable } from 'stream'
import { createReadStream } from 'fs'
  1. Create a class that extends the Readable class and set up some private variables to track the internal state:
class LoremFinder extends Readable {
#lorem = Buffer.from('lorem');
#found = 0;
#totalCount = 0;
#startByteLoc = -1;
#file = null;
}
  1. Add a constructor that initializes our #file variable to a Readable stream:
// inside our LoremFinder class
constructor(opts) {
super(opts);
if(!opts.stream ) {
throw new Error("This stream needs a stream to be
provided!");
}
this.#file = opts.stream;
this.#file.on('data', this.#data.bind(this)); // will add #data
method next
this.#file.on('end', () => this.push(null));
}

  1. Based on the constructor, we are going to utilize a #data private variable that will be a function. We will utilize it to read from our #file stream and to check for the locations of lorem:
// inside of the LoremFinder class
#data = function(chunk) {
for(let i = 0; i < chunk.byteLength; i++) {
const byte = chunk[i];
if( byte === this.#lorem[this.#found] ) {
if(!this.#found ) {
this.#startByteLoc = this.#totalCount + i;
}
this.#found += 1;
} else {
this.#found = 0;
}
if( this.#found === this.#lorem.byteLength ) {
const buf = Buffer.alloc(4);
buf.writeUInt32BE(this.#startByteLoc);
this.push(buf);
this.#found = 0;
}
}
this.#totalCount += chunk.byteLength;
}

We run through each byte and check whether we currently have the byte we are looking for in the lorem word. If we do and it is the l of the word, then we set our location #startByteLoc variable. If we find the entire word, we output #startByteLoc, otherwise, we reset our lookup variable and keep looping. Once we have finished looping, we add to our #totalCount the number of bytes we read and wait for our #data function to get called again. To end our stream and let others know that we have fully consumed the resource, we output a null value.

  1. The final piece we add is the _read method.

This will get called either through the Readable.read method or through hooking a data event up. This is how we can make sure that a primitive stream such as FileStream is consumed:

// inside of the LoremFinder class
_read(size) {
this.#file.resume();
}

  1. Now we can add some test code to make sure that this stream is working properly:
const locs = new Set();
const loremFinder = new LoremFinder({
stream : createReadStream('./input.txt')
});
loremFinder.on('data', (chunk) => {
const num = chunk.readUInt32BE();
locs.add(num);
});
loremFinder.on('end', () => {
console.log('here are all of the locations:');
for(const val of locs) {
console.log('location: ', val);
}
console.log('number of lorems found is', locs.size);
});

With all of these concepts, we can see how we are able to consume primitive streams and be able to wrap them with a superset stream. Now that we have this stream, we could always use the pipe interface and pipe it into a Writable stream. Let's write the indices out to a file. To do this, we can do something as simple as loremFinder.pipe(writeable).

If we open the file, we will see that it is just a bunch of random data. The reason for this is that we encoded all of the indices into 32-bit buffers. If we wanted to see them, we could rewrite our stream implementation just a little bit. This modification could look like this: this.push(this.#startByteLoc.toString() + " ");.

With this modification, we can now look at the output.txt file and see all of the indices. It should start to become apparent how powerful it is writing streams and being able to just pipe one to the next, on top of how readable the code can become if we just keep piping them through various stages.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.178.133