From 24bc347ed365b46fbed0c969f700825c09dd4ae3 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 6 Mar 2024 17:46:42 -0500 Subject: [PATCH 1/5] fix(NODE-5993): connection's aborted promise leak --- src/cmap/connection.ts | 74 ++++++++-------- src/cmap/wire_protocol/on_data.ts | 16 +--- .../connection.test.ts | 84 +++++++++++++++++++ .../node-specific/resource_clean_up.test.ts | 33 ++++++++ 4 files changed, 152 insertions(+), 55 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index f0e373a825a..603266c1707 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1,6 +1,5 @@ import { type Readable, Transform, type TransformCallback } from 'stream'; import { clearTimeout, setTimeout } from 'timers'; -import { promisify } from 'util'; import type { BSONSerializeOptions, Document, ObjectId } from '../bson'; import type { AutoEncrypter } from '../client-side-encryption/auto_encrypter'; @@ -182,18 +181,18 @@ export class Connection extends TypedEventEmitter { * Once connection is established, command logging can log events (if enabled) */ public established: boolean; + /** Indicates that the connection (including underlying TCP socket) has been closed. */ + public closed = false; private lastUseTime: number; private clusterTime: Document | null = null; + private error: Error | null = null; + private dataEvents: AsyncGenerator | null = null; private readonly socketTimeoutMS: number; private readonly monitorCommands: boolean; private readonly socket: Stream; - private readonly controller: AbortController; - private readonly signal: AbortSignal; private readonly messageStream: Readable; - private readonly socketWrite: (buffer: Uint8Array) => Promise; - private readonly aborted: Promise; /** @event */ static readonly COMMAND_STARTED = COMMAND_STARTED; @@ -213,6 +212,7 @@ export class Connection extends TypedEventEmitter { constructor(stream: Stream, options: ConnectionOptions) { super(); + this.socket = stream; this.id = options.id; this.address = streamIdentifier(stream, options); this.socketTimeoutMS = options.socketTimeoutMS ?? 0; @@ -225,39 +225,12 @@ export class Connection extends TypedEventEmitter { this.generation = options.generation; this.lastUseTime = now(); - this.socket = stream; - - // TODO: Remove signal from connection layer - this.controller = new AbortController(); - const { signal } = this.controller; - this.signal = signal; - const { promise: aborted, reject } = promiseWithResolvers(); - aborted.then(undefined, () => null); // Prevent unhandled rejection - this.signal.addEventListener( - 'abort', - function onAbort() { - reject(signal.reason); - }, - { once: true } - ); - this.aborted = aborted; - this.messageStream = this.socket .on('error', this.onError.bind(this)) .pipe(new SizedMessageTransform({ connection: this })) .on('error', this.onError.bind(this)); this.socket.on('close', this.onClose.bind(this)); this.socket.on('timeout', this.onTimeout.bind(this)); - - const socketWrite = promisify(this.socket.write.bind(this.socket)); - this.socketWrite = async buffer => { - return Promise.race([socketWrite(buffer), this.aborted]); - }; - } - - /** Indicates that the connection (including underlying TCP socket) has been closed. */ - public get closed(): boolean { - return this.signal.aborted; } public get hello() { @@ -357,7 +330,11 @@ export class Connection extends TypedEventEmitter { } this.socket.destroy(); - this.controller.abort(error); + if (error) { + this.error = error; + this.dataEvents?.throw(error).then(undefined, () => null); // squash unhandled rejection + } + this.closed = true; this.emit(Connection.CLOSE); } @@ -598,7 +575,7 @@ export class Connection extends TypedEventEmitter { } private throwIfAborted() { - this.signal.throwIfAborted(); + if (this.error) throw this.error; } /** @@ -621,7 +598,18 @@ export class Connection extends TypedEventEmitter { const buffer = Buffer.concat(await finalCommand.toBin()); - return this.socketWrite(buffer); + if (this.socket.write(buffer)) return; + + const { promise: drained, resolve, reject } = promiseWithResolvers(); + const onDrain = () => resolve(); + const onError = (error: Error) => reject(error); + + this.socket.once('drain', onDrain).once('error', onError); + try { + return await drained; + } finally { + this.socket.off('drain', onDrain).off('error', onError); + } } /** @@ -634,13 +622,19 @@ export class Connection extends TypedEventEmitter { * Note that `for-await` loops call `return` automatically when the loop is exited. */ private async *readMany(): AsyncGenerator { - for await (const message of onData(this.messageStream, { signal: this.signal })) { - const response = await decompressResponse(message); - yield response; + try { + this.dataEvents = this.dataEvents = onData(this.messageStream); + for await (const message of this.dataEvents) { + const response = await decompressResponse(message); + yield response; - if (!response.moreToCome) { - return; + if (!response.moreToCome) { + return; + } } + } finally { + this.dataEvents = null; + this.throwIfAborted(); } } } diff --git a/src/cmap/wire_protocol/on_data.ts b/src/cmap/wire_protocol/on_data.ts index 04c82f709d3..63b59806fed 100644 --- a/src/cmap/wire_protocol/on_data.ts +++ b/src/cmap/wire_protocol/on_data.ts @@ -18,9 +18,7 @@ type PendingPromises = Omit< * Returns an AsyncIterator that iterates each 'data' event emitted from emitter. * It will reject upon an error event or if the provided signal is aborted. */ -export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) { - const signal = options.signal; - +export function onData(emitter: EventEmitter) { // Setup pending events and pending promise lists /** * When the caller has not yet called .next(), we store the @@ -89,19 +87,8 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) emitter.on('data', eventHandler); emitter.on('error', errorHandler); - if (signal.aborted) { - // If the signal is aborted, set up the first .next() call to be a rejection - queueMicrotask(abortListener); - } else { - signal.addEventListener('abort', abortListener, { once: true }); - } - return iterator; - function abortListener() { - errorHandler(signal.reason); - } - function eventHandler(value: Buffer) { const promise = unconsumedPromises.shift(); if (promise != null) promise.resolve({ value, done: false }); @@ -119,7 +106,6 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) // Adding event handlers emitter.off('data', eventHandler); emitter.off('error', errorHandler); - signal.removeEventListener('abort', abortListener); finished = true; const doneResult = { value: undefined, done: finished } as const; diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.ts b/test/integration/connection-monitoring-and-pooling/connection.test.ts index de1e455c665..421a9e02bbf 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection.test.ts @@ -1,7 +1,11 @@ import { expect } from 'chai'; +import { type EventEmitter, once } from 'events'; +import * as sinon from 'sinon'; +import { setTimeout } from 'timers'; import { addContainerMetadata, + Binary, connect, Connection, type ConnectionOptions, @@ -15,7 +19,9 @@ import { ServerHeartbeatStartedEvent, Topology } from '../../mongodb'; +import * as mock from '../../tools/mongodb-mock/index'; import { skipBrokenAuthTestBeforeEachHook } from '../../tools/runner/hooks/configuration'; +import { getSymbolFrom, sleep } from '../../tools/utils'; import { assert as test, setupDatabase } from '../shared'; const commonConnectOptions = { @@ -200,6 +206,84 @@ describe('Connection', function () { client.connect(); }); + context( + 'when a large message is written to the socket', + { requires: { topology: 'single', auth: 'disabled' } }, + () => { + let client, mockServer: import('../../tools/mongodb-mock/src/server').MockServer; + + beforeEach(async function () { + mockServer = await mock.createServer(); + + mockServer + .addMessageHandler('insert', req => { + setTimeout(() => { + req.reply({ ok: 1 }); + }, 800); + }) + .addMessageHandler('hello', req => { + req.reply(Object.assign({}, mock.HELLO)); + }) + .addMessageHandler(LEGACY_HELLO_COMMAND, req => { + req.reply(Object.assign({}, mock.HELLO)); + }); + + client = new MongoClient(`mongodb://${mockServer.uri()}`, { + minPoolSize: 1, + maxPoolSize: 1 + }); + }); + + afterEach(async function () { + await client.close(); + mockServer.destroy(); + sinon.restore(); + }); + + it('waits for an async drain event because the write was buffered', async () => { + const connectionReady = once(client, 'connectionReady'); + await client.connect(); + await connectionReady; + + // Get the only connection + const pool = [...client.topology.s.servers.values()][0].pool; + + const connections = pool[getSymbolFrom(pool, 'connections')]; + expect(connections).to.have.lengthOf(1); + + const connection = connections.first(); + const socket: EventEmitter = connection.socket; + + // Spy on the socket event listeners + const addedListeners: string[] = []; + const removedListeners: string[] = []; + socket + .on('removeListener', name => removedListeners.push(name)) + .on('newListener', name => addedListeners.push(name)); + + // Make server sockets block + for (const s of mockServer.sockets) s.pause(); + + const insert = client + .db('test') + .collection('test') + // Anything above 16Kb should work I think (10mb to be extra sure) + .insertOne({ a: new Binary(Buffer.alloc(10 * (2 ** 10) ** 2), 127) }); + + // Sleep a bit and unblock server sockets + await sleep(10); + for (const s of mockServer.sockets) s.resume(); + + // Let the operation finish + await insert; + + // Ensure that we used the drain event for this write + expect(addedListeners).to.deep.equal(['drain', 'error']); + expect(removedListeners).to.deep.equal(['drain', 'error']); + }); + } + ); + context('when connecting with a username and password', () => { let utilClient: MongoClient; let client: MongoClient; diff --git a/test/integration/node-specific/resource_clean_up.test.ts b/test/integration/node-specific/resource_clean_up.test.ts index 0d330914c0f..e370986a264 100644 --- a/test/integration/node-specific/resource_clean_up.test.ts +++ b/test/integration/node-specific/resource_clean_up.test.ts @@ -1,5 +1,8 @@ +import * as v8 from 'node:v8'; + import { expect } from 'chai'; +import { sleep } from '../../tools/utils'; import { runScript } from './resource_tracking_script_builder'; /** @@ -86,4 +89,34 @@ describe('Driver Resources', () => { }); }); }); + + context('when 100s of operations are executed and complete', () => { + beforeEach(function () { + if (this.currentTest && typeof v8.queryObjects !== 'function') { + this.currentTest.skipReason = 'Test requires v8.queryObjects API to count Promises'; + this.currentTest?.skip(); + } + }); + + let client; + beforeEach(async function () { + client = this.configuration.newClient(); + }); + + afterEach(async function () { + await client.close(); + }); + + it('does not leave behind additional promises', async () => { + const test = client.db('test').collection('test'); + const promiseCountBefore = v8.queryObjects(Promise, { format: 'count' }); + for (let i = 0; i < 100; i++) { + await test.findOne(); + } + await sleep(10); + const promiseCountAfter = v8.queryObjects(Promise, { format: 'count' }); + + expect(promiseCountAfter).to.be.within(promiseCountBefore - 5, promiseCountBefore + 5); + }); + }); }); From 3ab050818b42a70485011cb0c5e639e51fbff452 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 6 Mar 2024 18:16:51 -0500 Subject: [PATCH 2/5] fix: cruft --- src/cmap/connection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 603266c1707..17a8701daae 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -623,7 +623,7 @@ export class Connection extends TypedEventEmitter { */ private async *readMany(): AsyncGenerator { try { - this.dataEvents = this.dataEvents = onData(this.messageStream); + this.dataEvents = onData(this.messageStream); for await (const message of this.dataEvents) { const response = await decompressResponse(message); yield response; From 8fb2d3ec42a7879c549a39f56195310c1638cf74 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 7 Mar 2024 11:00:18 -0500 Subject: [PATCH 3/5] make once util --- src/cmap/connection.ts | 16 ++++------------ src/utils.ts | 25 +++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 17a8701daae..bf83d50de30 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -36,7 +36,7 @@ import { maxWireVersion, type MongoDBNamespace, now, - promiseWithResolvers, + once, uuidV4 } from '../utils'; import type { WriteConcern } from '../write_concern'; @@ -333,6 +333,8 @@ export class Connection extends TypedEventEmitter { if (error) { this.error = error; this.dataEvents?.throw(error).then(undefined, () => null); // squash unhandled rejection + } else { + this.dataEvents?.return().then(undefined, () => null); // squash unhandled rejection } this.closed = true; this.emit(Connection.CLOSE); @@ -599,17 +601,7 @@ export class Connection extends TypedEventEmitter { const buffer = Buffer.concat(await finalCommand.toBin()); if (this.socket.write(buffer)) return; - - const { promise: drained, resolve, reject } = promiseWithResolvers(); - const onDrain = () => resolve(); - const onError = (error: Error) => reject(error); - - this.socket.once('drain', onDrain).once('error', onError); - try { - return await drained; - } finally { - this.socket.off('drain', onDrain).off('error', onError); - } + return once(this.socket, 'drain'); } /** diff --git a/src/utils.ts b/src/utils.ts index 8020d508f83..4a6f7a4e8c5 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,5 +1,6 @@ import * as crypto from 'crypto'; import type { SrvRecord } from 'dns'; +import { type EventEmitter } from 'events'; import * as http from 'http'; import { clearTimeout, setTimeout } from 'timers'; import * as url from 'url'; @@ -1295,3 +1296,27 @@ export function promiseWithResolvers() { } export const randomBytes = promisify(crypto.randomBytes); + +/** + * Replicates the events.once helper. + * + * Removes unused signal logic and It **only** supports 0 or 1 argument events. + * + * @param ee - An event emitter that may emit `ev` + * @param name - An event name to wait for + */ +export async function once(ee: EventEmitter, name: string): Promise { + const { promise, resolve, reject } = promiseWithResolvers(); + const onEvent = (data: T) => resolve(data); + const onError = (error: Error) => reject(error); + + ee.once(name, onEvent).once('error', onError); + try { + const res = await promise; + ee.off('error', onError); + return res; + } catch (error) { + ee.off(name, onEvent); + throw error; + } +} From e0a2b0ac0b8ef1c74fec0488477b7201023e493b Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 7 Mar 2024 11:03:06 -0500 Subject: [PATCH 4/5] fix error TS --- src/cmap/connection.ts | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index bf83d50de30..e4036bb187d 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -281,7 +281,7 @@ export class Connection extends TypedEventEmitter { this.lastUseTime = now(); } - public onError(error?: Error) { + public onError(error: Error) { this.cleanup(error); } @@ -324,18 +324,14 @@ export class Connection extends TypedEventEmitter { * * This method does nothing if the connection is already closed. */ - private cleanup(error?: Error): void { + private cleanup(error: Error): void { if (this.closed) { return; } this.socket.destroy(); - if (error) { - this.error = error; - this.dataEvents?.throw(error).then(undefined, () => null); // squash unhandled rejection - } else { - this.dataEvents?.return().then(undefined, () => null); // squash unhandled rejection - } + this.error = error; + this.dataEvents?.throw(error).then(undefined, () => null); // squash unhandled rejection this.closed = true; this.emit(Connection.CLOSE); } From ee3a0d65a1f5a9bb469896079b9144d863765a77 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 7 Mar 2024 11:03:49 -0500 Subject: [PATCH 5/5] docs --- src/cmap/wire_protocol/on_data.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmap/wire_protocol/on_data.ts b/src/cmap/wire_protocol/on_data.ts index 63b59806fed..b99c950d96f 100644 --- a/src/cmap/wire_protocol/on_data.ts +++ b/src/cmap/wire_protocol/on_data.ts @@ -16,7 +16,7 @@ type PendingPromises = Omit< * https://nodejs.org/api/events.html#eventsonemitter-eventname-options * * Returns an AsyncIterator that iterates each 'data' event emitted from emitter. - * It will reject upon an error event or if the provided signal is aborted. + * It will reject upon an error event. */ export function onData(emitter: EventEmitter) { // Setup pending events and pending promise lists