Skip to content

Commit 150f968

Browse files
nbbeekenljhaywar
authored andcommitted
fix(NODE-3272)!: emit correct event type when SRV Polling (#2825)
1 parent e684130 commit 150f968

File tree

7 files changed

+183
-63
lines changed

7 files changed

+183
-63
lines changed

src/sdam/srv_polling.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ export class SrvPoller extends TypedEventEmitter<SrvPollerEvents> {
5959
generation: number;
6060
_timeout?: NodeJS.Timeout;
6161

62+
/** @event */
63+
static readonly SRV_RECORD_DISCOVERY = 'srvRecordDiscovery' as const;
64+
6265
constructor(options: SrvPollerOptions) {
6366
super();
6467

@@ -110,7 +113,7 @@ export class SrvPoller extends TypedEventEmitter<SrvPollerEvents> {
110113
success(srvRecords: dns.SrvRecord[]): void {
111114
this.haMode = false;
112115
this.schedule();
113-
this.emit('srvRecordDiscovery', new SrvPollingEvent(srvRecords));
116+
this.emit(SrvPoller.SRV_RECORD_DISCOVERY, new SrvPollingEvent(srvRecords));
114117
}
115118

116119
failure(message: string, obj?: NodeJS.ErrnoException): void {

src/sdam/topology.ts

+43-57
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ export interface TopologyPrivate {
128128

129129
/** related to srv polling */
130130
srvPoller?: SrvPoller;
131-
detectTopologyDescriptionChange?: (event: TopologyDescriptionChangedEvent) => void;
132-
handleSrvPolling?: (event: SrvPollingEvent) => void;
131+
detectShardedTopology: (event: TopologyDescriptionChangedEvent) => void;
132+
detectSrvRecords: (event: SrvPollingEvent) => void;
133133
}
134134

135135
/** @public */
@@ -319,36 +319,57 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
319319
clusterTime: undefined,
320320

321321
// timer management
322-
connectionTimers: new Set<NodeJS.Timeout>()
322+
connectionTimers: new Set<NodeJS.Timeout>(),
323+
324+
detectShardedTopology: ev => this.detectShardedTopology(ev),
325+
detectSrvRecords: ev => this.detectSrvRecords(ev)
323326
};
324327

325328
if (options.srvHost) {
326329
this.s.srvPoller =
327-
options.srvPoller ||
330+
options.srvPoller ??
328331
new SrvPoller({
329332
heartbeatFrequencyMS: this.s.heartbeatFrequencyMS,
330333
srvHost: options.srvHost
331334
});
332335

333-
this.s.detectTopologyDescriptionChange = (ev: TopologyDescriptionChangedEvent) => {
334-
const previousType = ev.previousDescription.type;
335-
const newType = ev.newDescription.type;
336+
this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
337+
}
338+
}
336339

337-
if (previousType !== TopologyType.Sharded && newType === TopologyType.Sharded) {
338-
this.s.handleSrvPolling = srvPollingHandler(this);
339-
if (this.s.srvPoller) {
340-
// TODO(NODE-3269): it looks like there is a bug here, what if this happens twice?
341-
this.s.srvPoller.on('srvRecordDiscovery', this.s.handleSrvPolling);
342-
this.s.srvPoller.start();
343-
}
344-
}
345-
};
340+
private detectShardedTopology(event: TopologyDescriptionChangedEvent) {
341+
const previousType = event.previousDescription.type;
342+
const newType = event.newDescription.type;
343+
344+
const transitionToSharded =
345+
previousType !== TopologyType.Sharded && newType === TopologyType.Sharded;
346+
const srvListeners = this.s.srvPoller?.listeners(SrvPoller.SRV_RECORD_DISCOVERY);
347+
const listeningToSrvPolling = !!srvListeners?.includes(this.s.detectSrvRecords);
348+
349+
if (transitionToSharded && !listeningToSrvPolling) {
350+
this.s.srvPoller?.on(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
351+
this.s.srvPoller?.start();
352+
}
353+
}
346354

347-
this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectTopologyDescriptionChange);
355+
private detectSrvRecords(ev: SrvPollingEvent) {
356+
const previousTopologyDescription = this.s.description;
357+
this.s.description = this.s.description.updateFromSrvPollingEvent(ev);
358+
if (this.s.description === previousTopologyDescription) {
359+
// Nothing changed, so return
360+
return;
348361
}
349362

350-
// NOTE: remove this when NODE-1709 is resolved
351-
this.setMaxListeners(Infinity);
363+
updateServers(this);
364+
365+
this.emit(
366+
Topology.TOPOLOGY_DESCRIPTION_CHANGED,
367+
new TopologyDescriptionChangedEvent(
368+
this.s.id,
369+
previousTopologyDescription,
370+
this.s.description
371+
)
372+
);
352373
}
353374

354375
/**
@@ -456,20 +477,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
456477

457478
if (this.s.srvPoller) {
458479
this.s.srvPoller.stop();
459-
if (this.s.handleSrvPolling) {
460-
this.s.srvPoller.removeListener('srvRecordDiscovery', this.s.handleSrvPolling);
461-
delete this.s.handleSrvPolling;
462-
}
480+
this.s.srvPoller.removeListener(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
463481
}
464482

465-
if (this.s.detectTopologyDescriptionChange) {
466-
// TODO(NODE-3272): This isn't the event that the detectTopologyDescriptionChange event is listening to
467-
this.removeListener(
468-
Topology.SERVER_DESCRIPTION_CHANGED,
469-
this.s.detectTopologyDescriptionChange
470-
);
471-
delete this.s.detectTopologyDescriptionChange;
472-
}
483+
this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
473484

474485
for (const session of this.s.sessions) {
475486
session.endSession();
@@ -478,7 +489,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
478489
this.s.sessionPool.endAllPooledSessions(() => {
479490
eachAsync(
480491
Array.from(this.s.servers.values()),
481-
(server: Server, cb: Callback) => destroyServer(server, this, options, cb),
492+
(server, cb) => destroyServer(server, this, options, cb),
482493
err => {
483494
this.s.servers.clear();
484495

@@ -924,31 +935,6 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes
924935
}
925936
}
926937

927-
function srvPollingHandler(topology: Topology) {
928-
return function handleSrvPolling(ev: SrvPollingEvent) {
929-
const previousTopologyDescription = topology.s.description;
930-
topology.s.description = topology.s.description.updateFromSrvPollingEvent(ev);
931-
if (topology.s.description === previousTopologyDescription) {
932-
// Nothing changed, so return
933-
return;
934-
}
935-
936-
updateServers(topology);
937-
938-
topology.emit(
939-
Topology.SERVER_DESCRIPTION_CHANGED,
940-
// TODO(NODE-3272): This server description changed event is emitting a TopologyDescriptionChangeEvent
941-
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
942-
// @ts-expect-error
943-
new TopologyDescriptionChangedEvent(
944-
topology.s.id,
945-
previousTopologyDescription,
946-
topology.s.description
947-
)
948-
);
949-
};
950-
}
951-
952938
function drainWaitQueue(queue: Denque<ServerSelectionRequest>, err?: AnyError) {
953939
while (queue.length) {
954940
const waitQueueMember = queue.shift();

src/sessions.ts

-2
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,7 @@ class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
8585
hasEnded: boolean;
8686
clientOptions?: MongoOptions;
8787
supports: { causalConsistency: boolean };
88-
/** @internal */
8988
clusterTime?: ClusterTime;
90-
/** @internal */
9189
operationTime?: Timestamp;
9290
explicit: boolean;
9391
/** @internal */

test/functional/apm.test.js

+9-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ describe('APM', function () {
114114
}
115115
});
116116

117-
it('should correctly receive the APM events for a listIndexes command', {
117+
// NODE-3308
118+
it.skip('should correctly receive the APM events for a listIndexes command', {
118119
metadata: { requires: { topology: ['replicaset'], mongodb: '>=3.0.0' } },
119120

120121
test: function () {
@@ -908,6 +909,13 @@ describe('APM', function () {
908909
it(test.description, {
909910
metadata: { requires: requirements },
910911
test: function () {
912+
if (
913+
test.description ===
914+
'A successful find event with a getmore and the server kills the cursor'
915+
) {
916+
this.skip();
917+
}
918+
911919
const client = this.configuration.newClient({}, { monitorCommands: true });
912920
return client.connect().then(client => {
913921
expect(client).to.exist;

test/functional/unified-spec-runner/unified-runner.test.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ const SKIPPED_TESTS = [
77
// These two tests need to run against multiple mongoses
88
'Dirty explicit session is discarded',
99
// Will be implemented as part of NODE-2034
10-
'Client side error in command starting transaction'
10+
'Client side error in command starting transaction',
11+
'A successful find event with a getmore and the server kills the cursor' // NODE-3308
1112
];
1213

1314
describe('Unified test format runner', function unifiedTestRunner() {

test/tools/utils.js

+14-1
Original file line numberDiff line numberDiff line change
@@ -246,12 +246,25 @@ class EventCollector {
246246
}
247247
}
248248

249+
function getSymbolFrom(target, symbolName, assertExists = true) {
250+
const symbol = Object.getOwnPropertySymbols(target).filter(
251+
s => s.toString() === `Symbol(${symbolName})`
252+
)[0];
253+
254+
if (assertExists && !symbol) {
255+
throw new Error(`Did not find Symbol(${symbolName}) on ${target}`);
256+
}
257+
258+
return symbol;
259+
}
260+
249261
module.exports = {
250262
EventCollector,
251263
makeTestFunction,
252264
ensureCalledWith,
253265
ClassWithLogger,
254266
ClassWithoutLogger,
255267
ClassWithUndefinedLogger,
256-
visualizeMonitoringEvents
268+
visualizeMonitoringEvents,
269+
getSymbolFrom
257270
};

test/unit/sdam/topology.test.js

+111
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ const { Topology } = require('../../../src/sdam/topology');
77
const { Server } = require('../../../src/sdam/server');
88
const { ServerDescription } = require('../../../src/sdam/server_description');
99
const { ns, makeClientMetadata } = require('../../../src/utils');
10+
const { TopologyDescriptionChangedEvent } = require('../../../src/sdam/events');
11+
const { TopologyDescription } = require('../../../src/sdam/topology_description');
12+
const { TopologyType } = require('../../../src/sdam/common');
13+
const { SrvPoller, SrvPollingEvent } = require('../../../src/sdam/srv_polling');
14+
const { getSymbolFrom } = require('../../tools/utils');
1015

1116
describe('Topology (unit)', function () {
1217
describe('client metadata', function () {
@@ -314,5 +319,111 @@ describe('Topology (unit)', function () {
314319
});
315320
return p;
316321
});
322+
323+
describe('srv event listeners', function () {
324+
/** @type {Topology} */
325+
let topology;
326+
327+
beforeEach(() => {
328+
topology = new Topology('', { srvHost: 'fakeHost' });
329+
330+
expect(topology.s.detectSrvRecords).to.be.a('function');
331+
expect(topology.s.detectShardedTopology).to.be.a('function');
332+
});
333+
334+
afterEach(() => {
335+
// The srv event starts a monitor that we need to clean up
336+
for (const [, server] of topology.s.servers) {
337+
const kMonitor = getSymbolFrom(server, 'monitor');
338+
const kMonitorId = getSymbolFrom(server[kMonitor], 'monitorId');
339+
server[kMonitor][kMonitorId].stop();
340+
}
341+
});
342+
343+
function transitionTopology(topology, from, to) {
344+
topology.emit(
345+
Topology.TOPOLOGY_DESCRIPTION_CHANGED,
346+
new TopologyDescriptionChangedEvent(
347+
2,
348+
new TopologyDescription(from),
349+
new TopologyDescription(to)
350+
)
351+
);
352+
// We don't want the SrvPoller to actually run
353+
clearTimeout(topology.s.srvPoller._timeout);
354+
}
355+
356+
describe('srvRecordDiscovery event listener', function () {
357+
beforeEach(() => {
358+
// fake a transition to Sharded
359+
transitionTopology(topology, TopologyType.Unknown, TopologyType.Sharded);
360+
expect(topology.s.srvPoller).to.be.instanceOf(SrvPoller);
361+
362+
const srvPollerListeners = topology.s.srvPoller.listeners(SrvPoller.SRV_RECORD_DISCOVERY);
363+
expect(srvPollerListeners).to.have.lengthOf(1);
364+
expect(srvPollerListeners[0]).to.equal(topology.s.detectSrvRecords);
365+
const topologyChangeListeners = topology.listeners(Topology.TOPOLOGY_DESCRIPTION_CHANGED);
366+
expect(topologyChangeListeners).to.have.lengthOf(1);
367+
expect(topologyChangeListeners[0]).to.equal(topology.s.detectShardedTopology);
368+
});
369+
370+
it('should emit topologyDescriptionChange event', function () {
371+
topology.once(Topology.TOPOLOGY_DESCRIPTION_CHANGED, ev => {
372+
// The first event we get here is caused by the srv record discovery event below
373+
expect(ev).to.have.nested.property('newDescription.servers');
374+
expect(ev.newDescription.servers.get('fake:2'))
375+
.to.be.a('object')
376+
.with.property('address', 'fake:2');
377+
});
378+
379+
topology.s.srvPoller.emit(
380+
SrvPoller.SRV_RECORD_DISCOVERY,
381+
new SrvPollingEvent([{ priority: 1, weight: 1, port: 2, name: 'fake' }])
382+
);
383+
});
384+
385+
it('should clean up listeners on close', function (done) {
386+
topology.s.state = 'connected'; // fake state to test clean up logic
387+
topology.close(e => {
388+
const srvPollerListeners = topology.s.srvPoller.listeners(
389+
SrvPoller.SRV_RECORD_DISCOVERY
390+
);
391+
expect(srvPollerListeners).to.have.lengthOf(0);
392+
const topologyChangeListeners = topology.listeners(
393+
Topology.TOPOLOGY_DESCRIPTION_CHANGED
394+
);
395+
expect(topologyChangeListeners).to.have.lengthOf(0);
396+
done(e);
397+
});
398+
});
399+
});
400+
401+
describe('topologyDescriptionChange event listener', function () {
402+
it('should not add more than one srvRecordDiscovery listener', function () {
403+
// fake a transition to Sharded
404+
transitionTopology(topology, TopologyType.Unknown, TopologyType.Sharded); // Transition 1
405+
406+
const srvListenersFirstTransition = topology.s.srvPoller.listeners(
407+
SrvPoller.SRV_RECORD_DISCOVERY
408+
);
409+
expect(srvListenersFirstTransition).to.have.lengthOf(1);
410+
411+
transitionTopology(topology, TopologyType.Unknown, TopologyType.Sharded); // Transition 2
412+
413+
const srvListenersSecondTransition = topology.s.srvPoller.listeners(
414+
SrvPoller.SRV_RECORD_DISCOVERY
415+
);
416+
expect(srvListenersSecondTransition).to.have.lengthOf(1);
417+
});
418+
419+
it('should not add srvRecordDiscovery listener if transition is not to Sharded topology', function () {
420+
// fake a transition to **NOT** Sharded
421+
transitionTopology(topology, TopologyType.Unknown, TopologyType.ReplicaSetWithPrimary);
422+
423+
const srvListeners = topology.s.srvPoller.listeners(SrvPoller.SRV_RECORD_DISCOVERY);
424+
expect(srvListeners).to.have.lengthOf(0);
425+
});
426+
});
427+
});
317428
});
318429
});

0 commit comments

Comments
 (0)