From 7ade6e309d824a4725c8652df13cec579788bc96 Mon Sep 17 00:00:00 2001 From: Robert Nagy <ronagy@icloud.com> Date: Thu, 9 Dec 2021 09:15:12 +0100 Subject: [PATCH 1/6] stream: add isErrored helper Refs: https://github.com/nodejs/undici/pull/1134 --- doc/api/stream.md | 13 ++++++++++++ lib/internal/streams/utils.js | 20 ++++++++++++++++--- lib/internal/webstreams/readablestream.js | 5 +++++ lib/stream.js | 4 +++- test/parallel/test-stream-readable-didRead.js | 7 +++++-- test/parallel/test-whatwg-readablestream.js | 19 +++++++++++++++++- 6 files changed, 61 insertions(+), 7 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 8ca8fb1db5c0f1..5934ceae100e36 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2225,6 +2225,19 @@ added: v16.8.0 Returns whether the stream has been read from or cancelled. +### `stream.Readable.isErrored(stream)` + +<!-- YAML +added: v16.8.0 +--> + +> Stability: 1 - Experimental + +* `stream` {stream.Readable|ReadableStream} +* Returns: `boolean` + +Returns whether the stream has been errored. + ### `stream.Readable.toWeb(streamReadable)` <!-- YAML diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 7838494b3153b1..95851a7ab8dcd8 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -7,6 +7,7 @@ const { } = primordials; const kDestroyed = Symbol('kDestroyed'); +const kIsErrored = Symbol('kIsErrored'); const kIsDisturbed = Symbol('kIsDisturbed'); function isReadableNodeStream(obj, strict = false) { @@ -239,16 +240,29 @@ function willEmitClose(stream) { function isDisturbed(stream) { return !!(stream && ( - stream.readableDidRead || - stream.readableAborted || - stream[kIsDisturbed] + stream[kIsDisturbed] ?? + (stream.readableDidRead || stream.readableAborted) + )); +} + +function isErrored(stream) { + return !!(stream && ( + stream[kIsErrored] ?? + stream.readableErrored ?? + stream.writableErrored ?? + stream._readableState?.errorEmitted ?? + stream._writableState?.errorEmitted ?? + stream._readableState?.errored ?? + stream._writableState?.errored )); } module.exports = { kDestroyed, isDisturbed, + isErrored, kIsDisturbed, + kIsErrored, isClosed, isDestroyed, isDuplexNodeStream, diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index c9b988d6fee0f7..b4a31d146db2e0 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -82,6 +82,7 @@ const { const { kIsDisturbed, + kIsErrored, } = require('internal/streams/utils'); const { @@ -241,6 +242,10 @@ class ReadableStream { return this[kState].disturbed; } + get [kIsErrored]() { + return this[kState].state === 'errored'; + } + /** * @readonly * @type {boolean} diff --git a/lib/stream.js b/lib/stream.js index cc56b76e31a4a6..c7f61cf8873786 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -36,9 +36,11 @@ const eos = require('internal/streams/end-of-stream'); const internalBuffer = require('internal/buffer'); const promises = require('stream/promises'); +const utils = require('internal/streams/utils'); const Stream = module.exports = require('internal/streams/legacy').Stream; -Stream.isDisturbed = require('internal/streams/utils').isDisturbed; +Stream.isDisturbed = utils.isDisturbed; +Stream.isErrored = utils.isErrored; Stream.Readable = require('internal/streams/readable'); Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); diff --git a/test/parallel/test-stream-readable-didRead.js b/test/parallel/test-stream-readable-didRead.js index c67289e65442c5..878340ba190786 100644 --- a/test/parallel/test-stream-readable-didRead.js +++ b/test/parallel/test-stream-readable-didRead.js @@ -1,15 +1,18 @@ 'use strict'; const common = require('../common'); const assert = require('assert'); -const { isDisturbed, Readable } = require('stream'); +const { isDisturbed, isErrored, Readable } = require('stream'); function noop() {} function check(readable, data, fn) { assert.strictEqual(readable.readableDidRead, false); assert.strictEqual(isDisturbed(readable), false); + assert.strictEqual(isErrored(readable), false); if (data === -1) { - readable.on('error', common.mustCall()); + readable.on('error', common.mustCall(() => { + assert.strictEqual(isErrored(readable), true); + })); readable.on('data', common.mustNotCall()); readable.on('end', common.mustNotCall()); } else { diff --git a/test/parallel/test-whatwg-readablestream.js b/test/parallel/test-whatwg-readablestream.js index ce1d892262be16..5bd3e3872aadff 100644 --- a/test/parallel/test-whatwg-readablestream.js +++ b/test/parallel/test-whatwg-readablestream.js @@ -2,7 +2,7 @@ 'use strict'; const common = require('../common'); -const { isDisturbed } = require('stream'); +const { isDisturbed, isErrored } = require('stream'); const assert = require('assert'); const { isPromise, @@ -1572,3 +1572,20 @@ class Source { isDisturbed(stream, true); })().then(common.mustCall()); } + + +{ + const stream = new ReadableStream({ + start(controller) { + controller.error(new Error()); + }, + pull: common.mustNotCall(), + }); + + const reader = stream.getReader(); + (async () => { + isErrored(stream, false); + await reader.read(); + isErrored(stream, true); + })().then(common.mustCall()); +} From 47b26054fafa17e09a50d0bf147b7a69846c664b Mon Sep 17 00:00:00 2001 From: Robert Nagy <ronagy@icloud.com> Date: Thu, 9 Dec 2021 15:34:09 +0100 Subject: [PATCH 2/6] Apply suggestions from code review Co-authored-by: Luigi Pinca <luigipinca@gmail.com> Co-authored-by: mscdex <mscdex@users.noreply.github.com> --- doc/api/stream.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 5934ceae100e36..0f8482059790ad 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2225,10 +2225,10 @@ added: v16.8.0 Returns whether the stream has been read from or cancelled. -### `stream.Readable.isErrored(stream)` +### `stream.isErrored(stream)` <!-- YAML -added: v16.8.0 +added: REPLACEME --> > Stability: 1 - Experimental @@ -2236,7 +2236,7 @@ added: v16.8.0 * `stream` {stream.Readable|ReadableStream} * Returns: `boolean` -Returns whether the stream has been errored. +Returns whether the stream has encountered an error. ### `stream.Readable.toWeb(streamReadable)` From 571739e263d5ce2a1d09e386d7327057f5fb2192 Mon Sep 17 00:00:00 2001 From: Robert Nagy <ronagy@icloud.com> Date: Thu, 9 Dec 2021 15:34:19 +0100 Subject: [PATCH 3/6] Update doc/api/stream.md Co-authored-by: mscdex <mscdex@users.noreply.github.com> --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 0f8482059790ad..66a419cf02c328 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2234,7 +2234,7 @@ added: REPLACEME > Stability: 1 - Experimental * `stream` {stream.Readable|ReadableStream} -* Returns: `boolean` +* Returns: {boolean} Returns whether the stream has encountered an error. From e6560603c801477a7171ed9da2b4df7cf40a8ad1 Mon Sep 17 00:00:00 2001 From: Robert Nagy <ronagy@icloud.com> Date: Thu, 9 Dec 2021 15:34:42 +0100 Subject: [PATCH 4/6] Update doc/api/stream.md --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 66a419cf02c328..247eb54126c17f 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2233,7 +2233,7 @@ added: REPLACEME > Stability: 1 - Experimental -* `stream` {stream.Readable|ReadableStream} +* `stream` {stream.Readable|stream.Writable|stream.Duplex|WritableStream||ReadableStream} * Returns: {boolean} Returns whether the stream has encountered an error. From 44d58f9824a617d6509674cc3c89bc9b2a4dd1cb Mon Sep 17 00:00:00 2001 From: Robert Nagy <ronagy@icloud.com> Date: Fri, 10 Dec 2021 08:03:17 +0100 Subject: [PATCH 5/6] fixup --- doc/api/stream.md | 2 +- tools/doc/type-parser.mjs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 247eb54126c17f..2b1035f1874aa4 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2233,7 +2233,7 @@ added: REPLACEME > Stability: 1 - Experimental -* `stream` {stream.Readable|stream.Writable|stream.Duplex|WritableStream||ReadableStream} +* `stream` {Readable|Writable|Duplex|WritableStream|ReadableStream} * Returns: {boolean} Returns whether the stream has encountered an error. diff --git a/tools/doc/type-parser.mjs b/tools/doc/type-parser.mjs index 3cdee188c01e7e..566a1c92030584 100644 --- a/tools/doc/type-parser.mjs +++ b/tools/doc/type-parser.mjs @@ -211,6 +211,10 @@ const customTypesMap = { 'stream.Readable': 'stream.html#class-streamreadable', 'stream.Transform': 'stream.html#class-streamtransform', 'stream.Writable': 'stream.html#class-streamwritable', + 'Duplex': 'stream.html#class-streamduplex', + 'Readable': 'stream.html#class-streamreadable', + 'Transform': 'stream.html#class-streamtransform', + 'Writable': 'stream.html#class-streamwritable', 'Immediate': 'timers.html#class-immediate', 'Timeout': 'timers.html#class-timeout', From bb4e9f01e9eb2214af5c24404e503c7c51d27379 Mon Sep 17 00:00:00 2001 From: Robert Nagy <ronagy@icloud.com> Date: Thu, 16 Dec 2021 11:39:08 +0100 Subject: [PATCH 6/6] fixup --- test/parallel/test-whatwg-readablestream.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/parallel/test-whatwg-readablestream.js b/test/parallel/test-whatwg-readablestream.js index 5bd3e3872aadff..b5cb94d043b59b 100644 --- a/test/parallel/test-whatwg-readablestream.js +++ b/test/parallel/test-whatwg-readablestream.js @@ -1576,16 +1576,15 @@ class Source { { const stream = new ReadableStream({ - start(controller) { + pull: common.mustCall((controller) => { controller.error(new Error()); - }, - pull: common.mustNotCall(), + }), }); const reader = stream.getReader(); (async () => { isErrored(stream, false); - await reader.read(); + await reader.read().catch(common.mustCall()); isErrored(stream, true); })().then(common.mustCall()); }