Skip to content

Commit

Permalink
update monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
W-A-James committed Mar 29, 2024
1 parent d46fc15 commit 0ce246f
Showing 1 changed file with 209 additions and 30 deletions.
239 changes: 209 additions & 30 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
rttPinger?: RTTPinger;
/** @internal */
override component = MongoLoggableComponent.TOPOLOGY;
private rttSamplesMS: MovingWindow;
/** @internal */
rttSamplesMS: MovingWindow;

constructor(server: Server, options: MonitorOptions) {
super();
Expand Down Expand Up @@ -244,6 +245,8 @@ function resetMonitorState(monitor: Monitor) {

monitor.connection?.destroy();
monitor.connection = null;

monitor.clearRttSamples();
}

function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean {
Expand Down Expand Up @@ -277,7 +280,6 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
function onHeartbeatFailed(err: Error) {
monitor.connection?.destroy();
monitor.connection = null;

monitor.emitAndLogHeartbeat(
Server.SERVER_HEARTBEAT_FAILED,
monitor[kServer].topology.s.id,
Expand Down Expand Up @@ -306,7 +308,11 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
// NOTE: here we use the latestRTT as this measurment corresponds with the value
// obtained for this successful heartbeat
const duration =
isAwaitable && monitor.rttPinger ? monitor.rttPinger.latestRTT : calculateDurationInMs(start);
isAwaitable && monitor.rttPinger
? monitor.rttPinger.latestRTT ?? calculateDurationInMs(start)
: calculateDurationInMs(start);

monitor.addRttSample(duration);

monitor.emitAndLogHeartbeat(
Server.SERVER_HEARTBEAT_SUCCEEDED,
Expand Down Expand Up @@ -406,6 +412,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
connection.destroy();
return;
}
const duration = calculateDurationInMs(start);
monitor.addRttSample(duration);

monitor.connection = connection;
monitor.emitAndLogHeartbeat(
Expand All @@ -414,7 +422,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
connection.hello?.connectionId,
new ServerHeartbeatSucceededEvent(
monitor.address,
calculateDurationInMs(start),
duration,
connection.hello,
useStreamingProtocol(monitor, connection.hello?.topologyVersion)
)
Expand All @@ -430,6 +438,173 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
);
}

async function _checkServer(monitor: Monitor): Promise<Document | null> {
let start: number;
let awaited: boolean;
const topologyVersion = monitor[kServer].description.topologyVersion;
const isAwaitable = useStreamingProtocol(monitor, topologyVersion);
monitor.emitAndLogHeartbeat(
Server.SERVER_HEARTBEAT_STARTED,
monitor[kServer].topology.s.id,
undefined,
new ServerHeartbeatStartedEvent(monitor.address, isAwaitable)
);

function onHeartbeatFailed(err: Error) {
monitor.connection?.destroy();
monitor.connection = null;
monitor.emitAndLogHeartbeat(
Server.SERVER_HEARTBEAT_FAILED,
monitor[kServer].topology.s.id,
undefined,
new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err, awaited)
);

const error = !(err instanceof MongoError)
? new MongoError(MongoError.buildErrorMessage(err), { cause: err })
: err;
error.addErrorLabel(MongoErrorLabel.ResetPool);
if (error instanceof MongoNetworkTimeoutError) {
error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections);
}

monitor.emit('resetServer', error);
}

function onHeartbeatSucceeded(hello: Document) {
if (!('isWritablePrimary' in hello)) {
// Provide hello-style response document.
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
}

// NOTE: here we use the latestRTT as this measurment corresponds with the value
// obtained for this successful heartbeat
const duration =
isAwaitable && monitor.rttPinger
? monitor.rttPinger.latestRTT ?? calculateDurationInMs(start)
: calculateDurationInMs(start);

monitor.addRttSample(duration);

monitor.emitAndLogHeartbeat(
Server.SERVER_HEARTBEAT_SUCCEEDED,
monitor[kServer].topology.s.id,
hello.connectionId,
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable)
);

if (isAwaitable) {
// If we are using the streaming protocol then we immediately issue another 'started'
// event, otherwise the "check" is complete and return to the main monitor loop
monitor.emitAndLogHeartbeat(
Server.SERVER_HEARTBEAT_STARTED,
monitor[kServer].topology.s.id,
undefined,
new ServerHeartbeatStartedEvent(monitor.address, true)
);
// We have not actually sent an outgoing handshake, but when we get the next response we
// want the duration to reflect the time since we last heard from the server
start = now();
} else {
monitor.rttPinger?.close();
monitor.rttPinger = undefined;
}
}

const { connection } = monitor;
if (connection && !connection.closed) {
const { serverApi, helloOk } = connection;
const connectTimeoutMS = monitor.options.connectTimeoutMS;
const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS;

const cmd = {
[serverApi?.version || helloOk ? 'hello' : LEGACY_HELLO_COMMAND]: 1,
...(isAwaitable && topologyVersion
? { maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) }
: {})
};

const options = isAwaitable
? {
socketTimeoutMS: connectTimeoutMS ? connectTimeoutMS + maxAwaitTimeMS : 0,
exhaustAllowed: true
}
: { socketTimeoutMS: connectTimeoutMS };

if (isAwaitable && monitor.rttPinger == null) {
monitor.rttPinger = new RTTPinger(
monitor,
monitor[kCancellationToken],
Object.assign(
{ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS },
monitor.connectOptions
)
);
}

// Record new start time before sending handshake
start = now();

if (isAwaitable) {
awaited = true;
try {
const hello = await connection.command(ns('admin.$cmd'), cmd, options);
onHeartbeatSucceeded(hello);
return hello;
} catch (error) {
onHeartbeatFailed(error);
return null;
}
}

awaited = false;
try {
const hello = await connection.command(ns('admin.$cmd'), cmd, options);
onHeartbeatSucceeded(hello);
return hello;
} catch (error) {
onHeartbeatFailed(error);
return null;
}
} else {
const socket = await makeSocket(monitor.connectOptions);
const connection = makeConnection(monitor.connectOptions, socket);
// The start time is after socket creation but before the handshake
start = now();
try {
await performInitialHandshake(connection, monitor.connectOptions);
const duration = calculateDurationInMs(start);
if (isInCloseState(monitor)) {
connection.destroy();
return null;
}

monitor.connection = connection;
monitor.addRttSample(duration);

monitor.emitAndLogHeartbeat(
Server.SERVER_HEARTBEAT_SUCCEEDED,
monitor[kServer].topology.s.id,
connection.hello?.connectionId,
new ServerHeartbeatSucceededEvent(
monitor.address,
duration,
connection.hello,
useStreamingProtocol(monitor, connection.hello?.topologyVersion)
)
);

return connection.hello;
} catch (error) {
connection.destroy();
monitor.connection = null;
awaited = false;
onHeartbeatFailed(error);
return null;
}
}
}

