Skip to content

Commit c9d3816

Browse files
authored
feat(NODE-2993): implement maxConnecting (#3255)
1 parent b2798d9 commit c9d3816

7 files changed

+180
-74
lines changed

src/cmap/connection_pool.ts

+62-55
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ export type ConnectionPoolEvents = {
107107
*/
108108
export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
109109
closed: boolean;
110-
options: Readonly<ConnectionPoolOptions>;
110+
options: Readonly<ConnectionPoolOptions & { maxConnecting: number }>;
111111
/** @internal */
112112
[kLogger]: Logger;
113113
/** @internal */
@@ -199,6 +199,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
199199
connectionType: Connection,
200200
maxPoolSize: options.maxPoolSize ?? 100,
201201
minPoolSize: options.minPoolSize ?? 0,
202+
maxConnecting: 2,
202203
maxIdleTimeMS: options.maxIdleTimeMS ?? 0,
203204
waitQueueTimeoutMS: options.waitQueueTimeoutMS ?? 0,
204205
autoEncrypter: options.autoEncrypter,
@@ -494,16 +495,29 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
494495
}
495496

496497
function ensureMinPoolSize(pool: ConnectionPool) {
497-
if (pool.closed || pool.options.minPoolSize === 0) {
498+
const minPoolSize = pool.options.minPoolSize;
499+
if (pool.closed || minPoolSize === 0) {
498500
return;
499501
}
500502

501-
const minPoolSize = pool.options.minPoolSize;
502-
for (let i = pool.totalConnectionCount; i < minPoolSize; ++i) {
503-
createConnection(pool);
503+
if (
504+
pool.totalConnectionCount < minPoolSize &&
505+
pool.pendingConnectionCount < pool.options.maxConnecting
506+
) {
507+
// NOTE: ensureMinPoolSize should not try to get all the pending
508+
// connection permits because that potentially delays the availability of
509+
// the connection to a checkout request
510+
createConnection(pool, (err, connection) => {
511+
pool[kPending]--;
512+
if (!err && connection) {
513+
pool[kConnections].push(connection);
514+
process.nextTick(processWaitQueue, pool);
515+
}
516+
pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 10);
517+
});
518+
} else {
519+
pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 100);
504520
}
505-
506-
pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 10);
507521
}
508522

