Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-5825): add minRoundTripTime to ServerDescription and change roundTripTime to a moving average #4059

Merged
merged 23 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ export type {
MonitorPrivate,
RTTPinger,
RTTPingerOptions,
RTTSampler,
ServerMonitoringMode
} from './sdam/monitor';
export type { Server, ServerEvents, ServerOptions, ServerPrivate } from './sdam/server';
Expand Down
237 changes: 175 additions & 62 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@ import { LEGACY_HELLO_COMMAND } from '../constants';
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
import { MongoLoggableComponent } from '../mongo_logger';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Callback, EventEmitterWithState } from '../utils';
import { calculateDurationInMs, makeStateMachine, now, ns } from '../utils';
import {
calculateDurationInMs,
type Callback,
type EventEmitterWithState,
makeStateMachine,
now,
ns
} from '../utils';
import { ServerType, STATE_CLOSED, STATE_CLOSING } from './common';
import {
ServerHeartbeatFailedEvent,
Expand All @@ -25,8 +31,6 @@ const kServer = Symbol('server');
const kMonitorId = Symbol('monitorId');
/** @internal */
const kCancellationToken = Symbol('cancellationToken');
/** @internal */
const kRoundTripTime = Symbol('roundTripTime');

const STATE_IDLE = 'idle';
const STATE_MONITORING = 'monitoring';
Expand Down Expand Up @@ -100,6 +104,8 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
rttPinger?: RTTPinger;
/** @internal */
override component = MongoLoggableComponent.TOPOLOGY;
/** @internal */
private rttSampler: RTTSampler;

constructor(server: Server, options: MonitorOptions) {
super();
Expand All @@ -121,6 +127,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
});
this.isRunningInFaasEnv = getFAASEnv() != null;
this.mongoLogger = this[kServer].topology.client?.mongoLogger;
this.rttSampler = new RTTSampler(10);

const cancellationToken = this[kCancellationToken];
// TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
Expand Down Expand Up @@ -203,6 +210,26 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
this.emit('close');
stateTransition(this, STATE_CLOSED);
}

get roundTripTime(): number {
return this.rttSampler.average();
}

get minRoundTripTime(): number {
return this.rttSampler.min();
}

get latestRtt(): number {
return this.rttSampler.last ?? 0; // FIXME: Check if this is acceptable
}

addRttSample(rtt: number) {
this.rttSampler.addSample(rtt);
}

clearRttSamples() {
this.rttSampler.clear();
}
}

