diff --git a/src/sdam/srv_polling.ts b/src/sdam/srv_polling.ts index 73369353b0f..ebeb54c0081 100644 --- a/src/sdam/srv_polling.ts +++ b/src/sdam/srv_polling.ts @@ -59,6 +59,9 @@ export class SrvPoller extends TypedEventEmitter { generation: number; _timeout?: NodeJS.Timeout; + /** @event */ + static readonly SRV_RECORD_DISCOVERY = 'srvRecordDiscovery' as const; + constructor(options: SrvPollerOptions) { super(); @@ -110,7 +113,7 @@ export class SrvPoller extends TypedEventEmitter { success(srvRecords: dns.SrvRecord[]): void { this.haMode = false; this.schedule(); - this.emit('srvRecordDiscovery', new SrvPollingEvent(srvRecords)); + this.emit(SrvPoller.SRV_RECORD_DISCOVERY, new SrvPollingEvent(srvRecords)); } failure(message: string, obj?: NodeJS.ErrnoException): void { diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index e4baf74cdf5..b11e775c84f 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -128,8 +128,8 @@ export interface TopologyPrivate { /** related to srv polling */ srvPoller?: SrvPoller; - detectTopologyDescriptionChange?: (event: TopologyDescriptionChangedEvent) => void; - handleSrvPolling?: (event: SrvPollingEvent) => void; + detectShardedTopology: (event: TopologyDescriptionChangedEvent) => void; + detectSrvRecords: (event: SrvPollingEvent) => void; } /** @public */ @@ -319,36 +319,57 @@ export class Topology extends TypedEventEmitter { clusterTime: undefined, // timer management - connectionTimers: new Set() + connectionTimers: new Set(), + + detectShardedTopology: ev => this.detectShardedTopology(ev), + detectSrvRecords: ev => this.detectSrvRecords(ev) }; if (options.srvHost) { this.s.srvPoller = - options.srvPoller || + options.srvPoller ?? new SrvPoller({ heartbeatFrequencyMS: this.s.heartbeatFrequencyMS, srvHost: options.srvHost }); - this.s.detectTopologyDescriptionChange = (ev: TopologyDescriptionChangedEvent) => { - const previousType = ev.previousDescription.type; - const newType = ev.newDescription.type; + this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology); + } + } - if (previousType !== TopologyType.Sharded && newType === TopologyType.Sharded) { - this.s.handleSrvPolling = srvPollingHandler(this); - if (this.s.srvPoller) { - // TODO(NODE-3269): it looks like there is a bug here, what if this happens twice? - this.s.srvPoller.on('srvRecordDiscovery', this.s.handleSrvPolling); - this.s.srvPoller.start(); - } - } - }; + private detectShardedTopology(event: TopologyDescriptionChangedEvent) { + const previousType = event.previousDescription.type; + const newType = event.newDescription.type; + + const transitionToSharded = + previousType !== TopologyType.Sharded && newType === TopologyType.Sharded; + const srvListeners = this.s.srvPoller?.listeners(SrvPoller.SRV_RECORD_DISCOVERY); + const listeningToSrvPolling = !!srvListeners?.includes(this.s.detectSrvRecords); + + if (transitionToSharded && !listeningToSrvPolling) { + this.s.srvPoller?.on(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords); + this.s.srvPoller?.start(); + } + } - this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectTopologyDescriptionChange); + private detectSrvRecords(ev: SrvPollingEvent) { + const previousTopologyDescription = this.s.description; + this.s.description = this.s.description.updateFromSrvPollingEvent(ev); + if (this.s.description === previousTopologyDescription) { + // Nothing changed, so return + return; } - // NOTE: remove this when NODE-1709 is resolved - this.setMaxListeners(Infinity); + updateServers(this); + + this.emit( + Topology.TOPOLOGY_DESCRIPTION_CHANGED, + new TopologyDescriptionChangedEvent( + this.s.id, + previousTopologyDescription, + this.s.description + ) + ); } /** @@ -456,20 +477,10 @@ export class Topology extends TypedEventEmitter { if (this.s.srvPoller) { this.s.srvPoller.stop(); - if (this.s.handleSrvPolling) { - this.s.srvPoller.removeListener('srvRecordDiscovery', this.s.handleSrvPolling); - delete this.s.handleSrvPolling; - } + this.s.srvPoller.removeListener(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords); } - if (this.s.detectTopologyDescriptionChange) { - // TODO(NODE-3272): This isn't the event that the detectTopologyDescriptionChange event is listening to - this.removeListener( - Topology.SERVER_DESCRIPTION_CHANGED, - this.s.detectTopologyDescriptionChange - ); - delete this.s.detectTopologyDescriptionChange; - } + this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology); for (const session of this.s.sessions) { session.endSession(); @@ -478,7 +489,7 @@ export class Topology extends TypedEventEmitter { this.s.sessionPool.endAllPooledSessions(() => { eachAsync( Array.from(this.s.servers.values()), - (server: Server, cb: Callback) => destroyServer(server, this, options, cb), + (server, cb) => destroyServer(server, this, options, cb), err => { this.s.servers.clear(); @@ -924,31 +935,6 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes } } -function srvPollingHandler(topology: Topology) { - return function handleSrvPolling(ev: SrvPollingEvent) { - const previousTopologyDescription = topology.s.description; - topology.s.description = topology.s.description.updateFromSrvPollingEvent(ev); - if (topology.s.description === previousTopologyDescription) { - // Nothing changed, so return - return; - } - - updateServers(topology); - - topology.emit( - Topology.SERVER_DESCRIPTION_CHANGED, - // TODO(NODE-3272): This server description changed event is emitting a TopologyDescriptionChangeEvent - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-expect-error - new TopologyDescriptionChangedEvent( - topology.s.id, - previousTopologyDescription, - topology.s.description - ) - ); - }; -} - function drainWaitQueue(queue: Denque, err?: AnyError) { while (queue.length) { const waitQueueMember = queue.shift(); diff --git a/src/sessions.ts b/src/sessions.ts index 8f8a4ee4b71..a5be6386df7 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -85,9 +85,7 @@ class ClientSession extends TypedEventEmitter { hasEnded: boolean; clientOptions?: MongoOptions; supports: { causalConsistency: boolean }; - /** @internal */ clusterTime?: ClusterTime; - /** @internal */ operationTime?: Timestamp; explicit: boolean; /** @internal */ diff --git a/test/functional/apm.test.js b/test/functional/apm.test.js index b42604a15bd..2d42305b920 100644 --- a/test/functional/apm.test.js +++ b/test/functional/apm.test.js @@ -114,7 +114,8 @@ describe('APM', function () { } }); - it('should correctly receive the APM events for a listIndexes command', { + // NODE-3308 + it.skip('should correctly receive the APM events for a listIndexes command', { metadata: { requires: { topology: ['replicaset'], mongodb: '>=3.0.0' } }, test: function () { @@ -908,6 +909,13 @@ describe('APM', function () { it(test.description, { metadata: { requires: requirements }, test: function () { + if ( + test.description === + 'A successful find event with a getmore and the server kills the cursor' + ) { + this.skip(); + } + const client = this.configuration.newClient({}, { monitorCommands: true }); return client.connect().then(client => { expect(client).to.exist; diff --git a/test/functional/unified-spec-runner/unified-runner.test.ts b/test/functional/unified-spec-runner/unified-runner.test.ts index 86eca4e1b2d..40267279364 100644 --- a/test/functional/unified-spec-runner/unified-runner.test.ts +++ b/test/functional/unified-spec-runner/unified-runner.test.ts @@ -7,7 +7,8 @@ const SKIPPED_TESTS = [ // These two tests need to run against multiple mongoses 'Dirty explicit session is discarded', // Will be implemented as part of NODE-2034 - 'Client side error in command starting transaction' + 'Client side error in command starting transaction', + 'A successful find event with a getmore and the server kills the cursor' // NODE-3308 ]; describe('Unified test format runner', function unifiedTestRunner() { diff --git a/test/tools/utils.js b/test/tools/utils.js index 9f82abaaf65..88930825be4 100644 --- a/test/tools/utils.js +++ b/test/tools/utils.js @@ -246,6 +246,18 @@ class EventCollector { } } +function getSymbolFrom(target, symbolName, assertExists = true) { + const symbol = Object.getOwnPropertySymbols(target).filter( + s => s.toString() === `Symbol(${symbolName})` + )[0]; + + if (assertExists && !symbol) { + throw new Error(`Did not find Symbol(${symbolName}) on ${target}`); + } + + return symbol; +} + module.exports = { EventCollector, makeTestFunction, @@ -253,5 +265,6 @@ module.exports = { ClassWithLogger, ClassWithoutLogger, ClassWithUndefinedLogger, - visualizeMonitoringEvents + visualizeMonitoringEvents, + getSymbolFrom }; diff --git a/test/unit/sdam/topology.test.js b/test/unit/sdam/topology.test.js index 0695549de38..b6dddb5e0f6 100644 --- a/test/unit/sdam/topology.test.js +++ b/test/unit/sdam/topology.test.js @@ -7,6 +7,11 @@ const { Topology } = require('../../../src/sdam/topology'); const { Server } = require('../../../src/sdam/server'); const { ServerDescription } = require('../../../src/sdam/server_description'); const { ns, makeClientMetadata } = require('../../../src/utils'); +const { TopologyDescriptionChangedEvent } = require('../../../src/sdam/events'); +const { TopologyDescription } = require('../../../src/sdam/topology_description'); +const { TopologyType } = require('../../../src/sdam/common'); +const { SrvPoller, SrvPollingEvent } = require('../../../src/sdam/srv_polling'); +const { getSymbolFrom } = require('../../tools/utils'); describe('Topology (unit)', function () { describe('client metadata', function () { @@ -314,5 +319,111 @@ describe('Topology (unit)', function () { }); return p; }); + + describe('srv event listeners', function () { + /** @type {Topology} */ + let topology; + + beforeEach(() => { + topology = new Topology('', { srvHost: 'fakeHost' }); + + expect(topology.s.detectSrvRecords).to.be.a('function'); + expect(topology.s.detectShardedTopology).to.be.a('function'); + }); + + afterEach(() => { + // The srv event starts a monitor that we need to clean up + for (const [, server] of topology.s.servers) { + const kMonitor = getSymbolFrom(server, 'monitor'); + const kMonitorId = getSymbolFrom(server[kMonitor], 'monitorId'); + server[kMonitor][kMonitorId].stop(); + } + }); + + function transitionTopology(topology, from, to) { + topology.emit( + Topology.TOPOLOGY_DESCRIPTION_CHANGED, + new TopologyDescriptionChangedEvent( + 2, + new TopologyDescription(from), + new TopologyDescription(to) + ) + ); + // We don't want the SrvPoller to actually run + clearTimeout(topology.s.srvPoller._timeout); + } + + describe('srvRecordDiscovery event listener', function () { + beforeEach(() => { + // fake a transition to Sharded + transitionTopology(topology, TopologyType.Unknown, TopologyType.Sharded); + expect(topology.s.srvPoller).to.be.instanceOf(SrvPoller); + + const srvPollerListeners = topology.s.srvPoller.listeners(SrvPoller.SRV_RECORD_DISCOVERY); + expect(srvPollerListeners).to.have.lengthOf(1); + expect(srvPollerListeners[0]).to.equal(topology.s.detectSrvRecords); + const topologyChangeListeners = topology.listeners(Topology.TOPOLOGY_DESCRIPTION_CHANGED); + expect(topologyChangeListeners).to.have.lengthOf(1); + expect(topologyChangeListeners[0]).to.equal(topology.s.detectShardedTopology); + }); + + it('should emit topologyDescriptionChange event', function () { + topology.once(Topology.TOPOLOGY_DESCRIPTION_CHANGED, ev => { + // The first event we get here is caused by the srv record discovery event below + expect(ev).to.have.nested.property('newDescription.servers'); + expect(ev.newDescription.servers.get('fake:2')) + .to.be.a('object') + .with.property('address', 'fake:2'); + }); + + topology.s.srvPoller.emit( + SrvPoller.SRV_RECORD_DISCOVERY, + new SrvPollingEvent([{ priority: 1, weight: 1, port: 2, name: 'fake' }]) + ); + }); + + it('should clean up listeners on close', function (done) { + topology.s.state = 'connected'; // fake state to test clean up logic + topology.close(e => { + const srvPollerListeners = topology.s.srvPoller.listeners( + SrvPoller.SRV_RECORD_DISCOVERY + ); + expect(srvPollerListeners).to.have.lengthOf(0); + const topologyChangeListeners = topology.listeners( + Topology.TOPOLOGY_DESCRIPTION_CHANGED + ); + expect(topologyChangeListeners).to.have.lengthOf(0); + done(e); + }); + }); + }); + + describe('topologyDescriptionChange event listener', function () { + it('should not add more than one srvRecordDiscovery listener', function () { + // fake a transition to Sharded + transitionTopology(topology, TopologyType.Unknown, TopologyType.Sharded); // Transition 1 + + const srvListenersFirstTransition = topology.s.srvPoller.listeners( + SrvPoller.SRV_RECORD_DISCOVERY + ); + expect(srvListenersFirstTransition).to.have.lengthOf(1); + + transitionTopology(topology, TopologyType.Unknown, TopologyType.Sharded); // Transition 2 + + const srvListenersSecondTransition = topology.s.srvPoller.listeners( + SrvPoller.SRV_RECORD_DISCOVERY + ); + expect(srvListenersSecondTransition).to.have.lengthOf(1); + }); + + it('should not add srvRecordDiscovery listener if transition is not to Sharded topology', function () { + // fake a transition to **NOT** Sharded + transitionTopology(topology, TopologyType.Unknown, TopologyType.ReplicaSetWithPrimary); + + const srvListeners = topology.s.srvPoller.listeners(SrvPoller.SRV_RECORD_DISCOVERY); + expect(srvListeners).to.have.lengthOf(0); + }); + }); + }); }); });