From ebbbf6082b1aca8cacfca603d9c8005c10d2c8da Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 27 Mar 2024 15:33:08 -0400 Subject: [PATCH 01/23] feat(NODE-5825): add minRTT and average rtt measures to Monitor --- src/cmap/connection.ts | 3 ++ src/sdam/monitor.ts | 57 ++++++++++++++++++++++++++++-------- src/sdam/server_selection.ts | 5 ++-- src/utils.ts | 46 +++++++++++++++++++++++++++++ 4 files changed, 95 insertions(+), 16 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 1a4db3401f9..7790e486a81 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -28,6 +28,7 @@ import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mong import { type CancellationToken, TypedEventEmitter } from '../mongo_types'; import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import { ServerType } from '../sdam/common'; +import { type Monitor } from '../sdam/monitor'; import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions'; import { BufferPool, @@ -125,6 +126,8 @@ export interface ConnectionOptions extendedMetadata: Promise; /** @internal */ mongoLogger?: MongoLogger | undefined; + /** @internal */ + parent?: Monitor; } /** @public */ diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index bec8cda1cf6..3bf8fd8a8d3 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -8,8 +8,15 @@ import { LEGACY_HELLO_COMMAND } from '../constants'; import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error'; import { MongoLoggableComponent } from '../mongo_logger'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; -import type { Callback, EventEmitterWithState } from '../utils'; -import { calculateDurationInMs, makeStateMachine, now, ns } from '../utils'; +import { + calculateDurationInMs, + type Callback, + type EventEmitterWithState, + makeStateMachine, + MovingWindow, + now, + ns +} from '../utils'; import { ServerType, STATE_CLOSED, STATE_CLOSING } from './common'; import { ServerHeartbeatFailedEvent, @@ -25,8 +32,6 @@ const kServer = Symbol('server'); const kMonitorId = Symbol('monitorId'); /** @internal */ const kCancellationToken = Symbol('cancellationToken'); -/** @internal */ -const kRoundTripTime = Symbol('roundTripTime'); const STATE_IDLE = 'idle'; const STATE_MONITORING = 'monitoring'; @@ -100,6 +105,7 @@ export class Monitor extends TypedEventEmitter { rttPinger?: RTTPinger; /** @internal */ override component = MongoLoggableComponent.TOPOLOGY; + private rttSamplesMS: MovingWindow; constructor(server: Server, options: MonitorOptions) { super(); @@ -121,6 +127,7 @@ export class Monitor extends TypedEventEmitter { }); this.isRunningInFaasEnv = getFAASEnv() != null; this.mongoLogger = this[kServer].topology.client?.mongoLogger; + this.rttSamplesMS = new MovingWindow(10); const cancellationToken = this[kCancellationToken]; // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration @@ -135,7 +142,8 @@ export class Monitor extends TypedEventEmitter { useBigInt64: false, promoteLongs: true, promoteValues: true, - promoteBuffers: true + promoteBuffers: true, + parent: this }; // ensure no authentication is used for monitoring @@ -203,6 +211,22 @@ export class Monitor extends TypedEventEmitter { this.emit('close'); stateTransition(this, STATE_CLOSED); } + + get roundTripTime(): number { + return this.rttSamplesMS.average(); + } + + get minRoundTripTime(): number { + return this.rttSamplesMS.min(); + } + + addRttSample(rtt: number) { + this.rttSamplesMS.addSample(rtt); + } + + clearRttSamples() { + this.rttSamplesMS.clear(); + } } function resetMonitorState(monitor: Monitor) { @@ -275,6 +299,8 @@ function checkServer(monitor: Monitor, callback: Callback) { hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND]; } + // FIXME: Figure out how to set this. Should duration be the instantaneous rtt or the averaged + // rtt? const duration = isAwaitable && monitor.rttPinger ? monitor.rttPinger.roundTripTime @@ -329,6 +355,7 @@ function checkServer(monitor: Monitor, callback: Callback) { if (isAwaitable && monitor.rttPinger == null) { monitor.rttPinger = new RTTPinger( + monitor, monitor[kCancellationToken], Object.assign( { heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS }, @@ -458,23 +485,27 @@ export class RTTPinger { /** @internal */ [kCancellationToken]: CancellationToken; /** @internal */ - [kRoundTripTime]: number; - /** @internal */ [kMonitorId]: NodeJS.Timeout; + /** @internal */ + monitor: Monitor; closed: boolean; - constructor(cancellationToken: CancellationToken, options: RTTPingerOptions) { + constructor(monitor: Monitor, cancellationToken: CancellationToken, options: RTTPingerOptions) { this.connection = undefined; this[kCancellationToken] = cancellationToken; - this[kRoundTripTime] = 0; this.closed = false; + this.monitor = monitor; const heartbeatFrequencyMS = options.heartbeatFrequencyMS; this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS); } get roundTripTime(): number { - return this[kRoundTripTime]; + return this.monitor.roundTripTime; + } + + get minRoundTripTime(): number { + return this.monitor.minRoundTripTime; } close(): void { @@ -505,7 +536,7 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { rttPinger.connection = conn; } - rttPinger[kRoundTripTime] = calculateDurationInMs(start); + rttPinger.monitor.addRttSample(calculateDurationInMs(start)); rttPinger[kMonitorId] = setTimeout( () => measureRoundTripTime(rttPinger, options), heartbeatFrequencyMS @@ -521,7 +552,7 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { }, () => { rttPinger.connection = undefined; - rttPinger[kRoundTripTime] = 0; + rttPinger.monitor.clearRttSamples(); } ); return; @@ -535,7 +566,7 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { () => { rttPinger.connection?.destroy(); rttPinger.connection = undefined; - rttPinger[kRoundTripTime] = 0; + rttPinger.monitor.clearRttSamples(); return; } ); diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 8c92f08b625..bb262efa337 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -223,9 +223,8 @@ function latencyWindowReducer( servers: ServerDescription[] ): ServerDescription[] { const low = servers.reduce( - (min: number, server: ServerDescription) => - min === -1 ? server.roundTripTime : Math.min(server.roundTripTime, min), - -1 + (min: number, server: ServerDescription) => Math.min(server.roundTripTime, min), + Infinity ); const high = low + topologyDescription.localThresholdMS; diff --git a/src/utils.ts b/src/utils.ts index b25b3ebb0fc..13b4b8d1399 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1344,3 +1344,49 @@ export async function fileIsAccessible(fileName: string, mode?: number) { return false; } } + +export class MovingWindow { + private writeIndex: number; + private length: number; + private samples: Float64Array; + + constructor(windowSize = 10) { + this.samples = new Float64Array(windowSize); + this.length = 0; + this.writeIndex = 0; + } + + addSample(sample: number) { + this.samples[this.writeIndex++] = sample; + if (this.length < this.samples.length) { + this.length++; + } + + this.writeIndex %= this.samples.length; + } + + min(): number { + if (this.length < 2) return 0; + let min = this.samples[0]; + for (let i = 1; i < this.length; i++) { + if (this.samples[i] < min) min = this.samples[i]; + } + + return min; + } + + average(): number { + if (this.length === 0) return 0; + let sum = 0; + for (let i = 0; i < this.length; i++) { + sum += this.samples[i]; + } + + return sum / this.length; + } + + clear() { + this.length = 0; + this.writeIndex = 0; + } +} From d8807e84f8b33a7d257c744f6fe39a2a09830016 Mon Sep 17 00:00:00 2001 From: Warren James Date: Thu, 28 Mar 2024 21:06:38 -0400 Subject: [PATCH 02/23] use instantaneous heartbeat duration and not average for ServerHeartbeatSucceededEvent --- src/sdam/monitor.ts | 16 +++++++++++----- src/utils.ts | 6 ++++++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 3bf8fd8a8d3..2c778decc57 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -220,6 +220,10 @@ export class Monitor extends TypedEventEmitter { return this.rttSamplesMS.min(); } + get latestRTT(): number { + return this.rttSamplesMS.last ?? 0; // FIXME: Check if this is acceptable + } + addRttSample(rtt: number) { this.rttSamplesMS.addSample(rtt); } @@ -299,12 +303,10 @@ function checkServer(monitor: Monitor, callback: Callback) { hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND]; } - // FIXME: Figure out how to set this. Should duration be the instantaneous rtt or the averaged - // rtt? + // NOTE: here we use the latestRTT as this measurment corresponds with the value + // obtained for this successful heartbeat const duration = - isAwaitable && monitor.rttPinger - ? monitor.rttPinger.roundTripTime - : calculateDurationInMs(start); + isAwaitable && monitor.rttPinger ? monitor.rttPinger.latestRTT : calculateDurationInMs(start); monitor.emitAndLogHeartbeat( Server.SERVER_HEARTBEAT_SUCCEEDED, @@ -508,6 +510,10 @@ export class RTTPinger { return this.monitor.minRoundTripTime; } + get latestRTT(): number { + return this.monitor.latestRTT; + } + close(): void { this.closed = true; clearTimeout(this[kMonitorId]); diff --git a/src/utils.ts b/src/utils.ts index 13b4b8d1399..89c201d1d03 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1346,6 +1346,7 @@ export async function fileIsAccessible(fileName: string, mode?: number) { } export class MovingWindow { + /** Index of the next slot to be overwritten */ private writeIndex: number; private length: number; private samples: Float64Array; @@ -1385,6 +1386,11 @@ export class MovingWindow { return sum / this.length; } + get last(): number | null { + if (this.length === 0) return null; + return this.samples[this.writeIndex === 0 ? this.length - 1 : this.writeIndex - 1]; + } + clear() { this.length = 0; this.writeIndex = 0; From 0bcf2e87f1e822c6f95ff1de92abad4614a32d65 Mon Sep 17 00:00:00 2001 From: Warren James Date: Thu, 28 Mar 2024 22:55:29 -0400 Subject: [PATCH 03/23] add MovingWindow tests --- src/utils.ts | 4 +- test/unit/utils.test.ts | 145 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 2 deletions(-) diff --git a/src/utils.ts b/src/utils.ts index 89c201d1d03..91eb1201765 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1348,8 +1348,8 @@ export async function fileIsAccessible(fileName: string, mode?: number) { export class MovingWindow { /** Index of the next slot to be overwritten */ private writeIndex: number; - private length: number; - private samples: Float64Array; + length: number; + samples: Float64Array; constructor(windowSize = 10) { this.samples = new Float64Array(windowSize); diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index d36ddbeb3be..52278c0e6a0 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -14,6 +14,7 @@ import { MongoDBCollectionNamespace, MongoDBNamespace, MongoRuntimeError, + MovingWindow, ObjectId, shuffle, TimeoutController @@ -1045,4 +1046,148 @@ describe('driver utils', function () { }); }); }); + + describe('class MovingWindow', () => { + describe('constructor', () => { + it('Constructs a Float64 array of length windowSize', () => { + const window = new MovingWindow(10); + expect(window.samples).to.have.length(10); + }); + }); + + describe('addSample', () => { + context('when length < windowSize', () => { + it('increments the length', () => { + const window = new MovingWindow(10); + expect(window.length).to.equal(0); + + window.addSample(1); + + expect(window.length).to.equal(1); + }); + }); + context('when length === windowSize', () => { + let window: MovingWindow; + const size = 10; + + beforeEach(() => { + window = new MovingWindow(size); + for (let i = 1; i <= size; i++) { + window.addSample(i); + } + }); + + it('does not increment the length', () => { + window.addSample(size + 1); + expect(window.length).to.equal(size); + }); + + it('overwrites the oldest element', () => { + window.addSample(size + 1); + for (const el of window.samples) { + if (el === 1) expect.fail('Did not overwrite oldest element'); + } + }); + + it('appends the new element to the end of the window', () => { + window.addSample(size + 1); + expect(window.last).to.equal(size + 1); + }); + }); + }); + + describe('min()', () => { + context('when length < 2', () => { + it('returns 0', () => { + const window = new MovingWindow(10); + // length 0 + expect(window.min()).to.equal(0); + + window.addSample(1); + // length 1 + expect(window.min()).to.equal(0); + }); + }); + + context('when 2 <= length < windowSize', () => { + let window: MovingWindow; + beforeEach(() => { + window = new MovingWindow(10); + for (let i = 1; i <= 3; i++) { + window.addSample(i); + } + }); + + it('correctly computes the minimum', () => { + expect(window.min()).to.equal(1); + }); + }); + + context('when length == windowSize', () => { + let window: MovingWindow; + const size = 10; + + beforeEach(() => { + window = new MovingWindow(size); + for (let i = 1; i <= size * 2; i++) { + window.addSample(i); + } + }); + + it('correctly computes the minimum', () => { + expect(window.min()).to.equal(size + 1); + }); + }); + }); + + describe('average()', () => { + it('correctly computes the mean', () => { + const window = new MovingWindow(10); + let sum = 0; + + for (let i = 1; i <= 10; i++) { + sum += i; + window.addSample(i); + } + + expect(window.average()).to.equal(sum / 10); + }); + }); + + describe('last', () => { + context('when length == 0', () => { + it('returns null', () => { + const window = new MovingWindow(10); + expect(window.last).to.be.null; + }); + }); + + context('when length > 0', () => { + it('returns the most recently inserted element', () => { + const window = new MovingWindow(10); + for (let i = 0; i < 11; i++) { + window.addSample(i); + } + expect(window.last).to.equal(10); + }); + }); + }); + + describe('clear', () => { + let window: MovingWindow; + + beforeEach(() => { + window = new MovingWindow(10); + for (let i = 0; i < 20; i++) { + window.addSample(i); + } + expect(window.length).to.equal(10); + }); + + it('sets length to 0', () => { + window.clear(); + expect(window.length).to.equal(0); + }); + }); + }); }); From d46fc1529d2545c6f3988c5d5cbeaf9969e77820 Mon Sep 17 00:00:00 2001 From: Warren James Date: Thu, 28 Mar 2024 22:56:24 -0400 Subject: [PATCH 04/23] test cleanup --- test/unit/sdam/monitor.test.ts | 78 +++++++++++++++++----------------- 1 file changed, 40 insertions(+), 38 deletions(-) diff --git a/test/unit/sdam/monitor.test.ts b/test/unit/sdam/monitor.test.ts index 3c6e02a1342..2f3acd04f69 100644 --- a/test/unit/sdam/monitor.test.ts +++ b/test/unit/sdam/monitor.test.ts @@ -1,11 +1,13 @@ +import { once } from 'node:events'; import * as net from 'node:net'; import { expect } from 'chai'; import { coerce } from 'semver'; import * as sinon from 'sinon'; import { setTimeout } from 'timers'; +import { setTimeout as setTimeoutPromise } from 'timers/promises'; -import { MongoClient } from '../../mongodb'; +import { MongoClient, ServerHeartbeatSucceededEvent } from '../../mongodb'; import { isHello, LEGACY_HELLO_COMMAND, @@ -141,7 +143,7 @@ describe('monitoring', function () { }).skipReason = 'TODO(NODE-3600): Unskip flaky tests'; describe('Monitor', function () { - let monitor; + let monitor: Monitor | null; beforeEach(() => { monitor = null; @@ -153,7 +155,7 @@ describe('monitoring', function () { } }); - it('should connect and issue an initial server check', function (done) { + it('should connect and issue an initial server check', async function () { mockServer.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { @@ -164,15 +166,17 @@ describe('monitoring', function () { const server = new MockServer(mockServer.address()); monitor = new Monitor(server as any, {} as any); - monitor.on('serverHeartbeatFailed', () => done(new Error('unexpected heartbeat failure'))); - monitor.on('serverHeartbeatSucceeded', () => { - expect(monitor.connection).to.have.property('id', ''); - done(); - }); + const heartbeatFailed = once(monitor, 'serverHeartbeatFailed'); + const heartbeatSucceeded = once(monitor, 'serverHeartbeatSucceeded'); monitor.connect(); + + const res = await Promise.race([heartbeatFailed, heartbeatSucceeded]); + + expect(res[0]).to.be.instanceOf(ServerHeartbeatSucceededEvent); + expect(monitor.connection).to.have.property('id', ''); }); - it('should ignore attempts to connect when not already closed', function (done) { + it('should ignore attempts to connect when not already closed', async function () { mockServer.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { @@ -183,13 +187,17 @@ describe('monitoring', function () { const server = new MockServer(mockServer.address()); monitor = new Monitor(server as any, {} as any); - monitor.on('serverHeartbeatFailed', () => done(new Error('unexpected heartbeat failure'))); - monitor.on('serverHeartbeatSucceeded', () => done()); + const heartbeatFailed = once(monitor, 'serverHeartbeatFailed'); + const heartbeatSucceeded = once(monitor, 'serverHeartbeatSucceeded'); monitor.connect(); + + const res = await Promise.race([heartbeatFailed, heartbeatSucceeded]); + + expect(res[0]).to.be.instanceOf(ServerHeartbeatSucceededEvent); monitor.connect(); }); - it('should not initiate another check if one is in progress', function (done) { + it('should not initiate another check if one is in progress', async function () { mockServer.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { @@ -202,32 +210,27 @@ describe('monitoring', function () { const startedEvents: ServerHeartbeatStartedEvent[] = []; monitor.on('serverHeartbeatStarted', event => startedEvents.push(event)); - monitor.on('close', () => { - expect(startedEvents).to.have.length(2); - done(); - }); + const monitorClose = once(monitor, 'close'); monitor.connect(); - monitor.once('serverHeartbeatSucceeded', () => { - monitor.requestCheck(); - monitor.requestCheck(); - monitor.requestCheck(); - monitor.requestCheck(); - monitor.requestCheck(); + await once(monitor, 'serverHeartbeatSucceeded'); + monitor.requestCheck(); + monitor.requestCheck(); + monitor.requestCheck(); + monitor.requestCheck(); + monitor.requestCheck(); - const minHeartbeatFrequencyMS = 500; - setTimeout(() => { - // wait for minHeartbeatFrequencyMS, then request a check and verify another check occurred - monitor.once('serverHeartbeatSucceeded', () => { - monitor.close(); - }); + const minHeartbeatFrequencyMS = 500; + await setTimeoutPromise(minHeartbeatFrequencyMS); - monitor.requestCheck(); - }, minHeartbeatFrequencyMS); - }); + await once(monitor, 'serverHeartbeatSucceeded'); + monitor.close(); + + await monitorClose; + expect(startedEvents).to.have.length(2); }); - it('should not close the monitor on a failed heartbeat', function (done) { + it('should not close the monitor on a failed heartbeat', async function () { let helloCount = 0; mockServer.setMessageHandler(request => { const doc = request.document; @@ -263,16 +266,13 @@ describe('monitoring', function () { let successCount = 0; monitor.on('serverHeartbeatSucceeded', () => { if (successCount++ === 2) { - monitor.close(); + monitor?.close(); } }); - monitor.on('close', () => { - expect(events).to.have.length(2); - done(); - }); - monitor.connect(); + await once(monitor, 'close'); + expect(events).to.have.length(2); }); it('should upgrade to hello from legacy hello when initial handshake contains helloOk', function (done) { @@ -306,6 +306,8 @@ describe('monitoring', function () { }, minHeartbeatFrequencyMS); }); }); + + describe(''); }); describe('class MonitorInterval', function () { From 0ce246fc608aee2c659d7dd01ea0377441523101 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 29 Mar 2024 15:46:38 -0400 Subject: [PATCH 05/23] update monitor --- src/sdam/monitor.ts | 239 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 209 insertions(+), 30 deletions(-) diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 2c778decc57..b90d10b5163 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -105,7 +105,8 @@ export class Monitor extends TypedEventEmitter { rttPinger?: RTTPinger; /** @internal */ override component = MongoLoggableComponent.TOPOLOGY; - private rttSamplesMS: MovingWindow; + /** @internal */ + rttSamplesMS: MovingWindow; constructor(server: Server, options: MonitorOptions) { super(); @@ -244,6 +245,8 @@ function resetMonitorState(monitor: Monitor) { monitor.connection?.destroy(); monitor.connection = null; + + monitor.clearRttSamples(); } function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean { @@ -277,7 +280,6 @@ function checkServer(monitor: Monitor, callback: Callback) { function onHeartbeatFailed(err: Error) { monitor.connection?.destroy(); monitor.connection = null; - monitor.emitAndLogHeartbeat( Server.SERVER_HEARTBEAT_FAILED, monitor[kServer].topology.s.id, @@ -306,7 +308,11 @@ function checkServer(monitor: Monitor, callback: Callback) { // NOTE: here we use the latestRTT as this measurment corresponds with the value // obtained for this successful heartbeat const duration = - isAwaitable && monitor.rttPinger ? monitor.rttPinger.latestRTT : calculateDurationInMs(start); + isAwaitable && monitor.rttPinger + ? monitor.rttPinger.latestRTT ?? calculateDurationInMs(start) + : calculateDurationInMs(start); + + monitor.addRttSample(duration); monitor.emitAndLogHeartbeat( Server.SERVER_HEARTBEAT_SUCCEEDED, @@ -406,6 +412,8 @@ function checkServer(monitor: Monitor, callback: Callback) { connection.destroy(); return; } + const duration = calculateDurationInMs(start); + monitor.addRttSample(duration); monitor.connection = connection; monitor.emitAndLogHeartbeat( @@ -414,7 +422,7 @@ function checkServer(monitor: Monitor, callback: Callback) { connection.hello?.connectionId, new ServerHeartbeatSucceededEvent( monitor.address, - calculateDurationInMs(start), + duration, connection.hello, useStreamingProtocol(monitor, connection.hello?.topologyVersion) ) @@ -430,6 +438,173 @@ function checkServer(monitor: Monitor, callback: Callback) { ); } +async function _checkServer(monitor: Monitor): Promise { + let start: number; + let awaited: boolean; + const topologyVersion = monitor[kServer].description.topologyVersion; + const isAwaitable = useStreamingProtocol(monitor, topologyVersion); + monitor.emitAndLogHeartbeat( + Server.SERVER_HEARTBEAT_STARTED, + monitor[kServer].topology.s.id, + undefined, + new ServerHeartbeatStartedEvent(monitor.address, isAwaitable) + ); + + function onHeartbeatFailed(err: Error) { + monitor.connection?.destroy(); + monitor.connection = null; + monitor.emitAndLogHeartbeat( + Server.SERVER_HEARTBEAT_FAILED, + monitor[kServer].topology.s.id, + undefined, + new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err, awaited) + ); + + const error = !(err instanceof MongoError) + ? new MongoError(MongoError.buildErrorMessage(err), { cause: err }) + : err; + error.addErrorLabel(MongoErrorLabel.ResetPool); + if (error instanceof MongoNetworkTimeoutError) { + error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections); + } + + monitor.emit('resetServer', error); + } + + function onHeartbeatSucceeded(hello: Document) { + if (!('isWritablePrimary' in hello)) { + // Provide hello-style response document. + hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND]; + } + + // NOTE: here we use the latestRTT as this measurment corresponds with the value + // obtained for this successful heartbeat + const duration = + isAwaitable && monitor.rttPinger + ? monitor.rttPinger.latestRTT ?? calculateDurationInMs(start) + : calculateDurationInMs(start); + + monitor.addRttSample(duration); + + monitor.emitAndLogHeartbeat( + Server.SERVER_HEARTBEAT_SUCCEEDED, + monitor[kServer].topology.s.id, + hello.connectionId, + new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable) + ); + + if (isAwaitable) { + // If we are using the streaming protocol then we immediately issue another 'started' + // event, otherwise the "check" is complete and return to the main monitor loop + monitor.emitAndLogHeartbeat( + Server.SERVER_HEARTBEAT_STARTED, + monitor[kServer].topology.s.id, + undefined, + new ServerHeartbeatStartedEvent(monitor.address, true) + ); + // We have not actually sent an outgoing handshake, but when we get the next response we + // want the duration to reflect the time since we last heard from the server + start = now(); + } else { + monitor.rttPinger?.close(); + monitor.rttPinger = undefined; + } + } + + const { connection } = monitor; + if (connection && !connection.closed) { + const { serverApi, helloOk } = connection; + const connectTimeoutMS = monitor.options.connectTimeoutMS; + const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS; + + const cmd = { + [serverApi?.version || helloOk ? 'hello' : LEGACY_HELLO_COMMAND]: 1, + ...(isAwaitable && topologyVersion + ? { maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) } + : {}) + }; + + const options = isAwaitable + ? { + socketTimeoutMS: connectTimeoutMS ? connectTimeoutMS + maxAwaitTimeMS : 0, + exhaustAllowed: true + } + : { socketTimeoutMS: connectTimeoutMS }; + + if (isAwaitable && monitor.rttPinger == null) { + monitor.rttPinger = new RTTPinger( + monitor, + monitor[kCancellationToken], + Object.assign( + { heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS }, + monitor.connectOptions + ) + ); + } + + // Record new start time before sending handshake + start = now(); + + if (isAwaitable) { + awaited = true; + try { + const hello = await connection.command(ns('admin.$cmd'), cmd, options); + onHeartbeatSucceeded(hello); + return hello; + } catch (error) { + onHeartbeatFailed(error); + return null; + } + } + + awaited = false; + try { + const hello = await connection.command(ns('admin.$cmd'), cmd, options); + onHeartbeatSucceeded(hello); + return hello; + } catch (error) { + onHeartbeatFailed(error); + return null; + } + } else { + const socket = await makeSocket(monitor.connectOptions); + const connection = makeConnection(monitor.connectOptions, socket); + // The start time is after socket creation but before the handshake + start = now(); + try { + await performInitialHandshake(connection, monitor.connectOptions); + const duration = calculateDurationInMs(start); + if (isInCloseState(monitor)) { + connection.destroy(); + return null; + } + + monitor.connection = connection; + monitor.addRttSample(duration); + + monitor.emitAndLogHeartbeat( + Server.SERVER_HEARTBEAT_SUCCEEDED, + monitor[kServer].topology.s.id, + connection.hello?.connectionId, + new ServerHeartbeatSucceededEvent( + monitor.address, + duration, + connection.hello, + useStreamingProtocol(monitor, connection.hello?.topologyVersion) + ) + ); + + return connection.hello; + } catch (error) { + connection.destroy(); + monitor.connection = null; + awaited = false; + onHeartbeatFailed(error); + return null; + } + } +} + function monitorServer(monitor: Monitor) { return (callback: Callback) => { if (monitor.s.state === STATE_MONITORING) { @@ -491,12 +666,15 @@ export class RTTPinger { /** @internal */ monitor: Monitor; closed: boolean; + /** @internal */ + latestRTT?: number; constructor(monitor: Monitor, cancellationToken: CancellationToken, options: RTTPingerOptions) { this.connection = undefined; this[kCancellationToken] = cancellationToken; this.closed = false; this.monitor = monitor; + this.latestRTT = 0; const heartbeatFrequencyMS = options.heartbeatFrequencyMS; this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS); @@ -510,10 +688,6 @@ export class RTTPinger { return this.monitor.minRoundTripTime; } - get latestRTT(): number { - return this.monitor.latestRTT; - } - close(): void { this.closed = true; clearTimeout(this[kMonitorId]); @@ -523,30 +697,37 @@ export class RTTPinger { } } -function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { - const start = now(); - options.cancellationToken = rttPinger[kCancellationToken]; - const heartbeatFrequencyMS = options.heartbeatFrequencyMS; - +function measureAndReschedule( + rttPinger: RTTPinger, + options: RTTPingerOptions, + start?: number, + conn?: Connection +) { + if (start == null) { + start = now(); + } if (rttPinger.closed) { + conn?.destroy(); return; } - function measureAndReschedule(conn?: Connection) { - if (rttPinger.closed) { - conn?.destroy(); - return; - } + if (rttPinger.connection == null) { + rttPinger.connection = conn; + } - if (rttPinger.connection == null) { - rttPinger.connection = conn; - } + rttPinger.latestRTT = calculateDurationInMs(start); + rttPinger[kMonitorId] = setTimeout( + () => measureRoundTripTime(rttPinger, options), + options.heartbeatFrequencyMS + ); +} - rttPinger.monitor.addRttSample(calculateDurationInMs(start)); - rttPinger[kMonitorId] = setTimeout( - () => measureRoundTripTime(rttPinger, options), - heartbeatFrequencyMS - ); +function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { + const start = now(); + options.cancellationToken = rttPinger[kCancellationToken]; + + if (rttPinger.closed) { + return; } const connection = rttPinger.connection; @@ -554,11 +735,10 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { // eslint-disable-next-line github/no-then connect(options).then( connection => { - measureAndReschedule(connection); + measureAndReschedule(rttPinger, options, start, connection); }, () => { rttPinger.connection = undefined; - rttPinger.monitor.clearRttSamples(); } ); return; @@ -568,11 +748,10 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND; // eslint-disable-next-line github/no-then connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then( - () => measureAndReschedule(), + () => measureAndReschedule(rttPinger, options), () => { rttPinger.connection?.destroy(); rttPinger.connection = undefined; - rttPinger.monitor.clearRttSamples(); return; } ); From 63d7b5227c0212246ca568aad9e81e9dba6222ca Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 29 Mar 2024 15:47:18 -0400 Subject: [PATCH 06/23] update tests --- test/unit/sdam/monitor.test.ts | 50 ++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/test/unit/sdam/monitor.test.ts b/test/unit/sdam/monitor.test.ts index 2f3acd04f69..6c6a5d6f49b 100644 --- a/test/unit/sdam/monitor.test.ts +++ b/test/unit/sdam/monitor.test.ts @@ -7,7 +7,7 @@ import * as sinon from 'sinon'; import { setTimeout } from 'timers'; import { setTimeout as setTimeoutPromise } from 'timers/promises'; -import { MongoClient, ServerHeartbeatSucceededEvent } from '../../mongodb'; +import { Long, MongoClient, ObjectId, ServerHeartbeatSucceededEvent } from '../../mongodb'; import { isHello, LEGACY_HELLO_COMMAND, @@ -53,10 +53,6 @@ describe('monitoring', function () { const { major } = coerce(process.version); const failingTests = [ - 'should connect and issue an initial server check', - 'should ignore attempts to connect when not already closed', - 'should not initiate another check if one is in progress', - 'should not close the monitor on a failed heartbeat', 'should upgrade to hello from legacy hello when initial handshake contains helloOk' ]; test.skipReason = @@ -307,7 +303,49 @@ describe('monitoring', function () { }); }); - describe(''); + describe('roundTripTime', function () { + const table = [ + { + serverMonitoringMode: 'stream', + topologyVersion: { + processId: new ObjectId(), + counter: new Long(0, 0) + } + }, + { serverMonitoringMode: 'poll', topologyVersion: undefined } + ]; + for (const { serverMonitoringMode, topologyVersion } of table) { + context(`when serverMonitoringMode = ${serverMonitoringMode}`, () => { + context('when more than one heartbeatSucceededEvent has been captured', () => { + const heartbeatDurationMS = 250; + it('correctly returns the mean of the heartbeat durations', async () => { + mockServer.setMessageHandler(request => { + setTimeout( + () => request.reply(Object.assign({ helloOk: true }, mock.HELLO)), + heartbeatDurationMS + ); + }); + const server = new MockServer(mockServer.address()); + if (topologyVersion) server.description.topologyVersion = topologyVersion; + monitor = new Monitor(server as any, { serverMonitoringMode } as any); + monitor.connect(); + + for (let i = 0; i < 5; i++) { + await once(monitor, 'serverHeartbeatSucceeded'); + monitor.requestCheck(); + console.log(i); + } + monitor.close(); + + console.log(monitor.rttSamplesMS.samples); + expect(monitor.roundTripTime).to.be.greaterThanOrEqual(heartbeatDurationMS); + }); + }); + }); + } + }); + + //describe('minRoundTripTime'); }); describe('class MonitorInterval', function () { From 2fcc44fb1c51979003ec636d31e8d3c75d65d5e1 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 29 Mar 2024 16:01:38 -0400 Subject: [PATCH 07/23] skip tests on node 18 and 20 --- test/unit/sdam/monitor.test.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/test/unit/sdam/monitor.test.ts b/test/unit/sdam/monitor.test.ts index 6c6a5d6f49b..2a6af3cc199 100644 --- a/test/unit/sdam/monitor.test.ts +++ b/test/unit/sdam/monitor.test.ts @@ -53,7 +53,12 @@ describe('monitoring', function () { const { major } = coerce(process.version); const failingTests = [ - 'should upgrade to hello from legacy hello when initial handshake contains helloOk' + 'should connect and issue an initial server check', + 'should ignore attempts to connect when not already closed', + 'should not initiate another check if one is in progress', + 'should not close the monitor on a failed heartbeat', + 'should upgrade to hello from legacy hello when initial handshake contains helloOk', + 'correctly returns the mean of the heartbeat durations' ]; test.skipReason = (major === 18 || major === 20) && failingTests.includes(test.title) From 40bb3fe0c5647467b95c914c40be7dfcfdfd39dc Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 29 Mar 2024 16:38:36 -0400 Subject: [PATCH 08/23] use monitor roundtrip time for server descriptions --- src/sdam/monitor.ts | 167 --------------------------------- src/sdam/server.ts | 12 +-- src/sdam/server_description.ts | 6 +- 3 files changed, 7 insertions(+), 178 deletions(-) diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index b90d10b5163..322ad6b29a8 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -438,173 +438,6 @@ function checkServer(monitor: Monitor, callback: Callback) { ); } -async function _checkServer(monitor: Monitor): Promise { - let start: number; - let awaited: boolean; - const topologyVersion = monitor[kServer].description.topologyVersion; - const isAwaitable = useStreamingProtocol(monitor, topologyVersion); - monitor.emitAndLogHeartbeat( - Server.SERVER_HEARTBEAT_STARTED, - monitor[kServer].topology.s.id, - undefined, - new ServerHeartbeatStartedEvent(monitor.address, isAwaitable) - ); - - function onHeartbeatFailed(err: Error) { - monitor.connection?.destroy(); - monitor.connection = null; - monitor.emitAndLogHeartbeat( - Server.SERVER_HEARTBEAT_FAILED, - monitor[kServer].topology.s.id, - undefined, - new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err, awaited) - ); - - const error = !(err instanceof MongoError) - ? new MongoError(MongoError.buildErrorMessage(err), { cause: err }) - : err; - error.addErrorLabel(MongoErrorLabel.ResetPool); - if (error instanceof MongoNetworkTimeoutError) { - error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections); - } - - monitor.emit('resetServer', error); - } - - function onHeartbeatSucceeded(hello: Document) { - if (!('isWritablePrimary' in hello)) { - // Provide hello-style response document. - hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND]; - } - - // NOTE: here we use the latestRTT as this measurment corresponds with the value - // obtained for this successful heartbeat - const duration = - isAwaitable && monitor.rttPinger - ? monitor.rttPinger.latestRTT ?? calculateDurationInMs(start) - : calculateDurationInMs(start); - - monitor.addRttSample(duration); - - monitor.emitAndLogHeartbeat( - Server.SERVER_HEARTBEAT_SUCCEEDED, - monitor[kServer].topology.s.id, - hello.connectionId, - new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable) - ); - - if (isAwaitable) { - // If we are using the streaming protocol then we immediately issue another 'started' - // event, otherwise the "check" is complete and return to the main monitor loop - monitor.emitAndLogHeartbeat( - Server.SERVER_HEARTBEAT_STARTED, - monitor[kServer].topology.s.id, - undefined, - new ServerHeartbeatStartedEvent(monitor.address, true) - ); - // We have not actually sent an outgoing handshake, but when we get the next response we - // want the duration to reflect the time since we last heard from the server - start = now(); - } else { - monitor.rttPinger?.close(); - monitor.rttPinger = undefined; - } - } - - const { connection } = monitor; - if (connection && !connection.closed) { - const { serverApi, helloOk } = connection; - const connectTimeoutMS = monitor.options.connectTimeoutMS; - const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS; - - const cmd = { - [serverApi?.version || helloOk ? 'hello' : LEGACY_HELLO_COMMAND]: 1, - ...(isAwaitable && topologyVersion - ? { maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) } - : {}) - }; - - const options = isAwaitable - ? { - socketTimeoutMS: connectTimeoutMS ? connectTimeoutMS + maxAwaitTimeMS : 0, - exhaustAllowed: true - } - : { socketTimeoutMS: connectTimeoutMS }; - - if (isAwaitable && monitor.rttPinger == null) { - monitor.rttPinger = new RTTPinger( - monitor, - monitor[kCancellationToken], - Object.assign( - { heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS }, - monitor.connectOptions - ) - ); - } - - // Record new start time before sending handshake - start = now(); - - if (isAwaitable) { - awaited = true; - try { - const hello = await connection.command(ns('admin.$cmd'), cmd, options); - onHeartbeatSucceeded(hello); - return hello; - } catch (error) { - onHeartbeatFailed(error); - return null; - } - } - - awaited = false; - try { - const hello = await connection.command(ns('admin.$cmd'), cmd, options); - onHeartbeatSucceeded(hello); - return hello; - } catch (error) { - onHeartbeatFailed(error); - return null; - } - } else { - const socket = await makeSocket(monitor.connectOptions); - const connection = makeConnection(monitor.connectOptions, socket); - // The start time is after socket creation but before the handshake - start = now(); - try { - await performInitialHandshake(connection, monitor.connectOptions); - const duration = calculateDurationInMs(start); - if (isInCloseState(monitor)) { - connection.destroy(); - return null; - } - - monitor.connection = connection; - monitor.addRttSample(duration); - - monitor.emitAndLogHeartbeat( - Server.SERVER_HEARTBEAT_SUCCEEDED, - monitor[kServer].topology.s.id, - connection.hello?.connectionId, - new ServerHeartbeatSucceededEvent( - monitor.address, - duration, - connection.hello, - useStreamingProtocol(monitor, connection.hello?.topologyVersion) - ) - ); - - return connection.hello; - } catch (error) { - connection.destroy(); - monitor.connection = null; - awaited = false; - onHeartbeatFailed(error); - return null; - } - } -} - function monitorServer(monitor: Monitor) { return (callback: Callback) => { if (monitor.s.state === STATE_MONITORING) { diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 89f154eaac9..6dbc31df7d2 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -175,7 +175,8 @@ export class Server extends TypedEventEmitter { this.emit( Server.DESCRIPTION_RECEIVED, new ServerDescription(this.description.hostAddress, event.reply, { - roundTripTime: calculateRoundTripTime(this.description.roundTripTime, event.duration) + roundTripTime: this.monitor?.roundTripTime, + minRoundTripTime: this.monitor?.minRoundTripTime }) ); @@ -467,15 +468,6 @@ export class Server extends TypedEventEmitter { } } -function calculateRoundTripTime(oldRtt: number, duration: number): number { - if (oldRtt === -1) { - return duration; - } - - const alpha = 0.2; - return alpha * duration + (1 - alpha) * oldRtt; -} - function markServerUnknown(server: Server, error?: MongoError) { // Load balancer servers can never be marked unknown. if (server.loadBalanced) { diff --git a/src/sdam/server_description.ts b/src/sdam/server_description.ts index ec9b1939dbf..d381bea2090 100644 --- a/src/sdam/server_description.ts +++ b/src/sdam/server_description.ts @@ -33,8 +33,10 @@ export interface ServerDescriptionOptions { /** An Error used for better reporting debugging */ error?: MongoError; - /** The round trip time to ping this server (in ms) */ + /** The average round trip time to ping this server (in ms) */ roundTripTime?: number; + /** The minimum round trip time to ping this server over the past 10 samples(in ms) */ + minRoundTripTime?: number; /** If the client is in load balancing mode. */ loadBalanced?: boolean; @@ -58,6 +60,7 @@ export class ServerDescription { minWireVersion: number; maxWireVersion: number; roundTripTime: number; + minRoundTripTime: number; lastUpdateTime: number; lastWriteDate: number; me: string | null; @@ -98,6 +101,7 @@ export class ServerDescription { this.minWireVersion = hello?.minWireVersion ?? 0; this.maxWireVersion = hello?.maxWireVersion ?? 0; this.roundTripTime = options?.roundTripTime ?? -1; + this.minRoundTripTime = options?.minRoundTripTime ?? 0; this.lastUpdateTime = now(); this.lastWriteDate = hello?.lastWrite?.lastWriteDate ?? 0; this.error = options.error ?? null; From db7fad558543b0f7ec73a6e5faf2170856861e52 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 29 Mar 2024 17:18:49 -0400 Subject: [PATCH 09/23] fix lint --- src/index.ts | 1 + src/utils.ts | 1 + 2 files changed, 2 insertions(+) diff --git a/src/index.ts b/src/index.ts index 9cd58ec0ac0..578509ee8f0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -552,6 +552,7 @@ export type { List, MongoDBCollectionNamespace, MongoDBNamespace, + MovingWindow, TimeoutController } from './utils'; export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern'; diff --git a/src/utils.ts b/src/utils.ts index 91eb1201765..8aa7433f736 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1345,6 +1345,7 @@ export async function fileIsAccessible(fileName: string, mode?: number) { } } +/** @internal */ export class MovingWindow { /** Index of the next slot to be overwritten */ private writeIndex: number; From bf1917fa8ec1f7493b9fe601d055b7613150a46f Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 29 Mar 2024 17:37:51 -0400 Subject: [PATCH 10/23] ensure rttpinger always has a most recent value --- src/sdam/monitor.ts | 2 +- test/unit/sdam/monitor.test.ts | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 322ad6b29a8..9ad1c2df929 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -507,7 +507,7 @@ export class RTTPinger { this[kCancellationToken] = cancellationToken; this.closed = false; this.monitor = monitor; - this.latestRTT = 0; + this.latestRTT = monitor.latestRTT; const heartbeatFrequencyMS = options.heartbeatFrequencyMS; this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS); diff --git a/test/unit/sdam/monitor.test.ts b/test/unit/sdam/monitor.test.ts index 2a6af3cc199..4502081c436 100644 --- a/test/unit/sdam/monitor.test.ts +++ b/test/unit/sdam/monitor.test.ts @@ -338,12 +338,9 @@ describe('monitoring', function () { for (let i = 0; i < 5; i++) { await once(monitor, 'serverHeartbeatSucceeded'); monitor.requestCheck(); - console.log(i); } - monitor.close(); - - console.log(monitor.rttSamplesMS.samples); expect(monitor.roundTripTime).to.be.greaterThanOrEqual(heartbeatDurationMS); + monitor.close(); }); }); }); From e4a3fc7572cb33e4d8dd3446dce475f8aaa8c919 Mon Sep 17 00:00:00 2001 From: Warren James Date: Mon, 1 Apr 2024 15:29:45 -0400 Subject: [PATCH 11/23] change field name --- src/cmap/connection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 7790e486a81..67d24a4e7a5 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -127,7 +127,7 @@ export interface ConnectionOptions /** @internal */ mongoLogger?: MongoLogger | undefined; /** @internal */ - parent?: Monitor; + monitor?: Monitor; } /** @public */ From 3d8bc4185128dae1509a5969c9c5df1c5716a152 Mon Sep 17 00:00:00 2001 From: Warren James Date: Mon, 1 Apr 2024 15:45:02 -0400 Subject: [PATCH 12/23] variable naming --- src/sdam/monitor.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 9ad1c2df929..e047d0fbf9e 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -221,7 +221,7 @@ export class Monitor extends TypedEventEmitter { return this.rttSamplesMS.min(); } - get latestRTT(): number { + get latestRtt(): number { return this.rttSamplesMS.last ?? 0; // FIXME: Check if this is acceptable } @@ -507,7 +507,7 @@ export class RTTPinger { this[kCancellationToken] = cancellationToken; this.closed = false; this.monitor = monitor; - this.latestRTT = monitor.latestRTT; + this.latestRTT = monitor.latestRtt; const heartbeatFrequencyMS = options.heartbeatFrequencyMS; this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS); From 02e4b48e13fbec4fbfd7dc41d541918f75233ebb Mon Sep 17 00:00:00 2001 From: Warren James Date: Mon, 1 Apr 2024 15:51:30 -0400 Subject: [PATCH 13/23] variable naming --- src/sdam/monitor.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index e047d0fbf9e..e31a70bc6d5 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -305,11 +305,11 @@ function checkServer(monitor: Monitor, callback: Callback) { hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND]; } - // NOTE: here we use the latestRTT as this measurment corresponds with the value + // NOTE: here we use the latestRtt as this measurment corresponds with the value // obtained for this successful heartbeat const duration = isAwaitable && monitor.rttPinger - ? monitor.rttPinger.latestRTT ?? calculateDurationInMs(start) + ? monitor.rttPinger.latestRtt ?? calculateDurationInMs(start) : calculateDurationInMs(start); monitor.addRttSample(duration); @@ -500,14 +500,14 @@ export class RTTPinger { monitor: Monitor; closed: boolean; /** @internal */ - latestRTT?: number; + latestRtt?: number; constructor(monitor: Monitor, cancellationToken: CancellationToken, options: RTTPingerOptions) { this.connection = undefined; this[kCancellationToken] = cancellationToken; this.closed = false; this.monitor = monitor; - this.latestRTT = monitor.latestRtt; + this.latestRtt = monitor.latestRtt; const heartbeatFrequencyMS = options.heartbeatFrequencyMS; this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS); @@ -548,7 +548,7 @@ function measureAndReschedule( rttPinger.connection = conn; } - rttPinger.latestRTT = calculateDurationInMs(start); + rttPinger.latestRtt = calculateDurationInMs(start); rttPinger[kMonitorId] = setTimeout( () => measureRoundTripTime(rttPinger, options), options.heartbeatFrequencyMS From a6e55b5fb4990274808683bdfdbd5cd2ce91d9f8 Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 2 Apr 2024 14:00:31 -0400 Subject: [PATCH 14/23] refactor measureAndReschedule as methods --- src/sdam/monitor.ts | 93 +++++++++++++++++++++------------------------ 1 file changed, 44 insertions(+), 49 deletions(-) diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index e31a70bc6d5..d8c5dc6944d 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -510,7 +510,7 @@ export class RTTPinger { this.latestRtt = monitor.latestRtt; const heartbeatFrequencyMS = options.heartbeatFrequencyMS; - this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS); + this[kMonitorId] = setTimeout(() => this.measureRoundTripTime(options), heartbeatFrequencyMS); } get roundTripTime(): number { @@ -528,66 +528,61 @@ export class RTTPinger { this.connection?.destroy(); this.connection = undefined; } -} -function measureAndReschedule( - rttPinger: RTTPinger, - options: RTTPingerOptions, - start?: number, - conn?: Connection -) { - if (start == null) { - start = now(); - } - if (rttPinger.closed) { - conn?.destroy(); - return; - } + private measureAndReschedule(options: RTTPingerOptions, start?: number, conn?: Connection) { + if (start == null) { + start = now(); + } + if (this.closed) { + conn?.destroy(); + return; + } + + if (this.connection == null) { + this.connection = conn; + } - if (rttPinger.connection == null) { - rttPinger.connection = conn; + this.latestRtt = calculateDurationInMs(start); + this[kMonitorId] = setTimeout( + () => this.measureRoundTripTime(options), + options.heartbeatFrequencyMS + ); } - rttPinger.latestRtt = calculateDurationInMs(start); - rttPinger[kMonitorId] = setTimeout( - () => measureRoundTripTime(rttPinger, options), - options.heartbeatFrequencyMS - ); -} + private measureRoundTripTime(options: RTTPingerOptions) { + const start = now(); + options.cancellationToken = this[kCancellationToken]; -function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { - const start = now(); - options.cancellationToken = rttPinger[kCancellationToken]; + if (this.closed) { + return; + } - if (rttPinger.closed) { - return; - } + const connection = this.connection; + if (connection == null) { + // eslint-disable-next-line github/no-then + connect(options).then( + connection => { + this.measureAndReschedule(options, start, connection); + }, + () => { + this.connection = undefined; + } + ); + return; + } - const connection = rttPinger.connection; - if (connection == null) { + const commandName = + connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND; // eslint-disable-next-line github/no-then - connect(options).then( - connection => { - measureAndReschedule(rttPinger, options, start, connection); - }, + connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then( + () => this.measureAndReschedule(options), () => { - rttPinger.connection = undefined; + this.connection?.destroy(); + this.connection = undefined; + return; } ); - return; } - - const commandName = - connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND; - // eslint-disable-next-line github/no-then - connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then( - () => measureAndReschedule(rttPinger, options), - () => { - rttPinger.connection?.destroy(); - rttPinger.connection = undefined; - return; - } - ); } /** From 29164db0591276527a9ba29c074039992efdf2ba Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 2 Apr 2024 14:36:37 -0400 Subject: [PATCH 15/23] Update monitor rtt tests --- test/unit/sdam/monitor.test.ts | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/test/unit/sdam/monitor.test.ts b/test/unit/sdam/monitor.test.ts index 4502081c436..33f90c471e6 100644 --- a/test/unit/sdam/monitor.test.ts +++ b/test/unit/sdam/monitor.test.ts @@ -322,13 +322,14 @@ describe('monitoring', function () { for (const { serverMonitoringMode, topologyVersion } of table) { context(`when serverMonitoringMode = ${serverMonitoringMode}`, () => { context('when more than one heartbeatSucceededEvent has been captured', () => { - const heartbeatDurationMS = 250; + let heartbeatDurationMS = 100; it('correctly returns the mean of the heartbeat durations', async () => { mockServer.setMessageHandler(request => { setTimeout( () => request.reply(Object.assign({ helloOk: true }, mock.HELLO)), heartbeatDurationMS ); + heartbeatDurationMS += 100; }); const server = new MockServer(mockServer.address()); if (topologyVersion) server.description.topologyVersion = topologyVersion; @@ -339,15 +340,20 @@ describe('monitoring', function () { await once(monitor, 'serverHeartbeatSucceeded'); monitor.requestCheck(); } - expect(monitor.roundTripTime).to.be.greaterThanOrEqual(heartbeatDurationMS); + + const avgRtt = monitor.roundTripTime; + const expectedRtt = 300; // (100 + 200 + 300 + 400 + 500)/5 = 300 + // avgRtt will strictly be greater than expectedRtt since setTimeout sets a minimum + // delay from the time of scheduling to the time of callback execution + expect(avgRtt - expectedRtt).greaterThanOrEqual(0); + expect(avgRtt - expectedRtt).lessThan(50); + monitor.close(); }); }); }); } }); - - //describe('minRoundTripTime'); }); describe('class MonitorInterval', function () { From 2b113db2683ff8bf6ea5afe8970b14d51d4a1698 Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 2 Apr 2024 14:40:38 -0400 Subject: [PATCH 16/23] rename utility class --- src/index.ts | 2 +- src/sdam/monitor.ts | 52 ++++++++++++--------------- src/utils.ts | 78 ++++++++++++++++++++++++++++++----------- test/unit/utils.test.ts | 32 ++++++++--------- 4 files changed, 96 insertions(+), 68 deletions(-) diff --git a/src/index.ts b/src/index.ts index 578509ee8f0..c69184d7fb7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -552,7 +552,7 @@ export type { List, MongoDBCollectionNamespace, MongoDBNamespace, - MovingWindow, + RTTSampler as MovingWindow, TimeoutController } from './utils'; export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern'; diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index d8c5dc6944d..0d992c3ca9f 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -13,9 +13,9 @@ import { type Callback, type EventEmitterWithState, makeStateMachine, - MovingWindow, now, - ns + ns, + RTTSampler } from '../utils'; import { ServerType, STATE_CLOSED, STATE_CLOSING } from './common'; import { @@ -106,7 +106,7 @@ export class Monitor extends TypedEventEmitter { /** @internal */ override component = MongoLoggableComponent.TOPOLOGY; /** @internal */ - rttSamplesMS: MovingWindow; + private rttSampler: RTTSampler; constructor(server: Server, options: MonitorOptions) { super(); @@ -128,7 +128,7 @@ export class Monitor extends TypedEventEmitter { }); this.isRunningInFaasEnv = getFAASEnv() != null; this.mongoLogger = this[kServer].topology.client?.mongoLogger; - this.rttSamplesMS = new MovingWindow(10); + this.rttSampler = new RTTSampler(10); const cancellationToken = this[kCancellationToken]; // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration @@ -214,23 +214,23 @@ export class Monitor extends TypedEventEmitter { } get roundTripTime(): number { - return this.rttSamplesMS.average(); + return this.rttSampler.average(); } get minRoundTripTime(): number { - return this.rttSamplesMS.min(); + return this.rttSampler.min(); } get latestRtt(): number { - return this.rttSamplesMS.last ?? 0; // FIXME: Check if this is acceptable + return this.rttSampler.last ?? 0; // FIXME: Check if this is acceptable } addRttSample(rtt: number) { - this.rttSamplesMS.addSample(rtt); + this.rttSampler.addSample(rtt); } clearRttSamples() { - this.rttSamplesMS.clear(); + this.rttSampler.clear(); } } @@ -305,7 +305,7 @@ function checkServer(monitor: Monitor, callback: Callback) { hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND]; } - // NOTE: here we use the latestRtt as this measurment corresponds with the value + // NOTE: here we use the latestRtt as this measurement corresponds with the value // obtained for this successful heartbeat const duration = isAwaitable && monitor.rttPinger @@ -362,14 +362,7 @@ function checkServer(monitor: Monitor, callback: Callback) { : { socketTimeoutMS: connectTimeoutMS }; if (isAwaitable && monitor.rttPinger == null) { - monitor.rttPinger = new RTTPinger( - monitor, - monitor[kCancellationToken], - Object.assign( - { heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS }, - monitor.connectOptions - ) - ); + monitor.rttPinger = new RTTPinger(monitor); } // Record new start time before sending handshake @@ -502,15 +495,15 @@ export class RTTPinger { /** @internal */ latestRtt?: number; - constructor(monitor: Monitor, cancellationToken: CancellationToken, options: RTTPingerOptions) { + constructor(monitor: Monitor) { this.connection = undefined; - this[kCancellationToken] = cancellationToken; + this[kCancellationToken] = monitor[kCancellationToken]; this.closed = false; this.monitor = monitor; this.latestRtt = monitor.latestRtt; - const heartbeatFrequencyMS = options.heartbeatFrequencyMS; - this[kMonitorId] = setTimeout(() => this.measureRoundTripTime(options), heartbeatFrequencyMS); + const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS; + this[kMonitorId] = setTimeout(() => this.measureRoundTripTime(), heartbeatFrequencyMS); } get roundTripTime(): number { @@ -529,7 +522,7 @@ export class RTTPinger { this.connection = undefined; } - private measureAndReschedule(options: RTTPingerOptions, start?: number, conn?: Connection) { + private measureAndReschedule(start?: number, conn?: Connection) { if (start == null) { start = now(); } @@ -544,14 +537,13 @@ export class RTTPinger { this.latestRtt = calculateDurationInMs(start); this[kMonitorId] = setTimeout( - () => this.measureRoundTripTime(options), - options.heartbeatFrequencyMS + () => this.measureRoundTripTime(), + this.monitor.options.heartbeatFrequencyMS ); } - private measureRoundTripTime(options: RTTPingerOptions) { + private measureRoundTripTime() { const start = now(); - options.cancellationToken = this[kCancellationToken]; if (this.closed) { return; @@ -560,9 +552,9 @@ export class RTTPinger { const connection = this.connection; if (connection == null) { // eslint-disable-next-line github/no-then - connect(options).then( + connect(this.monitor.connectOptions).then( connection => { - this.measureAndReschedule(options, start, connection); + this.measureAndReschedule(start, connection); }, () => { this.connection = undefined; @@ -575,7 +567,7 @@ export class RTTPinger { connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND; // eslint-disable-next-line github/no-then connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then( - () => this.measureAndReschedule(options), + () => this.measureAndReschedule(), () => { this.connection?.destroy(); this.connection = undefined; diff --git a/src/utils.ts b/src/utils.ts index 8aa7433f736..688b1dbf111 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1345,55 +1345,91 @@ export async function fileIsAccessible(fileName: string, mode?: number) { } } -/** @internal */ -export class MovingWindow { +/** @internal + * This class implements the RTT sampling logic specified for [CSOT](https://github.com/mongodb/specifications/blob/bbb335e60cd7ea1e0f7cd9a9443cb95fc9d3b64d/source/client-side-operations-timeout/client-side-operations-timeout.md#drivers-use-minimum-rtt-to-short-circuit-operations) + * + * This is implemented as a [circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) keeping + * the most recent `windowSize` samples + * */ +export class RTTSampler { /** Index of the next slot to be overwritten */ private writeIndex: number; - length: number; - samples: Float64Array; + private _length: number; + private _rttSamples: Float64Array; constructor(windowSize = 10) { - this.samples = new Float64Array(windowSize); - this.length = 0; + this._rttSamples = new Float64Array(windowSize); + this._length = 0; this.writeIndex = 0; } + /** + * Adds an rtt sample to the end of the circular buffer + * When `windowSize` samples have been collected, `addSample` overwrites the least recently added + * sample + */ addSample(sample: number) { - this.samples[this.writeIndex++] = sample; - if (this.length < this.samples.length) { - this.length++; + this._rttSamples[this.writeIndex++] = sample; + if (this._length < this._rttSamples.length) { + this._length++; } - this.writeIndex %= this.samples.length; + this.writeIndex %= this._rttSamples.length; } + /** + * When \< 2 samples have been collected, returns 0 + * Otherwise computes the minimum value samples contained in the buffer + */ min(): number { - if (this.length < 2) return 0; - let min = this.samples[0]; - for (let i = 1; i < this.length; i++) { - if (this.samples[i] < min) min = this.samples[i]; + if (this._length < 2) return 0; + let min = this._rttSamples[0]; + for (let i = 1; i < this._length; i++) { + if (this._rttSamples[i] < min) min = this._rttSamples[i]; } return min; } + /** + * Returns mean of samples contained in the buffer + */ average(): number { - if (this.length === 0) return 0; + if (this._length === 0) return 0; let sum = 0; - for (let i = 0; i < this.length; i++) { - sum += this.samples[i]; + for (let i = 0; i < this._length; i++) { + sum += this._rttSamples[i]; } - return sum / this.length; + return sum / this._length; } + /** + * Returns most recently inserted element in the buffer + * Returns null if the buffer is empty + * */ get last(): number | null { - if (this.length === 0) return null; - return this.samples[this.writeIndex === 0 ? this.length - 1 : this.writeIndex - 1]; + if (this._length === 0) return null; + return this._rttSamples[this.writeIndex === 0 ? this._length - 1 : this.writeIndex - 1]; + } + + /** @remarks Added for testing */ + get length(): number { + return this._length; } + /** @remarks Added for testing */ + get samples(): Float64Array { + return this._rttSamples; + } + + /** + * Clear the buffer + * NOTE: this does not overwrite the data held in the internal array, just the pointers into + * this array + */ clear() { - this.length = 0; + this._length = 0; this.writeIndex = 0; } } diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index 52278c0e6a0..c7801c33649 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -14,8 +14,8 @@ import { MongoDBCollectionNamespace, MongoDBNamespace, MongoRuntimeError, - MovingWindow, ObjectId, + RTTSampler, shuffle, TimeoutController } from '../mongodb'; @@ -1047,10 +1047,10 @@ describe('driver utils', function () { }); }); - describe('class MovingWindow', () => { + describe('class RTTSampler', () => { describe('constructor', () => { it('Constructs a Float64 array of length windowSize', () => { - const window = new MovingWindow(10); + const window = new RTTSampler(10); expect(window.samples).to.have.length(10); }); }); @@ -1058,7 +1058,7 @@ describe('driver utils', function () { describe('addSample', () => { context('when length < windowSize', () => { it('increments the length', () => { - const window = new MovingWindow(10); + const window = new RTTSampler(10); expect(window.length).to.equal(0); window.addSample(1); @@ -1067,11 +1067,11 @@ describe('driver utils', function () { }); }); context('when length === windowSize', () => { - let window: MovingWindow; + let window: RTTSampler; const size = 10; beforeEach(() => { - window = new MovingWindow(size); + window = new RTTSampler(size); for (let i = 1; i <= size; i++) { window.addSample(i); } @@ -1099,7 +1099,7 @@ describe('driver utils', function () { describe('min()', () => { context('when length < 2', () => { it('returns 0', () => { - const window = new MovingWindow(10); + const window = new RTTSampler(10); // length 0 expect(window.min()).to.equal(0); @@ -1110,9 +1110,9 @@ describe('driver utils', function () { }); context('when 2 <= length < windowSize', () => { - let window: MovingWindow; + let window: RTTSampler; beforeEach(() => { - window = new MovingWindow(10); + window = new RTTSampler(10); for (let i = 1; i <= 3; i++) { window.addSample(i); } @@ -1124,11 +1124,11 @@ describe('driver utils', function () { }); context('when length == windowSize', () => { - let window: MovingWindow; + let window: RTTSampler; const size = 10; beforeEach(() => { - window = new MovingWindow(size); + window = new RTTSampler(size); for (let i = 1; i <= size * 2; i++) { window.addSample(i); } @@ -1142,7 +1142,7 @@ describe('driver utils', function () { describe('average()', () => { it('correctly computes the mean', () => { - const window = new MovingWindow(10); + const window = new RTTSampler(10); let sum = 0; for (let i = 1; i <= 10; i++) { @@ -1157,14 +1157,14 @@ describe('driver utils', function () { describe('last', () => { context('when length == 0', () => { it('returns null', () => { - const window = new MovingWindow(10); + const window = new RTTSampler(10); expect(window.last).to.be.null; }); }); context('when length > 0', () => { it('returns the most recently inserted element', () => { - const window = new MovingWindow(10); + const window = new RTTSampler(10); for (let i = 0; i < 11; i++) { window.addSample(i); } @@ -1174,10 +1174,10 @@ describe('driver utils', function () { }); describe('clear', () => { - let window: MovingWindow; + let window: RTTSampler; beforeEach(() => { - window = new MovingWindow(10); + window = new RTTSampler(10); for (let i = 0; i < 20; i++) { window.addSample(i); } From 8107480c12ffa461835a643735d0ace8d2373d9a Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 2 Apr 2024 14:40:45 -0400 Subject: [PATCH 17/23] add docs --- src/sdam/server_description.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sdam/server_description.ts b/src/sdam/server_description.ts index d381bea2090..5068931b6a2 100644 --- a/src/sdam/server_description.ts +++ b/src/sdam/server_description.ts @@ -60,6 +60,7 @@ export class ServerDescription { minWireVersion: number; maxWireVersion: number; roundTripTime: number; + /** The minimum measurement of the last 10 measurements of roundTripTime that have been collected */ minRoundTripTime: number; lastUpdateTime: number; lastWriteDate: number; From e69d3f668b87b9514d11452a76e2b56335d1e3bf Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 2 Apr 2024 14:44:17 -0400 Subject: [PATCH 18/23] misc and lint fixes --- src/cmap/connection.ts | 3 --- src/index.ts | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 67d24a4e7a5..1a4db3401f9 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -28,7 +28,6 @@ import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mong import { type CancellationToken, TypedEventEmitter } from '../mongo_types'; import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import { ServerType } from '../sdam/common'; -import { type Monitor } from '../sdam/monitor'; import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions'; import { BufferPool, @@ -126,8 +125,6 @@ export interface ConnectionOptions extendedMetadata: Promise; /** @internal */ mongoLogger?: MongoLogger | undefined; - /** @internal */ - monitor?: Monitor; } /** @public */ diff --git a/src/index.ts b/src/index.ts index c69184d7fb7..d6b5f9798d1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -552,7 +552,7 @@ export type { List, MongoDBCollectionNamespace, MongoDBNamespace, - RTTSampler as MovingWindow, + RTTSampler, TimeoutController } from './utils'; export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern'; From 19875d7170b450da0ef0fd60fd73ebb59f6342d7 Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 2 Apr 2024 14:46:14 -0400 Subject: [PATCH 19/23] variable renaming --- test/unit/utils.test.ts | 82 ++++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index c7801c33649..7a3eea6b5c1 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1050,48 +1050,48 @@ describe('driver utils', function () { describe('class RTTSampler', () => { describe('constructor', () => { it('Constructs a Float64 array of length windowSize', () => { - const window = new RTTSampler(10); - expect(window.samples).to.have.length(10); + const sampler = new RTTSampler(10); + expect(sampler.samples).to.have.length(10); }); }); describe('addSample', () => { context('when length < windowSize', () => { it('increments the length', () => { - const window = new RTTSampler(10); - expect(window.length).to.equal(0); + const sampler = new RTTSampler(10); + expect(sampler.length).to.equal(0); - window.addSample(1); + sampler.addSample(1); - expect(window.length).to.equal(1); + expect(sampler.length).to.equal(1); }); }); context('when length === windowSize', () => { - let window: RTTSampler; + let sampler: RTTSampler; const size = 10; beforeEach(() => { - window = new RTTSampler(size); + sampler = new RTTSampler(size); for (let i = 1; i <= size; i++) { - window.addSample(i); + sampler.addSample(i); } }); it('does not increment the length', () => { - window.addSample(size + 1); - expect(window.length).to.equal(size); + sampler.addSample(size + 1); + expect(sampler.length).to.equal(size); }); it('overwrites the oldest element', () => { - window.addSample(size + 1); - for (const el of window.samples) { + sampler.addSample(size + 1); + for (const el of sampler.samples) { if (el === 1) expect.fail('Did not overwrite oldest element'); } }); it('appends the new element to the end of the window', () => { - window.addSample(size + 1); - expect(window.last).to.equal(size + 1); + sampler.addSample(size + 1); + expect(sampler.last).to.equal(size + 1); }); }); }); @@ -1099,94 +1099,94 @@ describe('driver utils', function () { describe('min()', () => { context('when length < 2', () => { it('returns 0', () => { - const window = new RTTSampler(10); + const sampler = new RTTSampler(10); // length 0 - expect(window.min()).to.equal(0); + expect(sampler.min()).to.equal(0); - window.addSample(1); + sampler.addSample(1); // length 1 - expect(window.min()).to.equal(0); + expect(sampler.min()).to.equal(0); }); }); context('when 2 <= length < windowSize', () => { - let window: RTTSampler; + let sampler: RTTSampler; beforeEach(() => { - window = new RTTSampler(10); + sampler = new RTTSampler(10); for (let i = 1; i <= 3; i++) { - window.addSample(i); + sampler.addSample(i); } }); it('correctly computes the minimum', () => { - expect(window.min()).to.equal(1); + expect(sampler.min()).to.equal(1); }); }); context('when length == windowSize', () => { - let window: RTTSampler; + let sampler: RTTSampler; const size = 10; beforeEach(() => { - window = new RTTSampler(size); + sampler = new RTTSampler(size); for (let i = 1; i <= size * 2; i++) { - window.addSample(i); + sampler.addSample(i); } }); it('correctly computes the minimum', () => { - expect(window.min()).to.equal(size + 1); + expect(sampler.min()).to.equal(size + 1); }); }); }); describe('average()', () => { it('correctly computes the mean', () => { - const window = new RTTSampler(10); + const sampler = new RTTSampler(10); let sum = 0; for (let i = 1; i <= 10; i++) { sum += i; - window.addSample(i); + sampler.addSample(i); } - expect(window.average()).to.equal(sum / 10); + expect(sampler.average()).to.equal(sum / 10); }); }); describe('last', () => { context('when length == 0', () => { it('returns null', () => { - const window = new RTTSampler(10); - expect(window.last).to.be.null; + const sampler = new RTTSampler(10); + expect(sampler.last).to.be.null; }); }); context('when length > 0', () => { it('returns the most recently inserted element', () => { - const window = new RTTSampler(10); + const sampler = new RTTSampler(10); for (let i = 0; i < 11; i++) { - window.addSample(i); + sampler.addSample(i); } - expect(window.last).to.equal(10); + expect(sampler.last).to.equal(10); }); }); }); describe('clear', () => { - let window: RTTSampler; + let sampler: RTTSampler; beforeEach(() => { - window = new RTTSampler(10); + sampler = new RTTSampler(10); for (let i = 0; i < 20; i++) { - window.addSample(i); + sampler.addSample(i); } - expect(window.length).to.equal(10); + expect(sampler.length).to.equal(10); }); it('sets length to 0', () => { - window.clear(); - expect(window.length).to.equal(0); + sampler.clear(); + expect(sampler.length).to.equal(0); }); }); }); From 2a4f1a22ccbaa19b327c87d9b6cc86c3f536b985 Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 2 Apr 2024 14:46:53 -0400 Subject: [PATCH 20/23] remove parent field from connect options --- src/sdam/monitor.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 0d992c3ca9f..b4ad0098efc 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -143,8 +143,7 @@ export class Monitor extends TypedEventEmitter { useBigInt64: false, promoteLongs: true, promoteValues: true, - promoteBuffers: true, - parent: this + promoteBuffers: true }; // ensure no authentication is used for monitoring From 9d1f67b7d0a659f694a8a319846ca0dd64f17e3e Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 2 Apr 2024 16:27:57 -0400 Subject: [PATCH 21/23] remove getters --- src/utils.ts | 10 ---------- test/unit/utils.test.ts | 16 +++++++++------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/src/utils.ts b/src/utils.ts index 688b1dbf111..acbd1ef25cd 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1413,16 +1413,6 @@ export class RTTSampler { return this._rttSamples[this.writeIndex === 0 ? this._length - 1 : this.writeIndex - 1]; } - /** @remarks Added for testing */ - get length(): number { - return this._length; - } - - /** @remarks Added for testing */ - get samples(): Float64Array { - return this._rttSamples; - } - /** * Clear the buffer * NOTE: this does not overwrite the data held in the internal array, just the pointers into diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index 7a3eea6b5c1..9f7959ec780 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1051,7 +1051,8 @@ describe('driver utils', function () { describe('constructor', () => { it('Constructs a Float64 array of length windowSize', () => { const sampler = new RTTSampler(10); - expect(sampler.samples).to.have.length(10); + // @ts-expect-error Accessing internal state + expect(sampler._rttSamples).to.have.length(10); }); }); @@ -1059,11 +1060,11 @@ describe('driver utils', function () { context('when length < windowSize', () => { it('increments the length', () => { const sampler = new RTTSampler(10); - expect(sampler.length).to.equal(0); + expect(sampler).to.have.property('_length', 0); sampler.addSample(1); - expect(sampler.length).to.equal(1); + expect(sampler).to.have.property('_length', 1); }); }); context('when length === windowSize', () => { @@ -1079,12 +1080,13 @@ describe('driver utils', function () { it('does not increment the length', () => { sampler.addSample(size + 1); - expect(sampler.length).to.equal(size); + expect(sampler).to.have.property('_length', size); }); it('overwrites the oldest element', () => { sampler.addSample(size + 1); - for (const el of sampler.samples) { + // @ts-expect-error Accessing internal state + for (const el of sampler._rttSamples) { if (el === 1) expect.fail('Did not overwrite oldest element'); } }); @@ -1181,12 +1183,12 @@ describe('driver utils', function () { for (let i = 0; i < 20; i++) { sampler.addSample(i); } - expect(sampler.length).to.equal(10); + expect(sampler).to.have.property('_length', 10); }); it('sets length to 0', () => { sampler.clear(); - expect(sampler.length).to.equal(0); + expect(sampler).to.have.property('_length', 0); }); }); }); From 1b2393b03a20707f2762bb8600ea0c786b50bb3c Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 2 Apr 2024 16:38:07 -0400 Subject: [PATCH 22/23] review changes --- src/utils.ts | 38 +++++++++++++++++----------------- test/unit/sdam/monitor.test.ts | 7 +++---- test/unit/utils.test.ts | 14 ++++++------- 3 files changed, 29 insertions(+), 30 deletions(-) diff --git a/src/utils.ts b/src/utils.ts index acbd1ef25cd..39a54085222 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1354,12 +1354,12 @@ export async function fileIsAccessible(fileName: string, mode?: number) { export class RTTSampler { /** Index of the next slot to be overwritten */ private writeIndex: number; - private _length: number; - private _rttSamples: Float64Array; + private length: number; + private rttSamples: Float64Array; constructor(windowSize = 10) { - this._rttSamples = new Float64Array(windowSize); - this._length = 0; + this.rttSamples = new Float64Array(windowSize); + this.length = 0; this.writeIndex = 0; } @@ -1369,12 +1369,12 @@ export class RTTSampler { * sample */ addSample(sample: number) { - this._rttSamples[this.writeIndex++] = sample; - if (this._length < this._rttSamples.length) { - this._length++; + this.rttSamples[this.writeIndex++] = sample; + if (this.length < this.rttSamples.length) { + this.length++; } - this.writeIndex %= this._rttSamples.length; + this.writeIndex %= this.rttSamples.length; } /** @@ -1382,10 +1382,10 @@ export class RTTSampler { * Otherwise computes the minimum value samples contained in the buffer */ min(): number { - if (this._length < 2) return 0; - let min = this._rttSamples[0]; - for (let i = 1; i < this._length; i++) { - if (this._rttSamples[i] < min) min = this._rttSamples[i]; + if (this.length < 2) return 0; + let min = this.rttSamples[0]; + for (let i = 1; i < this.length; i++) { + if (this.rttSamples[i] < min) min = this.rttSamples[i]; } return min; @@ -1395,13 +1395,13 @@ export class RTTSampler { * Returns mean of samples contained in the buffer */ average(): number { - if (this._length === 0) return 0; + if (this.length === 0) return 0; let sum = 0; - for (let i = 0; i < this._length; i++) { - sum += this._rttSamples[i]; + for (let i = 0; i < this.length; i++) { + sum += this.rttSamples[i]; } - return sum / this._length; + return sum / this.length; } /** @@ -1409,8 +1409,8 @@ export class RTTSampler { * Returns null if the buffer is empty * */ get last(): number | null { - if (this._length === 0) return null; - return this._rttSamples[this.writeIndex === 0 ? this._length - 1 : this.writeIndex - 1]; + if (this.length === 0) return null; + return this.rttSamples[this.writeIndex === 0 ? this.length - 1 : this.writeIndex - 1]; } /** @@ -1419,7 +1419,7 @@ export class RTTSampler { * this array */ clear() { - this._length = 0; + this.length = 0; this.writeIndex = 0; } } diff --git a/test/unit/sdam/monitor.test.ts b/test/unit/sdam/monitor.test.ts index 33f90c471e6..228632e0e00 100644 --- a/test/unit/sdam/monitor.test.ts +++ b/test/unit/sdam/monitor.test.ts @@ -342,11 +342,10 @@ describe('monitoring', function () { } const avgRtt = monitor.roundTripTime; - const expectedRtt = 300; // (100 + 200 + 300 + 400 + 500)/5 = 300 - // avgRtt will strictly be greater than expectedRtt since setTimeout sets a minimum + // expected avgRtt = (100 + 200 + 300 + 400 + 500)/5 = 300ms + // avgRtt will strictly be greater than 300ms since setTimeout sets a minimum // delay from the time of scheduling to the time of callback execution - expect(avgRtt - expectedRtt).greaterThanOrEqual(0); - expect(avgRtt - expectedRtt).lessThan(50); + expect(avgRtt).to.be.within(300, 350); monitor.close(); }); diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index 9f7959ec780..d6a3c27b2c0 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1052,7 +1052,7 @@ describe('driver utils', function () { it('Constructs a Float64 array of length windowSize', () => { const sampler = new RTTSampler(10); // @ts-expect-error Accessing internal state - expect(sampler._rttSamples).to.have.length(10); + expect(sampler.rttSamples).to.have.length(10); }); }); @@ -1060,11 +1060,11 @@ describe('driver utils', function () { context('when length < windowSize', () => { it('increments the length', () => { const sampler = new RTTSampler(10); - expect(sampler).to.have.property('_length', 0); + expect(sampler).to.have.property('length', 0); sampler.addSample(1); - expect(sampler).to.have.property('_length', 1); + expect(sampler).to.have.property('length', 1); }); }); context('when length === windowSize', () => { @@ -1080,13 +1080,13 @@ describe('driver utils', function () { it('does not increment the length', () => { sampler.addSample(size + 1); - expect(sampler).to.have.property('_length', size); + expect(sampler).to.have.property('length', size); }); it('overwrites the oldest element', () => { sampler.addSample(size + 1); // @ts-expect-error Accessing internal state - for (const el of sampler._rttSamples) { + for (const el of sampler.rttSamples) { if (el === 1) expect.fail('Did not overwrite oldest element'); } }); @@ -1183,12 +1183,12 @@ describe('driver utils', function () { for (let i = 0; i < 20; i++) { sampler.addSample(i); } - expect(sampler).to.have.property('_length', 10); + expect(sampler).to.have.property('length', 10); }); it('sets length to 0', () => { sampler.clear(); - expect(sampler).to.have.property('_length', 0); + expect(sampler).to.have.property('length', 0); }); }); }); From a06eec1b1076534ead88be62643a6fbfaf04670e Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 3 Apr 2024 15:19:33 -0400 Subject: [PATCH 23/23] move RTTSampler class to monitor file --- src/index.ts | 2 +- src/sdam/monitor.ts | 82 +++++++++++++++++- src/utils.ts | 79 ----------------- test/unit/sdam/monitor.test.ts | 154 ++++++++++++++++++++++++++++++++- test/unit/utils.test.ts | 147 ------------------------------- 5 files changed, 234 insertions(+), 230 deletions(-) diff --git a/src/index.ts b/src/index.ts index d6b5f9798d1..60ebd96067a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -510,6 +510,7 @@ export type { MonitorPrivate, RTTPinger, RTTPingerOptions, + RTTSampler, ServerMonitoringMode } from './sdam/monitor'; export type { Server, ServerEvents, ServerOptions, ServerPrivate } from './sdam/server'; @@ -552,7 +553,6 @@ export type { List, MongoDBCollectionNamespace, MongoDBNamespace, - RTTSampler, TimeoutController } from './utils'; export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern'; diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index b4ad0098efc..769c41d16d5 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -14,8 +14,7 @@ import { type EventEmitterWithState, makeStateMachine, now, - ns, - RTTSampler + ns } from '../utils'; import { ServerType, STATE_CLOSED, STATE_CLOSING } from './common'; import { @@ -701,3 +700,82 @@ export class MonitorInterval { }); }; } + +/** @internal + * This class implements the RTT sampling logic specified for [CSOT](https://github.com/mongodb/specifications/blob/bbb335e60cd7ea1e0f7cd9a9443cb95fc9d3b64d/source/client-side-operations-timeout/client-side-operations-timeout.md#drivers-use-minimum-rtt-to-short-circuit-operations) + * + * This is implemented as a [circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) keeping + * the most recent `windowSize` samples + * */ +export class RTTSampler { + /** Index of the next slot to be overwritten */ + private writeIndex: number; + private length: number; + private rttSamples: Float64Array; + + constructor(windowSize = 10) { + this.rttSamples = new Float64Array(windowSize); + this.length = 0; + this.writeIndex = 0; + } + + /** + * Adds an rtt sample to the end of the circular buffer + * When `windowSize` samples have been collected, `addSample` overwrites the least recently added + * sample + */ + addSample(sample: number) { + this.rttSamples[this.writeIndex++] = sample; + if (this.length < this.rttSamples.length) { + this.length++; + } + + this.writeIndex %= this.rttSamples.length; + } + + /** + * When \< 2 samples have been collected, returns 0 + * Otherwise computes the minimum value samples contained in the buffer + */ + min(): number { + if (this.length < 2) return 0; + let min = this.rttSamples[0]; + for (let i = 1; i < this.length; i++) { + if (this.rttSamples[i] < min) min = this.rttSamples[i]; + } + + return min; + } + + /** + * Returns mean of samples contained in the buffer + */ + average(): number { + if (this.length === 0) return 0; + let sum = 0; + for (let i = 0; i < this.length; i++) { + sum += this.rttSamples[i]; + } + + return sum / this.length; + } + + /** + * Returns most recently inserted element in the buffer + * Returns null if the buffer is empty + * */ + get last(): number | null { + if (this.length === 0) return null; + return this.rttSamples[this.writeIndex === 0 ? this.length - 1 : this.writeIndex - 1]; + } + + /** + * Clear the buffer + * NOTE: this does not overwrite the data held in the internal array, just the pointers into + * this array + */ + clear() { + this.length = 0; + this.writeIndex = 0; + } +} diff --git a/src/utils.ts b/src/utils.ts index 39a54085222..b25b3ebb0fc 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1344,82 +1344,3 @@ export async function fileIsAccessible(fileName: string, mode?: number) { return false; } } - -/** @internal - * This class implements the RTT sampling logic specified for [CSOT](https://github.com/mongodb/specifications/blob/bbb335e60cd7ea1e0f7cd9a9443cb95fc9d3b64d/source/client-side-operations-timeout/client-side-operations-timeout.md#drivers-use-minimum-rtt-to-short-circuit-operations) - * - * This is implemented as a [circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) keeping - * the most recent `windowSize` samples - * */ -export class RTTSampler { - /** Index of the next slot to be overwritten */ - private writeIndex: number; - private length: number; - private rttSamples: Float64Array; - - constructor(windowSize = 10) { - this.rttSamples = new Float64Array(windowSize); - this.length = 0; - this.writeIndex = 0; - } - - /** - * Adds an rtt sample to the end of the circular buffer - * When `windowSize` samples have been collected, `addSample` overwrites the least recently added - * sample - */ - addSample(sample: number) { - this.rttSamples[this.writeIndex++] = sample; - if (this.length < this.rttSamples.length) { - this.length++; - } - - this.writeIndex %= this.rttSamples.length; - } - - /** - * When \< 2 samples have been collected, returns 0 - * Otherwise computes the minimum value samples contained in the buffer - */ - min(): number { - if (this.length < 2) return 0; - let min = this.rttSamples[0]; - for (let i = 1; i < this.length; i++) { - if (this.rttSamples[i] < min) min = this.rttSamples[i]; - } - - return min; - } - - /** - * Returns mean of samples contained in the buffer - */ - average(): number { - if (this.length === 0) return 0; - let sum = 0; - for (let i = 0; i < this.length; i++) { - sum += this.rttSamples[i]; - } - - return sum / this.length; - } - - /** - * Returns most recently inserted element in the buffer - * Returns null if the buffer is empty - * */ - get last(): number | null { - if (this.length === 0) return null; - return this.rttSamples[this.writeIndex === 0 ? this.length - 1 : this.writeIndex - 1]; - } - - /** - * Clear the buffer - * NOTE: this does not overwrite the data held in the internal array, just the pointers into - * this array - */ - clear() { - this.length = 0; - this.writeIndex = 0; - } -} diff --git a/test/unit/sdam/monitor.test.ts b/test/unit/sdam/monitor.test.ts index 228632e0e00..408a2b26659 100644 --- a/test/unit/sdam/monitor.test.ts +++ b/test/unit/sdam/monitor.test.ts @@ -7,7 +7,13 @@ import * as sinon from 'sinon'; import { setTimeout } from 'timers'; import { setTimeout as setTimeoutPromise } from 'timers/promises'; -import { Long, MongoClient, ObjectId, ServerHeartbeatSucceededEvent } from '../../mongodb'; +import { + Long, + MongoClient, + ObjectId, + RTTSampler, + ServerHeartbeatSucceededEvent +} from '../../mongodb'; import { isHello, LEGACY_HELLO_COMMAND, @@ -665,4 +671,150 @@ describe('monitoring', function () { expect(serverHeartbeatFailed).to.have.property('duration').that.is.lessThan(20); // way less than 80ms }); }); + + describe('class RTTSampler', () => { + describe('constructor', () => { + it('Constructs a Float64 array of length windowSize', () => { + const sampler = new RTTSampler(10); + // @ts-expect-error Accessing internal state + expect(sampler.rttSamples).to.have.length(10); + }); + }); + + describe('addSample', () => { + context('when length < windowSize', () => { + it('increments the length', () => { + const sampler = new RTTSampler(10); + expect(sampler).to.have.property('length', 0); + + sampler.addSample(1); + + expect(sampler).to.have.property('length', 1); + }); + }); + context('when length === windowSize', () => { + let sampler: RTTSampler; + const size = 10; + + beforeEach(() => { + sampler = new RTTSampler(size); + for (let i = 1; i <= size; i++) { + sampler.addSample(i); + } + }); + + it('does not increment the length', () => { + sampler.addSample(size + 1); + expect(sampler).to.have.property('length', size); + }); + + it('overwrites the oldest element', () => { + sampler.addSample(size + 1); + // @ts-expect-error Accessing internal state + for (const el of sampler.rttSamples) { + if (el === 1) expect.fail('Did not overwrite oldest element'); + } + }); + + it('appends the new element to the end of the window', () => { + sampler.addSample(size + 1); + expect(sampler.last).to.equal(size + 1); + }); + }); + }); + + describe('min()', () => { + context('when length < 2', () => { + it('returns 0', () => { + const sampler = new RTTSampler(10); + // length 0 + expect(sampler.min()).to.equal(0); + + sampler.addSample(1); + // length 1 + expect(sampler.min()).to.equal(0); + }); + }); + + context('when 2 <= length < windowSize', () => { + let sampler: RTTSampler; + beforeEach(() => { + sampler = new RTTSampler(10); + for (let i = 1; i <= 3; i++) { + sampler.addSample(i); + } + }); + + it('correctly computes the minimum', () => { + expect(sampler.min()).to.equal(1); + }); + }); + + context('when length == windowSize', () => { + let sampler: RTTSampler; + const size = 10; + + beforeEach(() => { + sampler = new RTTSampler(size); + for (let i = 1; i <= size * 2; i++) { + sampler.addSample(i); + } + }); + + it('correctly computes the minimum', () => { + expect(sampler.min()).to.equal(size + 1); + }); + }); + }); + + describe('average()', () => { + it('correctly computes the mean', () => { + const sampler = new RTTSampler(10); + let sum = 0; + + for (let i = 1; i <= 10; i++) { + sum += i; + sampler.addSample(i); + } + + expect(sampler.average()).to.equal(sum / 10); + }); + }); + + describe('last', () => { + context('when length == 0', () => { + it('returns null', () => { + const sampler = new RTTSampler(10); + expect(sampler.last).to.be.null; + }); + }); + + context('when length > 0', () => { + it('returns the most recently inserted element', () => { + const sampler = new RTTSampler(10); + for (let i = 0; i < 11; i++) { + sampler.addSample(i); + } + expect(sampler.last).to.equal(10); + }); + }); + }); + + describe('clear', () => { + let sampler: RTTSampler; + + beforeEach(() => { + sampler = new RTTSampler(10); + for (let i = 0; i < 20; i++) { + sampler.addSample(i); + } + expect(sampler).to.have.property('length', 10); + }); + + it('sets length to 0', () => { + sampler.clear(); + expect(sampler).to.have.property('length', 0); + }); + }); + }); }); diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index d6a3c27b2c0..d36ddbeb3be 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -15,7 +15,6 @@ import { MongoDBNamespace, MongoRuntimeError, ObjectId, - RTTSampler, shuffle, TimeoutController } from '../mongodb'; @@ -1046,150 +1045,4 @@ describe('driver utils', function () { }); }); }); - - describe('class RTTSampler', () => { - describe('constructor', () => { - it('Constructs a Float64 array of length windowSize', () => { - const sampler = new RTTSampler(10); - // @ts-expect-error Accessing internal state - expect(sampler.rttSamples).to.have.length(10); - }); - }); - - describe('addSample', () => { - context('when length < windowSize', () => { - it('increments the length', () => { - const sampler = new RTTSampler(10); - expect(sampler).to.have.property('length', 0); - - sampler.addSample(1); - - expect(sampler).to.have.property('length', 1); - }); - }); - context('when length === windowSize', () => { - let sampler: RTTSampler; - const size = 10; - - beforeEach(() => { - sampler = new RTTSampler(size); - for (let i = 1; i <= size; i++) { - sampler.addSample(i); - } - }); - - it('does not increment the length', () => { - sampler.addSample(size + 1); - expect(sampler).to.have.property('length', size); - }); - - it('overwrites the oldest element', () => { - sampler.addSample(size + 1); - // @ts-expect-error Accessing internal state - for (const el of sampler.rttSamples) { - if (el === 1) expect.fail('Did not overwrite oldest element'); - } - }); - - it('appends the new element to the end of the window', () => { - sampler.addSample(size + 1); - expect(sampler.last).to.equal(size + 1); - }); - }); - }); - - describe('min()', () => { - context('when length < 2', () => { - it('returns 0', () => { - const sampler = new RTTSampler(10); - // length 0 - expect(sampler.min()).to.equal(0); - - sampler.addSample(1); - // length 1 - expect(sampler.min()).to.equal(0); - }); - }); - - context('when 2 <= length < windowSize', () => { - let sampler: RTTSampler; - beforeEach(() => { - sampler = new RTTSampler(10); - for (let i = 1; i <= 3; i++) { - sampler.addSample(i); - } - }); - - it('correctly computes the minimum', () => { - expect(sampler.min()).to.equal(1); - }); - }); - - context('when length == windowSize', () => { - let sampler: RTTSampler; - const size = 10; - - beforeEach(() => { - sampler = new RTTSampler(size); - for (let i = 1; i <= size * 2; i++) { - sampler.addSample(i); - } - }); - - it('correctly computes the minimum', () => { - expect(sampler.min()).to.equal(size + 1); - }); - }); - }); - - describe('average()', () => { - it('correctly computes the mean', () => { - const sampler = new RTTSampler(10); - let sum = 0; - - for (let i = 1; i <= 10; i++) { - sum += i; - sampler.addSample(i); - } - - expect(sampler.average()).to.equal(sum / 10); - }); - }); - - describe('last', () => { - context('when length == 0', () => { - it('returns null', () => { - const sampler = new RTTSampler(10); - expect(sampler.last).to.be.null; - }); - }); - - context('when length > 0', () => { - it('returns the most recently inserted element', () => { - const sampler = new RTTSampler(10); - for (let i = 0; i < 11; i++) { - sampler.addSample(i); - } - expect(sampler.last).to.equal(10); - }); - }); - }); - - describe('clear', () => { - let sampler: RTTSampler; - - beforeEach(() => { - sampler = new RTTSampler(10); - for (let i = 0; i < 20; i++) { - sampler.addSample(i); - } - expect(sampler).to.have.property('length', 10); - }); - - it('sets length to 0', () => { - sampler.clear(); - expect(sampler).to.have.property('length', 0); - }); - }); - }); });