diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index b5e0818061c..435b66936d5 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -17,8 +17,7 @@ import { } from '../constants'; import { type AnyError, - MONGODB_ERROR_CODES, - MongoError, + type MongoError, MongoInvalidArgumentError, MongoMissingCredentialsError, MongoNetworkError, @@ -27,7 +26,14 @@ import { } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Server } from '../sdam/server'; -import { type Callback, eachAsync, List, makeCounter, TimeoutController } from '../utils'; +import { + type Callback, + eachAsync, + List, + makeCounter, + promiseWithResolvers, + TimeoutController +} from '../utils'; import { connect } from './connect'; import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection'; import { @@ -100,7 +106,8 @@ export interface ConnectionPoolOptions extends Omit; + resolve: (conn: Connection) => void; + reject: (err: AnyError) => void; timeoutController: TimeoutController; [kCancelled]?: boolean; } @@ -350,7 +357,7 @@ export class ConnectionPool extends TypedEventEmitter { * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or * explicitly destroyed by the new owner. */ - checkOut(callback: Callback): void { + async checkOut(): Promise { this.emitAndLog( ConnectionPool.CONNECTION_CHECK_OUT_STARTED, new ConnectionCheckOutStartedEvent(this) @@ -358,8 +365,10 @@ export class ConnectionPool extends TypedEventEmitter { const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS; + const { promise, resolve, reject } = promiseWithResolvers(); const waitQueueMember: WaitQueueMember = { - callback, + resolve, + reject, timeoutController: new TimeoutController(waitQueueTimeoutMS) }; waitQueueMember.timeoutController.signal.addEventListener('abort', () => { @@ -370,7 +379,7 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new ConnectionCheckOutFailedEvent(this, 'timeout') ); - waitQueueMember.callback( + waitQueueMember.reject( new WaitQueueTimeoutError( this.loadBalanced ? this.waitQueueErrorMetrics() @@ -382,6 +391,8 @@ export class ConnectionPool extends TypedEventEmitter { this[kWaitQueue].push(waitQueueMember); process.nextTick(() => this.processWaitQueue()); + + return promise; } /** @@ -534,115 +545,35 @@ export class ConnectionPool extends TypedEventEmitter { } /** - * Runs a lambda with an implicitly checked out connection, checking that connection back in when the lambda - * has completed by calling back. - * - * NOTE: please note the required signature of `fn` - * - * @remarks When in load balancer mode, connections can be pinned to cursors or transactions. - * In these cases we pass the connection in to this method to ensure it is used and a new - * connection is not checked out. - * - * @param conn - A pinned connection for use in load balancing mode. - * @param fn - A function which operates on a managed connection - * @param callback - The original callback - */ - withConnection( - conn: Connection | undefined, - fn: WithConnectionCallback, - callback: Callback - ): void { - if (conn) { - // use the provided connection, and do _not_ check it in after execution - fn(undefined, conn, (fnErr, result) => { - if (fnErr) { - return this.withReauthentication(fnErr, conn, fn, callback); - } - callback(undefined, result); - }); - return; - } - - this.checkOut((err, conn) => { - // don't callback with `err` here, we might want to act upon it inside `fn` - fn(err as MongoError, conn, (fnErr, result) => { - if (fnErr) { - if (conn) { - this.withReauthentication(fnErr, conn, fn, callback); - } else { - callback(fnErr); - } - } else { - callback(undefined, result); - } - - if (conn) { - this.checkIn(conn); - } - }); - }); - } - - private withReauthentication( - fnErr: AnyError, - conn: Connection, - fn: WithConnectionCallback, - callback: Callback - ) { - if (fnErr instanceof MongoError && fnErr.code === MONGODB_ERROR_CODES.Reauthenticate) { - this.reauthenticate(conn, fn, (error, res) => { - if (error) { - return callback(error); - } - callback(undefined, res); - }); - } else { - callback(fnErr); - } - } - - /** - * Reauthenticate on the same connection and then retry the operation. + * @internal + * Reauthenticate a connection */ - private reauthenticate( - connection: Connection, - fn: WithConnectionCallback, - callback: Callback - ): void { + async reauthenticate(connection: Connection): Promise { const authContext = connection.authContext; if (!authContext) { - return callback(new MongoRuntimeError('No auth context found on connection.')); + throw new MongoRuntimeError('No auth context found on connection.'); } const credentials = authContext.credentials; if (!credentials) { - return callback( - new MongoMissingCredentialsError( - 'Connection is missing credentials when asked to reauthenticate' - ) + throw new MongoMissingCredentialsError( + 'Connection is missing credentials when asked to reauthenticate' ); } + const resolvedCredentials = credentials.resolveAuthMechanism(connection.hello); const provider = this[kServer].topology.client.s.authProviders.getOrCreateProvider( resolvedCredentials.mechanism ); + if (!provider) { - return callback( - new MongoMissingCredentialsError( - `Reauthenticate failed due to no auth provider for ${credentials.mechanism}` - ) + throw new MongoMissingCredentialsError( + `Reauthenticate failed due to no auth provider for ${credentials.mechanism}` ); } - provider.reauth(authContext).then( - () => { - fn(undefined, connection, (fnErr, fnResult) => { - if (fnErr) { - return callback(fnErr); - } - callback(undefined, fnResult); - }); - }, - error => callback(error) - ); + + await provider.reauth(authContext); + + return; } /** Clear the min pool size timer */ @@ -841,7 +772,7 @@ export class ConnectionPool extends TypedEventEmitter { ); waitQueueMember.timeoutController.clear(); this[kWaitQueue].shift(); - waitQueueMember.callback(error); + waitQueueMember.reject(error); continue; } @@ -863,7 +794,7 @@ export class ConnectionPool extends TypedEventEmitter { waitQueueMember.timeoutController.clear(); this[kWaitQueue].shift(); - waitQueueMember.callback(undefined, connection); + waitQueueMember.resolve(connection); } } @@ -889,16 +820,17 @@ export class ConnectionPool extends TypedEventEmitter { // TODO(NODE-5192): Remove this cast new ConnectionCheckOutFailedEvent(this, 'connectionError', err as MongoError) ); + waitQueueMember.reject(err); } else if (connection) { this[kCheckedOut].add(connection); this.emitAndLog( ConnectionPool.CONNECTION_CHECKED_OUT, new ConnectionCheckedOutEvent(this, connection) ); + waitQueueMember.resolve(connection); } waitQueueMember.timeoutController.clear(); - waitQueueMember.callback(err, connection); } process.nextTick(() => this.processWaitQueue()); }); diff --git a/src/operations/command.ts b/src/operations/command.ts index 4cff77eb174..3767fa866f6 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -152,6 +152,6 @@ export abstract class CommandOperation extends AbstractOperation { cmd = decorateWithExplain(cmd, this.explain); } - return server.commandAsync(this.ns, cmd, options); + return server.command(this.ns, cmd, options); } } diff --git a/src/operations/find.ts b/src/operations/find.ts index 85ebf1edcd1..abdda7ede1b 100644 --- a/src/operations/find.ts +++ b/src/operations/find.ts @@ -111,7 +111,7 @@ export class FindOperation extends CommandOperation { findCommand = decorateWithExplain(findCommand, this.explain); } - return server.commandAsync(this.ns, findCommand, { + return server.command(this.ns, findCommand, { ...this.options, ...this.bsonOptions, documentsReturnedIn: 'firstBatch', diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts index b7059a46c4b..5cf01904d6c 100644 --- a/src/operations/get_more.ts +++ b/src/operations/get_more.ts @@ -96,7 +96,7 @@ export class GetMoreOperation extends AbstractOperation { ...this.options }; - return server.commandAsync(this.ns, getMoreCmd, commandOptions); + return server.command(this.ns, getMoreCmd, commandOptions); } } diff --git a/src/operations/kill_cursors.ts b/src/operations/kill_cursors.ts index ffbf0a3ad44..73cf93cb469 100644 --- a/src/operations/kill_cursors.ts +++ b/src/operations/kill_cursors.ts @@ -46,7 +46,7 @@ export class KillCursorsOperation extends AbstractOperation { cursors: [this.cursorId] }; try { - await server.commandAsync(this.ns, killCursorsCommand, { session }); + await server.command(this.ns, killCursorsCommand, { session }); } catch { // The driver should never emit errors from killCursors, this is spec-ed behavior } diff --git a/src/operations/run_command.ts b/src/operations/run_command.ts index d89d2e229c2..c288d266be3 100644 --- a/src/operations/run_command.ts +++ b/src/operations/run_command.ts @@ -28,7 +28,7 @@ export class RunCommandOperation extends AbstractOperation { override async execute(server: Server, session: ClientSession | undefined): Promise { this.server = server; - return server.commandAsync(this.ns, this.command, { + return server.command(this.ns, this.command, { ...this.options, readPreference: this.readPreference, session @@ -54,7 +54,7 @@ export class RunAdminCommandOperation extends AbstractOperation override async execute(server: Server, session: ClientSession | undefined): Promise { this.server = server; - return server.commandAsync(this.ns, this.command, { + return server.command(this.ns, this.command, { ...this.options, readPreference: this.readPreference, session diff --git a/src/operations/search_indexes/create.ts b/src/operations/search_indexes/create.ts index 96b38a159be..054ba026297 100644 --- a/src/operations/search_indexes/create.ts +++ b/src/operations/search_indexes/create.ts @@ -36,7 +36,7 @@ export class CreateSearchIndexesOperation extends AbstractOperation { indexes: this.descriptions }; - const res = await server.commandAsync(namespace, command, { session }); + const res = await server.command(namespace, command, { session }); const indexesCreated: Array<{ name: string }> = res?.indexesCreated ?? []; return indexesCreated.map(({ name }) => name); diff --git a/src/operations/search_indexes/drop.ts b/src/operations/search_indexes/drop.ts index f17e3365e62..05a56f1c47c 100644 --- a/src/operations/search_indexes/drop.ts +++ b/src/operations/search_indexes/drop.ts @@ -28,7 +28,7 @@ export class DropSearchIndexOperation extends AbstractOperation { } try { - await server.commandAsync(namespace, command, { session }); + await server.command(namespace, command, { session }); } catch (error) { const isNamespaceNotFoundError = error instanceof MongoServerError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound; diff --git a/src/operations/search_indexes/update.ts b/src/operations/search_indexes/update.ts index fde2230a7c9..aad7f93536c 100644 --- a/src/operations/search_indexes/update.ts +++ b/src/operations/search_indexes/update.ts @@ -27,7 +27,7 @@ export class UpdateSearchIndexOperation extends AbstractOperation { definition: this.definition }; - await server.commandAsync(namespace, command, { session }); + await server.command(namespace, command, { session }); return; } } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 1eefbb01df4..13cadd9f7fc 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -1,5 +1,3 @@ -import { promisify } from 'util'; - import type { Document } from '../bson'; import { type AutoEncrypter } from '../client-side-encryption/auto_encrypter'; import { type CommandOptions, Connection, type DestroyOptions } from '../cmap/connection'; @@ -26,6 +24,7 @@ import { isNetworkErrorBeforeHandshake, isNodeShuttingDownError, isSDAMUnrecoverableError, + MONGODB_ERROR_CODES, MongoError, MongoErrorLabel, MongoInvalidArgumentError, @@ -34,7 +33,6 @@ import { MongoRuntimeError, MongoServerClosedError, type MongoServerError, - MongoUnexpectedServerResponseError, needsRetryableWriteLabel } from '../error'; import type { ServerApi } from '../mongo_client'; @@ -115,7 +113,6 @@ export class Server extends TypedEventEmitter { pool: ConnectionPool; serverApi?: ServerApi; hello?: Document; - commandAsync: (ns: MongoDBNamespace, cmd: Document, options: CommandOptions) => Promise; monitor: Monitor | null; /** @event */ @@ -139,16 +136,6 @@ export class Server extends TypedEventEmitter { constructor(topology: Topology, description: ServerDescription, options: ServerOptions) { super(); - this.commandAsync = promisify( - ( - ns: MongoDBNamespace, - cmd: Document, - options: CommandOptions, - // callback type defines Document result because result is never nullish when it succeeds, otherwise promise rejects - callback: (error: Error | undefined, result: Document) => void - ) => this.command(ns, cmd, options, callback as any) - ); - this.serverApi = options.serverApi; const poolOptions = { hostAddress: description.hostAddress, ...options }; @@ -293,23 +280,13 @@ export class Server extends TypedEventEmitter { * Execute a command * @internal */ - command( - ns: MongoDBNamespace, - cmd: Document, - options: CommandOptions, - callback: Callback - ): void { - if (callback == null) { - throw new MongoInvalidArgumentError('Callback must be provided'); - } - + async command(ns: MongoDBNamespace, cmd: Document, options: CommandOptions): Promise { if (ns.db == null || typeof ns === 'string') { throw new MongoInvalidArgumentError('Namespace must not be a string'); } if (this.s.state === STATE_CLOSING || this.s.state === STATE_CLOSED) { - callback(new MongoServerClosedError()); - return; + throw new MongoServerClosedError(); } // Clone the options @@ -324,60 +301,48 @@ export class Server extends TypedEventEmitter { } const session = finalOptions.session; - const conn = session?.pinnedConnection; - - // NOTE: This is a hack! We can't retrieve the connections used for executing an operation - // (and prevent them from being checked back in) at the point of operation execution. - // This should be considered as part of the work for NODE-2882 - // NOTE: - // When incrementing operation count, it's important that we increment it before we - // attempt to check out a connection from the pool. This ensures that operations that - // are waiting for a connection are included in the operation count. Load balanced - // mode will only ever have a single server, so the operation count doesn't matter. - // Incrementing the operation count above the logic to handle load balanced mode would - // require special logic to decrement it again, or would double increment (the load - // balanced code makes a recursive call). Instead, we increment the count after this - // check. - if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) { - this.pool.checkOut((err, checkedOut) => { - if (err || checkedOut == null) { - if (callback) return callback(err); - return; - } - - session.pin(checkedOut); - this.command(ns, cmd, finalOptions, callback); - }); - return; - } + let conn = session?.pinnedConnection; this.incrementOperationCount(); - - this.pool.withConnection( - conn, - (err, conn, cb) => { - if (err || !conn) { - this.decrementOperationCount(); - if (!err) { - return cb(new MongoRuntimeError('Failed to create connection without error')); - } - if (!(err instanceof PoolClearedError)) { - this.handleError(err); - } - return cb(err); + if (conn == null) { + try { + conn = await this.pool.checkOut(); + if (this.loadBalanced && isPinnableCommand(cmd, session)) { + session?.pin(conn); } + } catch (checkoutError) { + this.decrementOperationCount(); + if (!(checkoutError instanceof PoolClearedError)) this.handleError(checkoutError); + throw checkoutError; + } + } - const handler = makeOperationHandler(this, conn, cmd, finalOptions, (error, response) => { - this.decrementOperationCount(); - cb(error, response); - }); - conn.command(ns, cmd, finalOptions).then( - r => handler(undefined, r), - e => handler(e) - ); - }, - callback - ); + try { + try { + return await conn.command(ns, cmd, finalOptions); + } catch (commandError) { + throw this.decorateCommandError(conn, cmd, finalOptions, commandError); + } + } catch (operationError) { + if ( + operationError instanceof MongoError && + operationError.code === MONGODB_ERROR_CODES.Reauthenticate + ) { + await this.pool.reauthenticate(conn); + try { + return await conn.command(ns, cmd, finalOptions); + } catch (commandError) { + throw this.decorateCommandError(conn, cmd, finalOptions, commandError); + } + } else { + throw operationError; + } + } finally { + this.decrementOperationCount(); + if (session?.pinnedConnection !== conn) { + this.pool.checkIn(conn); + } + } } /** @@ -428,6 +393,77 @@ export class Server extends TypedEventEmitter { } } + /** + * Ensure that error is properly decorated and internal state is updated before throwing + * @internal + */ + private decorateCommandError( + connection: Connection, + cmd: Document, + options: CommandOptions | GetMoreOptions | undefined, + error: unknown + ): Error { + if (typeof error !== 'object' || error == null || !('name' in error)) { + throw new MongoRuntimeError('An unexpected error type: ' + typeof error); + } + + if (error.name === 'AbortError' && 'cause' in error && error.cause instanceof MongoError) { + error = error.cause; + } + + if (!(error instanceof MongoError)) { + // Node.js or some other error we have not special handling for + return error as Error; + } + + if (connectionIsStale(this.pool, connection)) { + return error; + } + + const session = options?.session; + if (error instanceof MongoNetworkError) { + if (session && !session.hasEnded && session.serverSession) { + session.serverSession.isDirty = true; + } + + // inActiveTransaction check handles commit and abort. + if ( + inActiveTransaction(session, cmd) && + !error.hasErrorLabel(MongoErrorLabel.TransientTransactionError) + ) { + error.addErrorLabel(MongoErrorLabel.TransientTransactionError); + } + + if ( + (isRetryableWritesEnabled(this.topology) || isTransactionCommand(cmd)) && + supportsRetryableWrites(this) && + !inActiveTransaction(session, cmd) + ) { + error.addErrorLabel(MongoErrorLabel.RetryableWriteError); + } + } else { + if ( + (isRetryableWritesEnabled(this.topology) || isTransactionCommand(cmd)) && + needsRetryableWriteLabel(error, maxWireVersion(this)) && + !inActiveTransaction(session, cmd) + ) { + error.addErrorLabel(MongoErrorLabel.RetryableWriteError); + } + } + + if ( + session && + session.isPinned && + error.hasErrorLabel(MongoErrorLabel.TransientTransactionError) + ) { + session.unpin({ force: true }); + } + + this.handleError(error, connection); + + return error; + } + /** * Decrement the operation count, returning the new count. */ @@ -472,6 +508,7 @@ function isPinnableCommand(cmd: Document, session?: ClientSession): boolean { if (session) { return ( session.inTransaction() || + (session.transaction.isCommitted && 'commitTransaction' in cmd) || 'aggregate' in cmd || 'find' in cmd || 'getMore' in cmd || @@ -508,82 +545,3 @@ function inActiveTransaction(session: ClientSession | undefined, cmd: Document) function isRetryableWritesEnabled(topology: Topology) { return topology.s.options.retryWrites !== false; } - -function makeOperationHandler( - server: Server, - connection: Connection, - cmd: Document, - options: CommandOptions | GetMoreOptions | undefined, - callback: Callback -): Callback { - const session = options?.session; - return function handleOperationResult(error, result) { - // We should not swallow an error if it is present. - if (error == null && result != null) { - return callback(undefined, result); - } - - if (options != null && 'noResponse' in options && options.noResponse === true) { - return callback(undefined, null); - } - - if (!error) { - return callback(new MongoUnexpectedServerResponseError('Empty response with no error')); - } - - if (error.name === 'AbortError' && error.cause instanceof MongoError) { - error = error.cause; - } - - if (!(error instanceof MongoError)) { - // Node.js or some other error we have not special handling for - return callback(error); - } - - if (connectionIsStale(server.pool, connection)) { - return callback(error); - } - - if (error instanceof MongoNetworkError) { - if (session && !session.hasEnded && session.serverSession) { - session.serverSession.isDirty = true; - } - - // inActiveTransaction check handles commit and abort. - if ( - inActiveTransaction(session, cmd) && - !error.hasErrorLabel(MongoErrorLabel.TransientTransactionError) - ) { - error.addErrorLabel(MongoErrorLabel.TransientTransactionError); - } - - if ( - (isRetryableWritesEnabled(server.topology) || isTransactionCommand(cmd)) && - supportsRetryableWrites(server) && - !inActiveTransaction(session, cmd) - ) { - error.addErrorLabel(MongoErrorLabel.RetryableWriteError); - } - } else { - if ( - (isRetryableWritesEnabled(server.topology) || isTransactionCommand(cmd)) && - needsRetryableWriteLabel(error, maxWireVersion(server)) && - !inActiveTransaction(session, cmd) - ) { - error.addErrorLabel(MongoErrorLabel.RetryableWriteError); - } - } - - if ( - session && - session.isPinned && - error.hasErrorLabel(MongoErrorLabel.TransientTransactionError) - ) { - session.unpin({ force: true }); - } - - server.handleError(error, connection); - - return callback(error); - }; -} diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 4bf816f2380..400db63870f 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -473,17 +473,13 @@ export class Topology extends TypedEventEmitter { const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true; if (!skipPingOnConnect && server && this.s.credentials) { - server.command(ns('admin.$cmd'), { ping: 1 }, {}, err => { - if (err) { - return exitWithError(err); - } - + server.command(ns('admin.$cmd'), { ping: 1 }, {}).then(() => { stateTransition(this, STATE_CONNECTED); this.emit(Topology.OPEN, this); this.emit(Topology.CONNECT, this); callback?.(undefined, this); - }); + }, exitWithError); return; } diff --git a/test/integration/crud/abstract_operation.test.ts b/test/integration/crud/abstract_operation.test.ts index 5e70c531c99..8134a7d437f 100644 --- a/test/integration/crud/abstract_operation.test.ts +++ b/test/integration/crud/abstract_operation.test.ts @@ -351,9 +351,7 @@ describe('abstract operation', async function () { const subclassInstance = subclassCreator(); const yieldDoc = subclassType.name === 'ProfilingLevelOperation' ? { ok: 1, was: 1 } : { ok: 1 }; - const cmdCallerStub = sinon - .stub(Server.prototype, 'command') - .yieldsRight(undefined, yieldDoc); + const cmdCallerStub = sinon.stub(Server.prototype, 'command').resolves(yieldDoc); if (sameServerOnlyOperationSubclasses.includes(subclassType.name.toString())) { await subclassInstance.execute(constructorServer, client.session); } else { diff --git a/test/integration/retryable-writes/non-server-retryable_writes.test.ts b/test/integration/retryable-writes/non-server-retryable_writes.test.ts index 7ef748f5600..8a4190615c2 100644 --- a/test/integration/retryable-writes/non-server-retryable_writes.test.ts +++ b/test/integration/retryable-writes/non-server-retryable_writes.test.ts @@ -33,11 +33,13 @@ describe('Non Server Retryable Writes', function () { { requires: { topology: 'replicaset', mongodb: '>=4.2.9' } }, async () => { const serverCommandStub = sinon.stub(Server.prototype, 'command'); - serverCommandStub.onCall(0).yieldsRight(new PoolClearedError('error')); + serverCommandStub.onCall(0).rejects(new PoolClearedError('error')); serverCommandStub .onCall(1) - .yieldsRight( - new MongoWriteConcernError({ errorLabels: ['NoWritesPerformed'], errorCode: 10107 }, {}) + .returns( + Promise.reject( + new MongoWriteConcernError({ errorLabels: ['NoWritesPerformed'], errorCode: 10107 }, {}) + ) ); const insertResult = await collection.insertOne({ _id: 1 }).catch(error => error); diff --git a/test/integration/retryable-writes/retryable_writes.spec.prose.test.ts b/test/integration/retryable-writes/retryable_writes.spec.prose.test.ts index 201e6a95294..c40d8264e7f 100644 --- a/test/integration/retryable-writes/retryable_writes.spec.prose.test.ts +++ b/test/integration/retryable-writes/retryable_writes.spec.prose.test.ts @@ -278,15 +278,19 @@ describe('Retryable Writes Spec Prose', () => { const serverCommandStub = sinon.stub(Server.prototype, 'command'); serverCommandStub .onCall(0) - .yieldsRight( - new MongoWriteConcernError({ errorLabels: ['RetryableWriteError'], code: 91 }, {}) + .returns( + Promise.reject( + new MongoWriteConcernError({ errorLabels: ['RetryableWriteError'], code: 91 }, {}) + ) ); serverCommandStub .onCall(1) - .yieldsRight( - new MongoWriteConcernError( - { errorLabels: ['RetryableWriteError', 'NoWritesPerformed'], errorCode: 10107 }, - {} + .returns( + Promise.reject( + new MongoWriteConcernError( + { errorLabels: ['RetryableWriteError', 'NoWritesPerformed'], errorCode: 10107 }, + {} + ) ) ); diff --git a/test/integration/server-selection/operation_count.test.ts b/test/integration/server-selection/operation_count.test.ts index 0741fa48cdf..9ab7cd8a136 100644 --- a/test/integration/server-selection/operation_count.test.ts +++ b/test/integration/server-selection/operation_count.test.ts @@ -103,9 +103,9 @@ describe('Server Operation Count Tests', function () { const server = Array.from(client.topology.s.servers.values())[0]; expect(server.s.operationCount).to.equal(0); - sinon.stub(ConnectionPool.prototype, 'checkOut').callsFake(function (cb) { - cb(new Error('unable to checkout connection'), undefined); - }); + sinon + .stub(ConnectionPool.prototype, 'checkOut') + .rejects(new Error('unable to checkout connection')); const commandSpy = sinon.spy(server, 'command'); const error = await collection.findOne({ count: 1 }).catch(e => e); @@ -170,9 +170,9 @@ describe('Server Operation Count Tests', function () { const server = Array.from(client.topology.s.servers.values())[0]; expect(server.s.operationCount).to.equal(0); - sinon.stub(ConnectionPool.prototype, 'checkOut').callsFake(function (cb) { - cb(new Error('unable to checkout connection'), undefined); - }); + sinon + .stub(ConnectionPool.prototype, 'checkOut') + .rejects(new Error('unable to checkout connection')); const commandSpy = sinon.spy(server, 'command'); const error = await collection.insertOne({ count: 1 }).catch(e => e); diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index 4c11736857e..9d0817548f1 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -185,9 +185,7 @@ const compareInputToSpec = (input, expected, message) => { const getTestOpDefinitions = (threadContext: ThreadContext) => ({ checkOut: async function (op) { - const connection: Connection = await promisify(ConnectionPool.prototype.checkOut).call( - threadContext.pool - ); + const connection: Connection = await ConnectionPool.prototype.checkOut.call(threadContext.pool); if (op.label != null) { threadContext.connections.set(op.label, connection); } else { diff --git a/test/tools/spec-runner/context.js b/test/tools/spec-runner/context.js index 304b964f1a1..7a678194b55 100644 --- a/test/tools/spec-runner/context.js +++ b/test/tools/spec-runner/context.js @@ -143,23 +143,17 @@ class TestRunnerContext { return Promise.all(cleanupPromises).then(cleanup, cleanup); } - targetedFailPoint(options) { + async targetedFailPoint(options) { const session = options.session; const failPoint = options.failPoint; expect(session.isPinned).to.be.true; - return new Promise((resolve, reject) => { - const serverOrConnection = session.loadBalanced - ? session.pinnedConnection - : session.transaction.server; + const serverOrConnection = session.loadBalanced + ? session.pinnedConnection + : session.transaction.server; - serverOrConnection.command(ns('admin.$cmd'), failPoint, undefined, err => { - if (err) return reject(err); - - this.appliedFailPoints.push(failPoint); - resolve(); - }); - }); + await serverOrConnection.command(ns('admin.$cmd'), failPoint, {}); + this.appliedFailPoints.push(failPoint); } enableFailPoint(failPoint) { diff --git a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts index 0d627d65cfa..82dd7a609d0 100644 --- a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts +++ b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts @@ -3,7 +3,6 @@ import { expect } from 'chai'; import * as fs from 'fs'; import * as path from 'path'; import * as sinon from 'sinon'; -import { promisify } from 'util'; import { ConnectionPool, @@ -335,18 +334,18 @@ async function executeSDAMTest(testData: SDAMTest) { // phase with applicationErrors simulating error's from network, timeouts, server for (const appError of phase.applicationErrors) { // Stub will return appError to SDAM machinery - const withConnectionStub = sinon - .stub(ConnectionPool.prototype, 'withConnection') - .callsFake(withConnectionStubImpl(appError)); + const checkOutStub = sinon + .stub(ConnectionPool.prototype, 'checkOut') + .callsFake(checkoutStubImpl(appError)); const server = client.topology.s.servers.get(appError.address); // Run a dummy command to encounter the error - const res = promisify(server.command.bind(server))(ns('admin.$cmd'), { ping: 1 }, {}); + const res = server.command.bind(server)(ns('admin.$cmd'), { ping: 1 }, {}); const thrownError = await res.catch(error => error); // Restore the stub before asserting anything in case of errors - withConnectionStub.restore(); + checkOutStub.restore(); const isApplicationError = error => { // These errors all come from the withConnection stub @@ -405,15 +404,13 @@ async function executeSDAMTest(testData: SDAMTest) { } } -function withConnectionStubImpl(appError) { - return function (conn, fn, callback) { - // eslint-disable-next-line @typescript-eslint/no-this-alias - const connectionPool = this; // we are stubbing `withConnection` on the `ConnectionPool` class +function checkoutStubImpl(appError) { + return async function () { + const connectionPoolGeneration = this.generation; const fakeConnection = { generation: - typeof appError.generation === 'number' ? appError.generation : connectionPool.generation, - - async command(_ns, _cmd, _options) { + typeof appError.generation === 'number' ? appError.generation : connectionPoolGeneration, + async command(_, __, ___) { if (appError.type === 'network') { throw new MongoNetworkError('test generated'); } else if (appError.type === 'timeout') { @@ -425,16 +422,7 @@ function withConnectionStubImpl(appError) { } } }; - - fn(undefined, fakeConnection, (fnErr, result) => { - if (typeof callback === 'function') { - if (fnErr) { - callback(fnErr); - } else { - callback(undefined, result); - } - } - }); + return fakeConnection; }; } diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 43177b72962..b6e408e3d56 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -1,14 +1,12 @@ 'use strict'; -const { ConnectionPool } = require('../../mongodb'); +const { ConnectionPool, MongoError } = require('../../mongodb'); const { WaitQueueTimeoutError } = require('../../mongodb'); const mock = require('../../tools/mongodb-mock/index'); const sinon = require('sinon'); const { expect } = require('chai'); const { setImmediate } = require('timers'); -const { promisify } = require('util'); const { ns, isHello } = require('../../mongodb'); -const { LEGACY_HELLO_COMMAND } = require('../../mongodb'); const { createTimerSandbox } = require('../timer_sandbox'); const { topologyWithPlaceholderClient } = require('../../tools/utils'); const { MongoClientAuthProviders } = require('../../mongodb'); @@ -58,7 +56,7 @@ describe('Connection Pool', function () { const events = []; pool.on('connectionClosed', event => events.push(event)); - const conn = await promisify(pool.checkOut.bind(pool))(); + const conn = await pool.checkOut(); const error = await conn.command(ns('admin.$cmd'), { ping: 1 }, {}).catch(error => error); expect(error).to.be.instanceOf(Error); @@ -69,7 +67,7 @@ describe('Connection Pool', function () { expect(closeEvent).have.property('reason').equal('error'); }); - it('should propagate socket timeouts to connections', function (done) { + it('should propagate socket timeouts to connections', async function () { mockMongod.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { @@ -87,18 +85,12 @@ describe('Connection Pool', function () { pool.ready(); - pool.withConnection( - (err, conn, cb) => { - expect(err).to.not.exist; - conn.command(ns('admin.$cmd'), { ping: 1 }, undefined, (err, result) => { - expect(err).to.exist; - expect(result).to.not.exist; - expect(err).to.match(/timed out/); - cb(); - }); - }, - () => pool.close(done) - ); + const conn = await pool.checkOut(); + const maybeError = await conn.command(ns('admin.$cmd'), { ping: 1 }, undefined).catch(e => e); + expect(maybeError).to.be.instanceOf(MongoError); + expect(maybeError).to.match(/timed out/); + + pool.checkIn(conn); }); it('should clear timed out wait queue members if no connections are available', function (done) { @@ -117,11 +109,9 @@ describe('Connection Pool', function () { pool.ready(); - pool.checkOut((err, conn) => { - expect(err).to.not.exist; + pool.checkOut().then(conn => { expect(conn).to.exist; - - pool.checkOut(err => { + pool.checkOut().then(expect.fail, err => { expect(err).to.exist.and.be.instanceOf(WaitQueueTimeoutError); // We can only process the wait queue with `checkIn` and `checkOut`, so we @@ -135,7 +125,7 @@ describe('Connection Pool', function () { setImmediate(() => expect(pool).property('waitQueueSize').to.equal(0)); done(); }); - }); + }, expect.fail); }); describe('minPoolSize population', function () { @@ -224,98 +214,4 @@ describe('Connection Pool', function () { expect(createConnStub).to.have.been.calledTwice; }); }); - - describe('withConnection', function () { - it('should manage a connection for a successful operation', function (done) { - mockMongod.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply(mock.HELLO); - } - }); - - const pool = new ConnectionPool(stubServer, { hostAddress: mockMongod.hostAddress() }); - pool.ready(); - - const callback = (err, result) => { - expect(err).to.not.exist; - expect(result).to.exist; - pool.close(done); - }; - - pool.withConnection((err, conn, cb) => { - expect(err).to.not.exist; - - conn.command( - ns('$admin.cmd'), - { [LEGACY_HELLO_COMMAND]: 1 }, - undefined, - (cmdErr, hello) => { - expect(cmdErr).to.not.exist; - cb(undefined, hello); - } - ); - }, callback); - }); - - it('should allow user interaction with an error', function (done) { - mockMongod.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.connection.destroy(); - } - }); - - const pool = new ConnectionPool(stubServer, { - waitQueueTimeoutMS: 200, - hostAddress: mockMongod.hostAddress() - }); - - pool.ready(); - - const callback = err => { - expect(err).to.exist; - expect(err).to.match(/closed/); - pool.close(done); - }; - - pool.withConnection( - undefined, - (err, conn, cb) => { - expect(err).to.exist; - expect(err).to.match(/closed/); - cb(err); - }, - callback - ); - }); - - it('should return an error to the original callback', function (done) { - mockMongod.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply(mock.HELLO); - } - }); - - const pool = new ConnectionPool(stubServer, { hostAddress: mockMongod.hostAddress() }); - pool.ready(); - - const callback = (err, result) => { - expect(err).to.exist; - expect(result).to.not.exist; - expect(err).to.match(/my great error/); - pool.close(done); - }; - - pool.withConnection( - undefined, - (err, conn, cb) => { - expect(err).to.not.exist; - cb(new Error('my great error')); - }, - callback - ); - }); - }); }); diff --git a/test/unit/error.test.ts b/test/unit/error.test.ts index ebaa0e9281c..678398d27ee 100644 --- a/test/unit/error.test.ts +++ b/test/unit/error.test.ts @@ -368,22 +368,24 @@ describe('MongoErrors', () => { topology.selectServer('primary', {}, (err, server) => { expect(err).to.not.exist; - server.command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), {}, err => { - let _err; - try { - expect(err).to.be.an.instanceOf(MongoWriteConcernError); - expect(err.result).to.exist; - expect(err.result).to.have.property('ok', 1); - expect(err.result).to.not.have.property('errmsg'); - expect(err.result).to.not.have.property('code'); - expect(err.result).to.not.have.property('codeName'); - expect(err.result).to.have.property('writeConcernError'); - } catch (e) { - _err = e; - } finally { - cleanup(_err); - } - }); + server + .command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), {}) + .then(expect.fail, err => { + let _err; + try { + expect(err).to.be.an.instanceOf(MongoWriteConcernError); + expect(err.result).to.exist; + expect(err.result).to.have.property('ok', 1); + expect(err.result).to.not.have.property('errmsg'); + expect(err.result).to.not.have.property('code'); + expect(err.result).to.not.have.property('codeName'); + expect(err.result).to.have.property('writeConcernError'); + } catch (e) { + _err = e; + } finally { + cleanup(_err); + } + }); }); }); }); @@ -409,20 +411,22 @@ describe('MongoErrors', () => { topology.selectServer('primary', {}, (err, server) => { expect(err).to.not.exist; - server.command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), {}, err => { - let _err; - try { - expect(err).to.be.an.instanceOf(MongoWriteConcernError); - expect(err.result).to.exist; - expect(err.result.writeConcernError).to.deep.equal( - RAW_USER_WRITE_CONCERN_ERROR_INFO.writeConcernError - ); - } catch (e) { - _err = e; - } finally { - cleanup(_err); - } - }); + server + .command(ns('db1'), Object.assign({}, RAW_USER_WRITE_CONCERN_CMD), {}) + .then(expect.fail, err => { + let _err; + try { + expect(err).to.be.an.instanceOf(MongoWriteConcernError); + expect(err.result).to.exist; + expect(err.result.writeConcernError).to.deep.equal( + RAW_USER_WRITE_CONCERN_ERROR_INFO.writeConcernError + ); + } catch (e) { + _err = e; + } finally { + cleanup(_err); + } + }); }); }); }); diff --git a/test/unit/operations/find.test.ts b/test/unit/operations/find.test.ts index 657013de1c1..bfb67d8e818 100644 --- a/test/unit/operations/find.test.ts +++ b/test/unit/operations/find.test.ts @@ -41,7 +41,7 @@ describe('FindOperation', function () { it('should build basic find command with filter', async () => { const findOperation = new FindOperation(undefined, namespace, filter); - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command').resolves({}); await findOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith(namespace, { find: namespace.collection, @@ -54,7 +54,7 @@ describe('FindOperation', function () { oplogReplay: true }; const findOperation = new FindOperation(undefined, namespace, {}, options); - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command').resolves({}); await findOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith( namespace, diff --git a/test/unit/operations/get_more.test.ts b/test/unit/operations/get_more.test.ts index 6385ba71c6b..80abcfdd176 100644 --- a/test/unit/operations/get_more.test.ts +++ b/test/unit/operations/get_more.test.ts @@ -54,7 +54,7 @@ describe('GetMoreOperation', function () { ); const opts = { ...options, documentsReturnedIn: 'nextBatch', returnFieldSelector: null }; const operation = new GetMoreOperation(namespace, cursorId, server, opts); - const stub = sinon.stub(server, 'command').callsFake((_, __, ___, cb) => cb()); + const stub = sinon.stub(server, 'command').resolves({}); const expectedGetMoreCommand = { getMore: cursorId, @@ -104,7 +104,7 @@ describe('GetMoreOperation', function () { it('should build basic getMore command with cursorId and collection', async () => { const getMoreOperation = new GetMoreOperation(namespace, cursorId, server, {}); - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command').resolves({}); await getMoreOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith(namespace, { getMore: cursorId, @@ -117,7 +117,7 @@ describe('GetMoreOperation', function () { batchSize: 234 }; const getMoreOperation = new GetMoreOperation(namespace, cursorId, server, options); - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command').resolves({}); await getMoreOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith( namespace, @@ -130,7 +130,7 @@ describe('GetMoreOperation', function () { maxAwaitTimeMS: 234 }; const getMoreOperation = new GetMoreOperation(namespace, cursorId, server, options); - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command').resolves({}); await getMoreOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith( namespace, @@ -188,7 +188,7 @@ describe('GetMoreOperation', function () { maxWireVersion: serverVersion }; const operation = new GetMoreOperation(namespace, cursorId, server, optionsWithComment); - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command').resolves({}); await operation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith(namespace, getMore); }); @@ -202,7 +202,7 @@ describe('GetMoreOperation', function () { new ServerDescription('a:1'), {} as any ); - sinon.stub(server, 'command').yieldsRight(); + sinon.stub(server, 'command').resolves({}); it('should throw if the cursorId is undefined', async () => { const getMoreOperation = new GetMoreOperation( diff --git a/test/unit/operations/kill_cursors.test.ts b/test/unit/operations/kill_cursors.test.ts index 4468cd33c65..d88a7053b4e 100644 --- a/test/unit/operations/kill_cursors.test.ts +++ b/test/unit/operations/kill_cursors.test.ts @@ -91,7 +91,7 @@ describe('class KillCursorsOperation', () => { server, options ) as any; - const stub = sinon.stub(server, 'command').yieldsRight(); + const stub = sinon.stub(server, 'command').resolves({}); await killCursorsOperation.execute(server, undefined); expect(stub).to.have.been.calledOnceWith(namespace, { killCursors: namespace.collection, diff --git a/test/unit/sdam/server.test.ts b/test/unit/sdam/server.test.ts index 7e55b59ba94..30358b3ee6e 100644 --- a/test/unit/sdam/server.test.ts +++ b/test/unit/sdam/server.test.ts @@ -9,7 +9,6 @@ import { MongoErrorLabel, MongoNetworkError, MongoNetworkTimeoutError, - ns, ObjectId, Server, ServerDescription, @@ -66,14 +65,6 @@ describe('Server', () => { ); }); - context('when a server is created', function () { - it('calls the command function through commandAsync', async function () { - const serverSpy = sinon.stub(server, 'command').yieldsRight(undefined, { ok: 1 }); - await server.commandAsync(ns('dummy'), { ping: 1 }, {}); - expect(serverSpy).to.have.been.calledOnce; - }); - }); - for (const loadBalanced of [true, false]) { const mode = loadBalanced ? 'loadBalanced' : 'non-loadBalanced'; const contextSuffix = loadBalanced ? ' with connection provided' : ''; diff --git a/test/unit/sdam/topology.test.js b/test/unit/sdam/topology.test.js index 261a363c6df..cbc654420db 100644 --- a/test/unit/sdam/topology.test.js +++ b/test/unit/sdam/topology.test.js @@ -99,13 +99,13 @@ describe('Topology (unit)', function () { topology.selectServer('primary', {}, (err, server) => { expect(err).to.not.exist; - server.command(ns('admin.$cmd'), { ping: 1 }, { socketTimeoutMS: 250 }, (err, result) => { - expect(result).to.not.exist; - expect(err).to.exist; - expect(err).to.match(/timed out/); - - topology.close({}, done); - }); + server + .command(ns('admin.$cmd'), { ping: 1 }, { socketTimeoutMS: 250 }) + .then(expect.fail, err => { + expect(err).to.exist; + expect(err).to.match(/timed out/); + topology.close({}, done); + }); }); }); }); @@ -212,8 +212,7 @@ describe('Topology (unit)', function () { let poolCleared = false; topology.on('connectionPoolCleared', () => (poolCleared = true)); - server.command(ns('test.test'), { insert: { a: 42 } }, {}, (err, result) => { - expect(result).to.not.exist; + server.command(ns('test.test'), { insert: { a: 42 } }, {}).then(expect.fail, err => { expect(err).to.exist; expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.true; @@ -248,8 +247,7 @@ describe('Topology (unit)', function () { let poolCleared = false; topology.on('connectionPoolCleared', () => (poolCleared = true)); - server.command(ns('test.test'), { insert: { a: 42 } }, {}, (err, result) => { - expect(result).to.not.exist; + server.command(ns('test.test'), { insert: { a: 42 } }, {}).then(expect.fail, err => { expect(err).to.exist; expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.false; @@ -281,8 +279,7 @@ describe('Topology (unit)', function () { let serverDescription; server.on('descriptionReceived', sd => (serverDescription = sd)); - server.command(ns('test.test'), { insert: { a: 42 } }, {}, (err, result) => { - expect(result).to.not.exist; + server.command(ns('test.test'), { insert: { a: 42 } }, {}).then(expect.fail, err => { expect(err).to.exist; expect(err).to.eql(serverDescription.error); expect(server.description.type).to.equal('Unknown');