509523
function connectionIsStale(pool: ConnectionPool, connection: Connection) {
@@ -521,7 +535,7 @@ function connectionIsIdle(pool: ConnectionPool, connection: Connection) {
521535
return !!(pool.options.maxIdleTimeMS && connection.idleTime > pool.options.maxIdleTimeMS);
522536
}
523537

524-
function createConnection(pool: ConnectionPool, callback?: Callback<Connection>) {
538+
function createConnection(pool: ConnectionPool, callback: Callback<Connection>) {
525539
const connectOptions: ConnectionOptions = {
526540
...pool.options,
527541
id: pool[kConnectionCounter].next().value,
@@ -530,14 +544,16 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
530544
};
531545

532546
pool[kPending]++;
547+
// This is our version of a "virtual" no-I/O connection as the spec requires
548+
pool.emit(
549+
ConnectionPool.CONNECTION_CREATED,
550+
new ConnectionCreatedEvent(pool, { id: connectOptions.id })
551+
);
552+
533553
connect(connectOptions, (err, connection) => {
534554
if (err || !connection) {
535-
pool[kPending]--;
536555
pool[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
537-
if (typeof callback === 'function') {
538-
callback(err);
539-
}
540-
556+
callback(err);
541557
return;
542558
}
543559

@@ -553,8 +569,6 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
553569
connection.on(event, (e: any) => pool.emit(event, e));
554570
}
555571

556-
pool.emit(ConnectionPool.CONNECTION_CREATED, new ConnectionCreatedEvent(pool, connection));
557-
558572
if (pool.loadBalanced) {
559573
connection.on(Connection.PINNED, pinType => pool[kMetrics].markPinned(pinType));
560574
connection.on(Connection.UNPINNED, pinType => pool[kMetrics].markUnpinned(pinType));
@@ -575,16 +589,8 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
575589
connection.markAvailable();
576590
pool.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(pool, connection));
577591

578-
// if a callback has been provided, hand off the connection immediately
579-
if (typeof callback === 'function') {
580-
callback(undefined, connection);
581-
return;
582-
}
583-
584-
// otherwise add it to the pool for later acquisition, and try to process the wait queue
585-
pool[kConnections].push(connection);
586-
pool[kPending]--;
587-
process.nextTick(processWaitQueue, pool);
592+
callback(undefined, connection);
593+
return;
588594
});
589595
}
590596

@@ -642,44 +648,45 @@ function processWaitQueue(pool: ConnectionPool) {
642648
}
643649
}
644650

645-
const maxPoolSize = pool.options.maxPoolSize;
646-
if (pool.waitQueueSize && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) {
651+
const { maxPoolSize, maxConnecting } = pool.options;
652+
while (
653+
pool.waitQueueSize > 0 &&
654+
pool.pendingConnectionCount < maxConnecting &&
655+
(maxPoolSize === 0 || pool.totalConnectionCount < maxPoolSize)
656+
) {
657+
const waitQueueMember = pool[kWaitQueue].shift();
658+
if (!waitQueueMember || waitQueueMember[kCancelled]) {
659+
continue;
660+
}
647661
createConnection(pool, (err, connection) => {
648-
const waitQueueMember = pool[kWaitQueue].shift();
649-
if (!waitQueueMember || waitQueueMember[kCancelled]) {
662+
pool[kPending]--;
663+
if (waitQueueMember[kCancelled]) {
650664
if (!err && connection) {
651665
pool[kConnections].push(connection);
652-
pool[kPending]--;
666+
}
667+
} else {
668+
if (err) {
669+
pool.emit(
670+
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
671+
new ConnectionCheckOutFailedEvent(pool, err)
672+
);
673+
} else if (connection) {
674+
pool[kCheckedOut]++;
675+
pool.emit(
676+
ConnectionPool.CONNECTION_CHECKED_OUT,
677+
new ConnectionCheckedOutEvent(pool, connection)
678+
);
653679
}
654680

655-
pool[kProcessingWaitQueue] = false;
656-
return;
657-
}
658-
659-
if (err) {
660-
pool.emit(
661-
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
662-
new ConnectionCheckOutFailedEvent(pool, err)
663-
);
664-
} else if (connection) {
665-
pool[kCheckedOut]++;
666-
pool[kPending]--;
667-
pool.emit(
668-
ConnectionPool.CONNECTION_CHECKED_OUT,
669-
new ConnectionCheckedOutEvent(pool, connection)
670-
);
671-
}
672-
673-
if (waitQueueMember.timer) {
674-
clearTimeout(waitQueueMember.timer);
681+
if (waitQueueMember.timer) {
682+
clearTimeout(waitQueueMember.timer);
683+
}
684+
waitQueueMember.callback(err, connection);
675685
}
676-
waitQueueMember.callback(err, connection);
677-
pool[kProcessingWaitQueue] = false;
678-
process.nextTick(() => processWaitQueue(pool));
686+
process.nextTick(processWaitQueue, pool);
679687
});
680-
} else {
681-
pool[kProcessingWaitQueue] = false;
682688
}
689+
pool[kProcessingWaitQueue] = false;
683690
}
684691

685692
/**

src/cmap/connection_pool_events.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ export class ConnectionCreatedEvent extends ConnectionPoolMonitoringEvent {
5959
connectionId: number | '<monitor>';
6060

6161
/** @internal */
62-
constructor(pool: ConnectionPool, connection: Connection) {
62+
constructor(pool: ConnectionPool, connection: { id: number | '<monitor>' }) {
6363
super(pool);
6464
this.connectionId = connection.id;
6565
}

test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-population-stats.json

+4-4
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,16 @@
3333
"address": 42,
3434
"currentCheckedOutCount": 0,
3535
"availableConnectionCount": 0,
36-
"pendingConnectionCount": 3,
37-
"totalConnectionCount": 3
36+
"pendingConnectionCount": 1,
37+
"totalConnectionCount": 1
3838
},
3939
{
4040
"type": "ConnectionCreated",
4141
"connectionId": 42,
4242
"address": 42,
4343
"availableConnectionCount": 1,
44-
"pendingConnectionCount": 2,
45-
"totalConnectionCount": 3
44+
"pendingConnectionCount": 1,
45+
"totalConnectionCount": 2
4646
},
4747
{
4848
"type": "ConnectionCreated",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
{
2+
"version": 1,
3+
"style": "unit",
4+
"description": "must replace removed connections up to minPoolSize",
5+
"poolOptions": {
6+
"minPoolSize": 2
7+
},
8+
"operations": [
9+
{
10+
"name": "waitForEvent",
11+
"event": "ConnectionReady",
12+
"count": 2
13+
},
14+
{
15+
"name": "wait",
16+
"ms": 1000
17+
},
18+
{
19+
"name": "checkOut",
20+
"label": "conn"
21+
},
22+
{
23+
"name": "clear"
24+
},
25+
{
26+
"name": "checkIn",
27+
"connection": "conn"
28+
},
29+
{
30+
"name": "waitForEvent",
31+
"event": "ConnectionReady",
32+
"count": 3
33+
}
34+
],
35+
"events": [
36+
{
37+
"type": "ConnectionReady",
38+
"address": 42
39+
},
40+
{
41+
"type": "ConnectionReady",
42+
"address": 42
43+
},
44+
{
45+
"type": "ConnectionCheckedOut",
46+
"address": 42
47+
},
48+
{
49+
"type": "ConnectionPoolCleared",
50+
"address": 42
51+
},
52+
{
53+
"type": "ConnectionCheckedIn",
54+
"address": 42
55+
},
56+
{
57+
"type": "ConnectionClosed",
58+
"reason": "stale",
59+
"address": 42,
60+
"availableConnectionCount": 1,
61+
"pendingConnectionCount": 0,
62+
"totalConnectionCount": 1
63+
},
64+
{
65+
"type": "ConnectionReady",
66+
"address": 42,
67+
"availableConnectionCount": 1,
68+
"pendingConnectionCount": 1,
69+
"totalConnectionCount": 2
70+
}
71+
],
72+
"ignore": [
73+
"ConnectionPoolCreated",
74+
"ConnectionCreated",
75+
"ConnectionCheckOutStarted"
76+
]
77+
}

test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts

+10-5
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,14 @@ const LB_SKIP_TESTS: SkipDescription[] = [
1515
describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () {
1616
const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling');
1717

18-
runCmapTestSuite(
19-
// TODO(NODE-2993): unskip integration tests for maxConnecting
20-
tests.filter(({ style }) => style === 'unit'),
21-
{ testsToSkip: LB_SKIP_TESTS }
22-
);
18+
runCmapTestSuite(tests, {
19+
testsToSkip: LB_SKIP_TESTS.concat([
20+
{
21+
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
22+
skipIfCondition: 'always',
23+
skipReason:
24+
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
25+
}
26+
])
27+
});
2328
});

test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.test.ts

+10-1
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,14 @@ describe('Connection Monitoring and Pooling (Node Driver)', function () {
66
'../integration/connection-monitoring-and-pooling/cmap-node-specs'
77
);
88

9-
runCmapTestSuite(tests, { injectPoolStats: true });
9+
runCmapTestSuite(tests, {
10+
injectPoolStats: true,
11+
testsToSkip: [
12+
{
13+
description: 'must replace removed connections up to minPoolSize',
14+
skipIfCondition: 'loadBalanced',
15+
skipReason: 'cannot run against load balancer due to reliance on pool.clear() command'
16+
}
17+
]
18+
});
1019
});

test/tools/cmap_spec_runner.ts

+16-8
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,14 @@ class Thread {
116116
await sleep();
117117
}
118118

119-
queue(op: CmapOperation) {
119+
queue(op: CmapOperation, thread?: Thread) {
120120
if (this.#killed || this.#error) {
121121
return;
122122
}
123123

124-
this.#promise = this.#promise.then(() => this._runOperation(op)).catch(e => (this.#error = e));
124+
const functionToQueue = () => (!thread ? this._runOperation(op) : thread.queue(op));
125+
126+
this.#promise = this.#promise.then(functionToQueue).catch(e => (this.#error = e));
125127
}
126128

127129
async finish() {
@@ -352,9 +354,12 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) {
352354
const op = operations[idx];
353355

354356
const threadKey = op.name === 'checkOut' ? op.thread || MAIN_THREAD_KEY : MAIN_THREAD_KEY;
355-
const thread = threadContext.getThread(threadKey);
356-
357-
thread.queue(op);
357+
if (threadKey === MAIN_THREAD_KEY) {
358+
mainThread.queue(op);
359+
} else {
360+
const thread = threadContext.getThread(threadKey);
361+
mainThread.queue(op, thread);
362+
}
358363
}
359364

360365
await mainThread.finish().catch(e => {
@@ -387,6 +392,7 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) {
387392
);
388393

389394
expect(actualEvents).to.have.lengthOf(expectedEvents.length);
395+
390396
for (const expected of expectedEvents) {
391397
const actual = actualEvents.shift();
392398
const { type: eventType, ...eventPropsToCheck } = expected;
@@ -397,7 +403,7 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) {
397403

398404
export type SkipDescription = {
399405
description: string;
400-
skipIfCondition: 'loadBalanced';
406+
skipIfCondition: 'loadBalanced' | 'always';
401407
skipReason: string;
402408
};
403409

@@ -416,10 +422,12 @@ export function runCmapTestSuite(
416422
({ description }) => description === test.description
417423
);
418424
if (skipDescription) {
425+
const alwaysSkip = skipDescription.skipIfCondition === 'always';
419426
const matchesLoadBalanceSkip =
420427
skipDescription.skipIfCondition === 'loadBalanced' && this.configuration.isLoadBalanced;
421-
if (matchesLoadBalanceSkip) {
422-
(this.currentTest as Mocha.Runnable).skipReason = skipDescription.skipReason;
428+
429+
if (alwaysSkip || matchesLoadBalanceSkip) {
430+
this.currentTest.skipReason = skipDescription.skipReason;
423431
this.skip();
424432
}
425433
}

0 commit comments

Comments
 (0)