function monitorServer(monitor: Monitor) {
return (callback: Callback) => {
if (monitor.s.state === STATE_MONITORING) {
Expand Down Expand Up @@ -491,12 +666,15 @@ export class RTTPinger {
/** @internal */
monitor: Monitor;
closed: boolean;
/** @internal */
latestRTT?: number;

constructor(monitor: Monitor, cancellationToken: CancellationToken, options: RTTPingerOptions) {
this.connection = undefined;
this[kCancellationToken] = cancellationToken;
this.closed = false;
this.monitor = monitor;
this.latestRTT = 0;

const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS);
Expand All @@ -510,10 +688,6 @@ export class RTTPinger {
return this.monitor.minRoundTripTime;
}

get latestRTT(): number {
return this.monitor.latestRTT;
}

close(): void {
this.closed = true;
clearTimeout(this[kMonitorId]);
Expand All @@ -523,42 +697,48 @@ export class RTTPinger {
}
}

function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
const start = now();
options.cancellationToken = rttPinger[kCancellationToken];
const heartbeatFrequencyMS = options.heartbeatFrequencyMS;

function measureAndReschedule(
rttPinger: RTTPinger,
options: RTTPingerOptions,
start?: number,
conn?: Connection
) {
if (start == null) {
start = now();
}
if (rttPinger.closed) {
conn?.destroy();
return;
}

function measureAndReschedule(conn?: Connection) {
if (rttPinger.closed) {
conn?.destroy();
return;
}
if (rttPinger.connection == null) {
rttPinger.connection = conn;
}

if (rttPinger.connection == null) {
rttPinger.connection = conn;
}
rttPinger.latestRTT = calculateDurationInMs(start);
rttPinger[kMonitorId] = setTimeout(
() => measureRoundTripTime(rttPinger, options),
options.heartbeatFrequencyMS
);
}

rttPinger.monitor.addRttSample(calculateDurationInMs(start));
rttPinger[kMonitorId] = setTimeout(
() => measureRoundTripTime(rttPinger, options),
heartbeatFrequencyMS
);
function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
const start = now();
options.cancellationToken = rttPinger[kCancellationToken];

if (rttPinger.closed) {
return;
}

const connection = rttPinger.connection;
if (connection == null) {
// eslint-disable-next-line github/no-then
connect(options).then(
connection => {
measureAndReschedule(connection);
measureAndReschedule(rttPinger, options, start, connection);
},
() => {
rttPinger.connection = undefined;
rttPinger.monitor.clearRttSamples();
}
);
return;
Expand All @@ -568,11 +748,10 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
// eslint-disable-next-line github/no-then
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
() => measureAndReschedule(),
() => measureAndReschedule(rttPinger, options),
() => {
rttPinger.connection?.destroy();
rttPinger.connection = undefined;
rttPinger.monitor.clearRttSamples();
return;
}
);
Expand Down

0 comments on commit 0ce246f

Please sign in to comment.