Implementing the Readable stream

From what we have learned here, let's implement the framing system that we talked about earlier. From our previous example, it should be obvious how we might handle this. We will hold the underlying resource, in this case, a socket. From there, we will find the !!!BEGIN!!! buffer and let it pass. We will then start to store the data that is held. Once we reach the !!!END!!! buffer, we will push out the data chunk.

We are holding on to quite a bit of data in this case, but it showcases how we might handle framing. The duplex stream will showcase how we might handle a simple protocol. The example is seen as follows:

  1. Import the Readable stream and create a class called ReadMessagePassStream:
import { Readable } from 'stream';

class ReadMessagePassStream extends Readable {
}
  1. Add some private variables to hold our internal state for the stream:
// inside of the ReadMessagePassStream class
#socket = null;
#bufBegin = Buffer.from("!!!START!!!");
#bufEnd = Buffer.from("!!!END!!!");
#internalBuffer = [];
#size = 0;

  1. Create a #data method like the one before. We will now be looking for the beginning and end frame buffers that we set up before, #bufBegin and #bufEnd:
#data = function(chunk) {
let i = -1
if((i = chunk.indexOf(this.#bufBegin)) !== -1) {
const tempBuf = chunk.slice(i + this.#bufBegin.byteLength);
this.#size += tempBuf.byteLength;
this.#internalBuffer.push(tempBuf);
}
else if((i = chunk.indexOf(this.#bufEnd)) !== -1) {
const tempBuf = chunk.slice(0, i);
this.#size += tempBuf.byteLength;
this.#internalBuffer.push(tempBuf);
const final = Buffer.concat(this.#internalBuffer);
this.#internalBuffer = [];
if(!this.push(final)) {
this.#socket.pause();
}
} else {
this.#size += chunk.byteLength;
this.#internalBuffer.push(chunk);
}
}
  1. Create the constructor for the class to initialize our private variables:
constructor(options) {
if( options.objectMode ) {
options.objectMode = false //we don't want it on
}
super(options);
if(!options.socket ) {
throw "Need a socket to attach to!"
}
this.#socket = options.socket;
this.#socket.on('data', this.#data.bind(this));
this.#socket.on('end', () => this.push(null));
}

One new piece of information is the objectMode property, which can be passed into our stream. This allows our streams to read objects instead of raw buffers. In our case, we do not want this to happen; we want to work with the raw data.

  1. Add the _read method to make sure that our stream will start up:
// inside the ReadMessagePassStream
_read(size) {
this.#socket.resume();
}

With this code, we now have a way of handling a socket without having to listen to the data event in our main code; it is now wrapped in this Readable stream. On top of this, we now have the capability of piping this stream into another. The following test harness code shows this:

import { createWriteStream } from 'fs';

const socket = createConnection(3333);
const write = createWriteStream('./output.txt');
const messageStream = new ReadMessagePassStream({ socket });
messageStream.pipe(write);

We have a server being hosted on localhost at port 3333. We create a write stream and pipe any data from our ReadMessagePassStream to that file. If we hook this up to the server in the test harness, we will notice that an output file is created and holds only the data that we sent over it, not the framing code.

The framing technique that we are utilizing is not always going to work. Just as it has been shown in the lorem example that our data can get chunked at any point, we could have our !!!START!!! and !!!END!!! end up on one of the chunk boundaries. If this happened, our streams would fail. There is additional code that we would need to handle these cases, but these examples should provide all the necessary ideas to implement the streaming code.

Next, we will take a look at the Writable stream interface.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.12.240