From c2d80b2400b59ccaf83239da6d690adb23524765 Mon Sep 17 00:00:00 2001 From: Katherine Walker Date: Tue, 30 Jul 2019 11:13:29 -0400 Subject: [PATCH] fix(change_stream): emit close event after cursor is closed during error Fixes NODE-2075 --- lib/change_stream.js | 25 ++++++++++++--- test/functional/change_stream_tests.js | 43 ++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index aeaa20654f0..423b4ab4412 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -179,10 +179,27 @@ class ChangeStream extends EventEmitter { } // Tidy up the existing cursor - var cursor = this.cursor; - ['data', 'close', 'end', 'error'].forEach(event => this.cursor.removeAllListeners(event)); - delete this.cursor; - return cursor.close(callback); + const cursor = this.cursor; + + if (callback) { + return cursor.close(err => { + ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); + delete this.cursor; + + return callback(err); + }); + } + + const PromiseCtor = this.promiseLibrary || Promise; + return new PromiseCtor((resolve, reject) => { + cursor.close(err => { + ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); + delete this.cursor; + + if (err) return reject(err); + resolve(); + }); + }); } /** diff --git a/test/functional/change_stream_tests.js b/test/functional/change_stream_tests.js index 1d30370cbd4..f035206a2a8 100644 --- a/test/functional/change_stream_tests.js +++ b/test/functional/change_stream_tests.js @@ -9,6 +9,7 @@ var co = require('co'); var mock = require('mongodb-mock-server'); const chai = require('chai'); const expect = chai.expect; +const sinon = require('sinon'); chai.use(require('chai-subset')); @@ -1876,6 +1877,48 @@ describe('Change Streams', function() { .then(() => teardown(), teardown); }); + it('should emit close event after error event', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } }, + test: function(done) { + const configuration = this.configuration; + const client = configuration.newClient(); + const closeSpy = sinon.spy(); + + client.connect(function(err, client) { + expect(err).to.not.exist; + + const db = client.db('integration_tests'); + const coll = db.collection('event_test'); + + // This will cause an error because the _id will be projected out, which causes the following error: + // "A change stream document has been received that lacks a resume token (_id)." + const changeStream = coll.watch([{ $project: { _id: false } }]); + + changeStream.on('change', changeDoc => { + expect(changeDoc).to.be.null; + }); + + changeStream.on('error', err => { + expect(err).to.exist; + changeStream.close(() => { + expect(closeSpy.calledOnce).to.be.true; + client.close(done); + }); + }); + + changeStream.on('close', closeSpy); + + // Trigger the first database event + setTimeout(() => { + coll.insertOne({ a: 1 }, (err, result) => { + expect(err).to.not.exist; + expect(result.insertedCount).to.equal(1); + }); + }); + }); + } + }); + describe('should properly handle a changeStream event being processed mid-close', function() { let client, coll;