function resetMonitorState(monitor: Monitor) {
Expand All @@ -216,6 +243,8 @@ function resetMonitorState(monitor: Monitor) {

monitor.connection?.destroy();
monitor.connection = null;

monitor.clearRttSamples();
}

function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean {
Expand Down Expand Up @@ -249,7 +278,6 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
function onHeartbeatFailed(err: Error) {
monitor.connection?.destroy();
monitor.connection = null;

monitor.emitAndLogHeartbeat(
Server.SERVER_HEARTBEAT_FAILED,
monitor[kServer].topology.s.id,
Expand All @@ -275,11 +303,15 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
}

// NOTE: here we use the latestRtt as this measurement corresponds with the value
// obtained for this successful heartbeat
const duration =
isAwaitable && monitor.rttPinger
? monitor.rttPinger.roundTripTime
? monitor.rttPinger.latestRtt ?? calculateDurationInMs(start)
: calculateDurationInMs(start);

monitor.addRttSample(duration);

monitor.emitAndLogHeartbeat(
Server.SERVER_HEARTBEAT_SUCCEEDED,
monitor[kServer].topology.s.id,
Expand Down Expand Up @@ -328,13 +360,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
: { socketTimeoutMS: connectTimeoutMS };

if (isAwaitable && monitor.rttPinger == null) {
monitor.rttPinger = new RTTPinger(
monitor[kCancellationToken],
Object.assign(
{ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS },
monitor.connectOptions
)
);
monitor.rttPinger = new RTTPinger(monitor);
}

// Record new start time before sending handshake
Expand Down Expand Up @@ -377,6 +403,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
connection.destroy();
return;
}
const duration = calculateDurationInMs(start);
monitor.addRttSample(duration);

monitor.connection = connection;
monitor.emitAndLogHeartbeat(
Expand All @@ -385,7 +413,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
connection.hello?.connectionId,
new ServerHeartbeatSucceededEvent(
monitor.address,
calculateDurationInMs(start),
duration,
connection.hello,
useStreamingProtocol(monitor, connection.hello?.topologyVersion)
)
Expand Down Expand Up @@ -458,23 +486,30 @@ export class RTTPinger {
/** @internal */
[kCancellationToken]: CancellationToken;
/** @internal */
[kRoundTripTime]: number;
/** @internal */
[kMonitorId]: NodeJS.Timeout;
/** @internal */
monitor: Monitor;
closed: boolean;
/** @internal */
latestRtt?: number;

constructor(cancellationToken: CancellationToken, options: RTTPingerOptions) {
constructor(monitor: Monitor) {
this.connection = undefined;
this[kCancellationToken] = cancellationToken;
this[kRoundTripTime] = 0;
this[kCancellationToken] = monitor[kCancellationToken];
this.closed = false;
this.monitor = monitor;
this.latestRtt = monitor.latestRtt;

const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS);
const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS;
this[kMonitorId] = setTimeout(() => this.measureRoundTripTime(), heartbeatFrequencyMS);
}

get roundTripTime(): number {
return this[kRoundTripTime];
return this.monitor.roundTripTime;
}

get minRoundTripTime(): number {
return this.monitor.minRoundTripTime;
}

close(): void {
Expand All @@ -484,61 +519,60 @@ export class RTTPinger {
this.connection?.destroy();
this.connection = undefined;
}
}

function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
const start = now();
options.cancellationToken = rttPinger[kCancellationToken];
const heartbeatFrequencyMS = options.heartbeatFrequencyMS;

if (rttPinger.closed) {
return;
}

function measureAndReschedule(conn?: Connection) {
if (rttPinger.closed) {
private measureAndReschedule(start?: number, conn?: Connection) {
if (start == null) {
start = now();
}
if (this.closed) {
conn?.destroy();
return;
}

if (rttPinger.connection == null) {
rttPinger.connection = conn;
if (this.connection == null) {
this.connection = conn;
}

rttPinger[kRoundTripTime] = calculateDurationInMs(start);
rttPinger[kMonitorId] = setTimeout(
() => measureRoundTripTime(rttPinger, options),
heartbeatFrequencyMS
this.latestRtt = calculateDurationInMs(start);
this[kMonitorId] = setTimeout(
() => this.measureRoundTripTime(),
this.monitor.options.heartbeatFrequencyMS
);
}

const connection = rttPinger.connection;
if (connection == null) {
private measureRoundTripTime() {
const start = now();

if (this.closed) {
return;
}

const connection = this.connection;
if (connection == null) {
// eslint-disable-next-line github/no-then
connect(this.monitor.connectOptions).then(
connection => {
this.measureAndReschedule(start, connection);
},
() => {
this.connection = undefined;
}
);
return;
}

const commandName =
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
// eslint-disable-next-line github/no-then
connect(options).then(
connection => {
measureAndReschedule(connection);
},
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
() => this.measureAndReschedule(),
() => {
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
this.connection?.destroy();
this.connection = undefined;
return;
}
);
return;
}

const commandName =
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
// eslint-disable-next-line github/no-then
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
() => measureAndReschedule(),
() => {
rttPinger.connection?.destroy();
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}
);
}

/**
Expand Down Expand Up @@ -666,3 +700,82 @@ export class MonitorInterval {
});
};
}

/** @internal
* This class implements the RTT sampling logic specified for [CSOT](https://github.com/mongodb/specifications/blob/bbb335e60cd7ea1e0f7cd9a9443cb95fc9d3b64d/source/client-side-operations-timeout/client-side-operations-timeout.md#drivers-use-minimum-rtt-to-short-circuit-operations)
*
* This is implemented as a [circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) keeping
* the most recent `windowSize` samples
* */
export class RTTSampler {
/** Index of the next slot to be overwritten */
private writeIndex: number;
private length: number;
private rttSamples: Float64Array;

constructor(windowSize = 10) {
this.rttSamples = new Float64Array(windowSize);
this.length = 0;
this.writeIndex = 0;
}

/**
* Adds an rtt sample to the end of the circular buffer
* When `windowSize` samples have been collected, `addSample` overwrites the least recently added
* sample
*/
addSample(sample: number) {
this.rttSamples[this.writeIndex++] = sample;
if (this.length < this.rttSamples.length) {
this.length++;
}

this.writeIndex %= this.rttSamples.length;
}

/**
* When \< 2 samples have been collected, returns 0
* Otherwise computes the minimum value samples contained in the buffer
*/
min(): number {
if (this.length < 2) return 0;
let min = this.rttSamples[0];
for (let i = 1; i < this.length; i++) {
if (this.rttSamples[i] < min) min = this.rttSamples[i];
}

return min;
}

/**
* Returns mean of samples contained in the buffer
*/
average(): number {
if (this.length === 0) return 0;
let sum = 0;
for (let i = 0; i < this.length; i++) {
sum += this.rttSamples[i];
}

return sum / this.length;
}

/**
* Returns most recently inserted element in the buffer
* Returns null if the buffer is empty
* */
get last(): number | null {
if (this.length === 0) return null;
return this.rttSamples[this.writeIndex === 0 ? this.length - 1 : this.writeIndex - 1];
}

/**
* Clear the buffer
* NOTE: this does not overwrite the data held in the internal array, just the pointers into
* this array
*/
clear() {
this.length = 0;
this.writeIndex = 0;
}
}
12 changes: 2 additions & 10 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ export class Server extends TypedEventEmitter<ServerEvents> {
this.emit(
Server.DESCRIPTION_RECEIVED,
new ServerDescription(this.description.hostAddress, event.reply, {
roundTripTime: calculateRoundTripTime(this.description.roundTripTime, event.duration)
roundTripTime: this.monitor?.roundTripTime,
minRoundTripTime: this.monitor?.minRoundTripTime
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
})
);

Expand Down Expand Up @@ -467,15 +468,6 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}
}

function calculateRoundTripTime(oldRtt: number, duration: number): number {
if (oldRtt === -1) {
return duration;
}

const alpha = 0.2;
return alpha * duration + (1 - alpha) * oldRtt;
}

nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
function markServerUnknown(server: Server, error?: MongoError) {
// Load balancer servers can never be marked unknown.
if (server.loadBalanced) {
Expand Down
Loading
Loading