Commit b4b37ac0 authored by Caleb C. Sander's avatar Caleb C. Sander
Browse files

Add streams notes

parent 2d2a557c
No related merge requests found
Showing with 397 additions and 1 deletion
+397 -1
......@@ -21,7 +21,7 @@ I will point out sections of it that may be useful on each assignment.
| ----- |---------------- | ------- | -------- |
| 1-2 | [Callbacks](notes/callbacks/callbacks.md) | [Minesweeper](specs/minesweeper/minesweeper.md) | 2020-04-17 |
| 3 | [Promises](notes/promises/promises.md) | [`make`](specs/make/make.md) | 2020-04-24 |
| 4 | Streams | `grep` | 2020-05-01 |
| 4 | [Streams](notes/streams/streams.md) | [`grep`](specs/grep/grep.md) | 2020-05-01 |
| 5 | HTTP | Wiki Game | 2020-05-08 |
| 6 | WebSockets | Chat server | 2020-05-15 |
| 7 | | Chat client | 2020-05-22 |
......
const fs = require('fs')
// Copy source.txt to destination.txt
fs.createReadStream('source.txt')
.pipe(fs.createWriteStream('destination.txt'))
const fs = require('fs')
// The word to search for.
// We only consider " " to be a word separator.
const WORD = 'king'
// The number of times the word occurs
let count = 0
// A word may be split across chunks,
// so store the partial word at the end of the previous chunk
let partialWord = ''
fs.createReadStream('hamlet.txt', 'utf8')
.on('data', chunk => {
// Split each chunk into words
const words = chunk.split(' ')
// Add the partial word at the end of the previous chunk
words[0] = partialWord + words[0]
// Store the partial word at the end of this chunk
partialWord = words.pop()
// Count the number of words that match our search word
for (const word of words) {
if (word.toLowerCase() === WORD) count++
}
})
.on('end', () => {
// Process the word at the end of the last chunk
if (partialWord.toLowerCase() === WORD) count++
// "The word king occurs 36 times"
console.log(`The word ${WORD} occurs ${count} times`)
})
const fs = require('fs')
const {Transform} = require('stream')
class DoublingStream extends Transform {
constructor() {
// Process chunks as strings (instead of byte arrays)
super({decodeStrings: false})
}
// Specifies how to transform each chunk.
// `callback` must be called when the chunk has been processed.
_transform(chunk, encoding, callback) {
// For each character in the chunk, emit it twice to the output
for (const character of chunk) {
this.push(character, encoding)
this.push(character, encoding)
}
callback() // indicate we're done processing chunk
}
}
// Read file.txt into string chunks
fs.createReadStream('file.txt', 'utf8')
// Transform each chunk with a DoublingStream.
// Note that pipe() returns the destination stream.
.pipe(new DoublingStream())
// Pipe the output of the DoublingStream to doubled.txt
.pipe(fs.createWriteStream('doubled.txt'))
const https = require('https')
const zlib = require('zlib')
const URL = 'https://raw.githubusercontent.com/nodejs/node/master/README.md'
// Request the compressed version of the content at URL.
// The decompressed version is about 10 times larger.
https.get(URL, {headers: {'accept-encoding': 'gzip'}}, res => {
res.pipe(zlib.createGunzip()) // decompress the response
.pipe(process.stdout) // and then print it to the terminal
})
const fs = require('fs')
// Create a readable stream for file.txt
fs.createReadStream('file.txt', 'utf8')
.on('data', chunk => {
// Print out each chunk that is read
console.log(chunk)
})
.on('end', () => {
// Print a message when the entire file has been read
console.log('Finished reading file')
})
const fs = require('fs')
const {Transform} = require('stream')
class ReplaceStream extends Transform {
// `oldString` is the text to replace; `newString` is the replacement
constructor(oldString, newString) {
super({decodeStrings: false}) // take in string chunks
this.setEncoding('utf8') // output string chunks
this.oldString = oldString
this.newString = newString
// We store the end of the previous chunk
// in case `oldString` straddles a chunk boundary
this.remainingChunk = ''
}
_transform(chunk, encoding, callback) {
// Find the strings between occurrences of `oldString`
const betweenStrings = (this.remainingChunk + chunk).split(this.oldString)
const lastString = betweenStrings.pop()
// Write out each `betweenString`,
// then replace the occurrence of `oldString` with `newString`
for (const betweenString of betweenStrings) {
this.push(betweenString, encoding)
this.push(this.newString, encoding)
}
// Keep the last few characters in case `oldString` is split across chunks
this.push(lastString.slice(0, -this.oldString.length))
this.remainingChunk = lastString.slice(-this.oldString.length)
callback()
}
// Called after all chunks have been transformed
_flush(callback) {
// Write the end of the last chunk
this.push(this.remainingChunk)
// As in `_transform()`, `callback` must be called when done
callback()
}
}
fs.createReadStream('source.txt', 'utf8')
.pipe(new ReplaceStream('here', 'there')) // replace 'here' with 'there'
.pipe(new ReplaceStream('yes', 'no')) // and 'yes' with 'no'
.pipe(fs.createWriteStream('destination.txt'))
# Streams
## Splitting up asynchronous tasks
Last time we looked at the `Promise` abstraction for asynchronous tasks.
A `Promise` is convenient for representing an asynchronous computation that will eventually terminate and compute a value.
We used reading files last time as an example of such an asynchronous computation:
```js
const fs = require('fs').promises
// Read file.txt as a UTF-8 string
fs.readFile('file.txt', 'utf8')
.then(contents => {
// Then print it out
console.log(contents)
})
```
However, this is a slight lie: computers actually read files in small chunks, which Node.js concatenates together into a single string.
A small file can be read in a single chunk, but large files can consist of thousands or millions of chunks.
So why would we want to process the chunks of a file individually instead of reading it all at once?
There are two main reasons:
- Often, we can process each chunk of the file independently.
By handling the chunks as they become available, we can process the file sooner than if we waited to read the entire file first.
- Memory is a limited resource and loading an entire into memory may use up a large portion of a computer's memory.
If the file is especially big, it may not even be possible to load it into memory.
By processing one chunk at a time, we greatly reduce our memory footprint.
## Readable streams
In Node.js, an asynchronous task that produces chunks of data is represented as a "readable stream".
To process a readable stream, we need to specify the callback function that will be used to process each chunk of data.
We can also register a callback to run when the stream finishes emitting data.
As we saw above, reading a file is a prime example of a stream.
Indeed, the `fs` module provides a function [`fs.createReadStream()`](https://nodejs.org/api/fs.html#fs_fs_createreadstream_path_options) to make a readable stream for a given file.
Here is an [example](read-stream.js) of how we would rewrite the code above using a stream:
```js
const fs = require('fs')
// Create a readable stream for file.txt
fs.createReadStream('file.txt', 'utf8')
.on('data', chunk => {
// Print out each chunk that is read
console.log(chunk)
})
.on('end', () => {
// Print a message when the entire file has been read
console.log('Finished reading file')
})
```
Often, we want to accumulate some data from the stream, and this may require remembering parts of the previous chunks.
For [example](count-word.js), we can count the number of occurrences of a word in Shakespeare's Hamlet:
```js
// The number of times the word occurs
let count = 0
// A word may be split across chunks,
// so store the partial word at the end of the previous chunk
let partialWord = ''
fs.createReadStream('hamlet.txt', 'utf8')
.on('data', chunk => {
// Split each chunk into words
const words = chunk.split(' ')
// Add the partial word at the end of the previous chunk
words[0] = partialWord + words[0]
// Store the partial word at the end of this chunk
partialWord = words.pop()
// Count the number of words that match our search word
for (const word of words) {
if (word.toLowerCase() === WORD) count++
}
})
.on('end', () => {
// Process the word at the end of the last chunk
if (partialWord.toLowerCase() === WORD) count++
// "The word king occurs 36 times"
console.log(`The word ${WORD} occurs ${count} times`)
})
```
## Writable streams
The counterparts to readable streams are "writable" streams.
Instead of attaching callbacks to the stream, we call the `write()` method to add data to the stream and `end()` to indicate that we are done writing to the stream.
The data put in the stream will be written to the destination asynchronously.
The `fs` module provides a [`createWriteStream()`](https://nodejs.org/api/fs.html#fs_fs_createwritestream_path_options) function to make a writable stream that writes into a file.
Here is an [example](write-stream.js) that writes the lines `abc` and `def` to `file.txt`:
```js
const stream = fs.createWriteStream('file.txt')
stream.write('abc\n') // write the chunk of data 'abc\n'
stream.write('def\n') // followed by the chunk 'def\n'
stream.end() // close the stream
```
In the unusal case that you need to know when the data has been written to the destination, you can pass a callback to `write()` or `end()`.
For [example](write-stream-callback.js):
```js
const stream = fs.createWriteStream('large-file.txt')
stream.write('text\n', () => {
console.log('Wrote first chunk')
})
for (let i = 0; i < 10000000; i++) {
stream.write('text\n')
}
stream.end(() => {
console.log('Wrote last chunk')
})
```
## Composing streams using `pipe()` and `Transform` streams
Like `Promise`s, streams are useful because they can be put together.
The simplest way to combine streams is to connect a read stream to a write stream.
This operation, called "piping" one stream into another, causes any data read from the source to be written to the destination.
Readable streams in Node.js have a builtin method `pipe()` that does exactly this.
For [example](copy-streams.js), this makes it easy to copy one file to another without storing the whole file in memory:
```js
// Copy source.txt to destination.txt
fs.createReadStream('source.txt')
.pipe(fs.createWriteStream('destination.txt'))
```
So we can copy data directly from an input stream to an output stream, but what if we want to process the data somehow first?
For example, we might want to alter the data, filter the data, or compute something from the data.
For these sorts of operations, Node.js provides a third type of stream: a "transform" stream.
Transform streams are *both* writable and readable streams.
Each chunk written to the stream is transformed into zero or more chunks that can be read from the stream.
Node.js transform streams are subclasses of the class [`Transform`](https://nodejs.org/api/stream.html#stream_implementing_a_transform_stream), which can be imported from the `stream` module.
A specific `Transform` stream is implemented by defining its `_transform()` method.
We can use the `Transform` stream by `pipe()`ing data into and out of it.
For [example](double-characters.js), a `Transform` that duplicates each character in a stream:
```js
const {Transform} = require('stream')
class DoublingStream extends Transform {
constructor() {
// Process chunks as strings (instead of byte arrays)
super({decodeStrings: false})
}
// Specifies how to transform each chunk.
// `callback` must be called when the chunk has been processed.
_transform(chunk, encoding, callback) {
// For each character in the chunk, emit it twice to the output
for (const character of chunk) {
this.push(character, encoding)
this.push(character, encoding)
}
callback() // indicate we're done processing chunk
}
}
// Read file.txt into string chunks
fs.createReadStream('file.txt', 'utf8')
// Transform each chunk with a DoublingStream.
// Note that pipe() returns the destination stream.
.pipe(new DoublingStream())
// Pipe the output of the DoublingStream to doubled.txt
.pipe(fs.createWriteStream('doubled.txt'))
```
If additional data needs to be written out after reading all chunks, you can implement the `_flush()` method.
For [example](replace-words.js), we can replace all occurrences of a string with another string.
(The logic is complicated by the fact that the string to replace can be split between two chunks.)
```js
class ReplaceStream extends Transform {
// `oldString` is the text to replace; `newString` is the replacement
constructor(oldString, newString) {
super({decodeStrings: false}) // take in string chunks
this.setEncoding('utf8') // output string chunks
this.oldString = oldString
this.newString = newString
// We store the end of the previous chunk
// in case `oldString` straddles a chunk boundary
this.remainingChunk = ''
}
_transform(chunk, encoding, callback) {
// Find the strings between occurrences of `oldString`
const betweenStrings = (this.remainingChunk + chunk).split(this.oldString)
const lastString = betweenStrings.pop()
// Write out each `betweenString`,
// then replace the occurrence of `oldString` with `newString`
for (const betweenString of betweenStrings) {
this.push(betweenString, encoding)
this.push(this.newString, encoding)
}
// Keep the last few characters in case `oldString` is split across chunks
this.push(lastString.slice(0, -this.oldString.length))
this.remainingChunk = lastString.slice(-this.oldString.length)
callback()
}
// Called after all chunks have been transformed
_flush(callback) {
// Write the end of the last chunk
this.push(this.remainingChunk)
// As in `_transform()`, `callback` must be called when done
callback()
}
}
```
## Everything is a stream
So far, we've looked at file read and write streams as examples of streams in Node.js.
Node.js actually has stream interfaces for most of its builtin modules.
Here are the most commonly used types of streams that Node.js provides:
### Readable streams
- File read streams, created with [`fs.createReadStream()`](https://nodejs.org/api/fs.html#fs_fs_createreadstream_path_options)
- The standard input from the terminal, [`process.stdin`](https://nodejs.org/api/process.html#process_process_stdin)
- HTTP responses (on the client side).
We will discuss HTTP in depth in the next set of notes, but it is the primary protocol for making requests on the Web.
The example below uses a readable stream obtained from [`https.get()`](https://nodejs.org/api/https.html#https_https_get_url_options_callback).
### Writable streams
- File write streams, created with [`fs.createWriteStream()`](https://nodejs.org/api/fs.html#fs_fs_createwritestream_path_options)
- The standard output and error streams, [`process.stdout`](https://nodejs.org/api/process.html#process_process_stdout) and [`process.stderr`](https://nodejs.org/api/process.html#process_process_stderr), that are displayed in the terminal
- HTTP responses (on the server side)
### Transform streams
- Compression and decompression streams, e.g. [`zlib.createGzip()`](https://nodejs.org/api/zlib.html#zlib_zlib_creategzip_options) and [`zlib.createGunzip()`](https://nodejs.org/api/zlib.html#zlib_zlib_creategunzip_options).
These are used to convert to and from compressed files (like `.zip` or `.gz` files).
The example below uses a `Gunzip` stream to decompress a file.
- Hash streams, e.g. [`crypto.createHash('sha256')`](https://nodejs.org/api/crypto.html#crypto_crypto_createhash_algorithm_options).
These turn a long stream into a short pseudorandom byte array, which can be used to verify the contents of the stream.
### An example
By putting together Node.js's builtin streams, we can easily build some complicated programs.
Here is a complete [example](http-gunzip-pipe.js) that loads a compressed webpage over HTTPS, decompresses it with a `Gunzip` transform stream, and pipes it to the standard output.
```js
const https = require('https')
const zlib = require('zlib')
const URL = 'https://raw.githubusercontent.com/nodejs/node/master/README.md'
// Request the compressed version of the content at URL.
// The decompressed version is about 10 times larger.
https.get(URL, {headers: {'accept-encoding': 'gzip'}}, res => {
res.pipe(zlib.createGunzip()) // decompress the response
.pipe(process.stdout) // and then print it to the terminal
})
```
const fs = require('fs')
const stream = fs.createWriteStream('large-file.txt')
stream.write('text\n', () => {
console.log('Wrote first chunk')
})
for (let i = 0; i < 10000000; i++) {
stream.write('text\n')
}
stream.end(() => {
console.log('Wrote last chunk')
})
const fs = require('fs')
const stream = fs.createWriteStream('file.txt')
stream.write('abc\n') // write the chunk of data 'abc\n'
stream.write('def\n') // followed by the chunk 'def\n'
stream.end() // close the stream
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment