streams.md 11.5 KB

Streams

Splitting up asynchronous tasks

Last time we looked at the Promise abstraction for asynchronous tasks. A Promise is convenient for representing an asynchronous computation that will eventually terminate and compute a value. We used reading files last time as an example of such an asynchronous computation:

const fs = require('fs').promises

// Read file.txt as a UTF-8 string
fs.readFile('file.txt', 'utf8')
  .then(contents => {
    // Then print it out
    console.log(contents)
  })

However, this is a slight lie: computers actually read files in small chunks, which Node.js concatenates together into a single string. A small file can be read in a single chunk, but large files can consist of thousands or millions of chunks.

So why would we want to process the chunks of a file individually instead of reading it all at once? There are two main reasons:

  • Often, we can process each chunk of the file independently. By handling the chunks as they become available, we can process the file sooner than if we waited to read the entire file first.
  • Memory is a limited resource and loading an entire into memory may use up a large portion of a computer's memory. If the file is especially big, it may not even be possible to load it into memory. By processing one chunk at a time, we greatly reduce our memory footprint.

Readable streams

In Node.js, an asynchronous task that produces chunks of data is represented as a "readable stream". To process a readable stream, we need to specify the callback function that will be used to process each chunk of data. We can also register a callback to run when the stream finishes emitting data. In Node.js, the .on() method for "event emitters" works similarly to the .addEventListener() method for DOM elements in the browser.

As we saw above, reading a file is a prime example of a stream. Indeed, the fs module provides a function fs.createReadStream() to make a readable stream for a given file. Here is an example of how we would rewrite the code above using a stream:

const fs = require('fs')

// Create a readable stream for file.txt
fs.createReadStream('file.txt', 'utf8')
  .on('data', chunk => {
    // Print out each chunk that is read
    console.log(chunk)
  })
  .on('end', () => {
    // Print a message when the entire file has been read
    console.log('Finished reading file')
  })

Often, we want to accumulate some data from the stream, and this may require remembering parts of the previous chunks. For example, we can count the number of occurrences of a word in Shakespeare's Hamlet:

// The number of times the word occurs
let count = 0
// A word may be split across chunks,
// so store the partial word at the end of the previous chunk
let partialWord = ''
fs.createReadStream('hamlet.txt', 'utf8')
  .on('data', chunk => {
    // Split each chunk into words
    const words = chunk.split(' ')
    // Add the partial word at the end of the previous chunk
    words[0] = partialWord + words[0]
    // Store the partial word at the end of this chunk
    partialWord = words.pop()
    // Count the number of words that match our search word
    for (const word of words) {
      if (word.toLowerCase() === WORD) count++
    }
  })
  .on('end', () => {
    // Process the word at the end of the last chunk
    if (partialWord.toLowerCase() === WORD) count++
    // "The word king occurs 36 times"
    console.log(`The word ${WORD} occurs ${count} times`)
  })

Writable streams

The counterparts to readable streams are "writable" streams. Instead of attaching callbacks to the stream, we call the write() method to add data to the stream and end() to indicate that we are done writing to the stream. The data put in the stream will be written to the destination asynchronously.

The fs module provides a createWriteStream() function to make a writable stream that writes into a file. Here is an example that writes the lines abc and def to file.txt:

const stream = fs.createWriteStream('file.txt')
stream.write('abc\n') // write the chunk of data 'abc\n'
stream.write('def\n') // followed by the chunk 'def\n'
stream.end()          // close the stream

In the unusal case that you need to know when the data has been written to the destination, you can pass a callback to write() or end(). For example:

const stream = fs.createWriteStream('large-file.txt')
stream.write('text\n', () => {
  console.log('Wrote first chunk')
})
for (let i = 0; i < 10000000; i++) {
  stream.write('text\n')
}
stream.end(() => {
  console.log('Wrote last chunk')
})

Composing streams using pipe() and Transform streams

Like Promises, streams are useful because they can be put together. The simplest way to combine streams is to connect a read stream to a write stream. This operation, called "piping" one stream into another, causes any data read from the source to be written to the destination. Readable streams in Node.js have a builtin method pipe() that does exactly this. For example, this makes it easy to copy one file to another without storing the whole file in memory:

// Copy source.txt to destination.txt
fs.createReadStream('source.txt')
  .pipe(fs.createWriteStream('destination.txt'))

