Implementing the Writable stream

The following Writable stream implementation showcases our framing method and how we can use it to put the !!!START!!! and !!!END!!! frames on our data. While simplistic, it does showcase the power of framing and building more complex streams around the primitive ones:

  1. Import the Writable class from the stream module and create the shell for WriteMessagePassStream. Set this as the default export for this file:
import { Writable } from 'stream';

export default class WriteMessagePassStream extends Writable {
}
  1. Add the private state variables and the constructor. Make sure not to allow objectMode through since we want to work on the raw data:
// inside the WriteMessagePassStream
#socket = null;
#writing = false;
constructor(options) {
if( options.objectMode ) {
options.objectMode = false;
}
if(!options.socket ) {
throw new Error("A socket is required to construct this
stream!");
}
super(options);
this.#socket = options.socket;
}
  1. Add the _write method to our class. It will be explained as follows:
_write(chunk, encoding, callback) { 
if(!this.#writing ) {
this.#writing = true;
this.#socket.write("!!!START!!!");
}
let i = -1;
let prevI = 0;
let numCount = 0;
while((i = chunk.indexOf([0x00], i)) !== -1) {
const buf = chunk.slice(prevI, i);
this.#socket.write(buf);
this.#socket.write("!!!END!!!");
if( i !== chunk.byteLength - 1 ) {
this.#socket.write("!!!START!!!");
} else {
return callback();
}
numCount += 1;
}
if(!numCount ) {
this.#socket.write(chunk);
}
return callback();
}

With this code, we can see some similar points to how we handled the readable side. Some notable exceptions include the following items:

  • We implement the _write method. We are ignoring, again, the encoding parameter of this function, but we should check this in case we get an encoding that we are not expecting. The chunk is the data that is being written, and the callback is what we call when we are finished processing the write for this chunk.
  • Since we are wrapping a socket and we do not want to kill it once we are done sending the data, we need to send some type of stop signal to our stream. In our case, we are using the simple 0x00 byte. In a more robust implementation, we would utilize something else, but this should work for now.
  • No matter what, we either use the framing or we just write to the underlying socket.
  • We call the callback once we are finished with our processing. In our case, if we have the writing flag set, this means we are still in a frame and we want to return early, otherwise, we want to put our stream into writing mode and write out the !!!START!!! and then the chunk. Again, if we never use the callback, our stream will be infinitely paused. The callback is what tells the internal mechanism to pull more data from the internal buffer for us to consume.

With this code, we can now look at the test harness and how we are utilizing it to create a server and handle incoming Readable streams that implement our framing context:

import { createServer } from 'net'
import WrappedWritableStream from '../writable/main.js'
const server = createServer((con) => {
console.log('client connected. sending test data');
const wrapped = new WrappedWritableStream({ socket : con });
for(let i = 0; i < 100000; i++) {
wrapped.write(`data${i} `);
}
wrapped.write(Buffer.from([0x00]));
wrapped.end();
console.log('finished sending test data');
});
server.listen(3333);

We create a server and listen in on port 3333 for localhost. Whenever we receive a connection, we wrap it with our Writable stream. We then send down a bunch of test data and, once that is finished, we write out the 0x00 signal to tell our stream this frame is done, and we then call the end method to say we are finished with this socket. If we added another test run after our first, we can see how our framing system works. Let's go ahead and do just that. Add the following code after wrapped.write(Buffer.from([0x00])):

for(let i = 0; i < 100000; i++) {
wrapped.write(`more_data${i} `);
}
wrapped.write(Buffer.from([0x00]));
If we ever hit the highWaterMark of our stream, the write stream will pause until the read stream has started to consume from it.

If we now run the test harness with our Readable stream from before, we will see that we are processing all of this data and writing out to our file without any of the framing passing through. With these two stream implementations, we are now able to pipe data across a socket with a custom framing option. We would now be able to use this system to implement our data-passing system from the previous chapter. However, we will instead implement a Duplex stream that will improve on this system and allow us to work with multiple writable chunks, which is what we will see in the next section. 

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.35.58