Skip to content

fix(ChangeStream): hasNext/next should work after resume #2333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
941c93c
test: add new tests demonstrating failure
emadum Apr 14, 2020
639b422
test: only run on servers that support change streams
emadum Apr 14, 2020
9cb32c6
test: fix tests
emadum Apr 26, 2020
1713f57
refactor: add getCursor to prevent iterating on old cursor during res…
emadum Apr 26, 2020
f106a57
test: extract withChangeStream helper
emadum Apr 26, 2020
b61b3a8
test: skip callback resume error tests
emadum Apr 26, 2020
2f3c0c1
test: consolidate tests
emadum Apr 26, 2020
48eff06
test: remove unused helpers to fix lint
emadum Apr 26, 2020
dc82031
fix: NonRetryableChangeStreamError should be NonResumableChangeStream…
emadum Apr 26, 2020
55d17d2
test: fix onResume in eventEmitter case
emadum Apr 26, 2020
5d0efeb
refactor: move waitForTopology into getCursor
emadum Apr 29, 2020
382158b
cleanup
emadum Apr 29, 2020
771af47
add comments
emadum Apr 29, 2020
a5a34f0
jsdoc cleanup
emadum Apr 29, 2020
92d604c
Merge remote-tracking branch 'origin/3.5' into NODE-2548/changestream…
emadum Apr 30, 2020
3a87128
Merge remote-tracking branch 'origin/3.5' into NODE-2548/changestream…
emadum Apr 30, 2020
8c84bf5
test: test for both old versions of non-resumable label to be safe
emadum Apr 30, 2020
a5224c6
refactor: use maybePromise for ChangeStream.close
emadum Apr 30, 2020
a5ba416
test: fix skipped mid-close tests
emadum Apr 30, 2020
faf2a5a
test: skip change stream tests running on mock single topology
emadum Apr 30, 2020
5dd46db
test: fix broken tests
emadum May 1, 2020
37f6cb9
test: unskip test
emadum May 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ language: node_js
branches:
only:
- master
- 3.6
- next
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidentally left this change in the previous merge from 3.6, so I reverted it here. But really we should just delete .travis.yml since we don't use Travis and therefore don't maintain this file.


before_install:
# we have to intstall mongo-orchestration ourselves to get around permissions issues in subshells
Expand Down
197 changes: 108 additions & 89 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

const Denque = require('denque');
const EventEmitter = require('events');
const isResumableError = require('./error').isResumableError;
const MongoError = require('./core').MongoError;
Expand All @@ -8,6 +9,7 @@ const relayEvents = require('./core/utils').relayEvents;
const maxWireVersion = require('./core/utils').maxWireVersion;
const maybePromise = require('./utils').maybePromise;
const AggregateOperation = require('./operations/aggregate');
const kResumeQueue = Symbol('resumeQueue');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
Expand Down Expand Up @@ -91,8 +93,12 @@ class ChangeStream extends EventEmitter {
this.options.readPreference = parent.s.readPreference;
}

this[kResumeQueue] = new Denque();

// Create contained Change Stream cursor
this.cursor = createChangeStreamCursor(this, options);
this.cursor = createCursor(this, options);

this.closed = false;

