Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(NODE-5912): make server.command an async function #3986

Merged
merged 25 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 36 additions & 104 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import {
} from '../constants';
import {
type AnyError,
MONGODB_ERROR_CODES,
MongoError,
type MongoError,
MongoInvalidArgumentError,
MongoMissingCredentialsError,
MongoNetworkError,
Expand All @@ -27,7 +26,14 @@ import {
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { type Callback, eachAsync, List, makeCounter, TimeoutController } from '../utils';
import {
type Callback,
eachAsync,
List,
makeCounter,
promiseWithResolvers,
TimeoutController
} from '../utils';
import { connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -100,7 +106,8 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g

/** @internal */
export interface WaitQueueMember {
callback: Callback<Connection>;
resolve: (conn: Connection) => void;
reject: (err: AnyError) => void;
timeoutController: TimeoutController;
[kCancelled]?: boolean;
}
Expand Down Expand Up @@ -350,16 +357,18 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
* explicitly destroyed by the new owner.
*/
checkOut(callback: Callback<Connection>): void {
async checkOut(): Promise<Connection> {
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
new ConnectionCheckOutStartedEvent(this)
);

const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;

const { promise, resolve, reject } = promiseWithResolvers<Connection>();
const waitQueueMember: WaitQueueMember = {
callback,
resolve,
reject,
timeoutController: new TimeoutController(waitQueueTimeoutMS)
};
waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
Expand All @@ -370,7 +379,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
);
waitQueueMember.callback(
waitQueueMember.reject(
new WaitQueueTimeoutError(
this.loadBalanced
? this.waitQueueErrorMetrics()
Expand All @@ -382,6 +391,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

this[kWaitQueue].push(waitQueueMember);
process.nextTick(() => this.processWaitQueue());

return promise;
}

/**
Expand Down Expand Up @@ -534,115 +545,35 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

/**
* Runs a lambda with an implicitly checked out connection, checking that connection back in when the lambda
* has completed by calling back.
*
* NOTE: please note the required signature of `fn`
*
* @remarks When in load balancer mode, connections can be pinned to cursors or transactions.
* In these cases we pass the connection in to this method to ensure it is used and a new
* connection is not checked out.
*
* @param conn - A pinned connection for use in load balancing mode.
* @param fn - A function which operates on a managed connection
* @param callback - The original callback
*/
withConnection(
conn: Connection | undefined,
fn: WithConnectionCallback,
callback: Callback<Connection>
): void {
if (conn) {
// use the provided connection, and do _not_ check it in after execution
fn(undefined, conn, (fnErr, result) => {
if (fnErr) {
return this.withReauthentication(fnErr, conn, fn, callback);
}
callback(undefined, result);
});
return;
}

this.checkOut((err, conn) => {
// don't callback with `err` here, we might want to act upon it inside `fn`
fn(err as MongoError, conn, (fnErr, result) => {
if (fnErr) {
if (conn) {
this.withReauthentication(fnErr, conn, fn, callback);
} else {
callback(fnErr);
}
} else {
callback(undefined, result);
}

if (conn) {
this.checkIn(conn);
}
});
});
}

private withReauthentication(
fnErr: AnyError,
conn: Connection,
fn: WithConnectionCallback,
callback: Callback<Connection>
) {
if (fnErr instanceof MongoError && fnErr.code === MONGODB_ERROR_CODES.Reauthenticate) {
this.reauthenticate(conn, fn, (error, res) => {
if (error) {
return callback(error);
}
callback(undefined, res);
});
} else {
callback(fnErr);
}
}

/**
* Reauthenticate on the same connection and then retry the operation.
* @internal
* Reauthenticate a connection
*/
private reauthenticate(
connection: Connection,
fn: WithConnectionCallback,
callback: Callback
): void {
async reauthenticateAsync(connection: Connection): Promise<Connection> {
const authContext = connection.authContext;
if (!authContext) {
return callback(new MongoRuntimeError('No auth context found on connection.'));
throw new MongoRuntimeError('No auth context found on connection.');
}
const credentials = authContext.credentials;
if (!credentials) {
return callback(
new MongoMissingCredentialsError(
'Connection is missing credentials when asked to reauthenticate'
)
throw new MongoMissingCredentialsError(
'Connection is missing credentials when asked to reauthenticate'
);
}

const resolvedCredentials = credentials.resolveAuthMechanism(connection.hello);
const provider = this[kServer].topology.client.s.authProviders.getOrCreateProvider(
resolvedCredentials.mechanism
);

if (!provider) {
return callback(
new MongoMissingCredentialsError(
`Reauthenticate failed due to no auth provider for ${credentials.mechanism}`
)
throw new MongoMissingCredentialsError(
`Reauthenticate failed due to no auth provider for ${credentials.mechanism}`
);
}
provider.reauth(authContext).then(
() => {
fn(undefined, connection, (fnErr, fnResult) => {
if (fnErr) {
return callback(fnErr);
}
callback(undefined, fnResult);
});
},
error => callback(error)
);

await provider.reauth(authContext);

return connection;
}

/** Clear the min pool size timer */
Expand Down Expand Up @@ -841,7 +772,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
);
waitQueueMember.timeoutController.clear();
this[kWaitQueue].shift();
waitQueueMember.callback(error);
waitQueueMember.reject(error);
continue;
}

Expand All @@ -863,7 +794,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
waitQueueMember.timeoutController.clear();

this[kWaitQueue].shift();
waitQueueMember.callback(undefined, connection);
waitQueueMember.resolve(connection);
}
}

Expand All @@ -889,16 +820,17 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
// TODO(NODE-5192): Remove this cast
new ConnectionCheckOutFailedEvent(this, 'connectionError', err as MongoError)
);
waitQueueMember.reject(err);
} else if (connection) {
this[kCheckedOut].add(connection);
this.emitAndLog(
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(this, connection)
);
waitQueueMember.resolve(connection);
}

waitQueueMember.timeoutController.clear();
waitQueueMember.callback(err, connection);
}
process.nextTick(() => this.processWaitQueue());
});
Expand Down
2 changes: 1 addition & 1 deletion src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,6 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
cmd = decorateWithExplain(cmd, this.explain);
}