So we can copy data directly from an input stream to an output stream, but what if we want to process the data somehow first? For example, we might want to alter the data, filter the data, or compute something from the data. For these sorts of operations, Node.js provides a third type of stream: a "transform" stream. Transform streams are both writable and readable streams. Each chunk written to the stream is transformed into zero or more chunks that can be read from the stream.

Node.js transform streams are subclasses of the class Transform, which can be imported from the stream module. A specific Transform stream is implemented by defining its _transform() method. We can use the Transform stream by pipe()ing data into and out of it. For example, a Transform that duplicates each character in a stream:

const {Transform} = require('stream')

class DoublingStream extends Transform {
  constructor() {
    // Process chunks as strings (instead of byte arrays)
    super({decodeStrings: false})
  }

  // Specifies how to transform each chunk.
  // `callback` must be called when the chunk has been processed.
  _transform(chunk, encoding, callback) {
    // For each character in the chunk, emit it twice to the output
    for (const character of chunk) {
      this.push(character, encoding)
      this.push(character, encoding)
    }
    callback() // indicate we're done processing chunk
  }
}

// Read file.txt into string chunks
fs.createReadStream('file.txt', 'utf8')
  // Transform each chunk with a DoublingStream.
  // Note that pipe() returns the destination stream.
  .pipe(new DoublingStream())
  // Pipe the output of the DoublingStream to doubled.txt
  .pipe(fs.createWriteStream('doubled.txt'))

If additional data needs to be written out after reading all chunks, you can implement the _flush() method. For example, we can replace all occurrences of a string with another string. (The logic is complicated by the fact that the string to replace can be split between two chunks.)

class ReplaceStream extends Transform {
  // `oldString` is the text to replace; `newString` is the replacement
  constructor(oldString, newString) {
    super({decodeStrings: false}) // take in string chunks
    this.setEncoding('utf8') // output string chunks
    this.oldString = oldString
    this.newString = newString
    // We store the end of the previous chunk
    // in case `oldString` straddles a chunk boundary
    this.remainingChunk = ''
  }

  _transform(chunk, encoding, callback) {
    // Find the strings between occurrences of `oldString`
    const betweenStrings = (this.remainingChunk + chunk).split(this.oldString)
    const lastString = betweenStrings.pop()
    // Write out each `betweenString`,
    // then replace the occurrence of `oldString` with `newString`
    for (const betweenString of betweenStrings) {
      this.push(betweenString, encoding)
      this.push(this.newString, encoding)
    }
    // Keep the last few characters in case `oldString` is split across chunks
    this.push(lastString.slice(0, -this.oldString.length))
    this.remainingChunk = lastString.slice(-this.oldString.length)
    callback()
  }
  // Called after all chunks have been transformed
  _flush(callback) {
    // Write the end of the last chunk
    this.push(this.remainingChunk)
    // As in `_transform()`, `callback` must be called when done
    callback()
  }
}

Everything is a stream

So far, we've looked at file read and write streams as examples of streams in Node.js. Node.js actually has stream interfaces for most of its builtin modules. Here are the most commonly used types of streams that Node.js provides:

Readable streams

  • File read streams, created with fs.createReadStream()
  • The standard input from the terminal, process.stdin
  • HTTP responses (on the client side). We will discuss HTTP in depth in the next set of notes, but it is the primary protocol for making requests on the Web. The example below uses a readable stream obtained from https.get().

Writable streams

Transform streams

  • Compression and decompression streams, e.g. zlib.createGzip() and zlib.createGunzip(). These are used to convert to and from compressed files (like .zip or .gz files). The example below uses a Gunzip stream to decompress a file.
  • Hash streams, e.g. crypto.createHash('sha256'). These turn a long stream into a short pseudorandom byte array, which can be used to verify the contents of the stream.

An example

By putting together Node.js's builtin streams, we can easily build some complicated programs. Here is a complete example that loads a compressed webpage over HTTPS, decompresses it with a Gunzip transform stream, and pipes it to the standard output.

const https = require('https')
const zlib = require('zlib')

const URL = 'https://raw.githubusercontent.com/nodejs/node/master/README.md'

// Request the compressed version of the content at URL.
// The decompressed version is about 10 times larger.
https.get(URL, {headers: {'accept-encoding': 'gzip'}}, res => {
  res.pipe(zlib.createGunzip()) // decompress the response
    .pipe(process.stdout) // and then print it to the terminal
})