Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: compose with async functions #39435

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 50 additions & 5 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1861,7 +1861,7 @@ failure, this can cause event listener leaks and swallowed errors.
added: REPLACEME
-->

* `streams` {Stream[]}
* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]}
* Returns: {stream.Duplex}

Combines two or more streams into a `Duplex` stream that writes to the
Expand All @@ -1875,6 +1875,9 @@ when passing streams to `stream.pipeline`, typically the first stream is
a readable stream and the last a writable stream, forming a closed
circuit.

If passed a `Function` it must be a factory method taking a `source`
`Iterable`.

```mjs
import { compose, Transform } from 'stream';

Expand All @@ -1884,11 +1887,11 @@ const removeSpaces = new Transform({
}
});

const toUpper = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).toUpperCase());
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
}

let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
Expand All @@ -1898,6 +1901,48 @@ for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
console.log(res); // prints 'HELLOWORLD'
```

`stream.compose` can be used to convert async iterables, generators and
functions into streams.

* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
`null`.
* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`.
Must take a source `AsyncIterable` as first parameter. Cannot yield
`null`.
* `AsyncFunction` converts into a writable `Duplex`. Must return
either `null` or `undefined`.

```mjs
import { compose } from 'stream';
import { finished } from 'stream/promises';

// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());

// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});

let res = '';

// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
ronag marked this conversation as resolved.
Show resolved Hide resolved

await finished(compose(s1, s2, s3));

console.log(res); // prints 'HELLOWORLD'
```

### `stream.Readable.from(iterable, [options])`
<!-- YAML
added:
Expand Down
24 changes: 2 additions & 22 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,10 @@ const {
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const _compose = require('internal/streams/compose');
const compose = require('internal/streams/compose');
const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');
const { isNodeStream } = require('internal/streams/utils');
const {
codes: {
ERR_INVALID_ARG_VALUE,
},
} = require('internal/errors');

const promises = require('stream/promises');

Expand All @@ -54,21 +48,7 @@ const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;
Stream.destroy = destroyer;

Stream.compose = function compose(...streams) {
// TODO (ronag): Remove this once async function API
// has been discussed.
for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
streams[n],
'must be stream'
);
}
}
return _compose(...streams);
};
Stream.compose = compose;

ObjectDefineProperty(Stream, 'promises', {
configurable: true,
Expand Down