Getting started with streams

Streaming is the act of working on an infinite dataset. This does not mean that it is, but it means that we have the possibility of having an unlimited data source. If we think in the traditional context of processing data, we usually run through three main steps:

  1. Open/get access to a data source.
  2. Process the data source once it is fully loaded in.
  3. Spit out computed data to another location.

We can think of this as the basics of input and output (I/O). Most of our concepts of I/O involve batch processing or working on all or almost all of the data. This means that we know the limits of that data ahead of time. We can make sure that we have enough memory, storage space, computing power, and so on, to deal with the process. Once we are done with the process, we kill the program or queue up the next batch of data.

A simple example of this is seen as follows, where we count the number of lines that the file has:

import { readFileSync } from 'fs'
const count = readFileSync('./input.txt', {encoding : 'utf8'})
.split(/ | /g).length;
console.log('number of lines in our file is: ', count);

We bring in the readFileSync method from the fs module and then read in the input.txt file. From here, we split on  or , which gives us an array of all the lines of the file. From there, we get the length and put it on our standard output channel. This seems quite simple and seems to work quite well. For small- to medium-length files, this works great, but what happens when the file becomes abnormally large? Let's go ahead and see. Head over to https://loremipsum.io and give it an input of 100 paragraphs. Copy this and paste it a few times into the input.txt file. Now, when we run this program, we can see in our task manager a spike in memory usage.

We are loading a roughly 3 MB file into memory, counting the number of newlines, and then printing this out. This should still be quite fast, but we are now starting to utilize a good chunk of memory. Let's do something a bit more complex with this file. We will count the number of times the word lorem appears in the text. We can do that with the following code:

import { readFileSync } from 'fs'
const file = readFileSync('./input.txt', {encoding : 'utf8'});
const re = /slorems/gi;
const matches = file.match(re);

console.log('the number of matches is: ', matches.length);

Again, this should process quite quickly, but there should be some lag in how it processes. While the use of a regular expression here could give us some false positives, it does showcase that we are batch processing on this file. In many cases, when we are working in a high-speed environment, we are working with files that can be close to or above 1 GB. When we get into these types of files, we do not want to load them all into memory. This is where streaming comes into play.

Many systems that are considered big data are working with terabytes of data. While there are some in-memory applications that will store large amounts of data in memory, a good chunk of this type of data processing uses a mix of both streaming with files and using in-memory data sources to work with the data.

Let's take our first example. We are reading from a file and trying to count the number of lines in the file. Well, instead of thinking about the number of lines as a whole, we can look for the character that denotes a newline. The character(s) we are looking for in our regular expression are the use of a newline character ( ) or the carriage return plus the newline ( ) character. With this in mind, we should be able to build a streaming application that can read the file and count the number of lines without loading the file completely into memory.

This example presents the API for utilizing a stream. We will go over what each Stream API gives us and how we can utilize it for our purposes. For now, take the code examples and run them to see how these types of applications work.

This can be seen in the following code snippet:

import { createReadStream } from 'fs';

const newLine = 0x0A;
const readStream = createReadStream('./input.txt');
let counter = 1;
readStream.on('data', (chunk) => {
for(const byte of chunk) {
if( newLine === byte ) counter += 1;
}
}).on('end', () => {
console.log('number of line in our file is: ', counter);
});

We grab a Readable stream from the fs module and create one. We also create a constant for the newline character represented in HEX format. We then listen for the data event so we can process data as it comes in. Then, we process each byte to see whether it is the same as the newline character. If it is, then we have a newline, otherwise we just keep searching. We do not need to explicitly look for the carriage return since we know it should be followed by a newline character.

While this will be slower than loading the entire file into memory, it does save us quite a bit of memory when we are processing the data. Another great thing about this method is that these are all events. With our full processing example, we are taking up the entire event loop until we are done processing. With the stream, we have events for when the data comes in. This means that we can have multiple streams running at the same time on the same thread without having to worry too much about blocking (as long as we are not spending too much time on the processing of the data chunk).

With the previous example, we can see how we could write the counterexample in streaming form. Just to drive the point home, let's go ahead and do just that. It should look something like the following:

const stream = createReadStream('./input.txt');
const buf = Buffer.from('lorem');
let found = 0;
let count = 0;
stream.on('data', (chunk) => {
for(const byte of chunk) {
if( byte === buf[found] ) {
found += 1;
} else {
found = 0;
}
if( found === buf.byteLength ) {
count += 1;
found = 0;
}
}
}).on('end', () => {
console.log('the number of matches is: ', count)
});

First, we create a read stream as we did before. Next, we create a Buffer form of the keyword that we are looking for (working on the raw bytes can be faster than trying to convert the stream into text, even if the API allows us to do that). Next, we maintain a found count and an actual count. The found count will let us know whether we have found the word; the other count keeps track of how many instances of lorem we have found. Next, we process each byte when a chunk comes in on the data event. If we find that the next byte is not the character we are looking for, we automatically return the found count to 0 (we did not find this particular string of text). After this check, we will see whether we have the full byte length found. If we do, we can increase the count and move found back to 0. We keep the found counter outside the data event because we receive the data in chunks. Since it is chunked, one part of lorem could come at the end of one chunk and the other piece of lorem could come at the beginning of the next chunk. Once the stream ends, we output the count.

Now, if we run both versions, we will find that the first actually catches more lorem. We added the case insensitive flag for regular expressions. If we turn this off by removing the trailing i and we remove the need for the character sequence to be by itself (the s surrounding our character sequence), we will see that we get the same result. This example showcases how writing streams can be a bit more complicated than the batch processing version, but it usually leads to lower memory use and sometimes faster code.

While utilizing built-in streams such as the streams inside of the zlib and fs modules will get us quite far, we will see how we can be the producers of our own custom streams. We will take each one and write an extended stream type that will handle the data framing that we were doing in the previous chapter.

For those that have forgotten or skipped to this chapter, we were framing all of our messages over a socket with the !!!BEGIN!!! and !!!END!!! tags to let us know when the full data had been streamed to us.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.12.136.186