From 38f7d57cc9f40e247445aa1aaee2e5907658381a Mon Sep 17 00:00:00 2001 From: emadum Date: Thu, 10 Dec 2020 19:38:09 -0500 Subject: [PATCH 1/7] fix: make tryNext and Batch public --- src/bulk/common.ts | 2 +- src/change_stream.ts | 7 +++++++ src/cursor/abstract_cursor.ts | 1 - 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/bulk/common.ts b/src/bulk/common.ts index 6ef8699892a..8e59d8a985b 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -137,7 +137,7 @@ export interface BulkResult { * Keeps the state of a unordered batch so we can rewrite the results * correctly after command execution * - * @internal + * @public */ export class Batch { originalZeroIndex: number; diff --git a/src/change_stream.ts b/src/change_stream.ts index 5d8582e3af5..3551b589d5d 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -335,6 +335,13 @@ export class ChangeStream extends EventEmitter { } return this.cursor.stream(options); } + + tryNext(): Promise { + if (!this.cursor) { + throw new MongoError('ChangeStream has no cursor, unable to tryNext'); + } + return this.cursor.tryNext(); + } } /** @internal */ diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 4d09686ca62..1ccef71c9f8 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -265,7 +265,6 @@ export abstract class AbstractCursor extends EventEmitter { /** * Try to get the next available document from the cursor or `null` if an empty batch is returned - * @internal */ tryNext(): Promise; tryNext(callback: Callback): void; From 2042a1ff9ea716c5c320c039f0150c83e1b81652 Mon Sep 17 00:00:00 2001 From: emadum Date: Mon, 14 Dec 2020 17:23:08 -0500 Subject: [PATCH 2/7] review feedback --- src/change_stream.ts | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 3551b589d5d..70da376a188 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -46,6 +46,7 @@ const CHANGE_DOMAIN_TYPES = { const NO_RESUME_TOKEN_ERROR = new MongoError( 'A change stream document has been received that lacks a resume token (_id).' ); +const NO_CURSOR_ERROR = new MongoError('ChangeStream has no cursor'); const CHANGESTREAM_CLOSED_ERROR = new MongoError('ChangeStream is closed'); /** @public */ @@ -277,7 +278,7 @@ export class ChangeStream extends EventEmitter { hasNext(callback?: Callback): Promise | void { return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { - if (err || !cursor) return cb(err); // failed to resume, raise an error + if (err || !cursor) return cb(err || NO_CURSOR_ERROR); // failed to resume, raise an error cursor.hasNext(cb); }); }); @@ -287,8 +288,7 @@ export class ChangeStream extends EventEmitter { next(callback?: Callback): Promise | void { return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { - if (err) return cb(err); // failed to resume, raise an error - if (!cursor) return cb(new MongoError('Cursor is undefined')); + if (err || !cursor) return cb(err || NO_CURSOR_ERROR); cursor.next((error, change) => { if (error) { this[kResumeQueue].push(() => this.next(cb)); @@ -330,17 +330,22 @@ export class ChangeStream extends EventEmitter { */ stream(options?: CursorStreamOptions): Readable { this.streamOptions = options; - if (!this.cursor) { - throw new MongoError('ChangeStream has no cursor, unable to stream'); - } + if (!this.cursor) throw NO_CURSOR_ERROR; return this.cursor.stream(options); } - tryNext(): Promise { - if (!this.cursor) { - throw new MongoError('ChangeStream has no cursor, unable to tryNext'); - } - return this.cursor.tryNext(); + /** + * Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned + */ + tryNext(): Promise; + tryNext(callback: Callback): void; + tryNext(callback?: Callback): Promise | void { + return maybePromise(callback, cb => { + getCursor(this, (err, cursor) => { + if (err || !cursor) return cb(err || NO_CURSOR_ERROR); // failed to resume, raise an error + return cursor.tryNext(cb); + }); + }); } } From 940e659f7ee2b7b84ffdfb5c198855374e348008 Mon Sep 17 00:00:00 2001 From: emadum Date: Mon, 14 Dec 2020 17:27:38 -0500 Subject: [PATCH 3/7] restore comment --- src/change_stream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 70da376a188..b8703a4d1b2 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -288,7 +288,7 @@ export class ChangeStream extends EventEmitter { next(callback?: Callback): Promise | void { return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { - if (err || !cursor) return cb(err || NO_CURSOR_ERROR); + if (err || !cursor) return cb(err || NO_CURSOR_ERROR); // failed to resume, raise an error cursor.next((error, change) => { if (error) { this[kResumeQueue].push(() => this.next(cb)); From bb077e010267ce0934590c54ed25c9193affc0ca Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 15 Dec 2020 13:21:30 -0500 Subject: [PATCH 4/7] consolidate NO_CURSOR_ERROR handling --- src/change_stream.ts | 19 ++++++++++++------- test/functional/change_stream.test.js | 5 ++++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index b8703a4d1b2..ae3ec6ff570 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -278,7 +278,7 @@ export class ChangeStream extends EventEmitter { hasNext(callback?: Callback): Promise | void { return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { - if (err || !cursor) return cb(err || NO_CURSOR_ERROR); // failed to resume, raise an error + if (err || !cursor) return cb(err); // failed to resume, raise an error cursor.hasNext(cb); }); }); @@ -288,7 +288,7 @@ export class ChangeStream extends EventEmitter { next(callback?: Callback): Promise | void { return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { - if (err || !cursor) return cb(err || NO_CURSOR_ERROR); // failed to resume, raise an error + if (err || !cursor) return cb(err); // failed to resume, raise an error cursor.next((error, change) => { if (error) { this[kResumeQueue].push(() => this.next(cb)); @@ -342,7 +342,7 @@ export class ChangeStream extends EventEmitter { tryNext(callback?: Callback): Promise | void { return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { - if (err || !cursor) return cb(err || NO_CURSOR_ERROR); // failed to resume, raise an error + if (err || !cursor) return cb(err); // failed to resume, raise an error return cursor.tryNext(cb); }); }); @@ -719,11 +719,16 @@ function getCursor(changeStream: ChangeStream, callback: Callback=3.6' } }, + metadata: { + requires: { topology: 'replicaset', mongodb: '>=3.6' }, + sessions: { skipLeakTests: true } + }, test: function (done) { let closed = false; const close = _err => { From 28108717c0fa4b96a36f9010ee52778d3091ea13 Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 15 Dec 2020 16:02:58 -0500 Subject: [PATCH 5/7] fix leaking test --- test/functional/change_stream.test.js | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 8eab1fb9d77..6f465f85046 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1924,10 +1924,7 @@ describe('Change Streams', function () { }); it('when invoked using eventEmitter API', { - metadata: { - requires: { topology: 'replicaset', mongodb: '>=3.6' }, - sessions: { skipLeakTests: true } - }, + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function (done) { let closed = false; const close = _err => { @@ -1942,8 +1939,7 @@ describe('Change Streams', function () { changeStream.on('change', () => { counter += 1; if (counter === 2) { - changeStream.close(); - setTimeout(() => close()); + changeStream.close(close); } else if (counter >= 3) { close(new Error('should not have received more than 2 events')); } From 0efb9f29892097eea9c167ac57de5978caf84946 Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 15 Dec 2020 18:01:17 -0500 Subject: [PATCH 6/7] pass --exit to data lake tests --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index a98d1f05214..750e9978964 100644 --- a/package.json +++ b/package.json @@ -100,7 +100,7 @@ "check:test": "mocha --recursive test/functional test/unit", "check:ts": "tsc -v && tsc --noEmit", "check:atlas": "mocha --config \"test/manual/mocharc.json\" test/manual/atlas_connectivity.test.js", - "check:adl": "mocha test/manual/data_lake.test.js", + "check:adl": "mocha --exit test/manual/data_lake.test.js", "check:ocsp": "mocha --config \"test/manual/mocharc.json\" test/manual/ocsp_support.test.js", "check:kerberos": "mocha --config \"test/manual/mocharc.json\" test/manual/kerberos.test.js", "check:tls": "mocha --config \"test/manual/mocharc.json\" test/manual/tls_support.test.js", From 456f84a8fbbb3aaceeca2fada3692d36519e5386 Mon Sep 17 00:00:00 2001 From: emadum Date: Wed, 16 Dec 2020 10:45:08 -0500 Subject: [PATCH 7/7] remove --exit from adl test --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 750e9978964..a98d1f05214 100644 --- a/package.json +++ b/package.json @@ -100,7 +100,7 @@ "check:test": "mocha --recursive test/functional test/unit", "check:ts": "tsc -v && tsc --noEmit", "check:atlas": "mocha --config \"test/manual/mocharc.json\" test/manual/atlas_connectivity.test.js", - "check:adl": "mocha --exit test/manual/data_lake.test.js", + "check:adl": "mocha test/manual/data_lake.test.js", "check:ocsp": "mocha --config \"test/manual/mocharc.json\" test/manual/ocsp_support.test.js", "check:kerberos": "mocha --config \"test/manual/mocharc.json\" test/manual/kerberos.test.js", "check:tls": "mocha --config \"test/manual/mocharc.json\" test/manual/tls_support.test.js",