Implementing a Duplex stream

A duplex stream is just that, one that works both ways. It combines a Readable and Writable stream into a single interface. With this type of stream, we can now just pipe from the socket into our custom stream instead of wrapping the stream like we have been (even though we will still implement it as a wrapped stream). 

There is not much more to talk about with Duplex streams other than one fact that trips up newcomers to the stream type. There are two separate buffers: one for Readable and one for Writable. We need to make sure to treat them as separate instances. This means whatever we use for the _read method in terms of variables, should not be used for the _write and _writev method implementations, otherwise, we could run into bad bugs.

As stated before, the following code implements a Duplex stream along with a counting mechanism so, that way, we can utilize the _writev method. As mentioned in the Understanding the Writable stream interface section, the _writev method allows us to work on multiple chunks of data at a time:

  1. Import the Duplex class from the stream module and add the shell for our MessageTranslator class. Export this class:
import { Duplex } from 'stream';

export default class MessageTranslator extends Duplex {
}
  1. Add all the internal state variables. Each of them will be explained in the following:
// inside the MessageTranslator class
#socket = null;
#internalWriteBuf = new Map();
#internalReadHoldBuf = [];
#internalPacketNum = 0;
#readSize = 0;
#writeCounter = 0;
  1. Add the constructor for our class. We will handle the data event for our #socket inside of this constructor instead of creating another method as we have in the past:
// inside the MessageTranslator class
constructor(opts) {
if(!opts.socket ) {
throw new Error("MessageTranslator stream needs a
socket!");
}
super(opts);
this.#socket = opts.socket;
// we are assuming a single message for each chunk
this.#socket.on('data', (chunk) => {
if(!this.#readSize ) {
this.#internalPacketNum = chunk.readInt32BE();
this.#readSize = chunk.readInt32BE(4);
this.#internalReadHoldBuf.push(chunk.slice(8));
this.#readSize -= chunk.byteLength - 8
} else {
this.#internalReadHoldBuf.push(chunk);
this.#readSize -= chunk.byteLength;
}
// reached end of message
if(!this.#readSize ) {
this.push(Buffer.concat(this.#internalReadHoldBuf));
this.#internalReadHoldBuf = [];
}
});
}

We will automatically assume we have a single message per chunk. This makes the processing much easier. When we do get data, we will read in the packet number, which should be the first four bytes of data. We then read in the size of the message, which is the next 4 bytes of data. Finally, we push the rest of the data into our internal buffer. Once we have finished reading the entire message, we will put all the internal chunks together and push them out. Finally, we will reset our internal buffer.

  1. Add the _writev and _write methods to our class. Remember that the _writev method is utilized for multiple chunks, so we will have to loop through them and write each one out:
// inside the MessageTranslator class
_writev(chunks, cb) {
for(const chunk of chunks) {
this.#processChunkHelper(chunk); //shown next
}
this.#writeHelper(cb); //shown next
}
_write(chunk, encoding, cb) {
this.#processChunkHelper(chunk); //shown next
this.#writeHelper(cb); //shown next
}
  1. Add helper methods to process the chunks and to actually write them out. We will utilize the number -1 as a 4-byte message to state we are done with this message:
// inside the MessageTranslator class
#processChunkHelper = function(chunk) {
if(chunk.readInt32BE() === -1) {
this.#internalWriteBuf.get(this.#writeCounter).done = true;
this.#writeCounter += 1;
this.#internalWriteBuf.set(this.#writeCounter, {buf : [],
done : false});
} else {
if(!this.#internalWriteBuf.has(this.#writeCounter)) {
this.#internalWriteBuf.set(this.#writeCounter, {buf :
[], done : false}); }
this.#internalWriteBuf.get(this.#writeCounter)
.buf.push(chunk);
}
}
}
#writeHelper = function(cb) {
const writeOut = [];
for(const [key, val] of this.#internalWriteBuf) {
if( val.done ) {
const cBuf = Buffer.allocUnsafe(4);
const valBuf = Buffer.concat(val.buf);
const sizeBuf = Buffer.allocUnsafe(4);
cBuf.writeInt32BE(valBuf.readInt32BE());
sizeBuf.writeInt32BE(valBuf.byteLength - 4);
writeOut.push(Buffer.concat([cBuf, sizeBuf,
valBuf.slice(4)]));
val.buf = [];
}
}
if( writeOut.length ) {
this.#socket.write(Buffer.concat(writeOut));
}
cb();
}

Our #processChunkHelper method checks to see whether we hit the magical -1 4-byte message to say we have finished writing our message. If we do not, we keep adding to our internal buffer (the array). Once we have reached the end, we will put all of the data together and then move onto the next packet of data.

Our #writeHelper method will loop through all of those packets and check to see whether any of them are finished. If they are, it will get the packet number, the size of the buffer, the data itself, and concatenate it all together. Once it has done this, it will reset the internal buffer to make sure we are not leaking memory. We will write all of this data to the socket and then call our callback to state that we are done writing.

  1. Finish up the Duplex stream by implementing our _read method as we have before. The _final method should just call the callback since there is no processing left:
// inside the MessageTranslator class
_read() {
this.#socket.resume();
}
_final(cb) {
cb(); // nothing to do since it all should be consumed at this
// point
}

_writev should really be used when order does not matter and we are just processing the data and possibly turning it into something else. This could be a hashing algorithm or something similar to that. In almost all cases, the _write method should be used.

While this implementation has quite a few flaws (one being that we do not look for possible other packets if we reach the -1 number), it does showcase how we can build a Duplex stream and also another way of handling messages. It is not recommended to come up with our own schemes of moving data across a socket (as we will see in the next chapter), but if there is a new specification that comes out, we could always write for it utilizing the Duplex socket.

If we test this implementation with our test harness, we should get a file called output.txt that has the duplex plus the number message written 100,000 times, plus a trailing end-of-line character. Again, a Duplex stream is just a separate Readable and Writable stream put together and should be used when implementing a data transmission protocol.

The final stream that we will take a look at is the Transform stream.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.133.233