// Listen for any `change` listeners being added to ChangeStream
this.on('newListener', eventName => {
Expand Down Expand Up @@ -128,7 +134,12 @@ class ChangeStream extends EventEmitter {
* @returns {Promise|void} returns Promise if no callback passed
*/
hasNext(callback) {
return maybePromise(this.parent, callback, cb => this.cursor.hasNext(cb));
return maybePromise(this.parent, callback, cb =>
getCursor(this, (err, cursor) => {
if (err) return cb(err);
cursor.hasNext(cb);
})
);
}

/**
Expand All @@ -139,14 +150,14 @@ class ChangeStream extends EventEmitter {
* @returns {Promise|void} returns Promise if no callback passed
*/
next(callback) {
return maybePromise(this.parent, callback, cb => {
if (this.isClosed()) {
return cb(new Error('Change Stream is not open.'));
}
this.cursor.next((error, change) => {
processNewChange({ changeStream: this, error, change, callback: cb });
});
});
return maybePromise(this.parent, callback, cb =>
getCursor(this, (err, cursor) => {
if (err) return cb(err);
cursor.next((error, change) =>
processNewChange({ changeStream: this, error, change, callback: cb })
);
})
);
}

/**
Expand All @@ -158,7 +169,7 @@ class ChangeStream extends EventEmitter {
if (this.cursor) {
return this.cursor.isClosed();
}
return true;
return this.closed;
}

/**
Expand All @@ -168,31 +179,21 @@ class ChangeStream extends EventEmitter {
* @return {Promise} returns Promise if no callback passed
*/
close(callback) {
if (!this.cursor) {
if (callback) return callback();
return this.promiseLibrary.resolve();
}

// Tidy up the existing cursor
const cursor = this.cursor;
return maybePromise(this.parent, callback, cb => {
this.closed = true;

if (callback) {
return cursor.close(err => {
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
delete this.cursor;
if (!this.cursor) {
return cb();
}

return callback(err);
});
}
// Tidy up the existing cursor
const cursor = this.cursor;

const PromiseCtor = this.promiseLibrary || Promise;
return new PromiseCtor((resolve, reject) => {
cursor.close(err => {
return cursor.close(err => {
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
delete this.cursor;

if (err) return reject(err);
resolve();
return cb(err);
});
});
}
Expand Down Expand Up @@ -367,7 +368,7 @@ class ChangeStreamCursor extends Cursor {
*/

// Create a new change stream cursor based on self's configuration
function createChangeStreamCursor(self, options) {
function createCursor(self, options) {
const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' };
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
Expand Down Expand Up @@ -479,85 +480,48 @@ function processNewChange(args) {

// If the cursor is null, then it should not process a change.
if (cursor == null) {
// We do not error in the eventEmitter case.
if (eventEmitter) {
return;
}

const error = new MongoError('ChangeStream is closed');
return typeof callback === 'function'
? callback(error, null)
: changeStream.promiseLibrary.reject(error);
// do not error in the eventEmitter case or if the change stream has closed due to error
if (eventEmitter || changeStream.closed) return;
return callback(new MongoError('ChangeStream is closed'));
}

const topology = changeStream.topology;
const options = changeStream.cursor.options;
const wireVersion = maxWireVersion(cursor.server);

if (error) {
if (isResumableError(error, wireVersion) && !changeStream.attemptingResume) {
changeStream.attemptingResume = true;
if (isResumableError(error, maxWireVersion(cursor.server))) {
changeStream.cursor = null;

// stop listening to all events from old cursor
['data', 'close', 'end', 'error'].forEach(event =>
changeStream.cursor.removeAllListeners(event)
);
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));

// close internal cursor, ignore errors
changeStream.cursor.close();
cursor.close();

// attempt recreating the cursor
if (eventEmitter) {
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) {
recreateCursor(changeStream, cursor, err => {
if (err) {
changeStream.closed = true;
if (eventEmitter) {
changeStream.emit('error', err);
changeStream.emit('close');
return;
}
changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
});

return;
}

if (callback) {
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) return callback(err, null);

changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
changeStream.next(callback);
});

return;
}

return new Promise((resolve, reject) => {
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) return reject(err);
resolve();
});
})
.then(
() => (changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions))
)
.then(() => changeStream.next());
return processResumeQueue(changeStream, err);
}
processResumeQueue(changeStream);
if (!eventEmitter) changeStream.next(callback);
});
return;
}

if (eventEmitter) return changeStream.emit('error', error);
if (typeof callback === 'function') return callback(error, null);
return changeStream.promiseLibrary.reject(error);
return callback(error, null);
}

changeStream.attemptingResume = false;

if (change && !change._id) {
const noResumeTokenError = new Error(
'A change stream document has been received that lacks a resume token (_id).'
);

if (eventEmitter) return changeStream.emit('error', noResumeTokenError);
if (typeof callback === 'function') return callback(noResumeTokenError, null);
return changeStream.promiseLibrary.reject(noResumeTokenError);
return callback(noResumeTokenError, null);
}

// cache the resume token
Expand All @@ -569,8 +533,63 @@ function processNewChange(args) {

// Return the change
if (eventEmitter) return changeStream.emit('change', change);
if (typeof callback === 'function') return callback(error, change);
return changeStream.promiseLibrary.resolve(change);
return callback(error, change);
}

/**
* Safely provides a cursor across resume attempts
*
* @param {ChangeStream} changeStream the parent ChangeStream
* @param {function} callback gets the cursor or error
* @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor
*/
function getCursor(changeStream, callback) {
if (changeStream.isClosed()) {
callback(new MongoError('ChangeStream is closed.'));
return;
}

// if a cursor exists and it is open, return it
if (changeStream.cursor) {
callback(undefined, changeStream.cursor);
return;
}

// no cursor, queue callback until topology reconnects
changeStream[kResumeQueue].push(callback);
}

function recreateCursor(changeStream, cursor, cb) {
// attempt to reconnect the topology
changeStream.waitingForTopology = true;
waitForTopologyConnected(
changeStream.topology,
{ readPreference: cursor.options.readPreference },
err => {
changeStream.waitingForTopology = false;
if (err) return cb(err);
changeStream.cursor = createCursor(changeStream, cursor.resumeOptions);
cb();
}
);
}

/**
* Drain the resume queue when a new has become available
*
* @param {ChangeStream} changeStream the parent ChangeStream
* * @param {?ChangeStreamCursor} changeStream.cursor the new cursor
* @param {?Error} err error getting a new cursor
*/
function processResumeQueue(changeStream, err) {
while (changeStream[kResumeQueue].length) {
if (changeStream.isClosed()) {
request(new MongoError('Change Stream is not open.'));
return;
}
const request = changeStream[kResumeQueue].pop();
request(err, changeStream.cursor);
}
}

/**
Expand Down
Loading