Streams
Splitting up asynchronous tasks
Last time we looked at the Promise
abstraction for asynchronous tasks.
A Promise
is convenient for representing an asynchronous computation that will eventually terminate and compute a value.
We used reading files last time as an example of such an asynchronous computation:
const fs = require('fs').promises
// Read file.txt as a UTF-8 string
fs.readFile('file.txt', 'utf8')
.then(contents => {
// Then print it out
console.log(contents)
})
However, this is a slight lie: computers actually read files in small chunks, which Node.js concatenates together into a single string. A small file can be read in a single chunk, but large files can consist of thousands or millions of chunks.
So why would we want to process the chunks of a file individually instead of reading it all at once? There are two main reasons:
- Often, we can process each chunk of the file independently. By handling the chunks as they become available, we can process the file sooner than if we waited to read the entire file first.
- Memory is a limited resource and loading an entire into memory may use up a large portion of a computer's memory. If the file is especially big, it may not even be possible to load it into memory. By processing one chunk at a time, we greatly reduce our memory footprint.
Readable streams
In Node.js, an asynchronous task that produces chunks of data is represented as a "readable stream".
To process a readable stream, we need to specify the callback function that will be used to process each chunk of data.
We can also register a callback to run when the stream finishes emitting data.
In Node.js, the .on()
method for "event emitters" works similarly to the .addEventListener()
method for DOM elements in the browser.
As we saw above, reading a file is a prime example of a stream.
Indeed, the fs
module provides a function fs.createReadStream()
to make a readable stream for a given file.
Here is an example of how we would rewrite the code above using a stream:
const fs = require('fs')
// Create a readable stream for file.txt
fs.createReadStream('file.txt', 'utf8')
.on('data', chunk => {
// Print out each chunk that is read
console.log(chunk)
})
.on('end', () => {
// Print a message when the entire file has been read
console.log('Finished reading file')
})
Often, we want to accumulate some data from the stream, and this may require remembering parts of the previous chunks. For example, we can count the number of occurrences of a word in Shakespeare's Hamlet:
// The number of times the word occurs
let count = 0
// A word may be split across chunks,
// so store the partial word at the end of the previous chunk
let partialWord = ''
fs.createReadStream('hamlet.txt', 'utf8')
.on('data', chunk => {
// Split each chunk into words
const words = chunk.split(' ')
// Add the partial word at the end of the previous chunk
words[0] = partialWord + words[0]
// Store the partial word at the end of this chunk
partialWord = words.pop()
// Count the number of words that match our search word
for (const word of words) {
if (word.toLowerCase() === WORD) count++
}
})
.on('end', () => {
// Process the word at the end of the last chunk
if (partialWord.toLowerCase() === WORD) count++
// "The word king occurs 36 times"
console.log(`The word ${WORD} occurs ${count} times`)
})
Writable streams
The counterparts to readable streams are "writable" streams.
Instead of attaching callbacks to the stream, we call the write()
method to add data to the stream and end()
to indicate that we are done writing to the stream.
The data put in the stream will be written to the destination asynchronously.
The fs
module provides a createWriteStream()
function to make a writable stream that writes into a file.
Here is an example that writes the lines abc
and def
to file.txt
:
const stream = fs.createWriteStream('file.txt')
stream.write('abc\n') // write the chunk of data 'abc\n'
stream.write('def\n') // followed by the chunk 'def\n'
stream.end() // close the stream
In the unusal case that you need to know when the data has been written to the destination, you can pass a callback to write()
or end()
.
For example:
const stream = fs.createWriteStream('large-file.txt')
stream.write('text\n', () => {
console.log('Wrote first chunk')
})
for (let i = 0; i < 10000000; i++) {
stream.write('text\n')
}
stream.end(() => {
console.log('Wrote last chunk')
})
pipe()
and Transform
streams
Composing streams using Like Promise
s, streams are useful because they can be put together.
The simplest way to combine streams is to connect a read stream to a write stream.
This operation, called "piping" one stream into another, causes any data read from the source to be written to the destination.
Readable streams in Node.js have a builtin method pipe()
that does exactly this.
For example, this makes it easy to copy one file to another without storing the whole file in memory:
// Copy source.txt to destination.txt
fs.createReadStream('source.txt')
.pipe(fs.createWriteStream('destination.txt'))
So we can copy data directly from an input stream to an output stream, but what if we want to process the data somehow first? For example, we might want to alter the data, filter the data, or compute something from the data. For these sorts of operations, Node.js provides a third type of stream: a "transform" stream. Transform streams are both writable and readable streams. Each chunk written to the stream is transformed into zero or more chunks that can be read from the stream.
Node.js transform streams are subclasses of the class Transform
, which can be imported from the stream
module.
A specific Transform
stream is implemented by defining its _transform()
method.
We can use the Transform
stream by pipe()
ing data into and out of it.
For example, a Transform
that duplicates each character in a stream:
const {Transform} = require('stream')
class DoublingStream extends Transform {
constructor() {
// Process chunks as strings (instead of byte arrays)
super({decodeStrings: false})
}
// Specifies how to transform each chunk.
// `callback` must be called when the chunk has been processed.
_transform(chunk, encoding, callback) {
// For each character in the chunk, emit it twice to the output
for (const character of chunk) {
this.push(character, encoding)
this.push(character, encoding)
}
callback() // indicate we're done processing chunk
}
}
// Read file.txt into string chunks
fs.createReadStream('file.txt', 'utf8')
// Transform each chunk with a DoublingStream.
// Note that pipe() returns the destination stream.
.pipe(new DoublingStream())
// Pipe the output of the DoublingStream to doubled.txt
.pipe(fs.createWriteStream('doubled.txt'))
If additional data needs to be written out after reading all chunks, you can implement the _flush()
method.
For example, we can replace all occurrences of a string with another string.
(The logic is complicated by the fact that the string to replace can be split between two chunks.)
class ReplaceStream extends Transform {
// `oldString` is the text to replace; `newString` is the replacement
constructor(oldString, newString) {
super({decodeStrings: false}) // take in string chunks
this.setEncoding('utf8') // output string chunks
this.oldString = oldString
this.newString = newString
// We store the end of the previous chunk
// in case `oldString` straddles a chunk boundary
this.remainingChunk = ''
}
_transform(chunk, encoding, callback) {
// Find the strings between occurrences of `oldString`
const betweenStrings = (this.remainingChunk + chunk).split(this.oldString)
const lastString = betweenStrings.pop()
// Write out each `betweenString`,
// then replace the occurrence of `oldString` with `newString`
for (const betweenString of betweenStrings) {
this.push(betweenString, encoding)
this.push(this.newString, encoding)
}
// Keep the last few characters in case `oldString` is split across chunks
this.push(lastString.slice(0, -this.oldString.length))
this.remainingChunk = lastString.slice(-this.oldString.length)
callback()
}
// Called after all chunks have been transformed
_flush(callback) {
// Write the end of the last chunk
this.push(this.remainingChunk)
// As in `_transform()`, `callback` must be called when done
callback()
}
}
Everything is a stream
So far, we've looked at file read and write streams as examples of streams in Node.js. Node.js actually has stream interfaces for most of its builtin modules. Here are the most commonly used types of streams that Node.js provides:
Readable streams
- File read streams, created with
fs.createReadStream()
- The standard input from the terminal,
process.stdin
- HTTP responses (on the client side).
We will discuss HTTP in depth in the next set of notes, but it is the primary protocol for making requests on the Web.
The example below uses a readable stream obtained from
https.get()
.
Writable streams
- File write streams, created with
fs.createWriteStream()
- The standard output and error streams,
process.stdout
andprocess.stderr
, that are displayed in the terminal - HTTP responses (on the server side)
Transform streams
- Compression and decompression streams, e.g.
zlib.createGzip()
andzlib.createGunzip()
. These are used to convert to and from compressed files (like.zip
or.gz
files). The example below uses aGunzip
stream to decompress a file. - Hash streams, e.g.
crypto.createHash('sha256')
. These turn a long stream into a short pseudorandom byte array, which can be used to verify the contents of the stream.
An example
By putting together Node.js's builtin streams, we can easily build some complicated programs.
Here is a complete example that loads a compressed webpage over HTTPS, decompresses it with a Gunzip
transform stream, and pipes it to the standard output.
const https = require('https')
const zlib = require('zlib')
const URL = 'https://raw.githubusercontent.com/nodejs/node/master/README.md'
// Request the compressed version of the content at URL.
// The decompressed version is about 10 times larger.
https.get(URL, {headers: {'accept-encoding': 'gzip'}}, res => {
res.pipe(zlib.createGunzip()) // decompress the response
.pipe(process.stdout) // and then print it to the terminal
})