From 7f7e3a06faf56013c77fcfc659438649d34c06d3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 25 Jun 2024 21:14:50 +0200 Subject: [PATCH] fix: interceptor back-pressure Refs: https://github.com/nodejs/undici/pull/3368 Refs: https://github.com/nodejs/undici/issues/3370 --- lib/dispatcher/agent.js | 12 ++------ lib/dispatcher/client.js | 37 ++++++++++++++++++------- lib/dispatcher/dispatcher-base.js | 10 +++---- lib/dispatcher/dispatcher.js | 12 ++++++-- lib/dispatcher/pool-base.js | 21 ++++++++++++-- lib/dispatcher/proxy-agent.js | 5 ++-- lib/dispatcher/retry-agent.js | 9 +++--- lib/interceptor/dump.js | 4 +-- lib/interceptor/redirect-interceptor.js | 6 ++-- lib/interceptor/redirect.js | 4 +-- lib/interceptor/retry.js | 5 ++-- 11 files changed, 81 insertions(+), 44 deletions(-) diff --git a/lib/dispatcher/agent.js b/lib/dispatcher/agent.js index 98f1486cac0..429c1301d67 100644 --- a/lib/dispatcher/agent.js +++ b/lib/dispatcher/agent.js @@ -79,7 +79,7 @@ class Agent extends DispatcherBase { return ret } - [kDispatch] (opts, handler) { + [kDispatch] (opts, handler, onDrain) { let key if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) { key = String(opts.origin) @@ -87,22 +87,16 @@ class Agent extends DispatcherBase { throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.') } - let dispatcher = this[kClients].get(key) + const dispatcher = this[kClients].get(key) if (!dispatcher) { - dispatcher = this[kFactory](opts.origin, this[kOptions]) - .on('drain', this[kOnDrain]) - .on('connect', this[kOnConnect]) - .on('disconnect', this[kOnDisconnect]) - .on('connectionError', this[kOnConnectionError]) - // This introduces a tiny memory leak, as dispatchers are never removed from the map. // TODO(mcollina): remove te timer when the client/pool do not have any more // active connections. this[kClients].set(key, dispatcher) } - return dispatcher.dispatch(opts, handler) + return dispatcher.dispatch(opts, handler, onDrain) } async [kClose] () { diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index cb61206b1ed..48f109227e5 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -62,6 +62,7 @@ const connectH2 = require('./client-h2.js') let deprecatedInterceptorWarned = false const kClosedResolve = Symbol('kClosedResolve') +const kDrainQueue = Symbol('kDrainQueue') function getPipelining (client) { return client[kPipelining] ?? client[kHTTPContext]?.defaultPipelining ?? 1 @@ -243,6 +244,13 @@ class Client extends DispatcherBase { this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server this[kHTTPContext] = null + this[kDrainQueue] = [] + this.on('drain', () => { + for (const callback of this[kDrainQueue].splice(0)) { + callback(null) + } + }) + // kQueue is built up of 3 sections separated by // the kRunningIdx and kPendingIdx indices. // | complete | running | pending | @@ -299,26 +307,31 @@ class Client extends DispatcherBase { this.once('connect', cb) } - [kDispatch] (opts, handler) { + [kDispatch] (opts, handler, onDrain) { const origin = opts.origin || this[kUrl].origin const request = new Request(origin, opts, handler) - this[kQueue].push(request) - if (this[kResuming]) { - // Do nothing. - } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) { - // Wait a tick in case stream/iterator is ended in the same tick. - this[kResuming] = 1 - queueMicrotask(() => resume(this)) + if (this[kBusy] && onDrain) { + this[kDrainQueue].push(onDrain) + return false } else { - this[kResume](true) + this[kQueue].push(request) + if (this[kResuming]) { + // Do nothing. + } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) { + // Wait a tick in case stream/iterator is ended in the same tick. + this[kResuming] = 1 + queueMicrotask(() => resume(this)) + } else { + this[kResume](true) + } } if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) { this[kNeedDrain] = 2 } - return this[kNeedDrain] < 2 + return onDrain ? true : this[kNeedDrain] < 2 } async [kClose] () { @@ -341,6 +354,10 @@ class Client extends DispatcherBase { util.errorRequest(this, request, err) } + for (const callback of this[kDrainQueue].splice(0)) { + callback(err) + } + const callback = () => { if (this[kClosedResolve]) { // TODO (fix): Should we error here with ClientDestroyedError? diff --git a/lib/dispatcher/dispatcher-base.js b/lib/dispatcher/dispatcher-base.js index bd860acdcf4..637f32c7a09 100644 --- a/lib/dispatcher/dispatcher-base.js +++ b/lib/dispatcher/dispatcher-base.js @@ -142,10 +142,10 @@ class DispatcherBase extends Dispatcher { }) } - [kInterceptedDispatch] (opts, handler) { + [kInterceptedDispatch] (opts, handler, onDrain) { if (!this[kInterceptors] || this[kInterceptors].length === 0) { this[kInterceptedDispatch] = this[kDispatch] - return this[kDispatch](opts, handler) + return this[kDispatch](opts, handler, onDrain) } let dispatch = this[kDispatch].bind(this) @@ -153,10 +153,10 @@ class DispatcherBase extends Dispatcher { dispatch = this[kInterceptors][i](dispatch) } this[kInterceptedDispatch] = dispatch - return dispatch(opts, handler) + return dispatch(opts, handler, onDrain) } - dispatch (opts, handler) { + dispatch (opts, handler, onDrain) { if (!handler || typeof handler !== 'object') { throw new InvalidArgumentError('handler must be an object') } @@ -174,7 +174,7 @@ class DispatcherBase extends Dispatcher { throw new ClientClosedError() } - return this[kInterceptedDispatch](opts, handler) + return this[kInterceptedDispatch](opts, handler, onDrain) } catch (err) { if (typeof handler.onError !== 'function') { throw new InvalidArgumentError('invalid onError method') diff --git a/lib/dispatcher/dispatcher.js b/lib/dispatcher/dispatcher.js index b1e0098ec4b..d2229d35f19 100644 --- a/lib/dispatcher/dispatcher.js +++ b/lib/dispatcher/dispatcher.js @@ -49,8 +49,16 @@ class ComposedDispatcher extends Dispatcher { this.#dispatch = dispatch } - dispatch (...args) { - this.#dispatch(...args) + dispatch (opts, handler, onDrain) { + onDrain ??= (err) => { + if (err) { + handler.onError(err) + } else { + this.#dispatch(opts, handler, onDrain) + } + } + + return this.#dispatch(opts, handler, onDrain) } close (...args) { diff --git a/lib/dispatcher/pool-base.js b/lib/dispatcher/pool-base.js index ff3108a4da2..0ee61a6a1d5 100644 --- a/lib/dispatcher/pool-base.js +++ b/lib/dispatcher/pool-base.js @@ -17,6 +17,7 @@ const kGetDispatcher = Symbol('get dispatcher') const kAddClient = Symbol('add client') const kRemoveClient = Symbol('remove client') const kStats = Symbol('stats') +const kDrainQueue = Symbol('kDrainQueue') class PoolBase extends DispatcherBase { constructor () { @@ -69,6 +70,13 @@ class PoolBase extends DispatcherBase { } this[kStats] = new PoolStats(this) + + this[kDrainQueue] = [] + this.on('drain', () => { + for (const callback of this[kDrainQueue].splice(0)) { + callback(null) + } + }) } get [kBusy] () { @@ -122,6 +130,10 @@ class PoolBase extends DispatcherBase { } async [kDestroy] (err) { + for (const callback of this[kDrainQueue].splice(0)) { + callback(err) + } + while (true) { const item = this[kQueue].shift() if (!item) { @@ -133,10 +145,13 @@ class PoolBase extends DispatcherBase { return Promise.all(this[kClients].map(c => c.destroy(err))) } - [kDispatch] (opts, handler) { + [kDispatch] (opts, handler, onDrain) { const dispatcher = this[kGetDispatcher]() - if (!dispatcher) { + if (!dispatcher && onDrain) { + this[kDrainQueue].push(onDrain) + return false + } else if (!dispatcher) { this[kNeedDrain] = true this[kQueue].push({ opts, handler }) this[kQueued]++ @@ -145,7 +160,7 @@ class PoolBase extends DispatcherBase { this[kNeedDrain] = !this[kGetDispatcher]() } - return !this[kNeedDrain] + return onDrain ? true : !this[kNeedDrain] } [kAddClient] (client) { diff --git a/lib/dispatcher/proxy-agent.js b/lib/dispatcher/proxy-agent.js index 226b67846da..a698a70a6b2 100644 --- a/lib/dispatcher/proxy-agent.js +++ b/lib/dispatcher/proxy-agent.js @@ -107,7 +107,7 @@ class ProxyAgent extends DispatcherBase { }) } - dispatch (opts, handler) { + dispatch (opts, handler, onDrain) { const headers = buildHeaders(opts.headers) throwIfProxyAuthIsSent(headers) @@ -121,7 +121,8 @@ class ProxyAgent extends DispatcherBase { ...opts, headers }, - handler + handler, + onDrain ) } diff --git a/lib/dispatcher/retry-agent.js b/lib/dispatcher/retry-agent.js index 0c2120d6f26..1a5015b54bb 100644 --- a/lib/dispatcher/retry-agent.js +++ b/lib/dispatcher/retry-agent.js @@ -4,15 +4,16 @@ const Dispatcher = require('./dispatcher') const RetryHandler = require('../handler/retry-handler') class RetryAgent extends Dispatcher { - #agent = null - #options = null + #agent + #options + constructor (agent, options = {}) { super(options) this.#agent = agent this.#options = options } - dispatch (opts, handler) { + dispatch (opts, handler, onDrain) { const retry = new RetryHandler({ ...opts, retryOptions: this.#options @@ -20,7 +21,7 @@ class RetryAgent extends Dispatcher { dispatch: this.#agent.dispatch.bind(this.#agent), handler }) - return this.#agent.dispatch(opts, retry) + return this.#agent.dispatch(opts, retry, onDrain) } close () { diff --git a/lib/interceptor/dump.js b/lib/interceptor/dump.js index fc9cacb198d..44d221e2b43 100644 --- a/lib/interceptor/dump.js +++ b/lib/interceptor/dump.js @@ -106,7 +106,7 @@ function createDumpInterceptor ( } ) { return dispatch => { - return function Intercept (opts, handler) { + return function Intercept (opts, handler, onDrain) { const { dumpMaxSize = defaultMaxSize } = opts @@ -115,7 +115,7 @@ function createDumpInterceptor ( handler ) - return dispatch(opts, dumpHandler) + return dispatch(opts, dumpHandler, onDrain) } } } diff --git a/lib/interceptor/redirect-interceptor.js b/lib/interceptor/redirect-interceptor.js index 896ee8db939..7b26af0cc8b 100644 --- a/lib/interceptor/redirect-interceptor.js +++ b/lib/interceptor/redirect-interceptor.js @@ -4,16 +4,16 @@ const RedirectHandler = require('../handler/redirect-handler') function createRedirectInterceptor ({ maxRedirections: defaultMaxRedirections }) { return (dispatch) => { - return function Intercept (opts, handler) { + return function Intercept (opts, handler, onDrain) { const { maxRedirections = defaultMaxRedirections } = opts if (!maxRedirections) { - return dispatch(opts, handler) + return dispatch(opts, handler, onDrain) } const redirectHandler = new RedirectHandler(dispatch, maxRedirections, opts, handler) opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting. - return dispatch(opts, redirectHandler) + return dispatch(opts, redirectHandler, onDrain) } } } diff --git a/lib/interceptor/redirect.js b/lib/interceptor/redirect.js index d2e789d8efb..ffe943d7650 100644 --- a/lib/interceptor/redirect.js +++ b/lib/interceptor/redirect.js @@ -4,7 +4,7 @@ const RedirectHandler = require('../handler/redirect-handler') module.exports = opts => { const globalMaxRedirections = opts?.maxRedirections return dispatch => { - return function redirectInterceptor (opts, handler) { + return function redirectInterceptor (opts, handler, onDrain) { const { maxRedirections = globalMaxRedirections, ...baseOpts } = opts if (!maxRedirections) { @@ -18,7 +18,7 @@ module.exports = opts => { handler ) - return dispatch(baseOpts, redirectHandler) + return dispatch(baseOpts, redirectHandler, onDrain) } } } diff --git a/lib/interceptor/retry.js b/lib/interceptor/retry.js index 1c16fd845a9..4ec61b53d37 100644 --- a/lib/interceptor/retry.js +++ b/lib/interceptor/retry.js @@ -3,7 +3,7 @@ const RetryHandler = require('../handler/retry-handler') module.exports = globalOpts => { return dispatch => { - return function retryInterceptor (opts, handler) { + return function retryInterceptor (opts, handler, onDrain) { return dispatch( opts, new RetryHandler( @@ -12,7 +12,8 @@ module.exports = globalOpts => { handler, dispatch } - ) + ), + onDrain ) } }