diff --git a/lib/child_process.js b/lib/child_process.js index 59c37b97672d39..c32756437833b6 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -42,6 +42,7 @@ const { StringPrototypeIncludes, StringPrototypeSlice, StringPrototypeToUpperCase, + SymbolDispose, } = primordials; const { @@ -95,6 +96,7 @@ const { const MAX_BUFFER = 1024 * 1024; const isZOS = process.platform === 'os390'; +let addAbortListener; /** * Spawns a new Node.js process + fork. @@ -781,9 +783,9 @@ function spawn(file, args, options) { if (signal.aborted) { process.nextTick(onAbortListener); } else { - signal.addEventListener('abort', onAbortListener, { once: true }); - child.once('exit', - () => signal.removeEventListener('abort', onAbortListener)); + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(signal, onAbortListener); + child.once('exit', disposable[SymbolDispose]); } function onAbortListener() { diff --git a/lib/dgram.js b/lib/dgram.js index b28d727c8a83ce..57975de9183f00 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -30,6 +30,7 @@ const { ObjectDefineProperty, ObjectSetPrototypeOf, ReflectApply, + SymbolDispose, } = primordials; const errors = require('internal/errors'); @@ -143,8 +144,8 @@ function Socket(type, listener) { if (signal.aborted) { onAborted(); } else { - signal.addEventListener('abort', onAborted); - this.once('close', () => signal.removeEventListener('abort', onAborted)); + const disposable = EventEmitter.addAbortListener(signal, onAborted); + this.once('close', disposable[SymbolDispose]); } } if (udpSocketChannel.hasSubscribers) { diff --git a/lib/internal/abort_controller.js b/lib/internal/abort_controller.js index e31738b98288ca..1d9d09aaff62c9 100644 --- a/lib/internal/abort_controller.js +++ b/lib/internal/abort_controller.js @@ -23,6 +23,7 @@ const { kTrustEvent, kNewListener, kRemoveListener, + kResistStopPropagation, kWeakHandler, } = require('internal/event_target'); const { @@ -435,7 +436,8 @@ async function aborted(signal, resource) { if (signal.aborted) return PromiseResolve(); const abortPromise = createDeferredPromise(); - signal.addEventListener('abort', abortPromise.resolve, { [kWeakHandler]: resource, once: true }); + const opts = { __proto__: null, [kWeakHandler]: resource, once: true, [kResistStopPropagation]: true }; + signal.addEventListener('abort', abortPromise.resolve, opts); return abortPromise.promise; } diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index fe104f52a38ed8..f5527a0dd11949 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -27,6 +27,7 @@ const { StringPrototypeSlice, Symbol, SymbolAsyncDispose, + SymbolDispose, TypedArrayPrototypeGetLength, Uint32Array, Uint8Array, @@ -1811,10 +1812,8 @@ class ClientHttp2Session extends Http2Session { if (signal.aborted) { aborter(); } else { - signal.addEventListener('abort', aborter); - stream.once('close', () => { - signal.removeEventListener('abort', aborter); - }); + const disposable = EventEmitter.addAbortListener(signal, aborter); + stream.once('close', disposable[SymbolDispose]); } } diff --git a/lib/internal/readline/interface.js b/lib/internal/readline/interface.js index df08875cc79ae6..f7f06674ef7c41 100644 --- a/lib/internal/readline/interface.js +++ b/lib/internal/readline/interface.js @@ -28,6 +28,7 @@ const { StringPrototypeStartsWith, StringPrototypeTrim, Symbol, + SymbolDispose, SymbolAsyncIterator, SafeStringIterator, } = primordials; @@ -325,8 +326,8 @@ function InterfaceConstructor(input, output, completer, terminal) { if (signal.aborted) { process.nextTick(onAborted); } else { - signal.addEventListener('abort', onAborted, { once: true }); - self.once('close', () => signal.removeEventListener('abort', onAborted)); + const disposable = EventEmitter.addAbortListener(signal, onAborted); + self.once('close', disposable[SymbolDispose]); } } diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js index d6c8ca4c9c7842..819be3ff63e915 100644 --- a/lib/internal/streams/add-abort-signal.js +++ b/lib/internal/streams/add-abort-signal.js @@ -1,5 +1,9 @@ 'use strict'; +const { + SymbolDispose, +} = primordials; + const { AbortError, codes, @@ -13,6 +17,7 @@ const { const eos = require('internal/streams/end-of-stream'); const { ERR_INVALID_ARG_TYPE } = codes; +let addAbortListener; // This method is inlined here for readable-stream // It also does not allow for signal to not exist on the stream @@ -46,8 +51,9 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) { if (signal.aborted) { onAbort(); } else { - signal.addEventListener('abort', onAbort); - eos(stream, () => signal.removeEventListener('abort', onAbort)); + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(signal, onAbort); + eos(stream, disposable[SymbolDispose]); } return stream; }; diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index df8fdeb50110d7..663222e3149bad 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -22,7 +22,11 @@ const { validateBoolean, } = require('internal/validators'); -const { Promise, PromisePrototypeThen } = primordials; +const { + Promise, + PromisePrototypeThen, + SymbolDispose, +} = primordials; const { isClosed, @@ -40,6 +44,7 @@ const { willEmitClose: _willEmitClose, kIsClosedPromise, } = require('internal/streams/utils'); +let addAbortListener; function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; @@ -249,12 +254,13 @@ function eos(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort); } else { + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(options.signal, abort); const originalCallback = callback; callback = once((...args) => { - options.signal.removeEventListener('abort', abort); + disposable[SymbolDispose](); originalCallback.apply(stream, args); }); - options.signal.addEventListener('abort', abort); } } @@ -272,12 +278,13 @@ function eosWeb(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort); } else { + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(options.signal, abort); const originalCallback = callback; callback = once((...args) => { - options.signal.removeEventListener('abort', abort); + disposable[SymbolDispose](); originalCallback.apply(stream, args); }); - options.signal.addEventListener('abort', abort); } } const resolverFn = (...args) => { diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 13cbce1005ece1..8f4797da5dd519 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -1,6 +1,6 @@ 'use strict'; -const { AbortController } = require('internal/abort_controller'); +const { AbortController, AbortSignal } = require('internal/abort_controller'); const { codes: { @@ -16,7 +16,7 @@ const { validateInteger, validateObject, } = require('internal/validators'); -const { kWeakHandler } = require('internal/event_target'); +const { kWeakHandler, kResistStopPropagation } = require('internal/event_target'); const { finished } = require('internal/streams/end-of-stream'); const staticCompose = require('internal/streams/compose'); const { @@ -26,6 +26,7 @@ const { isWritable, isNodeStream } = require('internal/streams/utils'); const { ArrayPrototypePush, + Boolean, MathFloor, Number, NumberIsNaN, @@ -83,19 +84,11 @@ function map(fn, options) { validateInteger(concurrency, 'concurrency', 1); return async function* map() { - const ac = new AbortController(); + const signal = AbortSignal.any([options?.signal].filter(Boolean)); const stream = this; const queue = []; - const signal = ac.signal; const signalOpt = { signal }; - const abort = () => ac.abort(); - if (options?.signal?.aborted) { - abort(); - } - - options?.signal?.addEventListener('abort', abort); - let next; let resume; let done = false; @@ -152,7 +145,6 @@ function map(fn, options) { next(); next = null; } - options?.signal?.removeEventListener('abort', abort); } } @@ -187,8 +179,6 @@ function map(fn, options) { }); } } finally { - ac.abort(); - done = true; if (resume) { resume(); @@ -281,7 +271,7 @@ async function reduce(reducer, initialValue, options) { const ac = new AbortController(); const signal = ac.signal; if (options?.signal) { - const opts = { once: true, [kWeakHandler]: this }; + const opts = { once: true, [kWeakHandler]: this, [kResistStopPropagation]: true }; options.signal.addEventListener('abort', () => ac.abort(), opts); } let gotAnyItemFromStream = false; diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index fb2cd90a2678ea..aac7f65f0404d8 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -7,6 +7,7 @@ const { ArrayIsArray, Promise, SymbolAsyncIterator, + SymbolDispose, } = primordials; const eos = require('internal/streams/end-of-stream'); @@ -44,6 +45,7 @@ const { AbortController } = require('internal/abort_controller'); let PassThrough; let Readable; +let addAbortListener; function destroyer(stream, reading, writing) { let finished = false; @@ -206,7 +208,11 @@ function pipelineImpl(streams, callback, opts) { finishImpl(new AbortError()); } - outerSignal?.addEventListener('abort', abort); + addAbortListener ??= require('events').addAbortListener; + let disposable; + if (outerSignal) { + disposable = addAbortListener(outerSignal, abort); + } let error; let value; @@ -231,7 +237,7 @@ function pipelineImpl(streams, callback, opts) { destroys.shift()(error); } - outerSignal?.removeEventListener('abort', abort); + disposable?.[SymbolDispose](); ac.abort(); if (final) { diff --git a/lib/internal/watch_mode/files_watcher.js b/lib/internal/watch_mode/files_watcher.js index 1fa4fc14cd4d4d..848c17f4115616 100644 --- a/lib/internal/watch_mode/files_watcher.js +++ b/lib/internal/watch_mode/files_watcher.js @@ -18,7 +18,6 @@ const { fileURLToPath } = require('url'); const { resolve, dirname } = require('path'); const { setTimeout } = require('timers'); - const supportsRecursiveWatching = process.platform === 'win32' || process.platform === 'darwin'; @@ -41,7 +40,9 @@ class FilesWatcher extends EventEmitter { this.#mode = mode; this.#signal = signal; - signal?.addEventListener('abort', () => this.clear(), { __proto__: null, once: true }); + if (signal) { + EventEmitter.addAbortListener(signal, () => this.clear()); + } } #isPathWatched(path) { diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 1f96a709959301..9af63227e0496f 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -22,6 +22,7 @@ const { SafePromiseAll, Symbol, SymbolAsyncIterator, + SymbolDispose, SymbolToStringTag, Uint8Array, } = primordials; @@ -140,6 +141,7 @@ const kRelease = Symbol('kRelease'); let releasedError; let releasingError; +let addAbortListener; const userModuleRegExp = /^ {4}at (?:[^/\\(]+ \()(?!node:(.+):\d+:\d+\)$).*/gm; @@ -1259,6 +1261,7 @@ function readableStreamPipeTo( let reader; let writer; + let disposable; // Both of these can throw synchronously. We want to capture // the error and return a rejected promise instead. try { @@ -1291,7 +1294,7 @@ function readableStreamPipeTo( writableStreamDefaultWriterRelease(writer); readableStreamReaderGenericRelease(reader); if (signal !== undefined) - signal.removeEventListener('abort', abortAlgorithm); + disposable?.[SymbolDispose](); if (rejected) promise.reject(error); else @@ -1418,7 +1421,8 @@ function readableStreamPipeTo( abortAlgorithm(); return promise.promise; } - signal.addEventListener('abort', abortAlgorithm, { once: true }); + addAbortListener ??= require('events').addAbortListener; + disposable = addAbortListener(signal, abortAlgorithm); } setPromiseHandled(run()); diff --git a/lib/net.js b/lib/net.js index c1fd13ca19d9b5..194da673db5a22 100644 --- a/lib/net.js +++ b/lib/net.js @@ -35,6 +35,7 @@ const { ObjectDefineProperty, ObjectSetPrototypeOf, Symbol, + SymbolDispose, } = primordials; const EventEmitter = require('events'); @@ -1605,9 +1606,10 @@ function afterConnect(status, handle, req, readable, writable) { function addClientAbortSignalOption(self, options) { validateAbortSignal(options.signal, 'options.signal'); const { signal } = options; + let disposable; function onAbort() { - signal.removeEventListener('abort', onAbort); + disposable?.[SymbolDispose](); self._aborted = true; } @@ -1615,7 +1617,7 @@ function addClientAbortSignalOption(self, options) { process.nextTick(onAbort); } else { process.nextTick(() => { - signal.addEventListener('abort', onAbort); + disposable = EventEmitter.addAbortListener(signal, onAbort); }); } } @@ -1695,8 +1697,8 @@ function addServerAbortSignalOption(self, options) { if (signal.aborted) { process.nextTick(onAborted); } else { - signal.addEventListener('abort', onAborted); - self.once('close', () => signal.removeEventListener('abort', onAborted)); + const disposable = EventEmitter.addAbortListener(signal, onAborted); + self.once('close', disposable[SymbolDispose]); } } diff --git a/lib/readline.js b/lib/readline.js index b9c6f17c52b4b0..5276d9401b4c12 100644 --- a/lib/readline.js +++ b/lib/readline.js @@ -30,6 +30,7 @@ const { Promise, PromiseReject, StringPrototypeSlice, + SymbolDispose, } = primordials; const { @@ -95,6 +96,7 @@ const { kWordRight, kWriteToOutput, } = require('internal/readline/interface'); +let addAbortListener; function Interface(input, output, completer, terminal) { if (!(this instanceof Interface)) { @@ -143,15 +145,13 @@ Interface.prototype.question = function question(query, options, cb) { const onAbort = () => { this[kQuestionCancel](); }; - options.signal.addEventListener('abort', onAbort, { once: true }); - const cleanup = () => { - options.signal.removeEventListener('abort', onAbort); - }; + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(options.signal, onAbort); const originalCb = cb; cb = typeof cb === 'function' ? (answer) => { - cleanup(); + disposable[SymbolDispose](); return originalCb(answer); - } : cleanup; + } : disposable[SymbolDispose]; } if (typeof cb === 'function') { @@ -175,9 +175,10 @@ Interface.prototype.question[promisify.custom] = function question(query, option const onAbort = () => { reject(new AbortError(undefined, { cause: options.signal.reason })); }; - options.signal.addEventListener('abort', onAbort, { once: true }); + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(options.signal, onAbort); cb = (answer) => { - options.signal.removeEventListener('abort', onAbort); + disposable[SymbolDispose](); resolve(answer); }; } diff --git a/lib/readline/promises.js b/lib/readline/promises.js index 9bfa2aaecd6b44..4c2ce90479ef8f 100644 --- a/lib/readline/promises.js +++ b/lib/readline/promises.js @@ -2,6 +2,7 @@ const { Promise, + SymbolDispose, } = primordials; const { @@ -22,6 +23,7 @@ const { validateAbortSignal } = require('internal/validators'); const { kEmptyObject, } = require('internal/util'); +let addAbortListener; class Interface extends _Interface { // eslint-disable-next-line no-useless-constructor @@ -43,9 +45,11 @@ class Interface extends _Interface { this[kQuestionCancel](); reject(new AbortError(undefined, { cause: options.signal.reason })); }; - options.signal.addEventListener('abort', onAbort, { once: true }); + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(options.signal, onAbort); + cb = (answer) => { - options.signal.removeEventListener('abort', onAbort); + disposable[SymbolDispose](); resolve(answer); }; }