From b4b37ac00b4e1208c542c05385184846526dde9a Mon Sep 17 00:00:00 2001 From: Caleb Sander <caleb.sander@gmail.com> Date: Wed, 25 Mar 2020 00:35:17 -0400 Subject: [PATCH] Add streams notes --- README.md | 2 +- notes/streams/copy-streams.js | 5 + notes/streams/count-word.js | 29 +++ notes/streams/double-characters.js | 28 +++ notes/streams/http-gunzip-pipe.js | 11 ++ notes/streams/read-stream.js | 12 ++ notes/streams/replace-words.js | 43 +++++ notes/streams/streams.md | 250 +++++++++++++++++++++++++ notes/streams/write-stream-callback.js | 12 ++ notes/streams/write-stream.js | 6 + 10 files changed, 397 insertions(+), 1 deletion(-) create mode 100644 notes/streams/copy-streams.js create mode 100644 notes/streams/count-word.js create mode 100644 notes/streams/double-characters.js create mode 100644 notes/streams/http-gunzip-pipe.js create mode 100644 notes/streams/read-stream.js create mode 100644 notes/streams/replace-words.js create mode 100644 notes/streams/streams.md create mode 100644 notes/streams/write-stream-callback.js create mode 100644 notes/streams/write-stream.js diff --git a/README.md b/README.md index c3e9ec4..025fa97 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ I will point out sections of it that may be useful on each assignment. | ----- |---------------- | ------- | -------- | | 1-2 | [Callbacks](notes/callbacks/callbacks.md) | [Minesweeper](specs/minesweeper/minesweeper.md) | 2020-04-17 | | 3 | [Promises](notes/promises/promises.md) | [`make`](specs/make/make.md) | 2020-04-24 | -| 4 | Streams | `grep` | 2020-05-01 | +| 4 | [Streams](notes/streams/streams.md) | [`grep`](specs/grep/grep.md) | 2020-05-01 | | 5 | HTTP | Wiki Game | 2020-05-08 | | 6 | WebSockets | Chat server | 2020-05-15 | | 7 | | Chat client | 2020-05-22 | diff --git a/notes/streams/copy-streams.js b/notes/streams/copy-streams.js new file mode 100644 index 0000000..0fe291d --- /dev/null +++ b/notes/streams/copy-streams.js @@ -0,0 +1,5 @@ +const fs = require('fs') + +// Copy source.txt to destination.txt +fs.createReadStream('source.txt') + .pipe(fs.createWriteStream('destination.txt')) diff --git a/notes/streams/count-word.js b/notes/streams/count-word.js new file mode 100644 index 0000000..efe0dbd --- /dev/null +++ b/notes/streams/count-word.js @@ -0,0 +1,29 @@ +const fs = require('fs') + +// The word to search for. +// We only consider " " to be a word separator. +const WORD = 'king' +// The number of times the word occurs +let count = 0 +// A word may be split across chunks, +// so store the partial word at the end of the previous chunk +let partialWord = '' +fs.createReadStream('hamlet.txt', 'utf8') + .on('data', chunk => { + // Split each chunk into words + const words = chunk.split(' ') + // Add the partial word at the end of the previous chunk + words[0] = partialWord + words[0] + // Store the partial word at the end of this chunk + partialWord = words.pop() + // Count the number of words that match our search word + for (const word of words) { + if (word.toLowerCase() === WORD) count++ + } + }) + .on('end', () => { + // Process the word at the end of the last chunk + if (partialWord.toLowerCase() === WORD) count++ + // "The word king occurs 36 times" + console.log(`The word ${WORD} occurs ${count} times`) + }) diff --git a/notes/streams/double-characters.js b/notes/streams/double-characters.js new file mode 100644 index 0000000..e43cb45 --- /dev/null +++ b/notes/streams/double-characters.js @@ -0,0 +1,28 @@ +const fs = require('fs') +const {Transform} = require('stream') + +class DoublingStream extends Transform { + constructor() { + // Process chunks as strings (instead of byte arrays) + super({decodeStrings: false}) + } + + // Specifies how to transform each chunk. + // `callback` must be called when the chunk has been processed. + _transform(chunk, encoding, callback) { + // For each character in the chunk, emit it twice to the output + for (const character of chunk) { + this.push(character, encoding) + this.push(character, encoding) + } + callback() // indicate we're done processing chunk + } +} + +// Read file.txt into string chunks +fs.createReadStream('file.txt', 'utf8') + // Transform each chunk with a DoublingStream. + // Note that pipe() returns the destination stream. + .pipe(new DoublingStream()) + // Pipe the output of the DoublingStream to doubled.txt + .pipe(fs.createWriteStream('doubled.txt')) diff --git a/notes/streams/http-gunzip-pipe.js b/notes/streams/http-gunzip-pipe.js new file mode 100644 index 0000000..fba0266 --- /dev/null +++ b/notes/streams/http-gunzip-pipe.js @@ -0,0 +1,11 @@ +const https = require('https') +const zlib = require('zlib') + +const URL = 'https://raw.githubusercontent.com/nodejs/node/master/README.md' + +// Request the compressed version of the content at URL. +// The decompressed version is about 10 times larger. +https.get(URL, {headers: {'accept-encoding': 'gzip'}}, res => { + res.pipe(zlib.createGunzip()) // decompress the response + .pipe(process.stdout) // and then print it to the terminal +}) diff --git a/notes/streams/read-stream.js b/notes/streams/read-stream.js new file mode 100644 index 0000000..37b3df7 --- /dev/null +++ b/notes/streams/read-stream.js @@ -0,0 +1,12 @@ +const fs = require('fs') + +// Create a readable stream for file.txt +fs.createReadStream('file.txt', 'utf8') + .on('data', chunk => { + // Print out each chunk that is read + console.log(chunk) + }) + .on('end', () => { + // Print a message when the entire file has been read + console.log('Finished reading file') + }) diff --git a/notes/streams/replace-words.js b/notes/streams/replace-words.js new file mode 100644 index 0000000..65a9dc9 --- /dev/null +++ b/notes/streams/replace-words.js @@ -0,0 +1,43 @@ +const fs = require('fs') +const {Transform} = require('stream') + +class ReplaceStream extends Transform { + // `oldString` is the text to replace; `newString` is the replacement + constructor(oldString, newString) { + super({decodeStrings: false}) // take in string chunks + this.setEncoding('utf8') // output string chunks + this.oldString = oldString + this.newString = newString + // We store the end of the previous chunk + // in case `oldString` straddles a chunk boundary + this.remainingChunk = '' + } + + _transform(chunk, encoding, callback) { + // Find the strings between occurrences of `oldString` + const betweenStrings = (this.remainingChunk + chunk).split(this.oldString) + const lastString = betweenStrings.pop() + // Write out each `betweenString`, + // then replace the occurrence of `oldString` with `newString` + for (const betweenString of betweenStrings) { + this.push(betweenString, encoding) + this.push(this.newString, encoding) + } + // Keep the last few characters in case `oldString` is split across chunks + this.push(lastString.slice(0, -this.oldString.length)) + this.remainingChunk = lastString.slice(-this.oldString.length) + callback() + } + // Called after all chunks have been transformed + _flush(callback) { + // Write the end of the last chunk + this.push(this.remainingChunk) + // As in `_transform()`, `callback` must be called when done + callback() + } +} + +fs.createReadStream('source.txt', 'utf8') + .pipe(new ReplaceStream('here', 'there')) // replace 'here' with 'there' + .pipe(new ReplaceStream('yes', 'no')) // and 'yes' with 'no' + .pipe(fs.createWriteStream('destination.txt')) diff --git a/notes/streams/streams.md b/notes/streams/streams.md new file mode 100644 index 0000000..56a1140 --- /dev/null +++ b/notes/streams/streams.md @@ -0,0 +1,250 @@ +# Streams + +## Splitting up asynchronous tasks + +Last time we looked at the `Promise` abstraction for asynchronous tasks. +A `Promise` is convenient for representing an asynchronous computation that will eventually terminate and compute a value. +We used reading files last time as an example of such an asynchronous computation: +```js +const fs = require('fs').promises + +// Read file.txt as a UTF-8 string +fs.readFile('file.txt', 'utf8') + .then(contents => { + // Then print it out + console.log(contents) + }) +``` +However, this is a slight lie: computers actually read files in small chunks, which Node.js concatenates together into a single string. +A small file can be read in a single chunk, but large files can consist of thousands or millions of chunks. + +So why would we want to process the chunks of a file individually instead of reading it all at once? +There are two main reasons: +- Often, we can process each chunk of the file independently. + By handling the chunks as they become available, we can process the file sooner than if we waited to read the entire file first. +- Memory is a limited resource and loading an entire into memory may use up a large portion of a computer's memory. + If the file is especially big, it may not even be possible to load it into memory. + By processing one chunk at a time, we greatly reduce our memory footprint. + +## Readable streams + +In Node.js, an asynchronous task that produces chunks of data is represented as a "readable stream". +To process a readable stream, we need to specify the callback function that will be used to process each chunk of data. +We can also register a callback to run when the stream finishes emitting data. + +As we saw above, reading a file is a prime example of a stream. +Indeed, the `fs` module provides a function [`fs.createReadStream()`](https://nodejs.org/api/fs.html#fs_fs_createreadstream_path_options) to make a readable stream for a given file. +Here is an [example](read-stream.js) of how we would rewrite the code above using a stream: +```js +const fs = require('fs') + +// Create a readable stream for file.txt +fs.createReadStream('file.txt', 'utf8') + .on('data', chunk => { + // Print out each chunk that is read + console.log(chunk) + }) + .on('end', () => { + // Print a message when the entire file has been read + console.log('Finished reading file') + }) +``` + +Often, we want to accumulate some data from the stream, and this may require remembering parts of the previous chunks. +For [example](count-word.js), we can count the number of occurrences of a word in Shakespeare's Hamlet: +```js +// The number of times the word occurs +let count = 0 +// A word may be split across chunks, +// so store the partial word at the end of the previous chunk +let partialWord = '' +fs.createReadStream('hamlet.txt', 'utf8') + .on('data', chunk => { + // Split each chunk into words + const words = chunk.split(' ') + // Add the partial word at the end of the previous chunk + words[0] = partialWord + words[0] + // Store the partial word at the end of this chunk + partialWord = words.pop() + // Count the number of words that match our search word + for (const word of words) { + if (word.toLowerCase() === WORD) count++ + } + }) + .on('end', () => { + // Process the word at the end of the last chunk + if (partialWord.toLowerCase() === WORD) count++ + // "The word king occurs 36 times" + console.log(`The word ${WORD} occurs ${count} times`) + }) +``` + +## Writable streams + +The counterparts to readable streams are "writable" streams. +Instead of attaching callbacks to the stream, we call the `write()` method to add data to the stream and `end()` to indicate that we are done writing to the stream. +The data put in the stream will be written to the destination asynchronously. + +The `fs` module provides a [`createWriteStream()`](https://nodejs.org/api/fs.html#fs_fs_createwritestream_path_options) function to make a writable stream that writes into a file. +Here is an [example](write-stream.js) that writes the lines `abc` and `def` to `file.txt`: +```js +const stream = fs.createWriteStream('file.txt') +stream.write('abc\n') // write the chunk of data 'abc\n' +stream.write('def\n') // followed by the chunk 'def\n' +stream.end() // close the stream +``` + +In the unusal case that you need to know when the data has been written to the destination, you can pass a callback to `write()` or `end()`. +For [example](write-stream-callback.js): +```js +const stream = fs.createWriteStream('large-file.txt') +stream.write('text\n', () => { + console.log('Wrote first chunk') +}) +for (let i = 0; i < 10000000; i++) { + stream.write('text\n') +} +stream.end(() => { + console.log('Wrote last chunk') +}) +``` + +## Composing streams using `pipe()` and `Transform` streams + +Like `Promise`s, streams are useful because they can be put together. +The simplest way to combine streams is to connect a read stream to a write stream. +This operation, called "piping" one stream into another, causes any data read from the source to be written to the destination. +Readable streams in Node.js have a builtin method `pipe()` that does exactly this. +For [example](copy-streams.js), this makes it easy to copy one file to another without storing the whole file in memory: +```js +// Copy source.txt to destination.txt +fs.createReadStream('source.txt') + .pipe(fs.createWriteStream('destination.txt')) +``` + +So we can copy data directly from an input stream to an output stream, but what if we want to process the data somehow first? +For example, we might want to alter the data, filter the data, or compute something from the data. +For these sorts of operations, Node.js provides a third type of stream: a "transform" stream. +Transform streams are *both* writable and readable streams. +Each chunk written to the stream is transformed into zero or more chunks that can be read from the stream. + +Node.js transform streams are subclasses of the class [`Transform`](https://nodejs.org/api/stream.html#stream_implementing_a_transform_stream), which can be imported from the `stream` module. +A specific `Transform` stream is implemented by defining its `_transform()` method. +We can use the `Transform` stream by `pipe()`ing data into and out of it. +For [example](double-characters.js), a `Transform` that duplicates each character in a stream: +```js +const {Transform} = require('stream') + +class DoublingStream extends Transform { + constructor() { + // Process chunks as strings (instead of byte arrays) + super({decodeStrings: false}) + } + + // Specifies how to transform each chunk. + // `callback` must be called when the chunk has been processed. + _transform(chunk, encoding, callback) { + // For each character in the chunk, emit it twice to the output + for (const character of chunk) { + this.push(character, encoding) + this.push(character, encoding) + } + callback() // indicate we're done processing chunk + } +} + +// Read file.txt into string chunks +fs.createReadStream('file.txt', 'utf8') + // Transform each chunk with a DoublingStream. + // Note that pipe() returns the destination stream. + .pipe(new DoublingStream()) + // Pipe the output of the DoublingStream to doubled.txt + .pipe(fs.createWriteStream('doubled.txt')) +``` + +If additional data needs to be written out after reading all chunks, you can implement the `_flush()` method. +For [example](replace-words.js), we can replace all occurrences of a string with another string. +(The logic is complicated by the fact that the string to replace can be split between two chunks.) +```js +class ReplaceStream extends Transform { + // `oldString` is the text to replace; `newString` is the replacement + constructor(oldString, newString) { + super({decodeStrings: false}) // take in string chunks + this.setEncoding('utf8') // output string chunks + this.oldString = oldString + this.newString = newString + // We store the end of the previous chunk + // in case `oldString` straddles a chunk boundary + this.remainingChunk = '' + } + + _transform(chunk, encoding, callback) { + // Find the strings between occurrences of `oldString` + const betweenStrings = (this.remainingChunk + chunk).split(this.oldString) + const lastString = betweenStrings.pop() + // Write out each `betweenString`, + // then replace the occurrence of `oldString` with `newString` + for (const betweenString of betweenStrings) { + this.push(betweenString, encoding) + this.push(this.newString, encoding) + } + // Keep the last few characters in case `oldString` is split across chunks + this.push(lastString.slice(0, -this.oldString.length)) + this.remainingChunk = lastString.slice(-this.oldString.length) + callback() + } + // Called after all chunks have been transformed + _flush(callback) { + // Write the end of the last chunk + this.push(this.remainingChunk) + // As in `_transform()`, `callback` must be called when done + callback() + } +} +``` + +## Everything is a stream + +So far, we've looked at file read and write streams as examples of streams in Node.js. +Node.js actually has stream interfaces for most of its builtin modules. +Here are the most commonly used types of streams that Node.js provides: + +### Readable streams + +- File read streams, created with [`fs.createReadStream()`](https://nodejs.org/api/fs.html#fs_fs_createreadstream_path_options) +- The standard input from the terminal, [`process.stdin`](https://nodejs.org/api/process.html#process_process_stdin) +- HTTP responses (on the client side). + We will discuss HTTP in depth in the next set of notes, but it is the primary protocol for making requests on the Web. + The example below uses a readable stream obtained from [`https.get()`](https://nodejs.org/api/https.html#https_https_get_url_options_callback). + +### Writable streams + +- File write streams, created with [`fs.createWriteStream()`](https://nodejs.org/api/fs.html#fs_fs_createwritestream_path_options) +- The standard output and error streams, [`process.stdout`](https://nodejs.org/api/process.html#process_process_stdout) and [`process.stderr`](https://nodejs.org/api/process.html#process_process_stderr), that are displayed in the terminal +- HTTP responses (on the server side) + +### Transform streams + +- Compression and decompression streams, e.g. [`zlib.createGzip()`](https://nodejs.org/api/zlib.html#zlib_zlib_creategzip_options) and [`zlib.createGunzip()`](https://nodejs.org/api/zlib.html#zlib_zlib_creategunzip_options). + These are used to convert to and from compressed files (like `.zip` or `.gz` files). + The example below uses a `Gunzip` stream to decompress a file. +- Hash streams, e.g. [`crypto.createHash('sha256')`](https://nodejs.org/api/crypto.html#crypto_crypto_createhash_algorithm_options). + These turn a long stream into a short pseudorandom byte array, which can be used to verify the contents of the stream. + +### An example + +By putting together Node.js's builtin streams, we can easily build some complicated programs. +Here is a complete [example](http-gunzip-pipe.js) that loads a compressed webpage over HTTPS, decompresses it with a `Gunzip` transform stream, and pipes it to the standard output. +```js +const https = require('https') +const zlib = require('zlib') + +const URL = 'https://raw.githubusercontent.com/nodejs/node/master/README.md' + +// Request the compressed version of the content at URL. +// The decompressed version is about 10 times larger. +https.get(URL, {headers: {'accept-encoding': 'gzip'}}, res => { + res.pipe(zlib.createGunzip()) // decompress the response + .pipe(process.stdout) // and then print it to the terminal +}) +``` diff --git a/notes/streams/write-stream-callback.js b/notes/streams/write-stream-callback.js new file mode 100644 index 0000000..5ac7bc5 --- /dev/null +++ b/notes/streams/write-stream-callback.js @@ -0,0 +1,12 @@ +const fs = require('fs') + +const stream = fs.createWriteStream('large-file.txt') +stream.write('text\n', () => { + console.log('Wrote first chunk') +}) +for (let i = 0; i < 10000000; i++) { + stream.write('text\n') +} +stream.end(() => { + console.log('Wrote last chunk') +}) diff --git a/notes/streams/write-stream.js b/notes/streams/write-stream.js new file mode 100644 index 0000000..acb9d69 --- /dev/null +++ b/notes/streams/write-stream.js @@ -0,0 +1,6 @@ +const fs = require('fs') + +const stream = fs.createWriteStream('file.txt') +stream.write('abc\n') // write the chunk of data 'abc\n' +stream.write('def\n') // followed by the chunk 'def\n' +stream.end() // close the stream -- GitLab