From db902937f71d5099af263a47b2db283807574279 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 17 Oct 2023 15:26:00 -0600 Subject: [PATCH] refactor(NODE-5675): refactor server selection and connection checkout to use abort signals for timeout management (#3890) --- src/cmap/connection_pool.ts | 56 +++++++++++++++---------------- src/index.ts | 3 +- src/sdam/topology.ts | 42 ++++++++++------------- src/utils.ts | 28 ++++++++++++++++ test/unit/utils.test.ts | 66 ++++++++++++++++++++++++++++++++++++- 5 files changed, 138 insertions(+), 57 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 4dfa1e3896e..187d6b09a00 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -27,7 +27,7 @@ import { } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Server } from '../sdam/server'; -import { type Callback, eachAsync, List, makeCounter } from '../utils'; +import { type Callback, eachAsync, List, makeCounter, TimeoutController } from '../utils'; import { AUTH_PROVIDERS, connect } from './connect'; import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection'; import { @@ -101,7 +101,7 @@ export interface ConnectionPoolOptions extends Omit; - timer?: NodeJS.Timeout; + timeoutController: TimeoutController; [kCancelled]?: boolean; } @@ -356,27 +356,29 @@ export class ConnectionPool extends TypedEventEmitter { new ConnectionCheckOutStartedEvent(this) ); - const waitQueueMember: WaitQueueMember = { callback }; const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS; - if (waitQueueTimeoutMS) { - waitQueueMember.timer = setTimeout(() => { - waitQueueMember[kCancelled] = true; - waitQueueMember.timer = undefined; - this.emitAndLog( - ConnectionPool.CONNECTION_CHECK_OUT_FAILED, - new ConnectionCheckOutFailedEvent(this, 'timeout') - ); - waitQueueMember.callback( - new WaitQueueTimeoutError( - this.loadBalanced - ? this.waitQueueErrorMetrics() - : 'Timed out while checking out a connection from connection pool', - this.address - ) - ); - }, waitQueueTimeoutMS); - } + const waitQueueMember: WaitQueueMember = { + callback, + timeoutController: new TimeoutController(waitQueueTimeoutMS) + }; + waitQueueMember.timeoutController.signal.addEventListener('abort', () => { + waitQueueMember[kCancelled] = true; + waitQueueMember.timeoutController.clear(); + + this.emitAndLog( + ConnectionPool.CONNECTION_CHECK_OUT_FAILED, + new ConnectionCheckOutFailedEvent(this, 'timeout') + ); + waitQueueMember.callback( + new WaitQueueTimeoutError( + this.loadBalanced + ? this.waitQueueErrorMetrics() + : 'Timed out while checking out a connection from connection pool', + this.address + ) + ); + }); this[kWaitQueue].push(waitQueueMember); process.nextTick(() => this.processWaitQueue()); @@ -831,9 +833,7 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new ConnectionCheckOutFailedEvent(this, reason, error) ); - if (waitQueueMember.timer) { - clearTimeout(waitQueueMember.timer); - } + waitQueueMember.timeoutController.clear(); this[kWaitQueue].shift(); waitQueueMember.callback(error); continue; @@ -854,9 +854,7 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CHECKED_OUT, new ConnectionCheckedOutEvent(this, connection) ); - if (waitQueueMember.timer) { - clearTimeout(waitQueueMember.timer); - } + waitQueueMember.timeoutController.clear(); this[kWaitQueue].shift(); waitQueueMember.callback(undefined, connection); @@ -893,9 +891,7 @@ export class ConnectionPool extends TypedEventEmitter { ); } - if (waitQueueMember.timer) { - clearTimeout(waitQueueMember.timer); - } + waitQueueMember.timeoutController.clear(); waitQueueMember.callback(err, connection); } process.nextTick(() => this.processWaitQueue()); diff --git a/src/index.ts b/src/index.ts index 436cc08d33c..280a6e829ab 100644 --- a/src/index.ts +++ b/src/index.ts @@ -524,6 +524,7 @@ export type { HostAddress, List, MongoDBCollectionNamespace, - MongoDBNamespace + MongoDBNamespace, + TimeoutController } from './utils'; export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern'; diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index d45acf6249b..37f4f4500d1 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -1,4 +1,3 @@ -import { clearTimeout, setTimeout } from 'timers'; import { promisify } from 'util'; import type { BSONSerializeOptions, Document } from '../bson'; @@ -43,7 +42,8 @@ import { List, makeStateMachine, ns, - shuffle + shuffle, + TimeoutController } from '../utils'; import { _advanceClusterTime, @@ -94,8 +94,8 @@ export interface ServerSelectionRequest { serverSelector: ServerSelector; transaction?: Transaction; callback: ServerSelectionCallback; - timer?: NodeJS.Timeout; [kCancelled]?: boolean; + timeoutController: TimeoutController; } /** @internal */ @@ -556,22 +556,20 @@ export class Topology extends TypedEventEmitter { const waitQueueMember: ServerSelectionRequest = { serverSelector, transaction, - callback + callback, + timeoutController: new TimeoutController(options.serverSelectionTimeoutMS) }; - const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS; - if (serverSelectionTimeoutMS) { - waitQueueMember.timer = setTimeout(() => { - waitQueueMember[kCancelled] = true; - waitQueueMember.timer = undefined; - const timeoutError = new MongoServerSelectionError( - `Server selection timed out after ${serverSelectionTimeoutMS} ms`, - this.description - ); + waitQueueMember.timeoutController.signal.addEventListener('abort', () => { + waitQueueMember[kCancelled] = true; + waitQueueMember.timeoutController.clear(); + const timeoutError = new MongoServerSelectionError( + `Server selection timed out after ${options.serverSelectionTimeoutMS} ms`, + this.description + ); - waitQueueMember.callback(timeoutError); - }, serverSelectionTimeoutMS); - } + waitQueueMember.callback(timeoutError); + }); this[kWaitQueue].push(waitQueueMember); processWaitQueue(this); @@ -842,9 +840,7 @@ function drainWaitQueue(queue: List, err?: MongoDriverEr continue; } - if (waitQueueMember.timer) { - clearTimeout(waitQueueMember.timer); - } + waitQueueMember.timeoutController.clear(); if (!waitQueueMember[kCancelled]) { waitQueueMember.callback(err); @@ -878,9 +874,7 @@ function processWaitQueue(topology: Topology) { ? serverSelector(topology.description, serverDescriptions) : serverDescriptions; } catch (e) { - if (waitQueueMember.timer) { - clearTimeout(waitQueueMember.timer); - } + waitQueueMember.timeoutController.clear(); waitQueueMember.callback(e); continue; @@ -917,9 +911,7 @@ function processWaitQueue(topology: Topology) { transaction.pinServer(selectedServer); } - if (waitQueueMember.timer) { - clearTimeout(waitQueueMember.timer); - } + waitQueueMember.timeoutController.clear(); waitQueueMember.callback(undefined, selectedServer); } diff --git a/src/utils.ts b/src/utils.ts index be215ff8809..8ae7f26381f 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,6 +1,7 @@ import * as crypto from 'crypto'; import type { SrvRecord } from 'dns'; import * as http from 'http'; +import { clearTimeout, setTimeout } from 'timers'; import * as url from 'url'; import { URL } from 'url'; @@ -1254,3 +1255,30 @@ export async function request( req.end(); }); } + +/** + * A custom AbortController that aborts after a specified timeout. + * + * If `timeout` is undefined or \<=0, the abort controller never aborts. + * + * This class provides two benefits over the built-in AbortSignal.timeout() method. + * - This class provides a mechanism for cancelling the timeout + * - This class supports infinite timeouts by interpreting a timeout of 0 as infinite. This is + * consistent with existing timeout options in the Node driver (serverSelectionTimeoutMS, for example). + * @internal + */ +export class TimeoutController extends AbortController { + constructor( + timeout = 0, + private timeoutId = timeout > 0 ? setTimeout(() => this.abort(), timeout) : null + ) { + super(); + } + + clear() { + if (this.timeoutId != null) { + clearTimeout(this.timeoutId); + } + this.timeoutId = null; + } +} diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index e3d82c50bde..9614cd9c923 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1,4 +1,5 @@ import { expect } from 'chai'; +import * as sinon from 'sinon'; import { BufferPool, @@ -16,8 +17,10 @@ import { MongoDBNamespace, MongoRuntimeError, ObjectId, - shuffle + shuffle, + TimeoutController } from '../mongodb'; +import { createTimerSandbox } from './timer_sandbox'; describe('driver utils', function () { describe('.hostMatchesWildcards', function () { @@ -1101,4 +1104,65 @@ describe('driver utils', function () { }); }); }); + + describe('class TimeoutController', () => { + let timerSandbox, clock, spy; + + beforeEach(function () { + timerSandbox = createTimerSandbox(); + clock = sinon.useFakeTimers(); + spy = sinon.spy(); + }); + + afterEach(function () { + clock.restore(); + timerSandbox.restore(); + }); + + describe('constructor', () => { + it('when no timeout is provided, it creates an infinite timeout', () => { + const controller = new TimeoutController(); + // @ts-expect-error Accessing a private field on TimeoutController + expect(controller.timeoutId).to.be.null; + }); + + it('when timeout is 0, it creates an infinite timeout', () => { + const controller = new TimeoutController(0); + // @ts-expect-error Accessing a private field on TimeoutController + expect(controller.timeoutId).to.be.null; + }); + + it('when timeout <0, it creates an infinite timeout', () => { + const controller = new TimeoutController(-5); + // @ts-expect-error Accessing a private field on TimeoutController + expect(controller.timeoutId).to.be.null; + }); + + context('when timeout > 0', () => { + let timeoutController: TimeoutController; + + beforeEach(function () { + timeoutController = new TimeoutController(3000); + timeoutController.signal.addEventListener('abort', spy); + }); + + afterEach(function () { + timeoutController.clear(); + }); + + it('it creates a timeout', () => { + // @ts-expect-error Accessing a private field on TimeoutController + expect(timeoutController.timeoutId).not.to.be.null; + }); + + it('times out after `timeout` milliseconds', () => { + expect(spy, 'spy was called after creation').not.to.have.been.called; + clock.tick(2999); + expect(spy, 'spy was called before 3000ms has expired').not.to.have.been.called; + clock.tick(1); + expect(spy, 'spy was not called after 3000ms').to.have.been.called; + }); + }); + }); + }); });