Working with Node.js streams is much more pleasant if we use asynchronous iteration. This blog post explores how to do that.
Asynchronous iteration is a protocol for retrieving the contents of a data container asynchronously (meaning the current “task” may be paused before retrieving an item).
Asynchronous generators help with async iteration. For example, this is an asynchronous generator function:
/**
* @returns an asynchronous iterable
*/
async function* asyncGenerator(asyncIterable) {
for await (const item of asyncIterable) { // input
if (···) {
yield '> ' + item; // output
}
}
}
for-await-of
loop iterates over the input asyncIterable
. This loop is also available in normal asynchronous functions.yield
feeds values into the asynchronous iterable that is returned by this generator.In the remainder of the blog post, pay close attention to whether a function is an async function or an async generator function:
/** @returns a Promise */
async function asyncFunction() { /*···*/ }
/** @returns an async iterable */
async function* asyncGeneratorFunction() { /*···*/ }
A stream is a pattern whose core idea is to “divide and conquer” a large amount of data: We can handle it if we split it into smaller pieces and handle one portion at a time.
Node.js supports several kinds of streams – for example:
Readable streams are streams from which we can read data. In other words, they are sources of data. An example is a readable file stream, which lets us read the contents of a file.
Writable streams are streams to which we can write data. In other words, they are sinks for data. An example is a writable file stream, which lets us write data to a file.
A transform stream is both readable and writable. As a writable stream, it receives pieces of data, transforms (changes or discards) them and then outputs them as a readable stream.
To process streamed data in multiple steps, we can pipeline (connect) streams:
Part (2) is optional.
When creating text streams, it is best to always specify an encoding:
The Node.js docs have a list of supported encodings and their default spellings – for example:
'utf8'
'utf16le'
'base64'
A few different spellings are also allowed. You can use Buffer.isEncoding()
to check which ones are:
> buffer.Buffer.isEncoding('utf8')
true
> buffer.Buffer.isEncoding('utf-8')
true
> buffer.Buffer.isEncoding('UTF-8')
true
> buffer.Buffer.isEncoding('UTF:8')
false
The default value for encodings is null
, which is equivalent to 'utf8'
.
readableToString()
We will occasionally use the following helper function. You don’t need to understand how it works, only (roughly) what it does.
import * as stream from 'stream';
/**
* Reads all the text in a readable stream and returns it as a string,
* via a Promise.
* @param {stream.Readable} readable
*/
function readableToString(readable) {
return new Promise((resolve, reject) => {
let data = '';
readable.on('data', function (chunk) {
data += chunk;
});
readable.on('end', function () {
resolve(data);
});
readable.on('error', function (err) {
reject(err);
});
});
}
This function is implemented via the event-based API. We’ll later see a simpler way of doing this – via async iteration.
We’ll only use text streams in this post.
In the examples, we’ll occasionally encounter await
being used at the top level. In that case, we imagine that we are inside a module or inside the body of an async function.
Whenever there are newlines, we support both:
'\n'
(LF)'\r\n'
(CR LF)The newline characters of the current platform can be accessed via the constant EOL
in module os
.
We can use fs.createReadStream()
to create readable streams:
import * as fs from 'fs';
const readableStream = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
assert.equal(
await readableToString(readableStream),
'This is a test!\n');
Readable.from()
: Creating readable streams from iterables The static method Readable.from(iterable, options?)
creates a readable stream which holds the data contained in iterable
. iterable
can be a synchronous iterable or an asynchronous iterable. The parameter options
is optional and can, among other things, be used to specify a text encoding.
import * as stream from 'stream';
function* gen() {
yield 'One line\n';
yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
assert.equal(
await readableToString(readableStream),
'One line\nAnother line\n');
Readable.from()
accepts any iterable and can therefore also be used to convert strings to streams:
import {Readable} from 'stream';
const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
assert.equal(
await readableToString(readable),
'Some text!');
At the moment, Readable.from()
treats a string like any other iterable and therefore iterates over its code points. That isn’t ideal, performance-wise, but should be OK for most use cases. I expect Readable.from()
to be often used with strings, so maybe there will be optimizations in the future.
for-await-of
Every readable stream is asynchronously iterable, which means that we can use a for-await-of
loop to read its contents:
import * as fs from 'fs';
async function logChunks(readable) {
for await (const chunk of readable) {
console.log(chunk);
}
}
const readable = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);
// Output:
// 'This is a test!\n'
The following function is a simpler reimplementation of the function that we have seen at the beginning of this blog post.
import {Readable} from 'stream';
async function readableToString2(readable) {
let result = '';
for await (const chunk of readable) {
result += chunk;
}
return result;
}
const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');
Note that, in this case, we had to use an async function because we wanted to return a Promise.
Async iteration provides an elegant alternative to transform streams for processing streamed data in multiple steps:
Readable.from()
(which can later be piped into a writable stream).To summarize, these are the pieces of such processing pipelines:
readable
→ first async generator [→ … → last async generator]
→ readable or async function
In the following example, the final step is performed by the async function logLines()
which logs the items in an iterable to the console.
import {Readable} from 'stream';
/**
* @param chunkIterable An asynchronous or synchronous iterable
* over “chunks” (arbitrary strings)
* @returns An asynchronous iterable over “lines”
* (strings with at most one newline that always appears at the end)
*/
async function* chunksToLines(chunkIterable) {
let previous = '';
for await (const chunk of chunkIterable) {
let startSearch = previous.length;
previous += chunk;
while (true) {
const eolIndex = previous.indexOf('\n', startSearch);
if (eolIndex < 0) break;
// line includes the EOL
const line = previous.slice(0, eolIndex+1);
yield line;
previous = previous.slice(eolIndex+1);
startSearch = 0;
}
}
if (previous.length > 0) {
yield previous;
}
}
async function* numberLines(lineIterable) {
let lineNumber = 1;
for await (const line of lineIterable) {
yield lineNumber + ' ' + line;
lineNumber++;
}
}
async function logLines(lineIterable) {
for await (const line of lineIterable) {
console.log(line);
}
}
const chunks = Readable.from(
'Text with\nmultiple\nlines.\n',
{encoding: 'utf8'});
logLines(numberLines(chunksToLines(chunks)));
// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'
We can use fs.createWriteStream()
to create writable streams:
const writableStream = fs.createWriteStream(
'tmp/log.txt', {encoding: 'utf8'});
In this section, we look at three approaches to writing to a writable stream:
.write()
..pipe()
of a readable stream to pipe it into the writable stream.pipeline()
from module stream
to pipe a readable stream into the writable stream.To demonstrate these approaches, we implement the same function writeIterableToFile()
in three different ways.
import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';
const finished = util.promisify(stream.finished); // (A)
async function writeIterableToFile(iterable, filePath) {
const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
for await (const chunk of iterable) {
if (!writable.write(chunk)) { // (B)
// Handle backpressure
await once(writable, 'drain');
}
}
writable.end(); // (C)
// Wait until done. Throws if there are errors.
await finished(writable);
}
await writeIterableToFile(
['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
'One line of text.\n');
The default version of stream.finished()
is callback-based but can be turned into a Promise-based version via util.promisify()
(line A).
We used the following two patterns:
Writing to a writable stream while handling backpressure (line B):
if (!writable.write(chunk)) {
await once(writable, 'drain');
}
Closing a writable stream and waiting until writing is done (line C):
writable.end();
await finished(writable);
pipeline(readable, writable)
import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);
async function writeIterableToFile(iterable, filePath) {
const readable = stream.Readable.from(
iterable, {encoding: 'utf8'});
const writable = fs.createWriteStream(filePath);
await pipeline(readable, writable); // (A)
}
await writeIterableToFile(
['One', ' line of text.\n'], 'tmp/log.txt');
// ···
We used the following pattern (line A):
await pipeline(readable, writable);
There is also Readable.prototype.pipe()
, but that method has a caveat (if the readable emits an error, then the writable is not closed automatically). stream.pipeline()
does not have that caveat.
Module os
:
const EOL: string
(since 0.7.8)
Contains the end-of-line character sequence used by the current platform.
Module buffer
:
Buffer.isEncoding(encoding: string): boolean
(since 0.9.1)
Returns true
if encoding
correctly names one of the supported Node.js encodings for text. Supported encodings include:
'utf8'
'utf16le'
'ascii'
'latin1
'base64'
'hex'
(each byte as two hexadecimal characters)Module stream
:
Readable.prototype[Symbol.asyncIterator](): AsyncIterableIterator<any>
(since 10.0.0)
Readable streams are asynchronously iterable. For example, you can use for-await-of
loops in asyc functions or async generators to iterate over them.
finished(stream: ReadableStream | WritableStream | ReadWriteStream, callback: (err?: ErrnoException | null) => void): () => Promise<void>
(since 10.0.0)
The returned Promise is settled when reading/writing is done or there was an error.
This promisified version is created as follows:
const finished = util.promisify(stream.finished);
pipeline(...streams: Array<ReadableStream|ReadWriteStream|WritableStream>): Promise<void>
(since 10.0.0)
Pipes between streams. The returned Promise is settled when the pipeline is complete or when there was an error.
This promisified version is created as follows:
const pipeline = util.promisify(stream.pipeline);
Readable.from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable
(since 12.3.0)
Converts an iterable into a readable stream.
interface ReadableOptions {
highWaterMark?: number;
encoding?: string;
objectMode?: boolean;
read?(this: Readable, size: number): void;
destroy?(this: Readable, error: Error | null,
callback: (error: Error | null) => void): void;
autoDestroy?: boolean;
}
These options are the same as the options for the Readable
constructor and documented there.
Module fs
:
createReadStream(path: string | Buffer | URL, options?: string | {encoding?: string; start?: number}): ReadStream
(since 2.3.0)
Creates a readable stream. More options are available.
createWriteStream(path: PathLike, options?: string | {encoding?: string; flags?: string; mode?: number; start?: number}): WriteStream
(since 2.3.0)
With option .flags
you can specify if you want to write or append and what happens if a file does or does not exist. More options are available.
The static type information in this section is based on Definitely Typed.