Skip to content

Commit e636d1e

Browse files
committed
feat(NODE-3132)!: Add TypedEventEmitter
Using a mapped type of event names to function type can now provide annotations and completion for event argument types. Breaking Changes: - Topology open event emits one argument which is the topology itself - srvPollingHandler on topology now emits TopologyDescriptionChange event (erroneously was serverDescriptionChanged)
1 parent 76b110e commit e636d1e

18 files changed

+1842
-1546
lines changed

package-lock.json

+1,417-1,358
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+6-6
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@
3131
},
3232
"devDependencies": {
3333
"@istanbuljs/nyc-config-typescript": "^1.0.1",
34-
"@microsoft/api-extractor": "^7.13.1",
35-
"@microsoft/tsdoc-config": "^0.14.0",
34+
"@microsoft/api-extractor": "^7.14.0",
35+
"@microsoft/tsdoc-config": "^0.15.2",
3636
"@types/aws4": "^1.5.1",
3737
"@types/bl": "^2.1.0",
3838
"@types/chai": "^4.2.14",
3939
"@types/chai-subset": "^1.3.3",
4040
"@types/kerberos": "^1.1.0",
4141
"@types/mocha": "^8.2.0",
42-
"@types/node": "^14.14.31",
42+
"@types/node": "^14.14.41",
4343
"@types/saslprep": "^1.0.0",
4444
"@types/semver": "^7.3.4",
4545
"@typescript-eslint/eslint-plugin": "^4.15.1",
@@ -62,7 +62,7 @@
6262
"mocha-sinon": "^2.1.0",
6363
"mongodb-mock-server": "^2.0.1",
6464
"nyc": "^15.1.0",
65-
"prettier": "^2.0.5",
65+
"prettier": "2.1.1",
6666
"rimraf": "^3.0.2",
6767
"semver": "^5.5.0",
6868
"sinon": "^4.3.0",
@@ -72,8 +72,8 @@
7272
"standard-version": "^8.0.2",
7373
"through2": "^3.0.1",
7474
"ts-node": "^9.1.1",
75-
"typedoc": "^0.20.25",
76-
"typescript": "^4.1.5",
75+
"typedoc": "^0.20.35",
76+
"typescript": "^4.2.4",
7777
"typescript-cached-transpile": "^0.0.6",
7878
"worker-farm": "^1.5.0",
7979
"wtfnode": "^0.8.2",

src/change_stream.ts

+39-11
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
import Denque = require('denque');
2-
import { EventEmitter } from 'events';
32
import { MongoError, AnyError, isResumableError } from './error';
43
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
54
import {
6-
relayEvents,
75
maxWireVersion,
86
calculateDurationInMs,
97
now,
@@ -22,11 +20,13 @@ import { Collection } from './collection';
2220
import type { Readable } from 'stream';
2321
import {
2422
AbstractCursor,
23+
AbstractCursorEvents,
2524
AbstractCursorOptions,
2625
CursorStreamOptions
2726
} from './cursor/abstract_cursor';
2827
import type { ClientSession } from './sessions';
2928
import { executeOperation, ExecutionResult } from './operations/execute_operation';
29+
import { TypedEventEmitter } from './mongo_types';
3030

3131
const kResumeQueue = Symbol('resumeQueue');
3232
const kCursorStream = Symbol('cursorStream');
@@ -96,7 +96,8 @@ export interface ChangeStreamOptions extends AggregateOptions {
9696
batchSize?: number;
9797
}
9898

99-
interface ChangeStreamDocument {
99+
/** @public */
100+
export interface ChangeStreamDocument {
100101
/**
101102
* The id functions as an opaque token for use when resuming an interrupted
102103
* change stream.
@@ -157,7 +158,8 @@ interface ChangeStreamDocument {
157158
fullDocument?: Document;
158159
}
159160

160-
interface UpdateDescription {
161+
/** @public */
162+
export interface UpdateDescription {
161163
/**
162164
* A document containing key:value pairs of names of the fields that were
163165
* changed, and the new value for those fields.
@@ -170,11 +172,22 @@ interface UpdateDescription {
170172
removedFields: string[];
171173
}
172174

175+
/** @public */
176+
export type ChangeStreamEvents = {
177+
resumeTokenChanged(token: ResumeToken): void;
178+
init(response: Document): void;
179+
more(response?: Document | undefined): void;
180+
response(): void;
181+
end(): void;
182+
error(error: Error): void;
183+
change(change: ChangeStreamDocument): void;
184+
} & AbstractCursorEvents;
185+
173186
/**
174187
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
175188
* @public
176189
*/
177-
export class ChangeStream extends EventEmitter {
190+
export class ChangeStream extends TypedEventEmitter<ChangeStreamEvents> {
178191
pipeline: Document[];
179192
options: ChangeStreamOptions;
180193
parent: MongoClient | Db | Collection;
@@ -190,6 +203,12 @@ export class ChangeStream extends EventEmitter {
190203
/** @internal */
191204
[kClosed]: boolean;
192205

206+
/** @event */
207+
static readonly RESPONSE = 'response' as const;
208+
/** @event */
209+
static readonly MORE = 'more' as const;
210+
/** @event */
211+
static readonly INIT = 'init' as const;
193212
/** @event */
194213
static readonly CLOSE = 'close' as const;
195214
/**
@@ -357,7 +376,7 @@ export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
357376
}
358377

359378
/** @internal */
360-
export class ChangeStreamCursor extends AbstractCursor {
379+
export class ChangeStreamCursor extends AbstractCursor<ChangeStreamEvents> {
361380
_resumeToken: ResumeToken;
362381
startAtOperationTime?: OperationTime;
363382
hasReceived?: boolean;
@@ -476,8 +495,8 @@ export class ChangeStreamCursor extends AbstractCursor {
476495

477496
this._processBatch('firstBatch', response);
478497

479-
this.emit('init', response);
480-
this.emit('response');
498+
this.emit(ChangeStream.INIT, response);
499+
this.emit(ChangeStream.RESPONSE);
481500

482501
// TODO: NODE-2882
483502
callback(undefined, { server, session, response });
@@ -492,13 +511,19 @@ export class ChangeStreamCursor extends AbstractCursor {
492511

493512
this._processBatch('nextBatch', response);
494513

495-
this.emit('more', response);
496-
this.emit('response');
514+
this.emit(ChangeStream.MORE, response);
515+
this.emit(ChangeStream.RESPONSE);
497516
callback(err, response);
498517
});
499518
}
500519
}
501520

521+
const CHANGE_STREAM_EVENTS = [
522+
ChangeStream.RESUME_TOKEN_CHANGED,
523+
ChangeStream.END,
524+
ChangeStream.CLOSE
525+
];
526+
502527
/**
503528
* Create a new change stream cursor based on self's configuration
504529
* @internal
@@ -525,7 +550,10 @@ function createChangeStreamCursor(
525550
cursorOptions
526551
);
527552

528-
relayEvents(changeStreamCursor, changeStream, ['resumeTokenChanged', 'end', 'close']);
553+
for (const event of CHANGE_STREAM_EVENTS) {
554+
changeStreamCursor.on(event, e => changeStream.emit(event, e));
555+
}
556+
529557
if (changeStream.listenerCount(ChangeStream.CHANGE) > 0) {
530558
streamEvents(changeStream, changeStreamCursor);
531559
}

src/cmap/connection.ts

+25-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { EventEmitter } from 'events';
21
import { MessageStream, OperationDescription } from './message_stream';
32
import { StreamDescription, StreamDescriptionOptions } from './stream_description';
43
import {
@@ -43,6 +42,7 @@ import { ReadPreference, ReadPreferenceLike } from '../read_preference';
4342
import { isTransactionCommand } from '../transactions';
4443
import type { W, WriteConcern, WriteConcernOptions } from '../write_concern';
4544
import type { ServerApi, SupportedNodeConnectionOptions } from '../mongo_client';
45+
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
4646

4747
const kStream = Symbol('stream');
4848
const kQueue = Symbol('queue');
@@ -121,7 +121,7 @@ export interface ConnectionOptions
121121
keepAliveInitialDelay?: number;
122122
noDelay?: boolean;
123123
socketTimeoutMS?: number;
124-
cancellationToken?: EventEmitter;
124+
cancellationToken?: CancellationToken;
125125

126126
metadata: ClientMetadata;
127127
}
@@ -133,7 +133,17 @@ export interface DestroyOptions {
133133
}
134134

135135
/** @public */
136-
export class Connection extends EventEmitter {
136+
export type ConnectionEvents = {
137+
[Connection.COMMAND_STARTED](event: CommandStartedEvent): void;
138+
[Connection.COMMAND_SUCCEEDED](event: CommandSucceededEvent): void;
139+
[Connection.COMMAND_FAILED](event: CommandFailedEvent): void;
140+
[Connection.CLUSTER_TIME_RECEIVED](clusterTime: Document): void;
141+
[Connection.CLOSE](): void;
142+
[Connection.MESSAGE](message: any): void;
143+
};
144+
145+
/** @public */
146+
export class Connection extends TypedEventEmitter<ConnectionEvents> {
137147
id: number | '<monitor>';
138148
address: string;
139149
socketTimeoutMS: number;
@@ -167,6 +177,10 @@ export class Connection extends EventEmitter {
167177
static readonly COMMAND_FAILED = 'commandFailed' as const;
168178
/** @event */
169179
static readonly CLUSTER_TIME_RECEIVED = 'clusterTimeReceived' as const;
180+
/** @event */
181+
static readonly CLOSE = 'close' as const;
182+
/** @event */
183+
static readonly MESSAGE = 'message' as const;
170184

171185
constructor(stream: Stream, options: ConnectionOptions) {
172186
super();
@@ -266,7 +280,7 @@ export class Connection extends EventEmitter {
266280
}
267281

268282
this[kQueue].clear();
269-
this.emit('close');
283+
this.emit(Connection.CLOSE);
270284
}
271285

272286
destroy(): void;
@@ -586,6 +600,13 @@ export class Connection extends EventEmitter {
586600
}
587601
}
588602

603+
/** @public */
604+
export const APM_EVENTS = [
605+
Connection.COMMAND_STARTED,
606+
Connection.COMMAND_SUCCEEDED,
607+
Connection.COMMAND_FAILED
608+
];
609+
589610
/** @internal */
590611
export class CryptoConnection extends Connection {
591612
/** @internal */

src/cmap/connection_pool.ts

+29-18
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import Denque = require('denque');
2-
import { EventEmitter } from 'events';
32
import { Logger } from '../logger';
4-
import { Connection, ConnectionOptions } from './connection';
3+
import { APM_EVENTS, Connection, ConnectionEvents, ConnectionOptions } from './connection';
54
import { connect } from './connect';
6-
import { eachAsync, relayEvents, makeCounter, Callback } from '../utils';
5+
import { eachAsync, makeCounter, Callback } from '../utils';
76
import { MongoError } from '../error';
87
import { PoolClosedError, WaitQueueTimeoutError } from './errors';
98
import {
@@ -18,6 +17,7 @@ import {
1817
ConnectionCheckedInEvent,
1918
ConnectionPoolClearedEvent
2019
} from './connection_pool_events';
20+
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
2121

2222
const kLogger = Symbol('logger');
2323
const kConnections = Symbol('connections');
@@ -53,11 +53,25 @@ export interface CloseOptions {
5353
force?: boolean;
5454
}
5555

56+
/** @public */
57+
export type ConnectionPoolEvents = {
58+
[ConnectionPool.CONNECTION_POOL_CREATED](event: ConnectionPoolCreatedEvent): void;
59+
[ConnectionPool.CONNECTION_POOL_CLOSED](event: ConnectionPoolClosedEvent): void;
60+
[ConnectionPool.CONNECTION_POOL_CLEARED](event: ConnectionPoolClearedEvent): void;
61+
[ConnectionPool.CONNECTION_CREATED](event: ConnectionCreatedEvent): void;
62+
[ConnectionPool.CONNECTION_READY](event: ConnectionReadyEvent): void;
63+
[ConnectionPool.CONNECTION_CLOSED](event: ConnectionClosedEvent): void;
64+
[ConnectionPool.CONNECTION_CHECK_OUT_STARTED](event: ConnectionCheckOutStartedEvent): void;
65+
[ConnectionPool.CONNECTION_CHECK_OUT_FAILED](event: ConnectionCheckOutFailedEvent): void;
66+
[ConnectionPool.CONNECTION_CHECKED_OUT](event: ConnectionCheckedOutEvent): void;
67+
[ConnectionPool.CONNECTION_CHECKED_IN](event: ConnectionCheckedInEvent): void;
68+
} & Omit<ConnectionEvents, 'close' | 'message'>;
69+
5670
/**
5771
* A pool of connections which dynamically resizes, and emit events related to pool activity
5872
* @public
5973
*/
60-
export class ConnectionPool extends EventEmitter {
74+
export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
6175
closed: boolean;
6276
options: Readonly<ConnectionPoolOptions>;
6377
/** @internal */
@@ -79,7 +93,7 @@ export class ConnectionPool extends EventEmitter {
7993
/** @internal */
8094
[kConnectionCounter]: Generator<number>;
8195
/** @internal */
82-
[kCancellationToken]: EventEmitter;
96+
[kCancellationToken]: CancellationToken;
8397
/** @internal */
8498
[kWaitQueue]: Denque<WaitQueueMember>;
8599

@@ -93,6 +107,11 @@ export class ConnectionPool extends EventEmitter {
93107
* @event
94108
*/
95109
static readonly CONNECTION_POOL_CLOSED = 'connectionPoolClosed' as const;
110+
/**
111+
* Emitted each time the connection pool is cleared and it's generation incremented
112+
* @event
113+
*/
114+
static readonly CONNECTION_POOL_CLEARED = 'connectionPoolCleared' as const;
96115
/**
97116
* Emitted when a connection is created.
98117
* @event
@@ -128,11 +147,6 @@ export class ConnectionPool extends EventEmitter {
128147
* @event
129148
*/
130149
static readonly CONNECTION_CHECKED_IN = 'connectionCheckedIn' as const;
131-
/**
132-
* Emitted each time the connection pool is cleared and it's generation incremented
133-
* @event
134-
*/
135-
static readonly CONNECTION_POOL_CLEARED = 'connectionPoolCleared' as const;
136150

137151
constructor(options: ConnectionPoolOptions) {
138152
super();
@@ -161,7 +175,7 @@ export class ConnectionPool extends EventEmitter {
161175
this[kMinPoolSizeTimer] = undefined;
162176
this[kGeneration] = 0;
163177
this[kConnectionCounter] = makeCounter(1);
164-
this[kCancellationToken] = new EventEmitter();
178+
this[kCancellationToken] = new CancellationToken();
165179
this[kCancellationToken].setMaxListeners(Infinity);
166180
this[kWaitQueue] = new Denque();
167181

@@ -409,12 +423,9 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
409423
}
410424

411425
// forward all events from the connection to the pool
412-
relayEvents(connection, pool, [
413-
Connection.COMMAND_STARTED,
414-
Connection.COMMAND_FAILED,
415-
Connection.COMMAND_SUCCEEDED,
416-
Connection.CLUSTER_TIME_RECEIVED
417-
]);
426+
for (const event of [...APM_EVENTS, Connection.CLUSTER_TIME_RECEIVED]) {
427+
connection.on(event, (e: any) => pool.emit(event, e));
428+
}
418429

419430
pool.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionCreatedEvent(pool, connection));
420431

@@ -522,7 +533,7 @@ function processWaitQueue(pool: ConnectionPool) {
522533
}
523534
}
524535

525-
export const CMAP_EVENT_NAMES = [
536+
export const CMAP_EVENTS = [
526537
ConnectionPool.CONNECTION_POOL_CREATED,
527538
ConnectionPool.CONNECTION_POOL_CLOSED,
528539
ConnectionPool.CONNECTION_CREATED,

0 commit comments

Comments
 (0)