From c44413d9bd8aae6b81d06fbe5bf864eb47b1a986 Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 6 Jul 2023 01:58:28 +0300 Subject: [PATCH] stream: use addAbortListener PR-URL: https://github.com/nodejs/node/pull/48550 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Matteo Collina --- lib/internal/streams/add-abort-signal.js | 10 ++++++++-- lib/internal/streams/end-of-stream.js | 17 ++++++++++++----- lib/internal/streams/operators.js | 20 +++++--------------- lib/internal/streams/pipeline.js | 10 ++++++++-- lib/internal/webstreams/readablestream.js | 8 ++++++-- 5 files changed, 39 insertions(+), 26 deletions(-) diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js index d6c8ca4c9c7842..819be3ff63e915 100644 --- a/lib/internal/streams/add-abort-signal.js +++ b/lib/internal/streams/add-abort-signal.js @@ -1,5 +1,9 @@ 'use strict'; +const { + SymbolDispose, +} = primordials; + const { AbortError, codes, @@ -13,6 +17,7 @@ const { const eos = require('internal/streams/end-of-stream'); const { ERR_INVALID_ARG_TYPE } = codes; +let addAbortListener; // This method is inlined here for readable-stream // It also does not allow for signal to not exist on the stream @@ -46,8 +51,9 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) { if (signal.aborted) { onAbort(); } else { - signal.addEventListener('abort', onAbort); - eos(stream, () => signal.removeEventListener('abort', onAbort)); + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(signal, onAbort); + eos(stream, disposable[SymbolDispose]); } return stream; }; diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index df8fdeb50110d7..663222e3149bad 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -22,7 +22,11 @@ const { validateBoolean, } = require('internal/validators'); -const { Promise, PromisePrototypeThen } = primordials; +const { + Promise, + PromisePrototypeThen, + SymbolDispose, +} = primordials; const { isClosed, @@ -40,6 +44,7 @@ const { willEmitClose: _willEmitClose, kIsClosedPromise, } = require('internal/streams/utils'); +let addAbortListener; function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; @@ -249,12 +254,13 @@ function eos(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort); } else { + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(options.signal, abort); const originalCallback = callback; callback = once((...args) => { - options.signal.removeEventListener('abort', abort); + disposable[SymbolDispose](); originalCallback.apply(stream, args); }); - options.signal.addEventListener('abort', abort); } } @@ -272,12 +278,13 @@ function eosWeb(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort); } else { + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(options.signal, abort); const originalCallback = callback; callback = once((...args) => { - options.signal.removeEventListener('abort', abort); + disposable[SymbolDispose](); originalCallback.apply(stream, args); }); - options.signal.addEventListener('abort', abort); } } const resolverFn = (...args) => { diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 96e2a95476813d..47208136e0916d 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -1,6 +1,6 @@ 'use strict'; -const { AbortController } = require('internal/abort_controller'); +const { AbortController, AbortSignal } = require('internal/abort_controller'); const { codes: { @@ -16,7 +16,7 @@ const { validateInteger, validateObject, } = require('internal/validators'); -const { kWeakHandler } = require('internal/event_target'); +const { kWeakHandler, kResistStopPropagation } = require('internal/event_target'); const { finished } = require('internal/streams/end-of-stream'); const staticCompose = require('internal/streams/compose'); const { @@ -27,6 +27,7 @@ const { deprecate } = require('internal/util'); const { ArrayPrototypePush, + Boolean, MathFloor, Number, NumberIsNaN, @@ -84,19 +85,11 @@ function map(fn, options) { validateInteger(concurrency, 'concurrency', 1); return async function* map() { - const ac = new AbortController(); + const signal = AbortSignal.any([options?.signal].filter(Boolean)); const stream = this; const queue = []; - const signal = ac.signal; const signalOpt = { signal }; - const abort = () => ac.abort(); - if (options?.signal?.aborted) { - abort(); - } - - options?.signal?.addEventListener('abort', abort); - let next; let resume; let done = false; @@ -153,7 +146,6 @@ function map(fn, options) { next(); next = null; } - options?.signal?.removeEventListener('abort', abort); } } @@ -188,8 +180,6 @@ function map(fn, options) { }); } } finally { - ac.abort(); - done = true; if (resume) { resume(); @@ -301,7 +291,7 @@ async function reduce(reducer, initialValue, options) { const ac = new AbortController(); const signal = ac.signal; if (options?.signal) { - const opts = { once: true, [kWeakHandler]: this }; + const opts = { once: true, [kWeakHandler]: this, [kResistStopPropagation]: true }; options.signal.addEventListener('abort', () => ac.abort(), opts); } let gotAnyItemFromStream = false; diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index fb2cd90a2678ea..aac7f65f0404d8 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -7,6 +7,7 @@ const { ArrayIsArray, Promise, SymbolAsyncIterator, + SymbolDispose, } = primordials; const eos = require('internal/streams/end-of-stream'); @@ -44,6 +45,7 @@ const { AbortController } = require('internal/abort_controller'); let PassThrough; let Readable; +let addAbortListener; function destroyer(stream, reading, writing) { let finished = false; @@ -206,7 +208,11 @@ function pipelineImpl(streams, callback, opts) { finishImpl(new AbortError()); } - outerSignal?.addEventListener('abort', abort); + addAbortListener ??= require('events').addAbortListener; + let disposable; + if (outerSignal) { + disposable = addAbortListener(outerSignal, abort); + } let error; let value; @@ -231,7 +237,7 @@ function pipelineImpl(streams, callback, opts) { destroys.shift()(error); } - outerSignal?.removeEventListener('abort', abort); + disposable?.[SymbolDispose](); ac.abort(); if (final) { diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 8eadb631650c2c..5e312d2a8f45ea 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -22,6 +22,7 @@ const { SafePromiseAll, Symbol, SymbolAsyncIterator, + SymbolDispose, SymbolToStringTag, Uint8Array, } = primordials; @@ -140,6 +141,7 @@ const kRelease = Symbol('kRelease'); let releasedError; let releasingError; +let addAbortListener; const userModuleRegExp = /^ {4}at (?:[^/\\(]+ \()(?!node:(.+):\d+:\d+\)$).*/gm; @@ -1259,6 +1261,7 @@ function readableStreamPipeTo( let reader; let writer; + let disposable; // Both of these can throw synchronously. We want to capture // the error and return a rejected promise instead. try { @@ -1291,7 +1294,7 @@ function readableStreamPipeTo( writableStreamDefaultWriterRelease(writer); readableStreamReaderGenericRelease(reader); if (signal !== undefined) - signal.removeEventListener('abort', abortAlgorithm); + disposable?.[SymbolDispose](); if (rejected) promise.reject(error); else @@ -1418,7 +1421,8 @@ function readableStreamPipeTo( abortAlgorithm(); return promise.promise; } - signal.addEventListener('abort', abortAlgorithm, { once: true }); + addAbortListener ??= require('events').addAbortListener; + disposable = addAbortListener(signal, abortAlgorithm); } setPromiseHandled(run());