diff --git a/lib/change_stream.js b/lib/change_stream.js index 03ee9d8d3c2..e875bc52029 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -46,7 +46,7 @@ class ChangeStream extends EventEmitter { if (changeDomain instanceof Collection) { this.type = CHANGE_DOMAIN_TYPES.COLLECTION; - this.serverConfig = changeDomain.s.db.serverConfig; + this.topology = changeDomain.s.db.serverConfig; this.namespace = { collection: changeDomain.collectionName, @@ -58,12 +58,12 @@ class ChangeStream extends EventEmitter { this.type = CHANGE_DOMAIN_TYPES.DATABASE; this.namespace = { collection: '', database: changeDomain.databaseName }; this.cursorNamespace = this.namespace.database; - this.serverConfig = changeDomain.serverConfig; + this.topology = changeDomain.serverConfig; } else if (changeDomain instanceof MongoClient) { this.type = CHANGE_DOMAIN_TYPES.CLUSTER; this.namespace = { collection: '', database: 'admin' }; this.cursorNamespace = this.namespace.database; - this.serverConfig = changeDomain.topology; + this.topology = changeDomain.topology; } else { throw new TypeError( 'changeDomain provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient' @@ -76,9 +76,9 @@ class ChangeStream extends EventEmitter { } // We need to get the operationTime as early as possible - const isMaster = this.serverConfig.lastIsMaster(); + const isMaster = this.topology.lastIsMaster(); if (!isMaster) { - throw new MongoError('ServerConfig does not have an ismaster yet.'); + throw new MongoError('Topology does not have an ismaster yet.'); } this.operationTime = isMaster.operationTime; @@ -89,7 +89,9 @@ class ChangeStream extends EventEmitter { // Listen for any `change` listeners being added to ChangeStream this.on('newListener', eventName => { if (eventName === 'change' && this.cursor && this.cursor.listenerCount('change') === 0) { - this.cursor.on('data', change => processNewChange(this, null, change)); + this.cursor.on('data', change => + processNewChange({ changeStream: this, change, eventEmitter: true }) + ); } }); @@ -125,14 +127,11 @@ class ChangeStream extends EventEmitter { if (callback) return callback(new Error('Change Stream is not open.'), null); return self.promiseLibrary.reject(new Error('Change Stream is not open.')); } + return this.cursor .next() - .then(function(change) { - return processNewChange(self, null, change, callback); - }) - .catch(function(err) { - return processNewChange(self, err, null, callback); - }); + .then(change => processNewChange({ changeStream: self, change, callback })) + .catch(error => processNewChange({ changeStream: self, error, callback })); } /** @@ -230,14 +229,16 @@ var createChangeStreamCursor = function(self) { var changeStreamCursor = buildChangeStreamAggregationCommand(self); /** - * Fired for each new matching change in the specified namespace. Attaching a `change` event listener to a Change Stream will switch the stream into flowing mode. Data will then be passed as soon as it is available. + * Fired for each new matching change in the specified namespace. Attaching a `change` + * event listener to a Change Stream will switch the stream into flowing mode. Data will + * then be passed as soon as it is available. * * @event ChangeStream#change * @type {object} */ if (self.listenerCount('change') > 0) { changeStreamCursor.on('data', function(change) { - processNewChange(self, null, change); + processNewChange({ changeStream: self, change, eventEmitter: true }); }); } @@ -268,7 +269,7 @@ var createChangeStreamCursor = function(self) { * @type {Error} */ changeStreamCursor.on('error', function(error) { - self.emit('error', error); + processNewChange({ changeStream: self, error, eventEmitter: true }); }); if (self.pipeDestinations) { @@ -286,14 +287,14 @@ function getResumeToken(self) { } function getStartAtOperationTime(self) { - const isMaster = self.serverConfig.lastIsMaster() || {}; + const isMaster = self.topology.lastIsMaster() || {}; return ( isMaster.maxWireVersion && isMaster.maxWireVersion >= 7 && self.options.startAtOperationTime ); } var buildChangeStreamAggregationCommand = function(self) { - const serverConfig = self.serverConfig; + const topology = self.topology; const namespace = self.namespace; const pipeline = self.pipeline; const options = self.options; @@ -339,62 +340,110 @@ var buildChangeStreamAggregationCommand = function(self) { }; // Create and return the cursor - return serverConfig.cursor(cursorNamespace, command, cursorOptions); + return topology.cursor(cursorNamespace, command, cursorOptions); }; +// This method performs a basic server selection loop, satisfying the requirements of +// ChangeStream resumability until the new SDAM layer can be used. +const SELECTION_TIMEOUT = 30000; +function waitForTopologyConnected(topology, options, callback) { + setTimeout(() => { + if (options && options.start == null) options.start = process.hrtime(); + const start = options.start || process.hrtime(); + const timeout = options.timeout || SELECTION_TIMEOUT; + const readPreference = options.readPreference; + + if (topology.isConnected({ readPreference })) return callback(null, null); + const hrElapsed = process.hrtime(start); + const elapsed = (hrElapsed[0] * 1e9 + hrElapsed[1]) / 1e6; + if (elapsed > timeout) return callback(new MongoError('Timed out waiting for connection')); + waitForTopologyConnected(topology, options, callback); + }, 3000); // this is an arbitrary wait time to allow SDAM to transition +} + // Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream. -var processNewChange = function(self, err, change, callback) { - // Handle errors - if (err) { - // Handle resumable MongoNetworkErrors - if (isResumableError(err) && !self.attemptingResume) { - self.attemptingResume = true; - - if (!(getResumeToken(self) || getStartAtOperationTime(self))) { - const startAtOperationTime = self.cursor.cursorState.operationTime; - self.options = Object.assign({ startAtOperationTime }, self.options); +function processNewChange(args) { + const changeStream = args.changeStream; + const error = args.error; + const change = args.change; + const callback = args.callback; + const eventEmitter = args.eventEmitter || false; + const topology = changeStream.topology; + const options = changeStream.cursor.options; + + if (error) { + if (isResumableError(error) && !changeStream.attemptingResume) { + changeStream.attemptingResume = true; + + if (!(getResumeToken(changeStream) || getStartAtOperationTime(changeStream))) { + const startAtOperationTime = changeStream.cursor.cursorState.operationTime; + changeStream.options = Object.assign({ startAtOperationTime }, changeStream.options); } - if (callback) { - return self.cursor.close(function(closeErr) { - if (closeErr) { - return callback(err, null); - } + // stop listening to all events from old cursor + ['data', 'close', 'end', 'error'].forEach(event => + changeStream.cursor.removeAllListeners(event) + ); + + // close internal cursor, ignore errors + changeStream.cursor.close(); + + // attempt recreating the cursor + if (eventEmitter) { + waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { + if (err) return changeStream.emit('error', err); + changeStream.cursor = createChangeStreamCursor(changeStream); + }); + + return; + } - self.cursor = createChangeStreamCursor(self); + if (callback) { + waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { + if (err) return callback(err, null); - return self.next(callback); + changeStream.cursor = createChangeStreamCursor(changeStream); + changeStream.next(callback); }); + + return; } - return self.cursor - .close() - .then(() => (self.cursor = createChangeStreamCursor(self))) - .then(() => self.next()); + return new Promise((resolve, reject) => { + waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { + if (err) return reject(err); + resolve(); + }); + }) + .then(() => (changeStream.cursor = createChangeStreamCursor(changeStream))) + .then(() => changeStream.next()); } - if (typeof callback === 'function') return callback(err, null); - if (self.listenerCount('error')) return self.emit('error', err); - return self.promiseLibrary.reject(err); + if (eventEmitter) return changeStream.emit('error', error); + if (typeof callback === 'function') return callback(error, null); + return changeStream.promiseLibrary.reject(error); } - self.attemptingResume = false; + + changeStream.attemptingResume = false; // Cache the resume token if it is present. If it is not present return an error. if (!change || !change._id) { var noResumeTokenError = new Error( 'A change stream document has been received that lacks a resume token (_id).' ); + + if (eventEmitter) return changeStream.emit('error', noResumeTokenError); if (typeof callback === 'function') return callback(noResumeTokenError, null); - if (self.listenerCount('error')) return self.emit('error', noResumeTokenError); - return self.promiseLibrary.reject(noResumeTokenError); + return changeStream.promiseLibrary.reject(noResumeTokenError); } - self.resumeToken = change._id; + + changeStream.resumeToken = change._id; // Return the change - if (typeof callback === 'function') return callback(err, change); - if (self.listenerCount('change')) return self.emit('change', change); - return self.promiseLibrary.resolve(change); -}; + if (eventEmitter) return changeStream.emit('change', change); + if (typeof callback === 'function') return callback(error, change); + return changeStream.promiseLibrary.resolve(change); +} /** * The callback format for results