Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-5825): add minRoundTripTime to ServerDescription and change roundTripTime to a moving average #4059

Merged
merged 23 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mong
import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { type Monitor } from '../sdam/monitor';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import {
BufferPool,
Expand Down Expand Up @@ -125,6 +126,8 @@ export interface ConnectionOptions
extendedMetadata: Promise<Document>;
/** @internal */
mongoLogger?: MongoLogger | undefined;
/** @internal */
parent?: Monitor;
}

/** @public */
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ export type {
List,
MongoDBCollectionNamespace,
MongoDBNamespace,
MovingWindow,
TimeoutController
} from './utils';
export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern';
119 changes: 84 additions & 35 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,15 @@ import { LEGACY_HELLO_COMMAND } from '../constants';
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
import { MongoLoggableComponent } from '../mongo_logger';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Callback, EventEmitterWithState } from '../utils';
import { calculateDurationInMs, makeStateMachine, now, ns } from '../utils';
import {
calculateDurationInMs,
type Callback,
type EventEmitterWithState,
makeStateMachine,
MovingWindow,
now,
ns
} from '../utils';
import { ServerType, STATE_CLOSED, STATE_CLOSING } from './common';
import {
ServerHeartbeatFailedEvent,
Expand All @@ -25,8 +32,6 @@ const kServer = Symbol('server');
const kMonitorId = Symbol('monitorId');
/** @internal */
const kCancellationToken = Symbol('cancellationToken');
/** @internal */
const kRoundTripTime = Symbol('roundTripTime');

const STATE_IDLE = 'idle';
const STATE_MONITORING = 'monitoring';
Expand Down Expand Up @@ -100,6 +105,8 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
rttPinger?: RTTPinger;
/** @internal */
override component = MongoLoggableComponent.TOPOLOGY;
/** @internal */
rttSamplesMS: MovingWindow;

constructor(server: Server, options: MonitorOptions) {
super();
Expand All @@ -121,6 +128,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
});
this.isRunningInFaasEnv = getFAASEnv() != null;
this.mongoLogger = this[kServer].topology.client?.mongoLogger;
this.rttSamplesMS = new MovingWindow(10);

const cancellationToken = this[kCancellationToken];
// TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
Expand All @@ -135,7 +143,8 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
useBigInt64: false,
promoteLongs: true,
promoteValues: true,
promoteBuffers: true
promoteBuffers: true,
parent: this
};

// ensure no authentication is used for monitoring
Expand Down Expand Up @@ -203,6 +212,26 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
this.emit('close');
stateTransition(this, STATE_CLOSED);
}

get roundTripTime(): number {
return this.rttSamplesMS.average();
}

get minRoundTripTime(): number {
return this.rttSamplesMS.min();
}

get latestRTT(): number {
return this.rttSamplesMS.last ?? 0; // FIXME: Check if this is acceptable
}

addRttSample(rtt: number) {
this.rttSamplesMS.addSample(rtt);
}

clearRttSamples() {
this.rttSamplesMS.clear();
}
}