return server.commandAsync(this.ns, cmd, options);
return server.command(this.ns, cmd, options);
}
}
2 changes: 1 addition & 1 deletion src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ export class FindOperation extends CommandOperation<Document> {
findCommand = decorateWithExplain(findCommand, this.explain);
}

return server.commandAsync(this.ns, findCommand, {
return server.command(this.ns, findCommand, {
...this.options,
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
Expand Down
2 changes: 1 addition & 1 deletion src/operations/get_more.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export class GetMoreOperation extends AbstractOperation {
...this.options
};

return server.commandAsync(this.ns, getMoreCmd, commandOptions);
return server.command(this.ns, getMoreCmd, commandOptions);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operations/kill_cursors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class KillCursorsOperation extends AbstractOperation {
cursors: [this.cursorId]
};
try {
await server.commandAsync(this.ns, killCursorsCommand, { session });
await server.command(this.ns, killCursorsCommand, { session });
} catch {
// The driver should never emit errors from killCursors, this is spec-ed behavior
}
Expand Down
4 changes: 2 additions & 2 deletions src/operations/run_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {

override async execute(server: Server, session: ClientSession | undefined): Promise<T> {
this.server = server;
return server.commandAsync(this.ns, this.command, {
return server.command(this.ns, this.command, {
...this.options,
readPreference: this.readPreference,
session
Expand All @@ -54,7 +54,7 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>

override async execute(server: Server, session: ClientSession | undefined): Promise<T> {
this.server = server;
return server.commandAsync(this.ns, this.command, {
return server.command(this.ns, this.command, {
...this.options,
readPreference: this.readPreference,
session
Expand Down
2 changes: 1 addition & 1 deletion src/operations/search_indexes/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class CreateSearchIndexesOperation extends AbstractOperation<string[]> {
indexes: this.descriptions
};

const res = await server.commandAsync(namespace, command, { session });
const res = await server.command(namespace, command, { session });

const indexesCreated: Array<{ name: string }> = res?.indexesCreated ?? [];
return indexesCreated.map(({ name }) => name);
Expand Down
2 changes: 1 addition & 1 deletion src/operations/search_indexes/drop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class DropSearchIndexOperation extends AbstractOperation<void> {
}

try {
await server.commandAsync(namespace, command, { session });
await server.command(namespace, command, { session });
} catch (error) {
const isNamespaceNotFoundError =
error instanceof MongoServerError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound;
Expand Down
2 changes: 1 addition & 1 deletion src/operations/search_indexes/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class UpdateSearchIndexOperation extends AbstractOperation<void> {
definition: this.definition
};

await server.commandAsync(namespace, command, { session });
await server.command(namespace, command, { session });
return;
}
}
Loading
Loading