From 0ce246fc608aee2c659d7dd01ea0377441523101 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 29 Mar 2024 15:46:38 -0400 Subject: [PATCH] update monitor --- src/sdam/monitor.ts | 239 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 209 insertions(+), 30 deletions(-) diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 2c778decc57..b90d10b5163 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -105,7 +105,8 @@ export class Monitor extends TypedEventEmitter { rttPinger?: RTTPinger; /** @internal */ override component = MongoLoggableComponent.TOPOLOGY; - private rttSamplesMS: MovingWindow; + /** @internal */ + rttSamplesMS: MovingWindow; constructor(server: Server, options: MonitorOptions) { super(); @@ -244,6 +245,8 @@ function resetMonitorState(monitor: Monitor) { monitor.connection?.destroy(); monitor.connection = null; + + monitor.clearRttSamples(); } function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean { @@ -277,7 +280,6 @@ function checkServer(monitor: Monitor, callback: Callback) { function onHeartbeatFailed(err: Error) { monitor.connection?.destroy(); monitor.connection = null; - monitor.emitAndLogHeartbeat( Server.SERVER_HEARTBEAT_FAILED, monitor[kServer].topology.s.id, @@ -306,7 +308,11 @@ function checkServer(monitor: Monitor, callback: Callback) { // NOTE: here we use the latestRTT as this measurment corresponds with the value // obtained for this successful heartbeat const duration = - isAwaitable && monitor.rttPinger ? monitor.rttPinger.latestRTT : calculateDurationInMs(start); + isAwaitable && monitor.rttPinger + ? monitor.rttPinger.latestRTT ?? calculateDurationInMs(start) + : calculateDurationInMs(start); + + monitor.addRttSample(duration); monitor.emitAndLogHeartbeat( Server.SERVER_HEARTBEAT_SUCCEEDED, @@ -406,6 +412,8 @@ function checkServer(monitor: Monitor, callback: Callback) { connection.destroy(); return; } + const duration = calculateDurationInMs(start); + monitor.addRttSample(duration); monitor.connection = connection; monitor.emitAndLogHeartbeat( @@ -414,7 +422,7 @@ function checkServer(monitor: Monitor, callback: Callback) { connection.hello?.connectionId, new ServerHeartbeatSucceededEvent( monitor.address, - calculateDurationInMs(start), + duration, connection.hello, useStreamingProtocol(monitor, connection.hello?.topologyVersion) ) @@ -430,6 +438,173 @@ function checkServer(monitor: Monitor, callback: Callback) { ); } +async function _checkServer(monitor: Monitor): Promise { + let start: number; + let awaited: boolean; + const topologyVersion = monitor[kServer].description.topologyVersion; + const isAwaitable = useStreamingProtocol(monitor, topologyVersion); + monitor.emitAndLogHeartbeat( + Server.SERVER_HEARTBEAT_STARTED, + monitor[kServer].topology.s.id, + undefined, + new ServerHeartbeatStartedEvent(monitor.address, isAwaitable) + ); + + function onHeartbeatFailed(err: Error) { + monitor.connection?.destroy(); + monitor.connection = null; + monitor.emitAndLogHeartbeat( + Server.SERVER_HEARTBEAT_FAILED, + monitor[kServer].topology.s.id, + undefined, + new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err, awaited) + ); + + const error = !(err instanceof MongoError) + ? new MongoError(MongoError.buildErrorMessage(err), { cause: err }) + : err; + error.addErrorLabel(MongoErrorLabel.ResetPool); + if (error instanceof MongoNetworkTimeoutError) { + error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections); + } + + monitor.emit('resetServer', error); + } + + function onHeartbeatSucceeded(hello: Document) { + if (!('isWritablePrimary' in hello)) { + // Provide hello-style response document. + hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND]; + } + + // NOTE: here we use the latestRTT as this measurment corresponds with the value + // obtained for this successful heartbeat + const duration = + isAwaitable && monitor.rttPinger + ? monitor.rttPinger.latestRTT ?? calculateDurationInMs(start) + : calculateDurationInMs(start); + + monitor.addRttSample(duration); + + monitor.emitAndLogHeartbeat( + Server.SERVER_HEARTBEAT_SUCCEEDED, + monitor[kServer].topology.s.id, + hello.connectionId, + new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable) + ); + + if (isAwaitable) { + // If we are using the streaming protocol then we immediately issue another 'started' + // event, otherwise the "check" is complete and return to the main monitor loop + monitor.emitAndLogHeartbeat( + Server.SERVER_HEARTBEAT_STARTED, + monitor[kServer].topology.s.id, + undefined, + new ServerHeartbeatStartedEvent(monitor.address, true) + ); + // We have not actually sent an outgoing handshake, but when we get the next response we + // want the duration to reflect the time since we last heard from the server + start = now(); + } else { + monitor.rttPinger?.close(); + monitor.rttPinger = undefined; + } + } + + const { connection } = monitor; + if (connection && !connection.closed) { + const { serverApi, helloOk } = connection; + const connectTimeoutMS = monitor.options.connectTimeoutMS; + const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS; + + const cmd = { + [serverApi?.version || helloOk ? 'hello' : LEGACY_HELLO_COMMAND]: 1, + ...(isAwaitable && topologyVersion + ? { maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) } + : {}) + }; + + const options = isAwaitable + ? { + socketTimeoutMS: connectTimeoutMS ? connectTimeoutMS + maxAwaitTimeMS : 0, + exhaustAllowed: true + } + : { socketTimeoutMS: connectTimeoutMS }; + + if (isAwaitable && monitor.rttPinger == null) { + monitor.rttPinger = new RTTPinger( + monitor, + monitor[kCancellationToken], + Object.assign( + { heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS }, + monitor.connectOptions + ) + ); + } + + // Record new start time before sending handshake + start = now(); + + if (isAwaitable) { + awaited = true; + try { + const hello = await connection.command(ns('admin.$cmd'), cmd, options); + onHeartbeatSucceeded(hello); + return hello; + } catch (error) { + onHeartbeatFailed(error); + return null; + } + } + + awaited = false; + try { + const hello = await connection.command(ns('admin.$cmd'), cmd, options); + onHeartbeatSucceeded(hello); + return hello; + } catch (error) { + onHeartbeatFailed(error); + return null; + } + } else { + const socket = await makeSocket(monitor.connectOptions); + const connection = makeConnection(monitor.connectOptions, socket); + // The start time is after socket creation but before the handshake + start = now(); + try { + await performInitialHandshake(connection, monitor.connectOptions); + const duration = calculateDurationInMs(start); + if (isInCloseState(monitor)) { + connection.destroy(); + return null; + } + + monitor.connection = connection; + monitor.addRttSample(duration); + + monitor.emitAndLogHeartbeat( + Server.SERVER_HEARTBEAT_SUCCEEDED, + monitor[kServer].topology.s.id, + connection.hello?.connectionId, + new ServerHeartbeatSucceededEvent( + monitor.address, + duration, + connection.hello, + useStreamingProtocol(monitor, connection.hello?.topologyVersion) + ) + ); + + return connection.hello; + } catch (error) { + connection.destroy(); + monitor.connection = null; + awaited = false; + onHeartbeatFailed(error); + return null; + } + } +} + function monitorServer(monitor: Monitor) { return (callback: Callback) => { if (monitor.s.state === STATE_MONITORING) { @@ -491,12 +666,15 @@ export class RTTPinger { /** @internal */ monitor: Monitor; closed: boolean; + /** @internal */ + latestRTT?: number; constructor(monitor: Monitor, cancellationToken: CancellationToken, options: RTTPingerOptions) { this.connection = undefined; this[kCancellationToken] = cancellationToken; this.closed = false; this.monitor = monitor; + this.latestRTT = 0; const heartbeatFrequencyMS = options.heartbeatFrequencyMS; this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS); @@ -510,10 +688,6 @@ export class RTTPinger { return this.monitor.minRoundTripTime; } - get latestRTT(): number { - return this.monitor.latestRTT; - } - close(): void { this.closed = true; clearTimeout(this[kMonitorId]); @@ -523,30 +697,37 @@ export class RTTPinger { } } -function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { - const start = now(); - options.cancellationToken = rttPinger[kCancellationToken]; - const heartbeatFrequencyMS = options.heartbeatFrequencyMS; - +function measureAndReschedule( + rttPinger: RTTPinger, + options: RTTPingerOptions, + start?: number, + conn?: Connection +) { + if (start == null) { + start = now(); + } if (rttPinger.closed) { + conn?.destroy(); return; } - function measureAndReschedule(conn?: Connection) { - if (rttPinger.closed) { - conn?.destroy(); - return; - } + if (rttPinger.connection == null) { + rttPinger.connection = conn; + } - if (rttPinger.connection == null) { - rttPinger.connection = conn; - } + rttPinger.latestRTT = calculateDurationInMs(start); + rttPinger[kMonitorId] = setTimeout( + () => measureRoundTripTime(rttPinger, options), + options.heartbeatFrequencyMS + ); +} - rttPinger.monitor.addRttSample(calculateDurationInMs(start)); - rttPinger[kMonitorId] = setTimeout( - () => measureRoundTripTime(rttPinger, options), - heartbeatFrequencyMS - ); +function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { + const start = now(); + options.cancellationToken = rttPinger[kCancellationToken]; + + if (rttPinger.closed) { + return; } const connection = rttPinger.connection; @@ -554,11 +735,10 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { // eslint-disable-next-line github/no-then connect(options).then( connection => { - measureAndReschedule(connection); + measureAndReschedule(rttPinger, options, start, connection); }, () => { rttPinger.connection = undefined; - rttPinger.monitor.clearRttSamples(); } ); return; @@ -568,11 +748,10 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND; // eslint-disable-next-line github/no-then connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then( - () => measureAndReschedule(), + () => measureAndReschedule(rttPinger, options), () => { rttPinger.connection?.destroy(); rttPinger.connection = undefined; - rttPinger.monitor.clearRttSamples(); return; } );