From 608cb83a35ea7a8a1aa24b3220ca41671bdbf611 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 2 Feb 2024 16:53:16 -0500 Subject: [PATCH 01/24] add Server.commandAsync --- src/cmap/connection_pool.ts | 25 +++++++++ src/sdam/server.ts | 101 +++++++++++++++++++++++++++++++----- 2 files changed, 113 insertions(+), 13 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index b5e0818061c..b3c94dcab33 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -645,6 +645,31 @@ export class ConnectionPool extends TypedEventEmitter { ); } + async reauthenticateAsync(connection: Connection): Promise { + const authContext = connection.authContext; + if (!authContext) { + throw new MongoRuntimeError('No auth context found on connection.'); + } + const credentials = authContext.credentials; + if (!credentials) { + throw new MongoMissingCredentialsError( + 'Connection is missing credentials when asked to reauthenticate' + ); + } + + const resolvedCredentials = credentials.resolveAuthMechanism(connection.hello); + const provider = AUTH_PROVIDERS.get(resolvedCredentials.mechanism); + + if (!provider) { + throw new MongoMissingCredentialsError( + `Reauthenticate failed due to no auth provider for ${credentials.mechanism}` + ); + } + + await provider.reauth(authContext); + return connection; + } + /** Clear the min pool size timer */ private clearMinPoolSizeTimer(): void { const minPoolSizeTimer = this[kMinPoolSizeTimer]; diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 1eefbb01df4..2106826aa3d 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -1,5 +1,3 @@ -import { promisify } from 'util'; - import type { Document } from '../bson'; import { type AutoEncrypter } from '../client-side-encryption/auto_encrypter'; import { type CommandOptions, Connection, type DestroyOptions } from '../cmap/connection'; @@ -26,6 +24,7 @@ import { isNetworkErrorBeforeHandshake, isNodeShuttingDownError, isSDAMUnrecoverableError, + MONGODB_ERROR_CODES, MongoError, MongoErrorLabel, MongoInvalidArgumentError, @@ -48,6 +47,7 @@ import { makeStateMachine, maxWireVersion, type MongoDBNamespace, + promiseWithResolvers, supportsRetryableWrites } from '../utils'; import { @@ -115,7 +115,6 @@ export class Server extends TypedEventEmitter { pool: ConnectionPool; serverApi?: ServerApi; hello?: Document; - commandAsync: (ns: MongoDBNamespace, cmd: Document, options: CommandOptions) => Promise; monitor: Monitor | null; /** @event */ @@ -139,16 +138,6 @@ export class Server extends TypedEventEmitter { constructor(topology: Topology, description: ServerDescription, options: ServerOptions) { super(); - this.commandAsync = promisify( - ( - ns: MongoDBNamespace, - cmd: Document, - options: CommandOptions, - // callback type defines Document result because result is never nullish when it succeeds, otherwise promise rejects - callback: (error: Error | undefined, result: Document) => void - ) => this.command(ns, cmd, options, callback as any) - ); - this.serverApi = options.serverApi; const poolOptions = { hostAddress: description.hostAddress, ...options }; @@ -380,6 +369,92 @@ export class Server extends TypedEventEmitter { ); } + async commandAsync( + ns: MongoDBNamespace, + cmd: Document, + options: CommandOptions + ): Promise { + if (ns.db == null || typeof ns === 'string') { + throw new MongoInvalidArgumentError('Namespace must not be a string'); + } + + if (this.s.state === STATE_CLOSING || this.s.state === STATE_CLOSED) { + throw new MongoServerClosedError(); + } + + // Clone the options + const finalOptions = Object.assign({}, options, { wireProtocolCommand: false }); + + // There are cases where we need to flag the read preference not to get sent in + // the command, such as pre-5.0 servers attempting to perform an aggregate write + // with a non-primary read preference. In this case the effective read preference + // (primary) is not the same as the provided and must be removed completely. + if (finalOptions.omitReadPreference) { + delete finalOptions.readPreference; + } + + const session = finalOptions.session; + let conn = session?.pinnedConnection; + + // NOTE: This is a hack! We can't retrieve the connections used for executing an operation + // (and prevent them from being checked back in) at the point of operation execution. + // This should be considered as part of the work for NODE-2882 + // NOTE: + // When incrementing operation count, it's important that we increment it before we + // attempt to check out a connection from the pool. This ensures that operations that + // are waiting for a connection are included in the operation count. Load balanced + // mode will only ever have a single server, so the operation count doesn't matter. + // Incrementing the operation count above the logic to handle load balanced mode would + // require special logic to decrement it again, or would double increment (the load + // balanced code makes a recursive call). Instead, we increment the count after this + // check. + + if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) { + const { promise: checkedOutPromise, resolve, reject } = promiseWithResolvers(); + + this.pool.checkOut((err, conn) => { + if (err || conn == null) { + reject(err); + return; + } + resolve(conn); + }); + const checkedOut = await checkedOutPromise; + + session.pin(checkedOut); + return this.commandAsync(ns, cmd, finalOptions); + } + this.incrementOperationCount(); + + // FIXME: Fix this + if (!conn) { + const { promise: connPromise, resolve, reject } = promiseWithResolvers(); + this.pool.checkOut((err, conn) => { + // don't callback with `err` here, we might want to act upon it inside `fn` + if (err || conn == null) { + reject(err); + return; + } + + resolve(conn); + }); + + conn = await connPromise; + } + + try { + return await conn.command(ns, cmd, finalOptions); + } catch (e) { + if (e instanceof MongoError && e.code === MONGODB_ERROR_CODES.Reauthenticate) { + conn = await this.pool.reauthenticateAsync(conn); + } else { + throw e; + } + } + + return conn.command(ns, cmd, finalOptions); + } + /** * Handle SDAM error * @internal From 9a192444327b9cd0ce666ca5c135501677e1fbee Mon Sep 17 00:00:00 2001 From: Warren James Date: Thu, 8 Feb 2024 10:41:54 -0500 Subject: [PATCH 02/24] refactor(NODE-5912): refactor --- src/sdam/topology.ts | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 4bf816f2380..6f92be2f864 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -473,17 +473,13 @@ export class Topology extends TypedEventEmitter { const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true; if (!skipPingOnConnect && server && this.s.credentials) { - server.command(ns('admin.$cmd'), { ping: 1 }, {}, err => { - if (err) { - return exitWithError(err); - } - + server.commandAsync(ns('admin.$cmd'), { ping: 1 }, {}).then(() => { stateTransition(this, STATE_CONNECTED); this.emit(Topology.OPEN, this); this.emit(Topology.CONNECT, this); callback?.(undefined, this); - }); + }, exitWithError); return; } From 290b7e9f31d3e0559448a224df9e6dacc17b315e Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 13 Feb 2024 12:57:19 -0500 Subject: [PATCH 03/24] WIP --- src/cmap/connection_pool.ts | 65 +++++- src/operations/command.ts | 2 +- src/operations/find.ts | 2 +- src/operations/get_more.ts | 2 +- src/operations/kill_cursors.ts | 2 +- src/operations/run_command.ts | 4 +- src/operations/search_indexes/create.ts | 2 +- src/operations/search_indexes/drop.ts | 2 +- src/operations/search_indexes/update.ts | 2 +- src/sdam/server.ts | 215 ++++++++---------- src/sdam/topology.ts | 2 +- src/sessions.ts | 152 +++++++++++++ test/integration/crud/find.test.js | 2 + ...rver_discovery_and_monitoring.spec.test.ts | 2 +- test/unit/error.test.ts | 64 +++--- test/unit/operations/find.test.ts | 4 +- test/unit/operations/get_more.test.ts | 10 +- test/unit/sdam/server.test.ts | 8 - test/unit/sdam/topology.test.js | 23 +- 19 files changed, 377 insertions(+), 188 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index b3c94dcab33..028fe0ae292 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -27,7 +27,14 @@ import { } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Server } from '../sdam/server'; -import { type Callback, eachAsync, List, makeCounter, TimeoutController } from '../utils'; +import { + type Callback, + eachAsync, + List, + makeCounter, + promiseWithResolvers, + TimeoutController +} from '../utils'; import { connect } from './connect'; import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection'; import { @@ -645,6 +652,56 @@ export class ConnectionPool extends TypedEventEmitter { ); } + async withConnectionAsync( + conn: Connection | undefined, + fn: AsyncWithConnectionCallback + ): Promise { + let result; + if (conn) { + // use the provided connection, and do _not_ check it in after execution + try { + result = await fn(undefined, conn); + return result; + } catch (fnErr) { + if (fnErr instanceof MongoError && fnErr.code === MONGODB_ERROR_CODES.Reauthenticate) { + conn = this.reauthenticateAsync(conn); + } else { + throw fnErr; + } + } + } else { + const { promise, resolve, reject } = promiseWithResolvers(); + this.checkOut((err, conn) => { + fn(err as MongoError, conn).then( + () => { + if (conn) this.checkIn(conn); + }, + fnErr => { + if (conn) { + this.withReauthenticationAsync(fnErr, conn, fn).then(resolve, reject); + } else { + reject(fnErr); + } + } + ); + }); + return promise; + } + } + + async withReauthenticationAsync( + fnErr: AnyError, + conn: Connection, + fn: AsyncWithConnectionCallback + ): Promise { + if (fnErr instanceof MongoError && fnErr.code === MONGODB_ERROR_CODES.Reauthenticate) { + conn = await this.reauthenticateAsync(conn); + return fn(undefined, conn); + } else { + throw fnErr; + } + } + async reauthenticateAsync(connection: Connection): Promise { const authContext = connection.authContext; if (!authContext) { @@ -667,6 +724,7 @@ export class ConnectionPool extends TypedEventEmitter { } await provider.reauth(authContext); + return connection; } @@ -945,3 +1003,8 @@ export type WithConnectionCallback = ( connection: Connection | undefined, callback: Callback ) => void; + +type AsyncWithConnectionCallback = ( + error: MongoError | undefined, + connection: Connection | undefined +) => Promise; diff --git a/src/operations/command.ts b/src/operations/command.ts index 4cff77eb174..3767fa866f6 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -152,6 +152,6 @@ export abstract class CommandOperation extends AbstractOperation { cmd = decorateWithExplain(cmd, this.explain); } - return server.commandAsync(this.ns, cmd, options); + return server.command(this.ns, cmd, options); } } diff --git a/src/operations/find.ts b/src/operations/find.ts index 85ebf1edcd1..abdda7ede1b 100644 --- a/src/operations/find.ts +++ b/src/operations/find.ts @@ -111,7 +111,7 @@ export class FindOperation extends CommandOperation { findCommand = decorateWithExplain(findCommand, this.explain); } - return server.commandAsync(this.ns, findCommand, { + return server.command(this.ns, findCommand, { ...this.options, ...this.bsonOptions, documentsReturnedIn: 'firstBatch', diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts index b7059a46c4b..5cf01904d6c 100644 --- a/src/operations/get_more.ts +++ b/src/operations/get_more.ts @@ -96,7 +96,7 @@ export class GetMoreOperation extends AbstractOperation { ...this.options }; - return server.commandAsync(this.ns, getMoreCmd, commandOptions); + return server.command(this.ns, getMoreCmd, commandOptions); } } diff --git a/src/operations/kill_cursors.ts b/src/operations/kill_cursors.ts index ffbf0a3ad44..73cf93cb469 100644 --- a/src/operations/kill_cursors.ts +++ b/src/operations/kill_cursors.ts @@ -46,7 +46,7 @@ export class KillCursorsOperation extends AbstractOperation { cursors: [this.cursorId] }; try { - await server.commandAsync(this.ns, killCursorsCommand, { session }); + await server.command(this.ns, killCursorsCommand, { session }); } catch { // The driver should never emit errors from killCursors, this is spec-ed behavior } diff --git a/src/operations/run_command.ts b/src/operations/run_command.ts index d89d2e229c2..c288d266be3 100644 --- a/src/operations/run_command.ts +++ b/src/operations/run_command.ts @@ -28,7 +28,7 @@ export class RunCommandOperation extends AbstractOperation { override async execute(server: Server, session: ClientSession | undefined): Promise { this.server = server; - return server.commandAsync(this.ns, this.command, { + return server.command(this.ns, this.command, { ...this.options, readPreference: this.readPreference, session @@ -54,7 +54,7 @@ export class RunAdminCommandOperation extends AbstractOperation override async execute(server: Server, session: ClientSession | undefined): Promise { this.server = server; - return server.commandAsync(this.ns, this.command, { + return server.command(this.ns, this.command, { ...this.options, readPreference: this.readPreference, session diff --git a/src/operations/search_indexes/create.ts b/src/operations/search_indexes/create.ts index 96b38a159be..054ba026297 100644 --- a/src/operations/search_indexes/create.ts +++ b/src/operations/search_indexes/create.ts @@ -36,7 +36,7 @@ export class CreateSearchIndexesOperation extends AbstractOperation { indexes: this.descriptions }; - const res = await server.commandAsync(namespace, command, { session }); + const res = await server.command(namespace, command, { session }); const indexesCreated: Array<{ name: string }> = res?.indexesCreated ?? []; return indexesCreated.map(({ name }) => name); diff --git a/src/operations/search_indexes/drop.ts b/src/operations/search_indexes/drop.ts index f17e3365e62..05a56f1c47c 100644 --- a/src/operations/search_indexes/drop.ts +++ b/src/operations/search_indexes/drop.ts @@ -28,7 +28,7 @@ export class DropSearchIndexOperation extends AbstractOperation { } try { - await server.commandAsync(namespace, command, { session }); + await server.command(namespace, command, { session }); } catch (error) { const isNamespaceNotFoundError = error instanceof MongoServerError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound; diff --git a/src/operations/search_indexes/update.ts b/src/operations/search_indexes/update.ts index fde2230a7c9..aad7f93536c 100644 --- a/src/operations/search_indexes/update.ts +++ b/src/operations/search_indexes/update.ts @@ -27,7 +27,7 @@ export class UpdateSearchIndexOperation extends AbstractOperation { definition: this.definition }; - await server.commandAsync(namespace, command, { session }); + await server.command(namespace, command, { session }); return; } } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 2106826aa3d..8c6e50ab6d5 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -282,98 +282,7 @@ export class Server extends TypedEventEmitter { * Execute a command * @internal */ - command( - ns: MongoDBNamespace, - cmd: Document, - options: CommandOptions, - callback: Callback - ): void { - if (callback == null) { - throw new MongoInvalidArgumentError('Callback must be provided'); - } - - if (ns.db == null || typeof ns === 'string') { - throw new MongoInvalidArgumentError('Namespace must not be a string'); - } - - if (this.s.state === STATE_CLOSING || this.s.state === STATE_CLOSED) { - callback(new MongoServerClosedError()); - return; - } - - // Clone the options - const finalOptions = Object.assign({}, options, { wireProtocolCommand: false }); - - // There are cases where we need to flag the read preference not to get sent in - // the command, such as pre-5.0 servers attempting to perform an aggregate write - // with a non-primary read preference. In this case the effective read preference - // (primary) is not the same as the provided and must be removed completely. - if (finalOptions.omitReadPreference) { - delete finalOptions.readPreference; - } - - const session = finalOptions.session; - const conn = session?.pinnedConnection; - - // NOTE: This is a hack! We can't retrieve the connections used for executing an operation - // (and prevent them from being checked back in) at the point of operation execution. - // This should be considered as part of the work for NODE-2882 - // NOTE: - // When incrementing operation count, it's important that we increment it before we - // attempt to check out a connection from the pool. This ensures that operations that - // are waiting for a connection are included in the operation count. Load balanced - // mode will only ever have a single server, so the operation count doesn't matter. - // Incrementing the operation count above the logic to handle load balanced mode would - // require special logic to decrement it again, or would double increment (the load - // balanced code makes a recursive call). Instead, we increment the count after this - // check. - if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) { - this.pool.checkOut((err, checkedOut) => { - if (err || checkedOut == null) { - if (callback) return callback(err); - return; - } - - session.pin(checkedOut); - this.command(ns, cmd, finalOptions, callback); - }); - return; - } - - this.incrementOperationCount(); - - this.pool.withConnection( - conn, - (err, conn, cb) => { - if (err || !conn) { - this.decrementOperationCount(); - if (!err) { - return cb(new MongoRuntimeError('Failed to create connection without error')); - } - if (!(err instanceof PoolClearedError)) { - this.handleError(err); - } - return cb(err); - } - - const handler = makeOperationHandler(this, conn, cmd, finalOptions, (error, response) => { - this.decrementOperationCount(); - cb(error, response); - }); - conn.command(ns, cmd, finalOptions).then( - r => handler(undefined, r), - e => handler(e) - ); - }, - callback - ); - } - - async commandAsync( - ns: MongoDBNamespace, - cmd: Document, - options: CommandOptions - ): Promise { + async command(ns: MongoDBNamespace, cmd: Document, options: CommandOptions): Promise { if (ns.db == null || typeof ns === 'string') { throw new MongoInvalidArgumentError('Namespace must not be a string'); } @@ -422,37 +331,108 @@ export class Server extends TypedEventEmitter { const checkedOut = await checkedOutPromise; session.pin(checkedOut); - return this.commandAsync(ns, cmd, finalOptions); + return this.command(ns, cmd, finalOptions); } + this.incrementOperationCount(); - // FIXME: Fix this - if (!conn) { - const { promise: connPromise, resolve, reject } = promiseWithResolvers(); - this.pool.checkOut((err, conn) => { - // don't callback with `err` here, we might want to act upon it inside `fn` - if (err || conn == null) { - reject(err); - return; + /* + const executeCommand: WithConnectionCallBack = (err, conn, cb) => { + if (err || !conn) { + this.decrementOperationCount(); + if (!err) { + return cb(new MongoRuntimeError('Failed to create connection without error')); } + if (!(err instanceof PoolClearedError)) { + this.handleError(err); + } + return cb(err); + } - resolve(conn); + const handler = makeOperationHandler(this, conn, cmd, finalOptions, (error, response) => { + this.decrementOperationCount(); + cb(error, response); }); + conn.command(ns, cmd, finalOptions).then( + r => handler(undefined, r), + e => handler(e) + ); + }; + */ + + const executeCommandAsync = async (err?: AnyError, conn?: Connection) => { + if (err || !conn) { + this.decrementOperationCount(); + if (!err) throw new MongoRuntimeError('Failed to create connection without error'); + if (!(err instanceof PoolClearedError)) this.handleError(err); + + throw err; + } + const handler = makeAsyncOperationHandler(this, conn, cmd, finalOptions); + + try { + const result = await conn.command(ns, cmd, finalOptions); + return await handler(undefined, result); + } catch (e) { + return await handler(e); + } + }; + const shouldReauth = (e: AnyError): boolean => { + return e instanceof MongoError && e.code === MONGODB_ERROR_CODES.Reauthenticate; + }; - conn = await connPromise; + // FIXME: This is where withConnection logic should go + if (conn) { + // send operation, possibly reauthenticating and return without checking back in + try { + return await executeCommandAsync(undefined, conn); + } catch (e) { + if (shouldReauth(e)) { + conn = await this.pool.reauthenticateAsync(conn); + return await executeCommandAsync(undefined, conn); + } + + throw e; + } } + // check out connection + const { promise: checkOutPromise, resolve, reject } = promiseWithResolvers(); + this.pool.checkOut((err, conn) => { + if (err) reject(err); + // There is no way to not error and get an undefined connection + else resolve(conn as Connection); + }); + + conn = await checkOutPromise; + + let rv: Document; + // call executeCommandAsync with that checked out connection try { - return await conn.command(ns, cmd, finalOptions); + rv = await executeCommandAsync(undefined, conn); } catch (e) { - if (e instanceof MongoError && e.code === MONGODB_ERROR_CODES.Reauthenticate) { - conn = await this.pool.reauthenticateAsync(conn); + // if it errors + if (conn) { + // if connection is defined + // reauthenticate + if (shouldReauth(e)) { + conn = await this.pool.reauthenticateAsync(conn); + rv = await executeCommandAsync(undefined, conn); + } else { + throw e; + } } else { + // throw the error throw e; } } - return conn.command(ns, cmd, finalOptions); + // if connection exists + // check connection back into pool + if (conn) { + this.pool.checkIn(conn); + } + return rv; } /** @@ -584,26 +564,25 @@ function isRetryableWritesEnabled(topology: Topology) { return topology.s.options.retryWrites !== false; } -function makeOperationHandler( +function makeAsyncOperationHandler( server: Server, connection: Connection, cmd: Document, - options: CommandOptions | GetMoreOptions | undefined, - callback: Callback -): Callback { + options: CommandOptions | GetMoreOptions | undefined +): (error?: AnyError, result?: any) => Promise { const session = options?.session; - return function handleOperationResult(error, result) { + return async function handleOperationResult(error, result) { // We should not swallow an error if it is present. if (error == null && result != null) { - return callback(undefined, result); + return result; } if (options != null && 'noResponse' in options && options.noResponse === true) { - return callback(undefined, null); + return null; } if (!error) { - return callback(new MongoUnexpectedServerResponseError('Empty response with no error')); + throw new MongoUnexpectedServerResponseError('Empty response with no error'); } if (error.name === 'AbortError' && error.cause instanceof MongoError) { @@ -612,11 +591,11 @@ function makeOperationHandler( if (!(error instanceof MongoError)) { // Node.js or some other error we have not special handling for - return callback(error); + throw error; } if (connectionIsStale(server.pool, connection)) { - return callback(error); + throw error; } if (error instanceof MongoNetworkError) { @@ -659,6 +638,6 @@ function makeOperationHandler( server.handleError(error, connection); - return callback(error); + throw error; }; } diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 6f92be2f864..400db63870f 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -473,7 +473,7 @@ export class Topology extends TypedEventEmitter { const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true; if (!skipPingOnConnect && server && this.s.credentials) { - server.commandAsync(ns('admin.$cmd'), { ping: 1 }, {}).then(() => { + server.command(ns('admin.$cmd'), { ping: 1 }, {}).then(() => { stateTransition(this, STATE_CONNECTED); this.emit(Topology.OPEN, this); this.emit(Topology.CONNECT, this); diff --git a/src/sessions.ts b/src/sessions.ts index e04c802b3cc..6a5db3f27ef 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -640,6 +640,158 @@ const endTransactionAsync = promisify( callback: (error: Error) => void ) => void ); +async function endTransaction( + session: ClientSession, + commandName: 'abortTransaction' | 'commitTransaction' +): Promise { + // handle any initial problematic cases + const txnState = session.transaction.state; + + if (txnState === TxnState.NO_TRANSACTION) { + throw new MongoTransactionError('No transaction started'); + } + + if (commandName === 'commitTransaction') { + if ( + txnState === TxnState.STARTING_TRANSACTION || + txnState === TxnState.TRANSACTION_COMMITTED_EMPTY + ) { + // the transaction was never started, we can safely exit here + session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY); + return; + } + + if (txnState === TxnState.TRANSACTION_ABORTED) { + throw new MongoTransactionError( + 'Cannot call commitTransaction after calling abortTransaction' + ); + } + } else { + if (txnState === TxnState.STARTING_TRANSACTION) { + // the transaction was never started, we can safely exit here + session.transaction.transition(TxnState.TRANSACTION_ABORTED); + return; + } + + if (txnState === TxnState.TRANSACTION_ABORTED) { + throw new MongoTransactionError('Cannot call abortTransaction twice'); + } + + if ( + txnState === TxnState.TRANSACTION_COMMITTED || + txnState === TxnState.TRANSACTION_COMMITTED_EMPTY + ) { + throw new MongoTransactionError( + 'Cannot call abortTransaction after calling commitTransaction' + ); + } + } + + // construct and send the command + const command: Document = { [commandName]: 1 }; + + // apply a writeConcern if specified + let writeConcern; + if (session.transaction.options.writeConcern) { + writeConcern = Object.assign({}, session.transaction.options.writeConcern); + } else if (session.clientOptions && session.clientOptions.writeConcern) { + writeConcern = { w: session.clientOptions.writeConcern.w }; + } + + if (txnState === TxnState.TRANSACTION_COMMITTED) { + writeConcern = Object.assign({ wtimeoutMS: 10000 }, writeConcern, { w: 'majority' }); + } + + if (writeConcern) { + WriteConcern.apply(command, writeConcern); + } + + if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) { + Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS }); + } + + function commandHandler(error?: Error) { + if (commandName !== 'commitTransaction') { + session.transaction.transition(TxnState.TRANSACTION_ABORTED); + if (session.loadBalanced) { + maybeClearPinnedConnection(session, { force: false }); + } + + // The spec indicates that we should ignore all errors on `abortTransaction` + return; + } + + session.transaction.transition(TxnState.TRANSACTION_COMMITTED); + if (error instanceof MongoError) { + if ( + isRetryableWriteError(error) || + error instanceof MongoWriteConcernError || + isMaxTimeMSExpiredError(error) + ) { + if (isUnknownTransactionCommitResult(error)) { + error.addErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult); + + // per txns spec, must unpin session in this case + session.unpin({ error }); + } + } else if (error.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) { + session.unpin({ error }); + } + } + + throw error; + } + + if (session.transaction.recoveryToken) { + command.recoveryToken = session.transaction.recoveryToken; + } + + const handleFirstCommandAttempt = (error?: Error) => { + if (command.abortTransaction) { + // always unpin on abort regardless of command outcome + session.unpin(); + } + + if (error instanceof MongoError && isRetryableWriteError(error)) { + // SPEC-1185: apply majority write concern when retrying commitTransaction + if (command.commitTransaction) { + // per txns spec, must unpin session in this case + session.unpin({ force: true }); + + command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, { + w: 'majority' + }); + } + + executeOperation( + session.client, + new RunAdminCommandOperation(command, { + session, + readPreference: ReadPreference.primary, + bypassPinningCheck: true + }) + ).then(() => commandHandler(), commandHandler); + return; + } + + commandHandler(error); + }; + + // send the command + try { + await executeOperation( + session.client, + new RunAdminCommandOperation(command, { + session, + readPreference: ReadPreference.primary, + bypassPinningCheck: true + }) + ); + handleFirstCommandAttempt(); + } catch (e) { + handleFirstCommandAttempt(e); + } +} function endTransaction( session: ClientSession, diff --git a/test/integration/crud/find.test.js b/test/integration/crud/find.test.js index c930a86a4ea..6f5144941b7 100644 --- a/test/integration/crud/find.test.js +++ b/test/integration/crud/find.test.js @@ -36,7 +36,9 @@ describe('Find', function () { const collection = db.collection('test_find_simple'); const docs = [{ a: 2 }, { b: 3 }]; + console.log('before insert'); await collection.insert(docs, configuration.writeConcernMax()); + console.log('after insert'); const insertedDocs = await collection.find().toArray(); expect(insertedDocs).to.have.length(2); diff --git a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts index 0d627d65cfa..d798b0dcfe4 100644 --- a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts +++ b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts @@ -342,7 +342,7 @@ async function executeSDAMTest(testData: SDAMTest) { const server = client.topology.s.servers.get(appError.address); // Run a dummy command to encounter the error - const res = promisify(server.command.bind(server))(ns('admin.$cmd'), { ping: 1 }, {}); + const res = server.command.bind(server)(ns('admin.$cmd'), { ping: 1 }, {}); const thrownError = await res.catch(error => error); // Restore the stub before asserting anything in case of errors diff --git a/test/unit/error.test.ts b/test/unit/error.test.ts index ebaa0e9281c..678398d27ee 100644 --- a/test/unit/error.test.ts +++ b/test/unit/error.test.ts @@ -368,22 +368,24 @@ describe('MongoErrors', () => { topology.selectServer('primary', {}, (err, server) => { expect(err).to.not.exist; - server.command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), {}, err => { - let _err; - try { - expect(err).to.be.an.instanceOf(MongoWriteConcernError); - expect(err.result).to.exist; - expect(err.result).to.have.property('ok', 1); - expect(err.result).to.not.have.property('errmsg'); - expect(err.result).to.not.have.property('code'); - expect(err.result).to.not.have.property('codeName'); - expect(err.result).to.have.property('writeConcernError'); - } catch (e) { - _err = e; - } finally { - cleanup(_err); - } - }); + server + .command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), {}) + .then(expect.fail, err => { + let _err; + try { + expect(err).to.be.an.instanceOf(MongoWriteConcernError); + expect(err.result).to.exist; + expect(err.result).to.have.property('ok', 1); + expect(err.result).to.not.have.property('errmsg'); + expect(err.result).to.not.have.property('code'); + expect(err.result).to.not.have.property('codeName'); + expect(err.result).to.have.property('writeConcernError'); + } catch (e) { + _err = e; + } finally { + cleanup(_err); + } + }); }); }); }); @@ -409,20 +411,22 @@ describe('MongoErrors', () => { topology.selectServer('primary', {}, (err, server) => { expect(err).to.not.exist; - server.command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), {}, err => { - let _err; - try { - expect(err).to.be.an.instanceOf(MongoWriteConcernError); - expect(err.result).to.exist; - expect(err.result.writeConcernError).to.deep.equal( - RAW_USER_WRITE_CONCERN_ERROR_INFO.writeConcernError - ); - } catch (e) { - _err = e; - } finally { - cleanup(_err); - } - }); + server + .command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), {}) + .then(expect.fail, err => { + let _err; + try { + expect(err).to.be.an.instanceOf(MongoWriteConcernError); + expect(err.result).to.exist; + expect(err.result.writeConcernError).to.deep.equal( + RAW_USER_WRITE_CONCERN_ERROR_INFO.writeConcernError + ); + } catch (e) { + _err = e; + } finally { + cleanup(_err); + } + }); }); }); }); diff --git a/test/unit/operations/find.test.ts b/test/unit/operations/find.test.ts index 657013de1c1..ef190168166 100644 --- a/test/unit/operations/find.test.ts +++ b/test/unit/operations/find.test.ts @@ -41,7 +41,7 @@ describe('FindOperation', function () { it('should build basic find command with filter', async () => { const findOperation = new FindOperation(undefined, namespace, filter); - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command'); await findOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith(namespace, { find: namespace.collection, @@ -54,7 +54,7 @@ describe('FindOperation', function () { oplogReplay: true }; const findOperation = new FindOperation(undefined, namespace, {}, options); - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command'); await findOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith( namespace, diff --git a/test/unit/operations/get_more.test.ts b/test/unit/operations/get_more.test.ts index 6385ba71c6b..55f0fed8b44 100644 --- a/test/unit/operations/get_more.test.ts +++ b/test/unit/operations/get_more.test.ts @@ -54,7 +54,7 @@ describe('GetMoreOperation', function () { ); const opts = { ...options, documentsReturnedIn: 'nextBatch', returnFieldSelector: null }; const operation = new GetMoreOperation(namespace, cursorId, server, opts); - const stub = sinon.stub(server, 'command').callsFake((_, __, ___, cb) => cb()); + const stub = sinon.stub(server, 'command').callsFake((_, __, ___) => {}); const expectedGetMoreCommand = { getMore: cursorId, @@ -104,7 +104,7 @@ describe('GetMoreOperation', function () { it('should build basic getMore command with cursorId and collection', async () => { const getMoreOperation = new GetMoreOperation(namespace, cursorId, server, {}); - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command'); await getMoreOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith(namespace, { getMore: cursorId, @@ -117,7 +117,7 @@ describe('GetMoreOperation', function () { batchSize: 234 }; const getMoreOperation = new GetMoreOperation(namespace, cursorId, server, options); - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command'); await getMoreOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith( namespace, @@ -130,7 +130,7 @@ describe('GetMoreOperation', function () { maxAwaitTimeMS: 234 }; const getMoreOperation = new GetMoreOperation(namespace, cursorId, server, options); - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command'); await getMoreOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith( namespace, @@ -188,7 +188,7 @@ describe('GetMoreOperation', function () { maxWireVersion: serverVersion }; const operation = new GetMoreOperation(namespace, cursorId, server, optionsWithComment); - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command'); await operation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith(namespace, getMore); }); diff --git a/test/unit/sdam/server.test.ts b/test/unit/sdam/server.test.ts index 7e55b59ba94..aff711bf386 100644 --- a/test/unit/sdam/server.test.ts +++ b/test/unit/sdam/server.test.ts @@ -66,14 +66,6 @@ describe('Server', () => { ); }); - context('when a server is created', function () { - it('calls the command function through commandAsync', async function () { - const serverSpy = sinon.stub(server, 'command').yieldsRight(undefined, { ok: 1 }); - await server.commandAsync(ns('dummy'), { ping: 1 }, {}); - expect(serverSpy).to.have.been.calledOnce; - }); - }); - for (const loadBalanced of [true, false]) { const mode = loadBalanced ? 'loadBalanced' : 'non-loadBalanced'; const contextSuffix = loadBalanced ? ' with connection provided' : ''; diff --git a/test/unit/sdam/topology.test.js b/test/unit/sdam/topology.test.js index 261a363c6df..cbc654420db 100644 --- a/test/unit/sdam/topology.test.js +++ b/test/unit/sdam/topology.test.js @@ -99,13 +99,13 @@ describe('Topology (unit)', function () { topology.selectServer('primary', {}, (err, server) => { expect(err).to.not.exist; - server.command(ns('admin.$cmd'), { ping: 1 }, { socketTimeoutMS: 250 }, (err, result) => { - expect(result).to.not.exist; - expect(err).to.exist; - expect(err).to.match(/timed out/); - - topology.close({}, done); - }); + server + .command(ns('admin.$cmd'), { ping: 1 }, { socketTimeoutMS: 250 }) + .then(expect.fail, err => { + expect(err).to.exist; + expect(err).to.match(/timed out/); + topology.close({}, done); + }); }); }); }); @@ -212,8 +212,7 @@ describe('Topology (unit)', function () { let poolCleared = false; topology.on('connectionPoolCleared', () => (poolCleared = true)); - server.command(ns('test.test'), { insert: { a: 42 } }, {}, (err, result) => { - expect(result).to.not.exist; + server.command(ns('test.test'), { insert: { a: 42 } }, {}).then(expect.fail, err => { expect(err).to.exist; expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.true; @@ -248,8 +247,7 @@ describe('Topology (unit)', function () { let poolCleared = false; topology.on('connectionPoolCleared', () => (poolCleared = true)); - server.command(ns('test.test'), { insert: { a: 42 } }, {}, (err, result) => { - expect(result).to.not.exist; + server.command(ns('test.test'), { insert: { a: 42 } }, {}).then(expect.fail, err => { expect(err).to.exist; expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.false; @@ -281,8 +279,7 @@ describe('Topology (unit)', function () { let serverDescription; server.on('descriptionReceived', sd => (serverDescription = sd)); - server.command(ns('test.test'), { insert: { a: 42 } }, {}, (err, result) => { - expect(result).to.not.exist; + server.command(ns('test.test'), { insert: { a: 42 } }, {}).then(expect.fail, err => { expect(err).to.exist; expect(err).to.eql(serverDescription.error); expect(server.description.type).to.equal('Unknown'); From 6df82a5175f55d6fae7b0185e8d1bd8d2cf826c9 Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 13 Feb 2024 13:14:50 -0500 Subject: [PATCH 04/24] remove async/await endTransaction --- src/sessions.ts | 152 ------------------------------------------------ 1 file changed, 152 deletions(-) diff --git a/src/sessions.ts b/src/sessions.ts index 6a5db3f27ef..e04c802b3cc 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -640,158 +640,6 @@ const endTransactionAsync = promisify( callback: (error: Error) => void ) => void ); -async function endTransaction( - session: ClientSession, - commandName: 'abortTransaction' | 'commitTransaction' -): Promise { - // handle any initial problematic cases - const txnState = session.transaction.state; - - if (txnState === TxnState.NO_TRANSACTION) { - throw new MongoTransactionError('No transaction started'); - } - - if (commandName === 'commitTransaction') { - if ( - txnState === TxnState.STARTING_TRANSACTION || - txnState === TxnState.TRANSACTION_COMMITTED_EMPTY - ) { - // the transaction was never started, we can safely exit here - session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY); - return; - } - - if (txnState === TxnState.TRANSACTION_ABORTED) { - throw new MongoTransactionError( - 'Cannot call commitTransaction after calling abortTransaction' - ); - } - } else { - if (txnState === TxnState.STARTING_TRANSACTION) { - // the transaction was never started, we can safely exit here - session.transaction.transition(TxnState.TRANSACTION_ABORTED); - return; - } - - if (txnState === TxnState.TRANSACTION_ABORTED) { - throw new MongoTransactionError('Cannot call abortTransaction twice'); - } - - if ( - txnState === TxnState.TRANSACTION_COMMITTED || - txnState === TxnState.TRANSACTION_COMMITTED_EMPTY - ) { - throw new MongoTransactionError( - 'Cannot call abortTransaction after calling commitTransaction' - ); - } - } - - // construct and send the command - const command: Document = { [commandName]: 1 }; - - // apply a writeConcern if specified - let writeConcern; - if (session.transaction.options.writeConcern) { - writeConcern = Object.assign({}, session.transaction.options.writeConcern); - } else if (session.clientOptions && session.clientOptions.writeConcern) { - writeConcern = { w: session.clientOptions.writeConcern.w }; - } - - if (txnState === TxnState.TRANSACTION_COMMITTED) { - writeConcern = Object.assign({ wtimeoutMS: 10000 }, writeConcern, { w: 'majority' }); - } - - if (writeConcern) { - WriteConcern.apply(command, writeConcern); - } - - if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) { - Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS }); - } - - function commandHandler(error?: Error) { - if (commandName !== 'commitTransaction') { - session.transaction.transition(TxnState.TRANSACTION_ABORTED); - if (session.loadBalanced) { - maybeClearPinnedConnection(session, { force: false }); - } - - // The spec indicates that we should ignore all errors on `abortTransaction` - return; - } - - session.transaction.transition(TxnState.TRANSACTION_COMMITTED); - if (error instanceof MongoError) { - if ( - isRetryableWriteError(error) || - error instanceof MongoWriteConcernError || - isMaxTimeMSExpiredError(error) - ) { - if (isUnknownTransactionCommitResult(error)) { - error.addErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult); - - // per txns spec, must unpin session in this case - session.unpin({ error }); - } - } else if (error.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) { - session.unpin({ error }); - } - } - - throw error; - } - - if (session.transaction.recoveryToken) { - command.recoveryToken = session.transaction.recoveryToken; - } - - const handleFirstCommandAttempt = (error?: Error) => { - if (command.abortTransaction) { - // always unpin on abort regardless of command outcome - session.unpin(); - } - - if (error instanceof MongoError && isRetryableWriteError(error)) { - // SPEC-1185: apply majority write concern when retrying commitTransaction - if (command.commitTransaction) { - // per txns spec, must unpin session in this case - session.unpin({ force: true }); - - command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, { - w: 'majority' - }); - } - - executeOperation( - session.client, - new RunAdminCommandOperation(command, { - session, - readPreference: ReadPreference.primary, - bypassPinningCheck: true - }) - ).then(() => commandHandler(), commandHandler); - return; - } - - commandHandler(error); - }; - - // send the command - try { - await executeOperation( - session.client, - new RunAdminCommandOperation(command, { - session, - readPreference: ReadPreference.primary, - bypassPinningCheck: true - }) - ); - handleFirstCommandAttempt(); - } catch (e) { - handleFirstCommandAttempt(e); - } -} function endTransaction( session: ClientSession, From dc2eab7a96c467b727b0cf45378297ab424be80d Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 13 Feb 2024 23:11:17 -0500 Subject: [PATCH 05/24] WIP --- src/cmap/connection_pool.ts | 190 ++----------------------- src/sdam/server.ts | 185 +++++++++++------------- test/unit/cmap/connection_pool.test.js | 10 +- 3 files changed, 100 insertions(+), 285 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 028fe0ae292..8208e1b9215 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -17,8 +17,7 @@ import { } from '../constants'; import { type AnyError, - MONGODB_ERROR_CODES, - MongoError, + type MongoError, MongoInvalidArgumentError, MongoMissingCredentialsError, MongoNetworkError, @@ -107,7 +106,8 @@ export interface ConnectionPoolOptions extends Omit; + resolve: (conn: Connection) => void; + reject: (err: AnyError) => void; timeoutController: TimeoutController; [kCancelled]?: boolean; } @@ -357,7 +357,7 @@ export class ConnectionPool extends TypedEventEmitter { * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or * explicitly destroyed by the new owner. */ - checkOut(callback: Callback): void { + async checkOut(): Promise { this.emitAndLog( ConnectionPool.CONNECTION_CHECK_OUT_STARTED, new ConnectionCheckOutStartedEvent(this) @@ -365,8 +365,10 @@ export class ConnectionPool extends TypedEventEmitter { const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS; + const { promise, resolve, reject } = promiseWithResolvers(); const waitQueueMember: WaitQueueMember = { - callback, + resolve, + reject, timeoutController: new TimeoutController(waitQueueTimeoutMS) }; waitQueueMember.timeoutController.signal.addEventListener('abort', () => { @@ -377,7 +379,7 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new ConnectionCheckOutFailedEvent(this, 'timeout') ); - waitQueueMember.callback( + waitQueueMember.reject( new WaitQueueTimeoutError( this.loadBalanced ? this.waitQueueErrorMetrics() @@ -389,6 +391,8 @@ export class ConnectionPool extends TypedEventEmitter { this[kWaitQueue].push(waitQueueMember); process.nextTick(() => this.processWaitQueue()); + + return promise; } /** @@ -540,168 +544,6 @@ export class ConnectionPool extends TypedEventEmitter { ); } - /** - * Runs a lambda with an implicitly checked out connection, checking that connection back in when the lambda - * has completed by calling back. - * - * NOTE: please note the required signature of `fn` - * - * @remarks When in load balancer mode, connections can be pinned to cursors or transactions. - * In these cases we pass the connection in to this method to ensure it is used and a new - * connection is not checked out. - * - * @param conn - A pinned connection for use in load balancing mode. - * @param fn - A function which operates on a managed connection - * @param callback - The original callback - */ - withConnection( - conn: Connection | undefined, - fn: WithConnectionCallback, - callback: Callback - ): void { - if (conn) { - // use the provided connection, and do _not_ check it in after execution - fn(undefined, conn, (fnErr, result) => { - if (fnErr) { - return this.withReauthentication(fnErr, conn, fn, callback); - } - callback(undefined, result); - }); - return; - } - - this.checkOut((err, conn) => { - // don't callback with `err` here, we might want to act upon it inside `fn` - fn(err as MongoError, conn, (fnErr, result) => { - if (fnErr) { - if (conn) { - this.withReauthentication(fnErr, conn, fn, callback); - } else { - callback(fnErr); - } - } else { - callback(undefined, result); - } - - if (conn) { - this.checkIn(conn); - } - }); - }); - } - - private withReauthentication( - fnErr: AnyError, - conn: Connection, - fn: WithConnectionCallback, - callback: Callback - ) { - if (fnErr instanceof MongoError && fnErr.code === MONGODB_ERROR_CODES.Reauthenticate) { - this.reauthenticate(conn, fn, (error, res) => { - if (error) { - return callback(error); - } - callback(undefined, res); - }); - } else { - callback(fnErr); - } - } - - /** - * Reauthenticate on the same connection and then retry the operation. - */ - private reauthenticate( - connection: Connection, - fn: WithConnectionCallback, - callback: Callback - ): void { - const authContext = connection.authContext; - if (!authContext) { - return callback(new MongoRuntimeError('No auth context found on connection.')); - } - const credentials = authContext.credentials; - if (!credentials) { - return callback( - new MongoMissingCredentialsError( - 'Connection is missing credentials when asked to reauthenticate' - ) - ); - } - const resolvedCredentials = credentials.resolveAuthMechanism(connection.hello); - const provider = this[kServer].topology.client.s.authProviders.getOrCreateProvider( - resolvedCredentials.mechanism - ); - if (!provider) { - return callback( - new MongoMissingCredentialsError( - `Reauthenticate failed due to no auth provider for ${credentials.mechanism}` - ) - ); - } - provider.reauth(authContext).then( - () => { - fn(undefined, connection, (fnErr, fnResult) => { - if (fnErr) { - return callback(fnErr); - } - callback(undefined, fnResult); - }); - }, - error => callback(error) - ); - } - - async withConnectionAsync( - conn: Connection | undefined, - fn: AsyncWithConnectionCallback - ): Promise { - let result; - if (conn) { - // use the provided connection, and do _not_ check it in after execution - try { - result = await fn(undefined, conn); - return result; - } catch (fnErr) { - if (fnErr instanceof MongoError && fnErr.code === MONGODB_ERROR_CODES.Reauthenticate) { - conn = this.reauthenticateAsync(conn); - } else { - throw fnErr; - } - } - } else { - const { promise, resolve, reject } = promiseWithResolvers(); - this.checkOut((err, conn) => { - fn(err as MongoError, conn).then( - () => { - if (conn) this.checkIn(conn); - }, - fnErr => { - if (conn) { - this.withReauthenticationAsync(fnErr, conn, fn).then(resolve, reject); - } else { - reject(fnErr); - } - } - ); - }); - return promise; - } - } - - async withReauthenticationAsync( - fnErr: AnyError, - conn: Connection, - fn: AsyncWithConnectionCallback - ): Promise { - if (fnErr instanceof MongoError && fnErr.code === MONGODB_ERROR_CODES.Reauthenticate) { - conn = await this.reauthenticateAsync(conn); - return fn(undefined, conn); - } else { - throw fnErr; - } - } - async reauthenticateAsync(connection: Connection): Promise { const authContext = connection.authContext; if (!authContext) { @@ -924,7 +766,7 @@ export class ConnectionPool extends TypedEventEmitter { ); waitQueueMember.timeoutController.clear(); this[kWaitQueue].shift(); - waitQueueMember.callback(error); + waitQueueMember.reject(error); continue; } @@ -946,7 +788,7 @@ export class ConnectionPool extends TypedEventEmitter { waitQueueMember.timeoutController.clear(); this[kWaitQueue].shift(); - waitQueueMember.callback(undefined, connection); + waitQueueMember.resolve(connection); } } @@ -972,16 +814,17 @@ export class ConnectionPool extends TypedEventEmitter { // TODO(NODE-5192): Remove this cast new ConnectionCheckOutFailedEvent(this, 'connectionError', err as MongoError) ); + waitQueueMember.reject(err); } else if (connection) { this[kCheckedOut].add(connection); this.emitAndLog( ConnectionPool.CONNECTION_CHECKED_OUT, new ConnectionCheckedOutEvent(this, connection) ); + waitQueueMember.resolve(connection); } waitQueueMember.timeoutController.clear(); - waitQueueMember.callback(err, connection); } process.nextTick(() => this.processWaitQueue()); }); @@ -1003,8 +846,3 @@ export type WithConnectionCallback = ( connection: Connection | undefined, callback: Callback ) => void; - -type AsyncWithConnectionCallback = ( - error: MongoError | undefined, - connection: Connection | undefined -) => Promise; diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 8c6e50ab6d5..98067336bda 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -47,7 +47,6 @@ import { makeStateMachine, maxWireVersion, type MongoDBNamespace, - promiseWithResolvers, supportsRetryableWrites } from '../utils'; import { @@ -319,18 +318,8 @@ export class Server extends TypedEventEmitter { // check. if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) { - const { promise: checkedOutPromise, resolve, reject } = promiseWithResolvers(); - - this.pool.checkOut((err, conn) => { - if (err || conn == null) { - reject(err); - return; - } - resolve(conn); - }); - const checkedOut = await checkedOutPromise; - - session.pin(checkedOut); + conn = await this.pool.checkOut(); + session.pin(conn); return this.command(ns, cmd, finalOptions); } @@ -368,13 +357,11 @@ export class Server extends TypedEventEmitter { throw err; } - const handler = makeAsyncOperationHandler(this, conn, cmd, finalOptions); - try { const result = await conn.command(ns, cmd, finalOptions); - return await handler(undefined, result); + return await this.operationHandler(conn, cmd, finalOptions, undefined, result); } catch (e) { - return await handler(e); + return await this.operationHandler(conn, cmd, finalOptions, e, undefined); } }; const shouldReauth = (e: AnyError): boolean => { @@ -397,14 +384,7 @@ export class Server extends TypedEventEmitter { } // check out connection - const { promise: checkOutPromise, resolve, reject } = promiseWithResolvers(); - this.pool.checkOut((err, conn) => { - if (err) reject(err); - // There is no way to not error and get an undefined connection - else resolve(conn as Connection); - }); - - conn = await checkOutPromise; + conn = await this.pool.checkOut(); let rv: Document; // call executeCommandAsync with that checked out connection @@ -483,6 +463,83 @@ export class Server extends TypedEventEmitter { } } + private async operationHandler( + connection: Connection, + cmd: Document, + options: CommandOptions | GetMoreOptions | undefined, + error?: AnyError, + result?: any + ): Promise { + const session = options?.session; + // We should not swallow an error if it is present. + if (error == null && result != null) { + return result; + } + + if (options != null && 'noResponse' in options && options.noResponse === true) { + return null as T; + } + + if (!error) { + throw new MongoUnexpectedServerResponseError('Empty response with no error'); + } + + if (error.name === 'AbortError' && error.cause instanceof MongoError) { + error = error.cause; + } + + if (!(error instanceof MongoError)) { + // Node.js or some other error we have not special handling for + throw error; + } + + if (connectionIsStale(this.pool, connection)) { + throw error; + } + + if (error instanceof MongoNetworkError) { + if (session && !session.hasEnded && session.serverSession) { + session.serverSession.isDirty = true; + } + + // inActiveTransaction check handles commit and abort. + if ( + inActiveTransaction(session, cmd) && + !error.hasErrorLabel(MongoErrorLabel.TransientTransactionError) + ) { + error.addErrorLabel(MongoErrorLabel.TransientTransactionError); + } + + if ( + (isRetryableWritesEnabled(this.topology) || isTransactionCommand(cmd)) && + supportsRetryableWrites(this) && + !inActiveTransaction(session, cmd) + ) { + error.addErrorLabel(MongoErrorLabel.RetryableWriteError); + } + } else { + if ( + (isRetryableWritesEnabled(this.topology) || isTransactionCommand(cmd)) && + needsRetryableWriteLabel(error, maxWireVersion(this)) && + !inActiveTransaction(session, cmd) + ) { + error.addErrorLabel(MongoErrorLabel.RetryableWriteError); + } + } + + if ( + session && + session.isPinned && + error.hasErrorLabel(MongoErrorLabel.TransientTransactionError) + ) { + session.unpin({ force: true }); + } + + this.handleError(error, connection); + + throw error; + } + /** * Decrement the operation count, returning the new count. */ @@ -563,81 +620,3 @@ function inActiveTransaction(session: ClientSession | undefined, cmd: Document) function isRetryableWritesEnabled(topology: Topology) { return topology.s.options.retryWrites !== false; } - -function makeAsyncOperationHandler( - server: Server, - connection: Connection, - cmd: Document, - options: CommandOptions | GetMoreOptions | undefined -): (error?: AnyError, result?: any) => Promise { - const session = options?.session; - return async function handleOperationResult(error, result) { - // We should not swallow an error if it is present. - if (error == null && result != null) { - return result; - } - - if (options != null && 'noResponse' in options && options.noResponse === true) { - return null; - } - - if (!error) { - throw new MongoUnexpectedServerResponseError('Empty response with no error'); - } - - if (error.name === 'AbortError' && error.cause instanceof MongoError) { - error = error.cause; - } - - if (!(error instanceof MongoError)) { - // Node.js or some other error we have not special handling for - throw error; - } - - if (connectionIsStale(server.pool, connection)) { - throw error; - } - - if (error instanceof MongoNetworkError) { - if (session && !session.hasEnded && session.serverSession) { - session.serverSession.isDirty = true; - } - - // inActiveTransaction check handles commit and abort. - if ( - inActiveTransaction(session, cmd) && - !error.hasErrorLabel(MongoErrorLabel.TransientTransactionError) - ) { - error.addErrorLabel(MongoErrorLabel.TransientTransactionError); - } - - if ( - (isRetryableWritesEnabled(server.topology) || isTransactionCommand(cmd)) && - supportsRetryableWrites(server) && - !inActiveTransaction(session, cmd) - ) { - error.addErrorLabel(MongoErrorLabel.RetryableWriteError); - } - } else { - if ( - (isRetryableWritesEnabled(server.topology) || isTransactionCommand(cmd)) && - needsRetryableWriteLabel(error, maxWireVersion(server)) && - !inActiveTransaction(session, cmd) - ) { - error.addErrorLabel(MongoErrorLabel.RetryableWriteError); - } - } - - if ( - session && - session.isPinned && - error.hasErrorLabel(MongoErrorLabel.TransientTransactionError) - ) { - session.unpin({ force: true }); - } - - server.handleError(error, connection); - - throw error; - }; -} diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 43177b72962..e1cdad271d4 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -58,7 +58,7 @@ describe('Connection Pool', function () { const events = []; pool.on('connectionClosed', event => events.push(event)); - const conn = await promisify(pool.checkOut.bind(pool))(); + const conn = await pool.checkOut(); const error = await conn.command(ns('admin.$cmd'), { ping: 1 }, {}).catch(error => error); expect(error).to.be.instanceOf(Error); @@ -117,11 +117,9 @@ describe('Connection Pool', function () { pool.ready(); - pool.checkOut((err, conn) => { - expect(err).to.not.exist; + pool.checkOut().then(conn => { expect(conn).to.exist; - - pool.checkOut(err => { + pool.checkOut().then(expect.fail, err => { expect(err).to.exist.and.be.instanceOf(WaitQueueTimeoutError); // We can only process the wait queue with `checkIn` and `checkOut`, so we @@ -135,7 +133,7 @@ describe('Connection Pool', function () { setImmediate(() => expect(pool).property('waitQueueSize').to.equal(0)); done(); }); - }); + }, expect.fail); }); describe('minPoolSize population', function () { From 484680d9db42f6f524c49f092c4be2b3baaee07f Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 14 Feb 2024 14:19:01 -0500 Subject: [PATCH 06/24] minimal functional code --- src/sdam/server.ts | 107 ++++++++++++--------------------------------- 1 file changed, 27 insertions(+), 80 deletions(-) diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 98067336bda..386abb832fc 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -33,7 +33,6 @@ import { MongoRuntimeError, MongoServerClosedError, type MongoServerError, - MongoUnexpectedServerResponseError, needsRetryableWriteLabel } from '../error'; import type { ServerApi } from '../mongo_client'; @@ -323,47 +322,17 @@ export class Server extends TypedEventEmitter { return this.command(ns, cmd, finalOptions); } - this.incrementOperationCount(); - - /* - const executeCommand: WithConnectionCallBack = (err, conn, cb) => { - if (err || !conn) { - this.decrementOperationCount(); - if (!err) { - return cb(new MongoRuntimeError('Failed to create connection without error')); - } - if (!(err instanceof PoolClearedError)) { - this.handleError(err); - } - return cb(err); - } - - const handler = makeOperationHandler(this, conn, cmd, finalOptions, (error, response) => { - this.decrementOperationCount(); - cb(error, response); - }); - conn.command(ns, cmd, finalOptions).then( - r => handler(undefined, r), - e => handler(e) - ); - }; - */ - - const executeCommandAsync = async (err?: AnyError, conn?: Connection) => { - if (err || !conn) { - this.decrementOperationCount(); - if (!err) throw new MongoRuntimeError('Failed to create connection without error'); - if (!(err instanceof PoolClearedError)) this.handleError(err); - - throw err; - } + const runCommand = async (conn: Connection) => { + this.incrementOperationCount(); try { - const result = await conn.command(ns, cmd, finalOptions); - return await this.operationHandler(conn, cmd, finalOptions, undefined, result); + return await conn.command(ns, cmd, finalOptions); } catch (e) { - return await this.operationHandler(conn, cmd, finalOptions, e, undefined); + throw this.decorateAndHandleError(conn, cmd, finalOptions, e); + } finally { + this.decrementOperationCount(); } }; + const shouldReauth = (e: AnyError): boolean => { return e instanceof MongoError && e.code === MONGODB_ERROR_CODES.Reauthenticate; }; @@ -372,47 +341,39 @@ export class Server extends TypedEventEmitter { if (conn) { // send operation, possibly reauthenticating and return without checking back in try { - return await executeCommandAsync(undefined, conn); + return await runCommand(conn); } catch (e) { if (shouldReauth(e)) { conn = await this.pool.reauthenticateAsync(conn); - return await executeCommandAsync(undefined, conn); + return await runCommand(conn); } - throw e; } } // check out connection - conn = await this.pool.checkOut(); + try { + conn = await this.pool.checkOut(); + } catch (e) { + if (!e) throw new MongoRuntimeError('Failed to create connection without error'); + if (!(e instanceof PoolClearedError)) this.handleError(e); - let rv: Document; + throw e; + } // call executeCommandAsync with that checked out connection try { - rv = await executeCommandAsync(undefined, conn); + return await runCommand(conn); } catch (e) { // if it errors - if (conn) { - // if connection is defined - // reauthenticate - if (shouldReauth(e)) { - conn = await this.pool.reauthenticateAsync(conn); - rv = await executeCommandAsync(undefined, conn); - } else { - throw e; - } - } else { - // throw the error - throw e; + // check if we should reauthenticate + if (shouldReauth(e)) { + conn = await this.pool.reauthenticateAsync(conn); + return await runCommand(conn); } - } - - // if connection exists - // check connection back into pool - if (conn) { + throw e; + } finally { this.pool.checkIn(conn); } - return rv; } /** @@ -463,27 +424,13 @@ export class Server extends TypedEventEmitter { } } - private async operationHandler( + private decorateAndHandleError( connection: Connection, cmd: Document, options: CommandOptions | GetMoreOptions | undefined, - error?: AnyError, - result?: any - ): Promise { + error: AnyError + ): AnyError { const session = options?.session; - // We should not swallow an error if it is present. - if (error == null && result != null) { - return result; - } - - if (options != null && 'noResponse' in options && options.noResponse === true) { - return null as T; - } - - if (!error) { - throw new MongoUnexpectedServerResponseError('Empty response with no error'); - } - if (error.name === 'AbortError' && error.cause instanceof MongoError) { error = error.cause; } @@ -537,7 +484,7 @@ export class Server extends TypedEventEmitter { this.handleError(error, connection); - throw error; + return error; } /** From 8d9fde3764360dd1da4ff71dbf3756d3ea454cb7 Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 14 Feb 2024 14:19:29 -0500 Subject: [PATCH 07/24] Updated tests --- .../crud/abstract_operation.test.ts | 2 +- .../non-server-retryable_writes.test.ts | 8 +- .../server-selection/operation_count.test.ts | 6 +- test/tools/cmap_spec_runner.ts | 4 +- test/unit/cmap/connection_pool.test.js | 116 ++---------------- 5 files changed, 18 insertions(+), 118 deletions(-) diff --git a/test/integration/crud/abstract_operation.test.ts b/test/integration/crud/abstract_operation.test.ts index 5e70c531c99..7632ac6f598 100644 --- a/test/integration/crud/abstract_operation.test.ts +++ b/test/integration/crud/abstract_operation.test.ts @@ -353,7 +353,7 @@ describe('abstract operation', async function () { subclassType.name === 'ProfilingLevelOperation' ? { ok: 1, was: 1 } : { ok: 1 }; const cmdCallerStub = sinon .stub(Server.prototype, 'command') - .yieldsRight(undefined, yieldDoc); + .returns(Promise.resolve(yieldDoc)); if (sameServerOnlyOperationSubclasses.includes(subclassType.name.toString())) { await subclassInstance.execute(constructorServer, client.session); } else { diff --git a/test/integration/retryable-writes/non-server-retryable_writes.test.ts b/test/integration/retryable-writes/non-server-retryable_writes.test.ts index 7ef748f5600..c915f200c15 100644 --- a/test/integration/retryable-writes/non-server-retryable_writes.test.ts +++ b/test/integration/retryable-writes/non-server-retryable_writes.test.ts @@ -33,11 +33,13 @@ describe('Non Server Retryable Writes', function () { { requires: { topology: 'replicaset', mongodb: '>=4.2.9' } }, async () => { const serverCommandStub = sinon.stub(Server.prototype, 'command'); - serverCommandStub.onCall(0).yieldsRight(new PoolClearedError('error')); + serverCommandStub.onCall(0).returns(Promise.reject(new PoolClearedError('error'))); serverCommandStub .onCall(1) - .yieldsRight( - new MongoWriteConcernError({ errorLabels: ['NoWritesPerformed'], errorCode: 10107 }, {}) + .returns( + Promise.reject( + new MongoWriteConcernError({ errorLabels: ['NoWritesPerformed'], errorCode: 10107 }, {}) + ) ); const insertResult = await collection.insertOne({ _id: 1 }).catch(error => error); diff --git a/test/integration/server-selection/operation_count.test.ts b/test/integration/server-selection/operation_count.test.ts index 0741fa48cdf..5ab60a128b9 100644 --- a/test/integration/server-selection/operation_count.test.ts +++ b/test/integration/server-selection/operation_count.test.ts @@ -170,9 +170,9 @@ describe('Server Operation Count Tests', function () { const server = Array.from(client.topology.s.servers.values())[0]; expect(server.s.operationCount).to.equal(0); - sinon.stub(ConnectionPool.prototype, 'checkOut').callsFake(function (cb) { - cb(new Error('unable to checkout connection'), undefined); - }); + sinon + .stub(ConnectionPool.prototype, 'checkOut') + .returns(Promise.reject(new Error('unable to checkout connection'))); const commandSpy = sinon.spy(server, 'command'); const error = await collection.insertOne({ count: 1 }).catch(e => e); diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index 4c11736857e..9d0817548f1 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -185,9 +185,7 @@ const compareInputToSpec = (input, expected, message) => { const getTestOpDefinitions = (threadContext: ThreadContext) => ({ checkOut: async function (op) { - const connection: Connection = await promisify(ConnectionPool.prototype.checkOut).call( - threadContext.pool - ); + const connection: Connection = await ConnectionPool.prototype.checkOut.call(threadContext.pool); if (op.label != null) { threadContext.connections.set(op.label, connection); } else { diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index e1cdad271d4..3b2d24f3d29 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -1,6 +1,6 @@ 'use strict'; -const { ConnectionPool } = require('../../mongodb'); +const { ConnectionPool, MongoError } = require('../../mongodb'); const { WaitQueueTimeoutError } = require('../../mongodb'); const mock = require('../../tools/mongodb-mock/index'); const sinon = require('sinon'); @@ -69,7 +69,7 @@ describe('Connection Pool', function () { expect(closeEvent).have.property('reason').equal('error'); }); - it('should propagate socket timeouts to connections', function (done) { + it('should propagate socket timeouts to connections', async function () { mockMongod.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { @@ -87,18 +87,12 @@ describe('Connection Pool', function () { pool.ready(); - pool.withConnection( - (err, conn, cb) => { - expect(err).to.not.exist; - conn.command(ns('admin.$cmd'), { ping: 1 }, undefined, (err, result) => { - expect(err).to.exist; - expect(result).to.not.exist; - expect(err).to.match(/timed out/); - cb(); - }); - }, - () => pool.close(done) - ); + const conn = await pool.checkOut(); + const maybeError = await conn.command(ns('admin.$cmd'), { ping: 1 }, undefined).catch(e => e); + expect(maybeError).to.be.instanceOf(MongoError); + expect(maybeError).to.match(/timed out/); + + pool.checkIn(conn); }); it('should clear timed out wait queue members if no connections are available', function (done) { @@ -222,98 +216,4 @@ describe('Connection Pool', function () { expect(createConnStub).to.have.been.calledTwice; }); }); - - describe('withConnection', function () { - it('should manage a connection for a successful operation', function (done) { - mockMongod.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply(mock.HELLO); - } - }); - - const pool = new ConnectionPool(stubServer, { hostAddress: mockMongod.hostAddress() }); - pool.ready(); - - const callback = (err, result) => { - expect(err).to.not.exist; - expect(result).to.exist; - pool.close(done); - }; - - pool.withConnection((err, conn, cb) => { - expect(err).to.not.exist; - - conn.command( - ns('$admin.cmd'), - { [LEGACY_HELLO_COMMAND]: 1 }, - undefined, - (cmdErr, hello) => { - expect(cmdErr).to.not.exist; - cb(undefined, hello); - } - ); - }, callback); - }); - - it('should allow user interaction with an error', function (done) { - mockMongod.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.connection.destroy(); - } - }); - - const pool = new ConnectionPool(stubServer, { - waitQueueTimeoutMS: 200, - hostAddress: mockMongod.hostAddress() - }); - - pool.ready(); - - const callback = err => { - expect(err).to.exist; - expect(err).to.match(/closed/); - pool.close(done); - }; - - pool.withConnection( - undefined, - (err, conn, cb) => { - expect(err).to.exist; - expect(err).to.match(/closed/); - cb(err); - }, - callback - ); - }); - - it('should return an error to the original callback', function (done) { - mockMongod.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply(mock.HELLO); - } - }); - - const pool = new ConnectionPool(stubServer, { hostAddress: mockMongod.hostAddress() }); - pool.ready(); - - const callback = (err, result) => { - expect(err).to.exist; - expect(result).to.not.exist; - expect(err).to.match(/my great error/); - pool.close(done); - }; - - pool.withConnection( - undefined, - (err, conn, cb) => { - expect(err).to.not.exist; - cb(new Error('my great error')); - }, - callback - ); - }); - }); }); From 2116955b2dd07ecd298ece31a4434878a04f2dfc Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 14 Feb 2024 14:52:38 -0500 Subject: [PATCH 08/24] Address lint failures --- src/sdam/server.ts | 35 +++----- .../standalone-topology-lifecycle.json | 79 +++++++++++++++++++ ...rver_discovery_and_monitoring.spec.test.ts | 1 - test/unit/cmap/connection_pool.test.js | 2 - test/unit/operations/get_more.test.ts | 2 +- test/unit/sdam/server.test.ts | 1 - 6 files changed, 93 insertions(+), 27 deletions(-) create mode 100644 test/spec/server-discovery-and-monitoring/unified/standalone-topology-lifecycle.json diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 386abb832fc..d4945ff67d1 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -338,42 +338,33 @@ export class Server extends TypedEventEmitter { }; // FIXME: This is where withConnection logic should go - if (conn) { - // send operation, possibly reauthenticating and return without checking back in + if (!conn) { try { - return await runCommand(conn); + conn = await this.pool.checkOut(); } catch (e) { - if (shouldReauth(e)) { - conn = await this.pool.reauthenticateAsync(conn); - return await runCommand(conn); - } + if (!e) throw new MongoRuntimeError('Failed to create connection without error'); + if (!(e instanceof PoolClearedError)) this.handleError(e); + throw e; } } - // check out connection + // Reauth flow + let rv; try { - conn = await this.pool.checkOut(); + rv = await runCommand(conn); } catch (e) { - if (!e) throw new MongoRuntimeError('Failed to create connection without error'); - if (!(e instanceof PoolClearedError)) this.handleError(e); - - throw e; - } - // call executeCommandAsync with that checked out connection - try { - return await runCommand(conn); - } catch (e) { - // if it errors - // check if we should reauthenticate if (shouldReauth(e)) { conn = await this.pool.reauthenticateAsync(conn); - return await runCommand(conn); + rv = await runCommand(conn); + } else { + throw e; } - throw e; } finally { this.pool.checkIn(conn); } + + return rv; } /** diff --git a/test/spec/server-discovery-and-monitoring/unified/standalone-topology-lifecycle.json b/test/spec/server-discovery-and-monitoring/unified/standalone-topology-lifecycle.json new file mode 100644 index 00000000000..c61edb8b9ee --- /dev/null +++ b/test/spec/server-discovery-and-monitoring/unified/standalone-topology-lifecycle.json @@ -0,0 +1,79 @@ +{ + "description": "standalone-heartbeat", + "schemaVersion": "1.16", + "runOnRequirements": [ + { + "topologies": [ + "single" + ], + "minServerVersion": "4.4" + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient" + } + } + ], + "tests": [ + { + "description": "Topology lifecycle", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "topologyDescriptionChangedEvent", + "serverDescriptionChangedEvent" + ] + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 2 + } + }, + { + "name": "close", + "object": "client" + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "events": [ + { + "topologyDescriptionChangedEvent": {} + }, + { + "serverDescriptionChangedEvent": {} + }, + { + "topologyDescriptionChangedEvent": {} + }, + { + "topologyDescriptionChangedEvent": {} + } + ], + "ignoreExtraEvents": true + } + ] + } + ] +} diff --git a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts index d798b0dcfe4..3b8f7a199cf 100644 --- a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts +++ b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts @@ -3,7 +3,6 @@ import { expect } from 'chai'; import * as fs from 'fs'; import * as path from 'path'; import * as sinon from 'sinon'; -import { promisify } from 'util'; import { ConnectionPool, diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 3b2d24f3d29..b6e408e3d56 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -6,9 +6,7 @@ const mock = require('../../tools/mongodb-mock/index'); const sinon = require('sinon'); const { expect } = require('chai'); const { setImmediate } = require('timers'); -const { promisify } = require('util'); const { ns, isHello } = require('../../mongodb'); -const { LEGACY_HELLO_COMMAND } = require('../../mongodb'); const { createTimerSandbox } = require('../timer_sandbox'); const { topologyWithPlaceholderClient } = require('../../tools/utils'); const { MongoClientAuthProviders } = require('../../mongodb'); diff --git a/test/unit/operations/get_more.test.ts b/test/unit/operations/get_more.test.ts index 55f0fed8b44..603c3ccd531 100644 --- a/test/unit/operations/get_more.test.ts +++ b/test/unit/operations/get_more.test.ts @@ -54,7 +54,7 @@ describe('GetMoreOperation', function () { ); const opts = { ...options, documentsReturnedIn: 'nextBatch', returnFieldSelector: null }; const operation = new GetMoreOperation(namespace, cursorId, server, opts); - const stub = sinon.stub(server, 'command').callsFake((_, __, ___) => {}); + const stub = sinon.stub(server, 'command').returns(Promise.resolve({})); const expectedGetMoreCommand = { getMore: cursorId, diff --git a/test/unit/sdam/server.test.ts b/test/unit/sdam/server.test.ts index aff711bf386..30358b3ee6e 100644 --- a/test/unit/sdam/server.test.ts +++ b/test/unit/sdam/server.test.ts @@ -9,7 +9,6 @@ import { MongoErrorLabel, MongoNetworkError, MongoNetworkTimeoutError, - ns, ObjectId, Server, ServerDescription, From 3a41fac637e1c7be64659d66a790097fd17858e1 Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 14 Feb 2024 17:15:54 -0500 Subject: [PATCH 09/24] WIP: Fixing test failures --- src/sdam/server.ts | 58 +++++++++---------- .../retryable_writes.spec.prose.test.ts | 16 +++-- .../server-selection/operation_count.test.ts | 6 +- test/tools/spec-runner/context.js | 18 ++---- test/unit/operations/get_more.test.ts | 2 +- test/unit/operations/kill_cursors.test.ts | 2 +- 6 files changed, 50 insertions(+), 52 deletions(-) diff --git a/src/sdam/server.ts b/src/sdam/server.ts index d4945ff67d1..438533f0b39 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -30,7 +30,6 @@ import { MongoInvalidArgumentError, MongoNetworkError, MongoNetworkTimeoutError, - MongoRuntimeError, MongoServerClosedError, type MongoServerError, needsRetryableWriteLabel @@ -315,56 +314,53 @@ export class Server extends TypedEventEmitter { // require special logic to decrement it again, or would double increment (the load // balanced code makes a recursive call). Instead, we increment the count after this // check. - if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) { conn = await this.pool.checkOut(); session.pin(conn); return this.command(ns, cmd, finalOptions); } - const runCommand = async (conn: Connection) => { - this.incrementOperationCount(); - try { - return await conn.command(ns, cmd, finalOptions); - } catch (e) { - throw this.decorateAndHandleError(conn, cmd, finalOptions, e); - } finally { - this.decrementOperationCount(); - } - }; - - const shouldReauth = (e: AnyError): boolean => { - return e instanceof MongoError && e.code === MONGODB_ERROR_CODES.Reauthenticate; - }; - - // FIXME: This is where withConnection logic should go + this.incrementOperationCount(); if (!conn) { try { conn = await this.pool.checkOut(); - } catch (e) { - if (!e) throw new MongoRuntimeError('Failed to create connection without error'); - if (!(e instanceof PoolClearedError)) this.handleError(e); + } catch (checkoutError) { + if (!(checkoutError instanceof PoolClearedError)) this.handleError(checkoutError); - throw e; + throw checkoutError; } } // Reauth flow - let rv; try { - rv = await runCommand(conn); - } catch (e) { - if (shouldReauth(e)) { + return await this.executeCommand(conn, ns, cmd, finalOptions); + } catch (operationError) { + if ( + operationError instanceof MongoError && + operationError.code === MONGODB_ERROR_CODES.Reauthenticate + ) { conn = await this.pool.reauthenticateAsync(conn); - rv = await runCommand(conn); + return await this.executeCommand(conn, ns, cmd, finalOptions); } else { - throw e; + throw operationError; } } finally { this.pool.checkIn(conn); + this.decrementOperationCount(); } + } - return rv; + private async executeCommand( + conn: Connection, + ns: MongoDBNamespace, + cmd: Document, + options?: CommandOptions + ): Promise { + try { + return await conn.command(ns, cmd, options); + } catch (commandOptions) { + throw this.decorateAndHandleError(conn, cmd, options, commandOptions); + } } /** @@ -415,6 +411,10 @@ export class Server extends TypedEventEmitter { } } + /** + * Ensure that error is properly decorated and internal state is updated before throwing + * @internal + */ private decorateAndHandleError( connection: Connection, cmd: Document, diff --git a/test/integration/retryable-writes/retryable_writes.spec.prose.test.ts b/test/integration/retryable-writes/retryable_writes.spec.prose.test.ts index 201e6a95294..c40d8264e7f 100644 --- a/test/integration/retryable-writes/retryable_writes.spec.prose.test.ts +++ b/test/integration/retryable-writes/retryable_writes.spec.prose.test.ts @@ -278,15 +278,19 @@ describe('Retryable Writes Spec Prose', () => { const serverCommandStub = sinon.stub(Server.prototype, 'command'); serverCommandStub .onCall(0) - .yieldsRight( - new MongoWriteConcernError({ errorLabels: ['RetryableWriteError'], code: 91 }, {}) + .returns( + Promise.reject( + new MongoWriteConcernError({ errorLabels: ['RetryableWriteError'], code: 91 }, {}) + ) ); serverCommandStub .onCall(1) - .yieldsRight( - new MongoWriteConcernError( - { errorLabels: ['RetryableWriteError', 'NoWritesPerformed'], errorCode: 10107 }, - {} + .returns( + Promise.reject( + new MongoWriteConcernError( + { errorLabels: ['RetryableWriteError', 'NoWritesPerformed'], errorCode: 10107 }, + {} + ) ) ); diff --git a/test/integration/server-selection/operation_count.test.ts b/test/integration/server-selection/operation_count.test.ts index 5ab60a128b9..ac415baa6a4 100644 --- a/test/integration/server-selection/operation_count.test.ts +++ b/test/integration/server-selection/operation_count.test.ts @@ -103,9 +103,9 @@ describe('Server Operation Count Tests', function () { const server = Array.from(client.topology.s.servers.values())[0]; expect(server.s.operationCount).to.equal(0); - sinon.stub(ConnectionPool.prototype, 'checkOut').callsFake(function (cb) { - cb(new Error('unable to checkout connection'), undefined); - }); + sinon + .stub(ConnectionPool.prototype, 'checkOut') + .returns(Promise.reject(new Error('unable to checkout connection'))); const commandSpy = sinon.spy(server, 'command'); const error = await collection.findOne({ count: 1 }).catch(e => e); diff --git a/test/tools/spec-runner/context.js b/test/tools/spec-runner/context.js index 304b964f1a1..7a678194b55 100644 --- a/test/tools/spec-runner/context.js +++ b/test/tools/spec-runner/context.js @@ -143,23 +143,17 @@ class TestRunnerContext { return Promise.all(cleanupPromises).then(cleanup, cleanup); } - targetedFailPoint(options) { + async targetedFailPoint(options) { const session = options.session; const failPoint = options.failPoint; expect(session.isPinned).to.be.true; - return new Promise((resolve, reject) => { - const serverOrConnection = session.loadBalanced - ? session.pinnedConnection - : session.transaction.server; + const serverOrConnection = session.loadBalanced + ? session.pinnedConnection + : session.transaction.server; - serverOrConnection.command(ns('admin.$cmd'), failPoint, undefined, err => { - if (err) return reject(err); - - this.appliedFailPoints.push(failPoint); - resolve(); - }); - }); + await serverOrConnection.command(ns('admin.$cmd'), failPoint, {}); + this.appliedFailPoints.push(failPoint); } enableFailPoint(failPoint) { diff --git a/test/unit/operations/get_more.test.ts b/test/unit/operations/get_more.test.ts index 603c3ccd531..64246b73a22 100644 --- a/test/unit/operations/get_more.test.ts +++ b/test/unit/operations/get_more.test.ts @@ -202,7 +202,7 @@ describe('GetMoreOperation', function () { new ServerDescription('a:1'), {} as any ); - sinon.stub(server, 'command').yieldsRight(); + sinon.stub(server, 'command'); it('should throw if the cursorId is undefined', async () => { const getMoreOperation = new GetMoreOperation( diff --git a/test/unit/operations/kill_cursors.test.ts b/test/unit/operations/kill_cursors.test.ts index 4468cd33c65..9b71dd087bb 100644 --- a/test/unit/operations/kill_cursors.test.ts +++ b/test/unit/operations/kill_cursors.test.ts @@ -91,7 +91,7 @@ describe('class KillCursorsOperation', () => { server, options ) as any; - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command').returns(Promise.resolve({})); await killCursorsOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith(namespace, { killCursors: namespace.collection, From a6156bc495cf6274070479bf4dcf091c1303b114 Mon Sep 17 00:00:00 2001 From: Warren James Date: Thu, 15 Feb 2024 16:44:43 -0500 Subject: [PATCH 10/24] fix unit tests --- ...rver_discovery_and_monitoring.spec.test.ts | 29 ++++++------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts index 3b8f7a199cf..613623b7613 100644 --- a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts +++ b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts @@ -334,9 +334,9 @@ async function executeSDAMTest(testData: SDAMTest) { // phase with applicationErrors simulating error's from network, timeouts, server for (const appError of phase.applicationErrors) { // Stub will return appError to SDAM machinery - const withConnectionStub = sinon - .stub(ConnectionPool.prototype, 'withConnection') - .callsFake(withConnectionStubImpl(appError)); + const checkOutStub = sinon + .stub(ConnectionPool.prototype, 'checkOut') + .callsFake(checkoutStubImpl(appError)); const server = client.topology.s.servers.get(appError.address); @@ -345,7 +345,7 @@ async function executeSDAMTest(testData: SDAMTest) { const thrownError = await res.catch(error => error); // Restore the stub before asserting anything in case of errors - withConnectionStub.restore(); + checkOutStub.restore(); const isApplicationError = error => { // These errors all come from the withConnection stub @@ -404,15 +404,13 @@ async function executeSDAMTest(testData: SDAMTest) { } } -function withConnectionStubImpl(appError) { - return function (conn, fn, callback) { - // eslint-disable-next-line @typescript-eslint/no-this-alias - const connectionPool = this; // we are stubbing `withConnection` on the `ConnectionPool` class +function checkoutStubImpl(appError) { + return async function () { + const connectionPool = this; const fakeConnection = { generation: typeof appError.generation === 'number' ? appError.generation : connectionPool.generation, - - async command(_ns, _cmd, _options) { + async command(_, __, ___) { if (appError.type === 'network') { throw new MongoNetworkError('test generated'); } else if (appError.type === 'timeout') { @@ -424,16 +422,7 @@ function withConnectionStubImpl(appError) { } } }; - - fn(undefined, fakeConnection, (fnErr, result) => { - if (typeof callback === 'function') { - if (fnErr) { - callback(fnErr); - } else { - callback(undefined, result); - } - } - }); + return fakeConnection; }; } From c0fc067bda0072aaa129b82a36ecdaaf3dd051d8 Mon Sep 17 00:00:00 2001 From: Warren James Date: Thu, 15 Feb 2024 16:45:10 -0500 Subject: [PATCH 11/24] Fix operation count --- src/sdam/server.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 438533f0b39..968b1198eb6 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -327,6 +327,7 @@ export class Server extends TypedEventEmitter { } catch (checkoutError) { if (!(checkoutError instanceof PoolClearedError)) this.handleError(checkoutError); + this.decrementOperationCount(); throw checkoutError; } } From e72fc70a77f52a2eedac9b39ebd6b7f20d4f781b Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 16 Feb 2024 11:47:22 -0500 Subject: [PATCH 12/24] remove unneeded recursion --- src/sdam/server.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 968b1198eb6..6a82ea0f094 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -311,13 +311,11 @@ export class Server extends TypedEventEmitter { // are waiting for a connection are included in the operation count. Load balanced // mode will only ever have a single server, so the operation count doesn't matter. // Incrementing the operation count above the logic to handle load balanced mode would - // require special logic to decrement it again, or would double increment (the load - // balanced code makes a recursive call). Instead, we increment the count after this - // check. + // require special logic to decrement it again, or would double increment. Instead, we + // increment the count after this check. if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) { conn = await this.pool.checkOut(); session.pin(conn); - return this.command(ns, cmd, finalOptions); } this.incrementOperationCount(); From 79b155cd743288402f280ae18a783428e01fc067 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 16 Feb 2024 11:56:29 -0500 Subject: [PATCH 13/24] Delete test/spec/server-discovery-and-monitoring/unified/standalone-topology-lifecycle.json --- .../standalone-topology-lifecycle.json | 79 ------------------- 1 file changed, 79 deletions(-) delete mode 100644 test/spec/server-discovery-and-monitoring/unified/standalone-topology-lifecycle.json diff --git a/test/spec/server-discovery-and-monitoring/unified/standalone-topology-lifecycle.json b/test/spec/server-discovery-and-monitoring/unified/standalone-topology-lifecycle.json deleted file mode 100644 index c61edb8b9ee..00000000000 --- a/test/spec/server-discovery-and-monitoring/unified/standalone-topology-lifecycle.json +++ /dev/null @@ -1,79 +0,0 @@ -{ - "description": "standalone-heartbeat", - "schemaVersion": "1.16", - "runOnRequirements": [ - { - "topologies": [ - "single" - ], - "minServerVersion": "4.4" - } - ], - "createEntities": [ - { - "client": { - "id": "setupClient" - } - } - ], - "tests": [ - { - "description": "Topology lifecycle", - "operations": [ - { - "name": "createEntities", - "object": "testRunner", - "arguments": { - "entities": [ - { - "client": { - "id": "client", - "observeEvents": [ - "topologyDescriptionChangedEvent", - "serverDescriptionChangedEvent" - ] - } - } - ] - } - }, - { - "name": "waitForEvent", - "object": "testRunner", - "arguments": { - "client": "client", - "event": { - "topologyDescriptionChangedEvent": {} - }, - "count": 2 - } - }, - { - "name": "close", - "object": "client" - } - ], - "expectEvents": [ - { - "client": "client", - "eventType": "sdam", - "events": [ - { - "topologyDescriptionChangedEvent": {} - }, - { - "serverDescriptionChangedEvent": {} - }, - { - "topologyDescriptionChangedEvent": {} - }, - { - "topologyDescriptionChangedEvent": {} - } - ], - "ignoreExtraEvents": true - } - ] - } - ] -} From d6781bbb1aa66c0891d572ff4bb214289ef98fd6 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 16 Feb 2024 12:04:03 -0500 Subject: [PATCH 14/24] test updates --- test/integration/crud/find.test.js | 2 -- test/unit/operations/find.test.ts | 4 ++-- test/unit/operations/get_more.test.ts | 12 ++++++------ test/unit/operations/kill_cursors.test.ts | 2 +- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/test/integration/crud/find.test.js b/test/integration/crud/find.test.js index 6f5144941b7..c930a86a4ea 100644 --- a/test/integration/crud/find.test.js +++ b/test/integration/crud/find.test.js @@ -36,9 +36,7 @@ describe('Find', function () { const collection = db.collection('test_find_simple'); const docs = [{ a: 2 }, { b: 3 }]; - console.log('before insert'); await collection.insert(docs, configuration.writeConcernMax()); - console.log('after insert'); const insertedDocs = await collection.find().toArray(); expect(insertedDocs).to.have.length(2); diff --git a/test/unit/operations/find.test.ts b/test/unit/operations/find.test.ts index ef190168166..bfb67d8e818 100644 --- a/test/unit/operations/find.test.ts +++ b/test/unit/operations/find.test.ts @@ -41,7 +41,7 @@ describe('FindOperation', function () { it('should build basic find command with filter', async () => { const findOperation = new FindOperation(undefined, namespace, filter); - const stub = sinon.stub(server, 'command'); + const stub = sinon.stub(server, 'command').resolves({}); await findOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith(namespace, { find: namespace.collection, @@ -54,7 +54,7 @@ describe('FindOperation', function () { oplogReplay: true }; const findOperation = new FindOperation(undefined, namespace, {}, options); - const stub = sinon.stub(server, 'command'); + const stub = sinon.stub(server, 'command').resolves({}); await findOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith( namespace, diff --git a/test/unit/operations/get_more.test.ts b/test/unit/operations/get_more.test.ts index 64246b73a22..80abcfdd176 100644 --- a/test/unit/operations/get_more.test.ts +++ b/test/unit/operations/get_more.test.ts @@ -54,7 +54,7 @@ describe('GetMoreOperation', function () { ); const opts = { ...options, documentsReturnedIn: 'nextBatch', returnFieldSelector: null }; const operation = new GetMoreOperation(namespace, cursorId, server, opts); - const stub = sinon.stub(server, 'command').returns(Promise.resolve({})); + const stub = sinon.stub(server, 'command').resolves({}); const expectedGetMoreCommand = { getMore: cursorId, @@ -104,7 +104,7 @@ describe('GetMoreOperation', function () { it('should build basic getMore command with cursorId and collection', async () => { const getMoreOperation = new GetMoreOperation(namespace, cursorId, server, {}); - const stub = sinon.stub(server, 'command'); + const stub = sinon.stub(server, 'command').resolves({}); await getMoreOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith(namespace, { getMore: cursorId, @@ -117,7 +117,7 @@ describe('GetMoreOperation', function () { batchSize: 234 }; const getMoreOperation = new GetMoreOperation(namespace, cursorId, server, options); - const stub = sinon.stub(server, 'command'); + const stub = sinon.stub(server, 'command').resolves({}); await getMoreOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith( namespace, @@ -130,7 +130,7 @@ describe('GetMoreOperation', function () { maxAwaitTimeMS: 234 }; const getMoreOperation = new GetMoreOperation(namespace, cursorId, server, options); - const stub = sinon.stub(server, 'command'); + const stub = sinon.stub(server, 'command').resolves({}); await getMoreOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith( namespace, @@ -188,7 +188,7 @@ describe('GetMoreOperation', function () { maxWireVersion: serverVersion }; const operation = new GetMoreOperation(namespace, cursorId, server, optionsWithComment); - const stub = sinon.stub(server, 'command'); + const stub = sinon.stub(server, 'command').resolves({}); await operation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith(namespace, getMore); }); @@ -202,7 +202,7 @@ describe('GetMoreOperation', function () { new ServerDescription('a:1'), {} as any ); - sinon.stub(server, 'command'); + sinon.stub(server, 'command').resolves({}); it('should throw if the cursorId is undefined', async () => { const getMoreOperation = new GetMoreOperation( diff --git a/test/unit/operations/kill_cursors.test.ts b/test/unit/operations/kill_cursors.test.ts index 9b71dd087bb..d88a7053b4e 100644 --- a/test/unit/operations/kill_cursors.test.ts +++ b/test/unit/operations/kill_cursors.test.ts @@ -91,7 +91,7 @@ describe('class KillCursorsOperation', () => { server, options ) as any; - const stub = sinon.stub(server, 'command').returns(Promise.resolve({})); + const stub = sinon.stub(server, 'command').resolves({}); await killCursorsOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith(namespace, { killCursors: namespace.collection, From 5242f084b4ff83b1755064bcf7eaa2262f0da910 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 16 Feb 2024 12:04:24 -0500 Subject: [PATCH 15/24] Add doc comment --- src/sdam/server.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 6a82ea0f094..ae2c30aa59c 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -349,6 +349,11 @@ export class Server extends TypedEventEmitter { } } + /** + * Execute a command, catching and appropriately rewrapping thrown errors and updating internal + * state as appropriate + * @internal + */ private async executeCommand( conn: Connection, ns: MongoDBNamespace, From e76790a036d4b5a29de8184b7d9d1afe0d7f0e99 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 16 Feb 2024 13:27:17 -0500 Subject: [PATCH 16/24] add doc comment --- src/cmap/connection_pool.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 8208e1b9215..4c8cc938d0b 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -544,6 +544,10 @@ export class ConnectionPool extends TypedEventEmitter { ); } + /** + * @internal + * Reauthenticate a connection + */ async reauthenticateAsync(connection: Connection): Promise { const authContext = connection.authContext; if (!authContext) { From 9a99c8a5e6245982df70ee642882099c8c6041a4 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 16 Feb 2024 13:35:25 -0500 Subject: [PATCH 17/24] fix lint --- .../assorted/server_discovery_and_monitoring.spec.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts index 613623b7613..82dd7a609d0 100644 --- a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts +++ b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts @@ -406,10 +406,10 @@ async function executeSDAMTest(testData: SDAMTest) { function checkoutStubImpl(appError) { return async function () { - const connectionPool = this; + const connectionPoolGeneration = this.generation; const fakeConnection = { generation: - typeof appError.generation === 'number' ? appError.generation : connectionPool.generation, + typeof appError.generation === 'number' ? appError.generation : connectionPoolGeneration, async command(_, __, ___) { if (appError.type === 'network') { throw new MongoNetworkError('test generated'); From cc3053a6fa474ceb2fa044e918dd9381d104dcc1 Mon Sep 17 00:00:00 2001 From: Warren James Date: Sat, 17 Feb 2024 18:51:42 -0500 Subject: [PATCH 18/24] Update pinning logic --- src/sdam/server.ts | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/sdam/server.ts b/src/sdam/server.ts index ae2c30aa59c..3a9ec544c72 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -313,10 +313,9 @@ export class Server extends TypedEventEmitter { // Incrementing the operation count above the logic to handle load balanced mode would // require special logic to decrement it again, or would double increment. Instead, we // increment the count after this check. - if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) { - conn = await this.pool.checkOut(); - session.pin(conn); - } + + const connShouldBePinned = + this.loadBalanced && session != null && isPinnableCommand(cmd, session); this.incrementOperationCount(); if (!conn) { @@ -324,12 +323,15 @@ export class Server extends TypedEventEmitter { conn = await this.pool.checkOut(); } catch (checkoutError) { if (!(checkoutError instanceof PoolClearedError)) this.handleError(checkoutError); - this.decrementOperationCount(); throw checkoutError; } } + if (connShouldBePinned && !session.isPinned) { + session.pin(conn); + } + // Reauth flow try { return await this.executeCommand(conn, ns, cmd, finalOptions); @@ -344,8 +346,10 @@ export class Server extends TypedEventEmitter { throw operationError; } } finally { - this.pool.checkIn(conn); - this.decrementOperationCount(); + if (!connShouldBePinned) { + this.pool.checkIn(conn); + this.decrementOperationCount(); + } } } @@ -362,8 +366,8 @@ export class Server extends TypedEventEmitter { ): Promise { try { return await conn.command(ns, cmd, options); - } catch (commandOptions) { - throw this.decorateAndHandleError(conn, cmd, options, commandOptions); + } catch (commandError) { + throw this.decorateAndHandleError(conn, cmd, options, commandError); } } @@ -526,6 +530,7 @@ function isPinnableCommand(cmd: Document, session?: ClientSession): boolean { if (session) { return ( session.inTransaction() || + (session.transaction.isCommitted && 'commitTransaction' in cmd) || 'aggregate' in cmd || 'find' in cmd || 'getMore' in cmd || From 7400c87db1ff437cb7bc2d06bafa3c3873de300c Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 21 Feb 2024 16:53:43 -0500 Subject: [PATCH 19/24] fix: use authProviders getOrCreateProvider --- src/cmap/connection_pool.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 4c8cc938d0b..8b5bd42c9b9 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -561,7 +561,9 @@ export class ConnectionPool extends TypedEventEmitter { } const resolvedCredentials = credentials.resolveAuthMechanism(connection.hello); - const provider = AUTH_PROVIDERS.get(resolvedCredentials.mechanism); + const provider = this[kServer].topology.client.s.authProviders.getOrCreateProvider( + resolvedCredentials.mechanism + ); if (!provider) { throw new MongoMissingCredentialsError( From 7ae6dc3407eb70c632d122ed2b6c0c5a4b7bf638 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 21 Feb 2024 17:33:47 -0500 Subject: [PATCH 20/24] fix: always decrement operation count --- src/sdam/server.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 3a9ec544c72..1a62c44a3e9 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -322,8 +322,8 @@ export class Server extends TypedEventEmitter { try { conn = await this.pool.checkOut(); } catch (checkoutError) { - if (!(checkoutError instanceof PoolClearedError)) this.handleError(checkoutError); this.decrementOperationCount(); + if (!(checkoutError instanceof PoolClearedError)) this.handleError(checkoutError); throw checkoutError; } } @@ -346,9 +346,9 @@ export class Server extends TypedEventEmitter { throw operationError; } } finally { + this.decrementOperationCount(); if (!connShouldBePinned) { this.pool.checkIn(conn); - this.decrementOperationCount(); } } } From 4aa184e08252f0d21cbb82040cf0330f5b83bd5c Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 21 Feb 2024 18:02:57 -0500 Subject: [PATCH 21/24] chore: rename reauthenticateAsync to reauthenticate --- src/cmap/connection_pool.ts | 2 +- src/sdam/server.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 8b5bd42c9b9..46b6c1d747a 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -548,7 +548,7 @@ export class ConnectionPool extends TypedEventEmitter { * @internal * Reauthenticate a connection */ - async reauthenticateAsync(connection: Connection): Promise { + async reauthenticate(connection: Connection): Promise { const authContext = connection.authContext; if (!authContext) { throw new MongoRuntimeError('No auth context found on connection.'); diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 1a62c44a3e9..d2c889b2b83 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -340,7 +340,7 @@ export class Server extends TypedEventEmitter { operationError instanceof MongoError && operationError.code === MONGODB_ERROR_CODES.Reauthenticate ) { - conn = await this.pool.reauthenticateAsync(conn); + conn = await this.pool.reauthenticate(conn); return await this.executeCommand(conn, ns, cmd, finalOptions); } else { throw operationError; From a523abd791ce07848f1de32109323b689387449e Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 22 Feb 2024 16:07:39 -0500 Subject: [PATCH 22/24] comments --- src/cmap/connection_pool.ts | 4 +- src/sdam/server.ts | 75 +++++++------------ .../crud/abstract_operation.test.ts | 4 +- .../non-server-retryable_writes.test.ts | 2 +- .../server-selection/operation_count.test.ts | 4 +- 5 files changed, 34 insertions(+), 55 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 46b6c1d747a..435b66936d5 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -548,7 +548,7 @@ export class ConnectionPool extends TypedEventEmitter { * @internal * Reauthenticate a connection */ - async reauthenticate(connection: Connection): Promise { + async reauthenticate(connection: Connection): Promise { const authContext = connection.authContext; if (!authContext) { throw new MongoRuntimeError('No auth context found on connection.'); @@ -573,7 +573,7 @@ export class ConnectionPool extends TypedEventEmitter { await provider.reauth(authContext); - return connection; + return; } /** Clear the min pool size timer */ diff --git a/src/sdam/server.ts b/src/sdam/server.ts index d2c889b2b83..440eae9f756 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -30,6 +30,7 @@ import { MongoInvalidArgumentError, MongoNetworkError, MongoNetworkTimeoutError, + MongoRuntimeError, MongoServerClosedError, type MongoServerError, needsRetryableWriteLabel @@ -302,25 +303,13 @@ export class Server extends TypedEventEmitter { const session = finalOptions.session; let conn = session?.pinnedConnection; - // NOTE: This is a hack! We can't retrieve the connections used for executing an operation - // (and prevent them from being checked back in) at the point of operation execution. - // This should be considered as part of the work for NODE-2882 - // NOTE: - // When incrementing operation count, it's important that we increment it before we - // attempt to check out a connection from the pool. This ensures that operations that - // are waiting for a connection are included in the operation count. Load balanced - // mode will only ever have a single server, so the operation count doesn't matter. - // Incrementing the operation count above the logic to handle load balanced mode would - // require special logic to decrement it again, or would double increment. Instead, we - // increment the count after this check. - - const connShouldBePinned = - this.loadBalanced && session != null && isPinnableCommand(cmd, session); - this.incrementOperationCount(); - if (!conn) { + if (conn == null) { try { conn = await this.pool.checkOut(); + if (this.loadBalanced && isPinnableCommand(cmd, session)) { + session?.pin(conn); + } } catch (checkoutError) { this.decrementOperationCount(); if (!(checkoutError instanceof PoolClearedError)) this.handleError(checkoutError); @@ -328,47 +317,35 @@ export class Server extends TypedEventEmitter { } } - if (connShouldBePinned && !session.isPinned) { - session.pin(conn); - } - - // Reauth flow try { - return await this.executeCommand(conn, ns, cmd, finalOptions); + try { + return await conn.command(ns, cmd, finalOptions); + } catch (commandError) { + this.decorateCommandError(conn, cmd, finalOptions, commandError); + } } catch (operationError) { if ( operationError instanceof MongoError && operationError.code === MONGODB_ERROR_CODES.Reauthenticate ) { - conn = await this.pool.reauthenticate(conn); - return await this.executeCommand(conn, ns, cmd, finalOptions); + try { + await this.pool.reauthenticate(conn); + return await conn.command(ns, cmd, finalOptions); + } catch (commandError) { + this.decorateCommandError(conn, cmd, finalOptions, commandError); + } } else { throw operationError; } } finally { this.decrementOperationCount(); - if (!connShouldBePinned) { + if (session?.pinnedConnection !== conn) { this.pool.checkIn(conn); } } - } - /** - * Execute a command, catching and appropriately rewrapping thrown errors and updating internal - * state as appropriate - * @internal - */ - private async executeCommand( - conn: Connection, - ns: MongoDBNamespace, - cmd: Document, - options?: CommandOptions - ): Promise { - try { - return await conn.command(ns, cmd, options); - } catch (commandError) { - throw this.decorateAndHandleError(conn, cmd, options, commandError); - } + // TS thinks this is reachable + throw 'a'; } /** @@ -423,14 +400,18 @@ export class Server extends TypedEventEmitter { * Ensure that error is properly decorated and internal state is updated before throwing * @internal */ - private decorateAndHandleError( + private decorateCommandError( connection: Connection, cmd: Document, options: CommandOptions | GetMoreOptions | undefined, - error: AnyError - ): AnyError { + error: unknown + ): never { + if (typeof error !== 'object' || error == null || !('name' in error)) { + throw new MongoRuntimeError('An unexpected error type: ' + typeof error); + } + const session = options?.session; - if (error.name === 'AbortError' && error.cause instanceof MongoError) { + if (error.name === 'AbortError' && 'cause' in error && error.cause instanceof MongoError) { error = error.cause; } @@ -483,7 +464,7 @@ export class Server extends TypedEventEmitter { this.handleError(error, connection); - return error; + throw error; } /** diff --git a/test/integration/crud/abstract_operation.test.ts b/test/integration/crud/abstract_operation.test.ts index 7632ac6f598..8134a7d437f 100644 --- a/test/integration/crud/abstract_operation.test.ts +++ b/test/integration/crud/abstract_operation.test.ts @@ -351,9 +351,7 @@ describe('abstract operation', async function () { const subclassInstance = subclassCreator(); const yieldDoc = subclassType.name === 'ProfilingLevelOperation' ? { ok: 1, was: 1 } : { ok: 1 }; - const cmdCallerStub = sinon - .stub(Server.prototype, 'command') - .returns(Promise.resolve(yieldDoc)); + const cmdCallerStub = sinon.stub(Server.prototype, 'command').resolves(yieldDoc); if (sameServerOnlyOperationSubclasses.includes(subclassType.name.toString())) { await subclassInstance.execute(constructorServer, client.session); } else { diff --git a/test/integration/retryable-writes/non-server-retryable_writes.test.ts b/test/integration/retryable-writes/non-server-retryable_writes.test.ts index c915f200c15..8a4190615c2 100644 --- a/test/integration/retryable-writes/non-server-retryable_writes.test.ts +++ b/test/integration/retryable-writes/non-server-retryable_writes.test.ts @@ -33,7 +33,7 @@ describe('Non Server Retryable Writes', function () { { requires: { topology: 'replicaset', mongodb: '>=4.2.9' } }, async () => { const serverCommandStub = sinon.stub(Server.prototype, 'command'); - serverCommandStub.onCall(0).returns(Promise.reject(new PoolClearedError('error'))); + serverCommandStub.onCall(0).rejects(new PoolClearedError('error')); serverCommandStub .onCall(1) .returns( diff --git a/test/integration/server-selection/operation_count.test.ts b/test/integration/server-selection/operation_count.test.ts index ac415baa6a4..9ab7cd8a136 100644 --- a/test/integration/server-selection/operation_count.test.ts +++ b/test/integration/server-selection/operation_count.test.ts @@ -105,7 +105,7 @@ describe('Server Operation Count Tests', function () { sinon .stub(ConnectionPool.prototype, 'checkOut') - .returns(Promise.reject(new Error('unable to checkout connection'))); + .rejects(new Error('unable to checkout connection')); const commandSpy = sinon.spy(server, 'command'); const error = await collection.findOne({ count: 1 }).catch(e => e); @@ -172,7 +172,7 @@ describe('Server Operation Count Tests', function () { sinon .stub(ConnectionPool.prototype, 'checkOut') - .returns(Promise.reject(new Error('unable to checkout connection'))); + .rejects(new Error('unable to checkout connection')); const commandSpy = sinon.spy(server, 'command'); const error = await collection.insertOne({ count: 1 }).catch(e => e); From c3a0479e333f2232ea88f1d96f90688e03ba0ac1 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 22 Feb 2024 16:40:50 -0500 Subject: [PATCH 23/24] chore: fix fn exit --- src/sdam/server.ts | 42 ++++++++++++++++++------------------------ 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 440eae9f756..a9c05c2c28e 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -318,34 +318,28 @@ export class Server extends TypedEventEmitter { } try { - try { - return await conn.command(ns, cmd, finalOptions); - } catch (commandError) { - this.decorateCommandError(conn, cmd, finalOptions, commandError); - } - } catch (operationError) { - if ( - operationError instanceof MongoError && - operationError.code === MONGODB_ERROR_CODES.Reauthenticate - ) { + let commandError: unknown; + for (let attempts = 0; attempts < 2; attempts++) { try { - await this.pool.reauthenticate(conn); - return await conn.command(ns, cmd, finalOptions); - } catch (commandError) { - this.decorateCommandError(conn, cmd, finalOptions, commandError); + return await conn.command(ns, cmd, options); + } catch (error: unknown) { + commandError = error; + if ( + attempts === 0 && + commandError instanceof MongoError && + commandError.code === MONGODB_ERROR_CODES.Reauthenticate + ) { + await this.pool.reauthenticate(conn); + } } - } else { - throw operationError; } + throw this.decorateCommandError(conn, cmd, options, commandError); } finally { this.decrementOperationCount(); if (session?.pinnedConnection !== conn) { this.pool.checkIn(conn); } } - - // TS thinks this is reachable - throw 'a'; } /** @@ -405,25 +399,25 @@ export class Server extends TypedEventEmitter { cmd: Document, options: CommandOptions | GetMoreOptions | undefined, error: unknown - ): never { + ): Error { if (typeof error !== 'object' || error == null || !('name' in error)) { throw new MongoRuntimeError('An unexpected error type: ' + typeof error); } - const session = options?.session; if (error.name === 'AbortError' && 'cause' in error && error.cause instanceof MongoError) { error = error.cause; } if (!(error instanceof MongoError)) { // Node.js or some other error we have not special handling for - throw error; + return error as Error; } if (connectionIsStale(this.pool, connection)) { - throw error; + return error; } + const session = options?.session; if (error instanceof MongoNetworkError) { if (session && !session.hasEnded && session.serverSession) { session.serverSession.isDirty = true; @@ -464,7 +458,7 @@ export class Server extends TypedEventEmitter { this.handleError(error, connection); - throw error; + return error; } /** From 434219ee43cc1be7e3fb775f3497fed1d324f592 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 22 Feb 2024 17:02:39 -0500 Subject: [PATCH 24/24] fix: inline executeCommand --- src/sdam/server.ts | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/sdam/server.ts b/src/sdam/server.ts index a9c05c2c28e..13cadd9f7fc 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -318,22 +318,25 @@ export class Server extends TypedEventEmitter { } try { - let commandError: unknown; - for (let attempts = 0; attempts < 2; attempts++) { + try { + return await conn.command(ns, cmd, finalOptions); + } catch (commandError) { + throw this.decorateCommandError(conn, cmd, finalOptions, commandError); + } + } catch (operationError) { + if ( + operationError instanceof MongoError && + operationError.code === MONGODB_ERROR_CODES.Reauthenticate + ) { + await this.pool.reauthenticate(conn); try { - return await conn.command(ns, cmd, options); - } catch (error: unknown) { - commandError = error; - if ( - attempts === 0 && - commandError instanceof MongoError && - commandError.code === MONGODB_ERROR_CODES.Reauthenticate - ) { - await this.pool.reauthenticate(conn); - } + return await conn.command(ns, cmd, finalOptions); + } catch (commandError) { + throw this.decorateCommandError(conn, cmd, finalOptions, commandError); } + } else { + throw operationError; } - throw this.decorateCommandError(conn, cmd, options, commandError); } finally { this.decrementOperationCount(); if (session?.pinnedConnection !== conn) {