function resetMonitorState(monitor: Monitor) {
Expand All @@ -216,6 +245,8 @@ function resetMonitorState(monitor: Monitor) {

monitor.connection?.destroy();
monitor.connection = null;

monitor.clearRttSamples();
}

function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean {
Expand Down Expand Up @@ -249,7 +280,6 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
function onHeartbeatFailed(err: Error) {
monitor.connection?.destroy();
monitor.connection = null;

monitor.emitAndLogHeartbeat(
Server.SERVER_HEARTBEAT_FAILED,
monitor[kServer].topology.s.id,
Expand All @@ -275,11 +305,15 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
}

// NOTE: here we use the latestRTT as this measurment corresponds with the value
// obtained for this successful heartbeat
const duration =
isAwaitable && monitor.rttPinger
? monitor.rttPinger.roundTripTime
? monitor.rttPinger.latestRTT ?? calculateDurationInMs(start)
: calculateDurationInMs(start);

monitor.addRttSample(duration);

monitor.emitAndLogHeartbeat(
Server.SERVER_HEARTBEAT_SUCCEEDED,
monitor[kServer].topology.s.id,
Expand Down Expand Up @@ -329,6 +363,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {

if (isAwaitable && monitor.rttPinger == null) {
monitor.rttPinger = new RTTPinger(
monitor,
monitor[kCancellationToken],
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
Object.assign(
{ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS },
Expand Down Expand Up @@ -377,6 +412,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
connection.destroy();
return;
}
const duration = calculateDurationInMs(start);
monitor.addRttSample(duration);

monitor.connection = connection;
monitor.emitAndLogHeartbeat(
Expand All @@ -385,7 +422,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
connection.hello?.connectionId,
new ServerHeartbeatSucceededEvent(
monitor.address,
calculateDurationInMs(start),
duration,
connection.hello,
useStreamingProtocol(monitor, connection.hello?.topologyVersion)
)
Expand Down Expand Up @@ -458,23 +495,30 @@ export class RTTPinger {
/** @internal */
[kCancellationToken]: CancellationToken;
/** @internal */
[kRoundTripTime]: number;
/** @internal */
[kMonitorId]: NodeJS.Timeout;
/** @internal */
monitor: Monitor;
closed: boolean;
/** @internal */
latestRTT?: number;

constructor(cancellationToken: CancellationToken, options: RTTPingerOptions) {
constructor(monitor: Monitor, cancellationToken: CancellationToken, options: RTTPingerOptions) {
this.connection = undefined;
this[kCancellationToken] = cancellationToken;
this[kRoundTripTime] = 0;
this.closed = false;
this.monitor = monitor;
this.latestRTT = monitor.latestRTT;
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS);
}

get roundTripTime(): number {
return this[kRoundTripTime];
return this.monitor.roundTripTime;
}

get minRoundTripTime(): number {
return this.monitor.minRoundTripTime;
}

close(): void {
Expand All @@ -486,42 +530,48 @@ export class RTTPinger {
}
}

function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
const start = now();
options.cancellationToken = rttPinger[kCancellationToken];
const heartbeatFrequencyMS = options.heartbeatFrequencyMS;

function measureAndReschedule(
rttPinger: RTTPinger,
options: RTTPingerOptions,
start?: number,
conn?: Connection
) {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
if (start == null) {
start = now();
}
if (rttPinger.closed) {
conn?.destroy();
return;
}

function measureAndReschedule(conn?: Connection) {
if (rttPinger.closed) {
conn?.destroy();
return;
}
if (rttPinger.connection == null) {
rttPinger.connection = conn;
}

if (rttPinger.connection == null) {
rttPinger.connection = conn;
}
rttPinger.latestRTT = calculateDurationInMs(start);
rttPinger[kMonitorId] = setTimeout(
() => measureRoundTripTime(rttPinger, options),
options.heartbeatFrequencyMS
);
}

rttPinger[kRoundTripTime] = calculateDurationInMs(start);
rttPinger[kMonitorId] = setTimeout(
() => measureRoundTripTime(rttPinger, options),
heartbeatFrequencyMS
);
function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
const start = now();
options.cancellationToken = rttPinger[kCancellationToken];

if (rttPinger.closed) {
return;
}

const connection = rttPinger.connection;
if (connection == null) {
// eslint-disable-next-line github/no-then
connect(options).then(
connection => {
measureAndReschedule(connection);
measureAndReschedule(rttPinger, options, start, connection);
},
() => {
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
}
);
return;
Expand All @@ -531,11 +581,10 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
// eslint-disable-next-line github/no-then
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
() => measureAndReschedule(),
() => measureAndReschedule(rttPinger, options),
() => {
rttPinger.connection?.destroy();
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}
);
Expand Down
12 changes: 2 additions & 10 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ export class Server extends TypedEventEmitter<ServerEvents> {
this.emit(
Server.DESCRIPTION_RECEIVED,
new ServerDescription(this.description.hostAddress, event.reply, {
roundTripTime: calculateRoundTripTime(this.description.roundTripTime, event.duration)
roundTripTime: this.monitor?.roundTripTime,
minRoundTripTime: this.monitor?.minRoundTripTime
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
})
);

Expand Down Expand Up @@ -467,15 +468,6 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}
}

function calculateRoundTripTime(oldRtt: number, duration: number): number {
if (oldRtt === -1) {
return duration;
}

const alpha = 0.2;
return alpha * duration + (1 - alpha) * oldRtt;
}

nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
function markServerUnknown(server: Server, error?: MongoError) {
// Load balancer servers can never be marked unknown.
if (server.loadBalanced) {
Expand Down
6 changes: 5 additions & 1 deletion src/sdam/server_description.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ export interface ServerDescriptionOptions {
/** An Error used for better reporting debugging */
error?: MongoError;

/** The round trip time to ping this server (in ms) */
/** The average round trip time to ping this server (in ms) */
roundTripTime?: number;
/** The minimum round trip time to ping this server over the past 10 samples(in ms) */
minRoundTripTime?: number;

/** If the client is in load balancing mode. */
loadBalanced?: boolean;
Expand All @@ -58,6 +60,7 @@ export class ServerDescription {
minWireVersion: number;
maxWireVersion: number;
roundTripTime: number;
minRoundTripTime: number;
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
lastUpdateTime: number;
lastWriteDate: number;
me: string | null;
Expand Down Expand Up @@ -98,6 +101,7 @@ export class ServerDescription {
this.minWireVersion = hello?.minWireVersion ?? 0;
this.maxWireVersion = hello?.maxWireVersion ?? 0;
this.roundTripTime = options?.roundTripTime ?? -1;
this.minRoundTripTime = options?.minRoundTripTime ?? 0;
this.lastUpdateTime = now();
this.lastWriteDate = hello?.lastWrite?.lastWriteDate ?? 0;
this.error = options.error ?? null;
Expand Down
5 changes: 2 additions & 3 deletions src/sdam/server_selection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,8 @@ function latencyWindowReducer(
servers: ServerDescription[]
): ServerDescription[] {
const low = servers.reduce(
(min: number, server: ServerDescription) =>
min === -1 ? server.roundTripTime : Math.min(server.roundTripTime, min),
-1
(min: number, server: ServerDescription) => Math.min(server.roundTripTime, min),
Infinity
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
);

const high = low + topologyDescription.localThresholdMS;
Expand Down
Loading
Loading