From 6583d1849ca64c786bc987da539317c5b10fc787 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 22 Feb 2024 16:34:17 +0000 Subject: [PATCH 1/6] Add a minimum connections configuration to the ConnectionPool --- Sources/GRPC/ClientConnection.swift | 1 + Sources/GRPC/ConnectionManager.swift | 17 ++++++++++- .../GRPC/ConnectionPool/ConnectionPool.swift | 29 +++++++++++++++++-- .../GRPC/ConnectionPool/GRPCChannelPool.swift | 4 +++ Sources/GRPC/ConnectionPool/PoolManager.swift | 8 +++++ .../GRPC/ConnectionPool/PooledChannel.swift | 1 + Sources/GRPC/GRPCIdleHandler.swift | 16 ++++++---- Tests/GRPCTests/ConnectionManagerTests.swift | 12 +++++--- .../ConnectionPool/ConnectionPoolTests.swift | 3 +- .../PoolManagerStateMachineTests.swift | 3 +- 10 files changed, 79 insertions(+), 15 deletions(-) diff --git a/Sources/GRPC/ClientConnection.swift b/Sources/GRPC/ClientConnection.swift index d238b7def..26adea540 100644 --- a/Sources/GRPC/ClientConnection.swift +++ b/Sources/GRPC/ClientConnection.swift @@ -131,6 +131,7 @@ public final class ClientConnection: Sendable { self.connectionManager = ConnectionManager( configuration: configuration, connectivityDelegate: monitor, + idleBehavior: .closeWhenIdleTimeout, logger: configuration.backgroundActivityLogger ) } diff --git a/Sources/GRPC/ConnectionManager.swift b/Sources/GRPC/ConnectionManager.swift index e21be63b2..b8d2e6eca 100644 --- a/Sources/GRPC/ConnectionManager.swift +++ b/Sources/GRPC/ConnectionManager.swift @@ -25,6 +25,14 @@ import NIOHTTP2 // event loop is being used. @usableFromInline internal final class ConnectionManager: @unchecked Sendable { + + /// Whether the connection managed by this manager should be allowed to go idle and be closed, or + /// if it should remain open indefinitely even when there are no active streams. + internal enum IdleBehavior { + case closeWhenIdleTimeout + case neverGoIdle + } + internal enum Reconnect { case none case after(TimeInterval) @@ -323,6 +331,9 @@ internal final class ConnectionManager: @unchecked Sendable { /// The configuration to use when backing off between connection attempts, if reconnection /// attempts should be made at all. private let connectionBackoff: ConnectionBackoff? + + /// Whether this connection should be allowed to go idle (and thus be closed when the idle timer fires). + internal let idleBehavior: IdleBehavior /// A logger. internal var logger: Logger @@ -356,12 +367,14 @@ internal final class ConnectionManager: @unchecked Sendable { configuration: ClientConnection.Configuration, channelProvider: ConnectionManagerChannelProvider? = nil, connectivityDelegate: ConnectionManagerConnectivityDelegate?, + idleBehavior: IdleBehavior, logger: Logger ) { self.init( eventLoop: configuration.eventLoopGroup.next(), channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration), callStartBehavior: configuration.callStartBehavior.wrapped, + idleBehavior: idleBehavior, connectionBackoff: configuration.connectionBackoff, connectivityDelegate: connectivityDelegate, http2Delegate: nil, @@ -373,6 +386,7 @@ internal final class ConnectionManager: @unchecked Sendable { eventLoop: EventLoop, channelProvider: ConnectionManagerChannelProvider, callStartBehavior: CallStartBehavior.Behavior, + idleBehavior: IdleBehavior, connectionBackoff: ConnectionBackoff?, connectivityDelegate: ConnectionManagerConnectivityDelegate?, http2Delegate: ConnectionManagerHTTP2Delegate?, @@ -392,6 +406,7 @@ internal final class ConnectionManager: @unchecked Sendable { self.connectionBackoff = connectionBackoff self.connectivityDelegate = connectivityDelegate self.http2Delegate = http2Delegate + self.idleBehavior = idleBehavior self.connectionID = connectionID self.channelNumber = channelNumber @@ -799,7 +814,7 @@ internal final class ConnectionManager: @unchecked Sendable { // Yes, after some time. case let .after(delay): let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting") - // Fail the candidate mux promise. KEep the 'readyChannelMuxPromise' as we'll try again. + // Fail the candidate mux promise. Keep the 'readyChannelMuxPromise' as we'll try again. connecting.candidateMuxPromise.fail(error) let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) { diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool.swift b/Sources/GRPC/ConnectionPool/ConnectionPool.swift index 039d788f3..9e0cf3363 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool.swift @@ -84,6 +84,11 @@ internal final class ConnectionPool { /// there are this many waiters in the queue then the next waiter will be failed immediately. @usableFromInline internal let maxWaiters: Int + + /// The number of connections in the pool that should always be kept open (i.e. they won't go idle). + /// In other words, it's the number of connections for which we should ignore idle timers. + @usableFromInline + internal let minConnections: Int /// Configuration for backoff between subsequence connection attempts. @usableFromInline @@ -157,6 +162,7 @@ internal final class ConnectionPool { init( eventLoop: EventLoop, maxWaiters: Int, + minConnections: Int, reservationLoadThreshold: Double, assumedMaxConcurrentStreams: Int, connectionBackoff: ConnectionBackoff, @@ -176,6 +182,7 @@ internal final class ConnectionPool { self._connections = [:] self.maxWaiters = maxWaiters + self.minConnections = minConnections self.waiters = CircularBuffer(initialCapacity: 16) self.eventLoop = eventLoop @@ -201,17 +208,23 @@ internal final class ConnectionPool { ] ) self._connections.reserveCapacity(connections) + var numberOfKeepOpenConnections = self.minConnections while self._connections.count < connections { - self.addConnectionToPool() + // If we have less than the minimum number of connections, don't let + // the new connection close when idle. + let idleBehavior = numberOfKeepOpenConnections > 0 ? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout + numberOfKeepOpenConnections -= 1 + self.addConnectionToPool(idleBehavior: idleBehavior) } } /// Make and add a new connection to the pool. - private func addConnectionToPool() { + private func addConnectionToPool(idleBehavior: ConnectionManager.IdleBehavior) { let manager = ConnectionManager( eventLoop: self.eventLoop, channelProvider: self.channelProvider, callStartBehavior: .waitsForConnectivity, + idleBehavior: idleBehavior, connectionBackoff: self.connectionBackoff, connectivityDelegate: self, http2Delegate: self, @@ -689,8 +702,12 @@ extension ConnectionPool: ConnectionManagerConnectivityDelegate { // Grab the number of reserved streams (before invalidating the index by adding a connection). let reservedStreams = self._connections.values[index].reservedStreams + // If we have less than the minimum number of connections, don't let + // the new connection close when idle. + let idleBehavior = self.sync.activeConnections < self.minConnections ? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout + // Replace the connection with a new idle one. - self.addConnectionToPool() + self.addConnectionToPool(idleBehavior: idleBehavior) // Since we're removing this connection from the pool (and no new streams can be created on // the connection), the pool manager can ignore any streams reserved against this connection. @@ -880,6 +897,12 @@ extension ConnectionPool { self.pool.eventLoop.assertInEventLoop() return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) } } + + /// The number of active (i.e. connecting or connected) connections in the pool. + internal var activeConnections: Int { + self.pool.eventLoop.assertInEventLoop() + return self.connections - self.idleConnections + } /// The number of streams currently available to reserve across all connections in the pool. internal var availableStreams: Int { diff --git a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift index 8076342a0..5bf207566 100644 --- a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift +++ b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift @@ -274,6 +274,10 @@ extension GRPCChannelPool.Configuration { /// /// Defaults to 100. public var maxWaitersPerEventLoop: Int = 100 + + /// The minimum number of connections to keep open in this pool. + /// This number of connections will never go idle and be closed. + public var minConnections: Int = 1 /// The maximum amount of time a caller is willing to wait for a stream for before timing out. /// diff --git a/Sources/GRPC/ConnectionPool/PoolManager.swift b/Sources/GRPC/ConnectionPool/PoolManager.swift index 49d184b7e..fe44403d6 100644 --- a/Sources/GRPC/ConnectionPool/PoolManager.swift +++ b/Sources/GRPC/ConnectionPool/PoolManager.swift @@ -32,6 +32,11 @@ internal final class PoolManager { /// The maximum number of waiters per pool. @usableFromInline var maxWaiters: Int + + /// The minimum number of connections to keep open per pool. + /// This number of connections will never go idle and be closed. + @usableFromInline + var minConnections: Int /// A load threshold in the range `0.0 ... 1.0` beyond which another connection will be started /// (assuming there is a connection available to start). @@ -62,6 +67,7 @@ internal final class PoolManager { internal init( maxConnections: Int, maxWaiters: Int, + minConnections: Int, loadThreshold: Double, assumedMaxConcurrentStreams: Int = 100, connectionBackoff: ConnectionBackoff, @@ -70,6 +76,7 @@ internal final class PoolManager { ) { self.maxConnections = maxConnections self.maxWaiters = maxWaiters + self.minConnections = minConnections self.loadThreshold = loadThreshold self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams self.connectionBackoff = connectionBackoff @@ -225,6 +232,7 @@ internal final class PoolManager { return ConnectionPool( eventLoop: eventLoop, maxWaiters: configuration.maxWaiters, + minConnections: configuration.minConnections, reservationLoadThreshold: configuration.loadThreshold, assumedMaxConcurrentStreams: configuration.assumedMaxConcurrentStreams, connectionBackoff: configuration.connectionBackoff, diff --git a/Sources/GRPC/ConnectionPool/PooledChannel.swift b/Sources/GRPC/ConnectionPool/PooledChannel.swift index 1b706451f..e52ce119b 100644 --- a/Sources/GRPC/ConnectionPool/PooledChannel.swift +++ b/Sources/GRPC/ConnectionPool/PooledChannel.swift @@ -96,6 +96,7 @@ internal final class PooledChannel: GRPCChannel { perPoolConfiguration: .init( maxConnections: configuration.connectionPool.connectionsPerEventLoop, maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop, + minConnections: configuration.connectionPool.minConnections, loadThreshold: configuration.connectionPool.reservationLoadThreshold, assumedMaxConcurrentStreams: 100, connectionBackoff: configuration.connectionBackoff, diff --git a/Sources/GRPC/GRPCIdleHandler.swift b/Sources/GRPC/GRPCIdleHandler.swift index 6371cb7a7..0f9492163 100644 --- a/Sources/GRPC/GRPCIdleHandler.swift +++ b/Sources/GRPC/GRPCIdleHandler.swift @@ -24,7 +24,8 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { typealias OutboundOut = HTTP2Frame /// The amount of time to wait before closing the channel when there are no active streams. - private let idleTimeout: TimeAmount + /// If nil, then we shouldn't schedule idle tasks. + private let idleTimeout: TimeAmount? /// The ping handler. private var pingHandler: PingHandler @@ -78,7 +79,12 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { logger: Logger ) { self.mode = .client(connectionManager, multiplexer) - self.idleTimeout = idleTimeout + switch connectionManager.idleBehavior { + case .neverGoIdle: + self.idleTimeout = nil + case .closeWhenIdleTimeout: + self.idleTimeout = idleTimeout + } self.stateMachine = .init(role: .client, logger: logger) self.pingHandler = PingHandler( pingCode: 5, @@ -135,7 +141,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { } // Handle idle timeout creation/cancellation. - if let idleTask = operations.idleTask { + if let idleTimeout = self.idleTimeout, let idleTask = operations.idleTask { switch idleTask { case let .cancel(task): self.stateMachine.logger.debug("idle timeout task cancelled") @@ -145,9 +151,9 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { if self.idleTimeout != .nanoseconds(.max), let context = self.context { self.stateMachine.logger.debug( "scheduling idle timeout task", - metadata: [MetadataKey.delayMs: "\(self.idleTimeout.milliseconds)"] + metadata: [MetadataKey.delayMs: "\(idleTimeout.milliseconds)"] ) - let task = context.eventLoop.scheduleTask(in: self.idleTimeout) { + let task = context.eventLoop.scheduleTask(in: idleTimeout) { self.stateMachine.logger.debug("idle timeout task fired") self.idleTimeoutFired() } diff --git a/Tests/GRPCTests/ConnectionManagerTests.swift b/Tests/GRPCTests/ConnectionManagerTests.swift index 5bd1ad432..5840167c4 100644 --- a/Tests/GRPCTests/ConnectionManagerTests.swift +++ b/Tests/GRPCTests/ConnectionManagerTests.swift @@ -58,7 +58,8 @@ class ConnectionManagerTests: GRPCTestCase { return ConnectionManager( configuration: configuration, channelProvider: channelProvider.map { HookedChannelProvider($0) }, - connectivityDelegate: self.monitor, + connectivityDelegate: self.monitor, + idleBehavior: .closeWhenIdleTimeout, logger: self.logger ) } @@ -947,7 +948,8 @@ extension ConnectionManagerTests { channelProvider: HookedChannelProvider { _, loop in return loop.makeFailedFuture(DoomedChannelError()) }, - connectivityDelegate: nil, + connectivityDelegate: nil, + idleBehavior: .closeWhenIdleTimeout, logger: self.logger ) let candidate = manager.getHTTP2Multiplexer() @@ -1206,7 +1208,8 @@ extension ConnectionManagerTests { return eventLoop.makeSucceededFuture(channel) }, - callStartBehavior: .waitsForConnectivity, + callStartBehavior: .waitsForConnectivity, + idleBehavior: .closeWhenIdleTimeout, connectionBackoff: ConnectionBackoff(), connectivityDelegate: nil, http2Delegate: http2, @@ -1382,7 +1385,8 @@ extension ConnectionManagerTests { let manager = ConnectionManager( configuration: configuration, channelProvider: Provider(), - connectivityDelegate: self.monitor, + connectivityDelegate: self.monitor, + idleBehavior: .closeWhenIdleTimeout, logger: self.logger ) diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift index 5c2738774..e611b61e7 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift @@ -62,7 +62,8 @@ final class ConnectionPoolTests: GRPCTestCase { ) -> ConnectionPool { return ConnectionPool( eventLoop: self.eventLoop, - maxWaiters: waiters, + maxWaiters: waiters, + minConnections: 1, reservationLoadThreshold: reservationLoadThreshold, assumedMaxConcurrentStreams: 100, connectionBackoff: connectionBackoff, diff --git a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift index 438bb461d..1a982fab2 100644 --- a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift +++ b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift @@ -32,7 +32,8 @@ class PoolManagerStateMachineTests: GRPCTestCase { ) -> ConnectionPool { return ConnectionPool( eventLoop: eventLoop, - maxWaiters: maxWaiters, + maxWaiters: maxWaiters, + minConnections: 1, reservationLoadThreshold: loadThreshold, assumedMaxConcurrentStreams: maxConcurrentStreams, connectionBackoff: connectionBackoff, From 64c6caa4de603f9bc93ca0b985ed18d751dfac7e Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Tue, 5 Mar 2024 21:46:42 +0000 Subject: [PATCH 2/6] Formatting --- Sources/GRPC/ConnectionManager.swift | 6 +++--- Sources/GRPC/ConnectionPool/ConnectionPool.swift | 14 +++++++++----- Sources/GRPC/ConnectionPool/GRPCChannelPool.swift | 4 ++-- Sources/GRPC/ConnectionPool/PoolManager.swift | 2 +- Tests/GRPCTests/ConnectionManagerTests.swift | 8 ++++---- .../ConnectionPool/ConnectionPoolTests.swift | 2 +- .../PoolManagerStateMachineTests.swift | 2 +- 7 files changed, 21 insertions(+), 17 deletions(-) diff --git a/Sources/GRPC/ConnectionManager.swift b/Sources/GRPC/ConnectionManager.swift index b8d2e6eca..d30d12ee0 100644 --- a/Sources/GRPC/ConnectionManager.swift +++ b/Sources/GRPC/ConnectionManager.swift @@ -25,14 +25,14 @@ import NIOHTTP2 // event loop is being used. @usableFromInline internal final class ConnectionManager: @unchecked Sendable { - + /// Whether the connection managed by this manager should be allowed to go idle and be closed, or /// if it should remain open indefinitely even when there are no active streams. internal enum IdleBehavior { case closeWhenIdleTimeout case neverGoIdle } - + internal enum Reconnect { case none case after(TimeInterval) @@ -331,7 +331,7 @@ internal final class ConnectionManager: @unchecked Sendable { /// The configuration to use when backing off between connection attempts, if reconnection /// attempts should be made at all. private let connectionBackoff: ConnectionBackoff? - + /// Whether this connection should be allowed to go idle (and thus be closed when the idle timer fires). internal let idleBehavior: IdleBehavior diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool.swift b/Sources/GRPC/ConnectionPool/ConnectionPool.swift index 9e0cf3363..db1c684af 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool.swift @@ -84,7 +84,7 @@ internal final class ConnectionPool { /// there are this many waiters in the queue then the next waiter will be failed immediately. @usableFromInline internal let maxWaiters: Int - + /// The number of connections in the pool that should always be kept open (i.e. they won't go idle). /// In other words, it's the number of connections for which we should ignore idle timers. @usableFromInline @@ -212,7 +212,9 @@ internal final class ConnectionPool { while self._connections.count < connections { // If we have less than the minimum number of connections, don't let // the new connection close when idle. - let idleBehavior = numberOfKeepOpenConnections > 0 ? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout + let idleBehavior = + numberOfKeepOpenConnections > 0 + ? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout numberOfKeepOpenConnections -= 1 self.addConnectionToPool(idleBehavior: idleBehavior) } @@ -704,8 +706,10 @@ extension ConnectionPool: ConnectionManagerConnectivityDelegate { // If we have less than the minimum number of connections, don't let // the new connection close when idle. - let idleBehavior = self.sync.activeConnections < self.minConnections ? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout - + let idleBehavior = + self.sync.activeConnections < self.minConnections + ? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout + // Replace the connection with a new idle one. self.addConnectionToPool(idleBehavior: idleBehavior) @@ -897,7 +901,7 @@ extension ConnectionPool { self.pool.eventLoop.assertInEventLoop() return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) } } - + /// The number of active (i.e. connecting or connected) connections in the pool. internal var activeConnections: Int { self.pool.eventLoop.assertInEventLoop() diff --git a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift index 5bf207566..0c8f729e9 100644 --- a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift +++ b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift @@ -274,8 +274,8 @@ extension GRPCChannelPool.Configuration { /// /// Defaults to 100. public var maxWaitersPerEventLoop: Int = 100 - - /// The minimum number of connections to keep open in this pool. + + /// The minimum number of connections to keep open in this pool. /// This number of connections will never go idle and be closed. public var minConnections: Int = 1 diff --git a/Sources/GRPC/ConnectionPool/PoolManager.swift b/Sources/GRPC/ConnectionPool/PoolManager.swift index fe44403d6..593d11e7d 100644 --- a/Sources/GRPC/ConnectionPool/PoolManager.swift +++ b/Sources/GRPC/ConnectionPool/PoolManager.swift @@ -32,7 +32,7 @@ internal final class PoolManager { /// The maximum number of waiters per pool. @usableFromInline var maxWaiters: Int - + /// The minimum number of connections to keep open per pool. /// This number of connections will never go idle and be closed. @usableFromInline diff --git a/Tests/GRPCTests/ConnectionManagerTests.swift b/Tests/GRPCTests/ConnectionManagerTests.swift index 5840167c4..ff27d53f8 100644 --- a/Tests/GRPCTests/ConnectionManagerTests.swift +++ b/Tests/GRPCTests/ConnectionManagerTests.swift @@ -58,7 +58,7 @@ class ConnectionManagerTests: GRPCTestCase { return ConnectionManager( configuration: configuration, channelProvider: channelProvider.map { HookedChannelProvider($0) }, - connectivityDelegate: self.monitor, + connectivityDelegate: self.monitor, idleBehavior: .closeWhenIdleTimeout, logger: self.logger ) @@ -948,7 +948,7 @@ extension ConnectionManagerTests { channelProvider: HookedChannelProvider { _, loop in return loop.makeFailedFuture(DoomedChannelError()) }, - connectivityDelegate: nil, + connectivityDelegate: nil, idleBehavior: .closeWhenIdleTimeout, logger: self.logger ) @@ -1208,7 +1208,7 @@ extension ConnectionManagerTests { return eventLoop.makeSucceededFuture(channel) }, - callStartBehavior: .waitsForConnectivity, + callStartBehavior: .waitsForConnectivity, idleBehavior: .closeWhenIdleTimeout, connectionBackoff: ConnectionBackoff(), connectivityDelegate: nil, @@ -1385,7 +1385,7 @@ extension ConnectionManagerTests { let manager = ConnectionManager( configuration: configuration, channelProvider: Provider(), - connectivityDelegate: self.monitor, + connectivityDelegate: self.monitor, idleBehavior: .closeWhenIdleTimeout, logger: self.logger ) diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift index e611b61e7..fbdfe4ae1 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift @@ -62,7 +62,7 @@ final class ConnectionPoolTests: GRPCTestCase { ) -> ConnectionPool { return ConnectionPool( eventLoop: self.eventLoop, - maxWaiters: waiters, + maxWaiters: waiters, minConnections: 1, reservationLoadThreshold: reservationLoadThreshold, assumedMaxConcurrentStreams: 100, diff --git a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift index 1a982fab2..fefdb5e28 100644 --- a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift +++ b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift @@ -32,7 +32,7 @@ class PoolManagerStateMachineTests: GRPCTestCase { ) -> ConnectionPool { return ConnectionPool( eventLoop: eventLoop, - maxWaiters: maxWaiters, + maxWaiters: maxWaiters, minConnections: 1, reservationLoadThreshold: loadThreshold, assumedMaxConcurrentStreams: maxConcurrentStreams, From c99f5082550e2c29ac185e9d79d30f4062207d71 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Fri, 8 Mar 2024 12:48:26 +0000 Subject: [PATCH 3/6] PR changes --- .../GRPC/ConnectionPool/ConnectionPool.swift | 28 +++- .../GRPC/ConnectionPool/GRPCChannelPool.swift | 4 +- .../ConnectionPool/ConnectionPoolTests.swift | 151 +++++++++++++++++- .../PoolManagerStateMachineTests.swift | 2 +- 4 files changed, 178 insertions(+), 7 deletions(-) diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool.swift b/Sources/GRPC/ConnectionPool/ConnectionPool.swift index db1c684af..f13bd9000 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool.swift @@ -235,6 +235,16 @@ internal final class ConnectionPool { let id = manager.id self._connections[id] = PerConnectionState(manager: manager) self.delegate?.connectionAdded(id: .init(id)) + + // If it's one of the connections that should be kept open, then connect + // straight away. + if case .neverGoIdle = idleBehavior { + self.eventLoop.execute { + if manager.sync.isIdle { + manager.sync.startConnecting() + } + } + } } // MARK: - Called from the pool manager @@ -737,6 +747,14 @@ extension ConnectionPool: ConnectionManagerConnectivityDelegate { if let droppedReservations = self._connections[id]?.unavailable(), droppedReservations > 0 { self.streamLender.returnStreams(droppedReservations, to: self) } + + // Now that a connection has become unavailable, we must make sure the minimum + // number of connections is still met. + if let connectionManager = self._connections[id]?.manager, + case .neverGoIdle = connectionManager.idleBehavior, + connectionManager.sync.isTransientFailure { + connectionManager.sync.startConnecting() + } } } @@ -902,10 +920,16 @@ extension ConnectionPool { return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) } } - /// The number of active (i.e. connecting or connected) connections in the pool. + /// The number of active (i.e. connecting or ready) connections in the pool. internal var activeConnections: Int { self.pool.eventLoop.assertInEventLoop() - return self.connections - self.idleConnections + return self.pool._connections.values.reduce(0) { $0 &+ (($1.manager.sync.isReady || $1.manager.sync.isConnecting) ? 1 : 0) } + } + + /// The number of connections in the pool in transient failure state. + internal var transientFailureConnections: Int { + self.pool.eventLoop.assertInEventLoop() + return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isTransientFailure ? 1 : 0) } } /// The number of streams currently available to reserve across all connections in the pool. diff --git a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift index 0c8f729e9..1890cb92a 100644 --- a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift +++ b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift @@ -275,8 +275,8 @@ extension GRPCChannelPool.Configuration { /// Defaults to 100. public var maxWaitersPerEventLoop: Int = 100 - /// The minimum number of connections to keep open in this pool. - /// This number of connections will never go idle and be closed. + /// The minimum number of connections to keep open in this pool, per EventLoop. + /// This number of connections per EventLoop will never go idle and be closed. public var minConnections: Int = 1 /// The maximum amount of time a caller is willing to wait for a stream for before timing out. diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift index fbdfe4ae1..be322caf8 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift @@ -53,6 +53,8 @@ final class ConnectionPoolTests: GRPCTestCase { private func makePool( waiters: Int = 1000, reservationLoadThreshold: Double = 0.9, + minConnections: Int = 0, + assumedMaxConcurrentStreams: Int = 100, now: @escaping () -> NIODeadline = { .now() }, connectionBackoff: ConnectionBackoff = ConnectionBackoff(), delegate: GRPCConnectionPoolDelegate? = nil, @@ -63,9 +65,9 @@ final class ConnectionPoolTests: GRPCTestCase { return ConnectionPool( eventLoop: self.eventLoop, maxWaiters: waiters, - minConnections: 1, + minConnections: minConnections, reservationLoadThreshold: reservationLoadThreshold, - assumedMaxConcurrentStreams: 100, + assumedMaxConcurrentStreams: assumedMaxConcurrentStreams, connectionBackoff: connectionBackoff, channelProvider: channelProvider, streamLender: HookedStreamLender( @@ -1070,6 +1072,151 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertEqual(error.code, .deadlineExceeded) XCTAssertNotEqual(error.code, .shutdown) } + + func testMinimumConnectionsAreOpenRightAfterInitializing() { + let controller = ChannelController() + let pool = self.makePool(minConnections: 5, channelProvider: controller) + + pool.initialize(connections: 20) + self.eventLoop.run() + + XCTAssertEqual(pool.sync.connections, 20) + XCTAssertEqual(pool.sync.idleConnections, 15) + XCTAssertEqual(pool.sync.activeConnections, 5) + XCTAssertEqual(pool.sync.waiters, 0) + XCTAssertEqual(pool.sync.availableStreams, 0) + XCTAssertEqual(pool.sync.reservedStreams, 0) + XCTAssertEqual(pool.sync.transientFailureConnections, 0) + } + + func testMinimumConnectionsAreOpenAfterOneIsQuiesced() { + let controller = ChannelController() + let pool = self.makePool( + minConnections: 1, + assumedMaxConcurrentStreams: 1, + channelProvider: controller + ) + + // Initialize two connections, and make sure that only one of them is active, + // since we have set minConnections to 1. + pool.initialize(connections: 2) + self.eventLoop.run() + XCTAssertEqual(pool.sync.connections, 2) + XCTAssertEqual(pool.sync.idleConnections, 1) + XCTAssertEqual(pool.sync.activeConnections, 1) + XCTAssertEqual(pool.sync.transientFailureConnections, 0) + + // Open two streams, which, because the maxConcurrentStreams is 1, will + // create two channels. + let w1 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { + $0.eventLoop.makeSucceededVoidFuture() + } + let w2 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { + $0.eventLoop.makeSucceededVoidFuture() + } + + // Start creating the channels. + self.eventLoop.run() + + // Make both connections ready. + controller.connectChannel(atIndex: 0) + controller.sendSettingsToChannel(atIndex: 0) + controller.connectChannel(atIndex: 1) + controller.sendSettingsToChannel(atIndex: 1) + + // Run the loop to create the streams/connections. + self.eventLoop.run() + XCTAssertNoThrow(try w1.wait()) + controller.openStreamInChannel(atIndex: 0) + XCTAssertNoThrow(try w2.wait()) + controller.openStreamInChannel(atIndex: 1) + + XCTAssertEqual(pool.sync.connections, 2) + XCTAssertEqual(pool.sync.idleConnections, 0) + XCTAssertEqual(pool.sync.activeConnections, 2) + XCTAssertEqual(pool.sync.transientFailureConnections, 0) + + // Quiesce the connection that should be kept alive. + // Another connection should be brought back up immediately after, to maintain + // the minimum number of active connections that won't go idle. + controller.sendGoAwayToChannel(atIndex: 0) + XCTAssertEqual(pool.sync.connections, 3) + XCTAssertEqual(pool.sync.idleConnections, 1) + XCTAssertEqual(pool.sync.activeConnections, 2) + XCTAssertEqual(pool.sync.transientFailureConnections, 0) + + // Now quiesce the other one. This will add a new idle connection, but it + // won't connect it right away. + controller.sendGoAwayToChannel(atIndex: 1) + XCTAssertEqual(pool.sync.connections, 4) + XCTAssertEqual(pool.sync.idleConnections, 2) + XCTAssertEqual(pool.sync.activeConnections, 2) + XCTAssertEqual(pool.sync.transientFailureConnections, 0) + } + + func testMinimumConnectionsAreOpenAfterOneIsClosed() { + let controller = ChannelController() + let pool = self.makePool( + minConnections: 1, + assumedMaxConcurrentStreams: 1, + channelProvider: controller + ) + + // Initialize two connections, and make sure that only one of them is active, + // since we have set minConnections to 1. + pool.initialize(connections: 2) + self.eventLoop.run() + XCTAssertEqual(pool.sync.connections, 2) + XCTAssertEqual(pool.sync.idleConnections, 1) + XCTAssertEqual(pool.sync.activeConnections, 1) + XCTAssertEqual(pool.sync.transientFailureConnections, 0) + + // Open two streams, which, because the maxConcurrentStreams is 1, will + // create two channels. + let w1 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { + $0.eventLoop.makeSucceededVoidFuture() + } + let w2 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { + $0.eventLoop.makeSucceededVoidFuture() + } + + // Start creating the channels. + self.eventLoop.run() + + // Make both connections ready. + controller.connectChannel(atIndex: 0) + controller.sendSettingsToChannel(atIndex: 0) + controller.connectChannel(atIndex: 1) + controller.sendSettingsToChannel(atIndex: 1) + + // Run the loop to create the streams/connections. + self.eventLoop.run() + XCTAssertNoThrow(try w1.wait()) + controller.openStreamInChannel(atIndex: 0) + XCTAssertNoThrow(try w2.wait()) + controller.openStreamInChannel(atIndex: 1) + + XCTAssertEqual(pool.sync.connections, 2) + XCTAssertEqual(pool.sync.idleConnections, 0) + XCTAssertEqual(pool.sync.activeConnections, 2) + XCTAssertEqual(pool.sync.transientFailureConnections, 0) + + // Close the connection that will be kept alive. + // It should be brought back up immediately after closing. + controller.fireChannelInactiveForChannel(atIndex: 0) + XCTAssertEqual(pool.sync.connections, 2) + XCTAssertEqual(pool.sync.idleConnections, 0) + XCTAssertEqual(pool.sync.activeConnections, 2) + XCTAssertEqual(pool.sync.transientFailureConnections, 0) + + // Now close the other one, which won't be brought back up immediately. + // It should remain in transient failure state. + controller.fireChannelInactiveForChannel(atIndex: 1) + XCTAssertEqual(pool.sync.connections, 2) + XCTAssertEqual(pool.sync.idleConnections, 0) + XCTAssertEqual(pool.sync.activeConnections, 1) + XCTAssertEqual(pool.sync.transientFailureConnections, 1) + } } extension ConnectionPool { diff --git a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift index fefdb5e28..02f33fa33 100644 --- a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift +++ b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift @@ -33,7 +33,7 @@ class PoolManagerStateMachineTests: GRPCTestCase { return ConnectionPool( eventLoop: eventLoop, maxWaiters: maxWaiters, - minConnections: 1, + minConnections: 0, reservationLoadThreshold: loadThreshold, assumedMaxConcurrentStreams: maxConcurrentStreams, connectionBackoff: connectionBackoff, From 621dadc38b1b24aaa35ee36d55a3b7890be8060c Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Fri, 8 Mar 2024 12:48:42 +0000 Subject: [PATCH 4/6] Formatting --- .../GRPC/ConnectionPool/ConnectionPool.swift | 19 +++++++++----- .../ConnectionPool/ConnectionPoolTests.swift | 26 +++++++++---------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool.swift b/Sources/GRPC/ConnectionPool/ConnectionPool.swift index f13bd9000..88b369a15 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool.swift @@ -235,7 +235,7 @@ internal final class ConnectionPool { let id = manager.id self._connections[id] = PerConnectionState(manager: manager) self.delegate?.connectionAdded(id: .init(id)) - + // If it's one of the connections that should be kept open, then connect // straight away. if case .neverGoIdle = idleBehavior { @@ -747,12 +747,13 @@ extension ConnectionPool: ConnectionManagerConnectivityDelegate { if let droppedReservations = self._connections[id]?.unavailable(), droppedReservations > 0 { self.streamLender.returnStreams(droppedReservations, to: self) } - + // Now that a connection has become unavailable, we must make sure the minimum // number of connections is still met. if let connectionManager = self._connections[id]?.manager, - case .neverGoIdle = connectionManager.idleBehavior, - connectionManager.sync.isTransientFailure { + case .neverGoIdle = connectionManager.idleBehavior, + connectionManager.sync.isTransientFailure + { connectionManager.sync.startConnecting() } } @@ -923,13 +924,17 @@ extension ConnectionPool { /// The number of active (i.e. connecting or ready) connections in the pool. internal var activeConnections: Int { self.pool.eventLoop.assertInEventLoop() - return self.pool._connections.values.reduce(0) { $0 &+ (($1.manager.sync.isReady || $1.manager.sync.isConnecting) ? 1 : 0) } + return self.pool._connections.values.reduce(0) { + $0 &+ (($1.manager.sync.isReady || $1.manager.sync.isConnecting) ? 1 : 0) + } } - + /// The number of connections in the pool in transient failure state. internal var transientFailureConnections: Int { self.pool.eventLoop.assertInEventLoop() - return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isTransientFailure ? 1 : 0) } + return self.pool._connections.values.reduce(0) { + $0 &+ ($1.manager.sync.isTransientFailure ? 1 : 0) + } } /// The number of streams currently available to reserve across all connections in the pool. diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift index be322caf8..4feb963aa 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift @@ -1072,14 +1072,14 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertEqual(error.code, .deadlineExceeded) XCTAssertNotEqual(error.code, .shutdown) } - + func testMinimumConnectionsAreOpenRightAfterInitializing() { let controller = ChannelController() let pool = self.makePool(minConnections: 5, channelProvider: controller) - + pool.initialize(connections: 20) self.eventLoop.run() - + XCTAssertEqual(pool.sync.connections, 20) XCTAssertEqual(pool.sync.idleConnections, 15) XCTAssertEqual(pool.sync.activeConnections, 5) @@ -1088,7 +1088,7 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertEqual(pool.sync.reservedStreams, 0) XCTAssertEqual(pool.sync.transientFailureConnections, 0) } - + func testMinimumConnectionsAreOpenAfterOneIsQuiesced() { let controller = ChannelController() let pool = self.makePool( @@ -1105,7 +1105,7 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertEqual(pool.sync.idleConnections, 1) XCTAssertEqual(pool.sync.activeConnections, 1) XCTAssertEqual(pool.sync.transientFailureConnections, 0) - + // Open two streams, which, because the maxConcurrentStreams is 1, will // create two channels. let w1 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { @@ -1130,12 +1130,12 @@ final class ConnectionPoolTests: GRPCTestCase { controller.openStreamInChannel(atIndex: 0) XCTAssertNoThrow(try w2.wait()) controller.openStreamInChannel(atIndex: 1) - + XCTAssertEqual(pool.sync.connections, 2) XCTAssertEqual(pool.sync.idleConnections, 0) XCTAssertEqual(pool.sync.activeConnections, 2) XCTAssertEqual(pool.sync.transientFailureConnections, 0) - + // Quiesce the connection that should be kept alive. // Another connection should be brought back up immediately after, to maintain // the minimum number of active connections that won't go idle. @@ -1144,7 +1144,7 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertEqual(pool.sync.idleConnections, 1) XCTAssertEqual(pool.sync.activeConnections, 2) XCTAssertEqual(pool.sync.transientFailureConnections, 0) - + // Now quiesce the other one. This will add a new idle connection, but it // won't connect it right away. controller.sendGoAwayToChannel(atIndex: 1) @@ -1153,7 +1153,7 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertEqual(pool.sync.activeConnections, 2) XCTAssertEqual(pool.sync.transientFailureConnections, 0) } - + func testMinimumConnectionsAreOpenAfterOneIsClosed() { let controller = ChannelController() let pool = self.makePool( @@ -1170,7 +1170,7 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertEqual(pool.sync.idleConnections, 1) XCTAssertEqual(pool.sync.activeConnections, 1) XCTAssertEqual(pool.sync.transientFailureConnections, 0) - + // Open two streams, which, because the maxConcurrentStreams is 1, will // create two channels. let w1 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { @@ -1195,12 +1195,12 @@ final class ConnectionPoolTests: GRPCTestCase { controller.openStreamInChannel(atIndex: 0) XCTAssertNoThrow(try w2.wait()) controller.openStreamInChannel(atIndex: 1) - + XCTAssertEqual(pool.sync.connections, 2) XCTAssertEqual(pool.sync.idleConnections, 0) XCTAssertEqual(pool.sync.activeConnections, 2) XCTAssertEqual(pool.sync.transientFailureConnections, 0) - + // Close the connection that will be kept alive. // It should be brought back up immediately after closing. controller.fireChannelInactiveForChannel(atIndex: 0) @@ -1208,7 +1208,7 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertEqual(pool.sync.idleConnections, 0) XCTAssertEqual(pool.sync.activeConnections, 2) XCTAssertEqual(pool.sync.transientFailureConnections, 0) - + // Now close the other one, which won't be brought back up immediately. // It should remain in transient failure state. controller.fireChannelInactiveForChannel(atIndex: 1) From 3606b441c370aa0cfbab3b2804a031a6571cf776 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Fri, 8 Mar 2024 15:30:51 +0000 Subject: [PATCH 5/6] PR changes --- .../GRPC/ConnectionPool/ConnectionPool.swift | 25 ++------ .../GRPC/ConnectionPool/GRPCChannelPool.swift | 2 +- .../ConnectionPool/ConnectionPoolTests.swift | 64 ------------------- 3 files changed, 8 insertions(+), 83 deletions(-) diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool.swift b/Sources/GRPC/ConnectionPool/ConnectionPool.swift index 88b369a15..9ea75b405 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool.swift @@ -238,12 +238,15 @@ internal final class ConnectionPool { // If it's one of the connections that should be kept open, then connect // straight away. - if case .neverGoIdle = idleBehavior { + switch idleBehavior { + case .neverGoIdle: self.eventLoop.execute { if manager.sync.isIdle { manager.sync.startConnecting() } } + case .closeWhenIdleTimeout: + () } } @@ -714,14 +717,9 @@ extension ConnectionPool: ConnectionManagerConnectivityDelegate { // Grab the number of reserved streams (before invalidating the index by adding a connection). let reservedStreams = self._connections.values[index].reservedStreams - // If we have less than the minimum number of connections, don't let - // the new connection close when idle. - let idleBehavior = - self.sync.activeConnections < self.minConnections - ? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout - - // Replace the connection with a new idle one. - self.addConnectionToPool(idleBehavior: idleBehavior) + // Replace the connection with a new idle one. Keep the idle behavior, so that + // if it's a connection that should be kept alive, we maintain it. + self.addConnectionToPool(idleBehavior: manager.idleBehavior) // Since we're removing this connection from the pool (and no new streams can be created on // the connection), the pool manager can ignore any streams reserved against this connection. @@ -747,15 +745,6 @@ extension ConnectionPool: ConnectionManagerConnectivityDelegate { if let droppedReservations = self._connections[id]?.unavailable(), droppedReservations > 0 { self.streamLender.returnStreams(droppedReservations, to: self) } - - // Now that a connection has become unavailable, we must make sure the minimum - // number of connections is still met. - if let connectionManager = self._connections[id]?.manager, - case .neverGoIdle = connectionManager.idleBehavior, - connectionManager.sync.isTransientFailure - { - connectionManager.sync.startConnecting() - } } } diff --git a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift index 1890cb92a..b3881b66e 100644 --- a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift +++ b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift @@ -277,7 +277,7 @@ extension GRPCChannelPool.Configuration { /// The minimum number of connections to keep open in this pool, per EventLoop. /// This number of connections per EventLoop will never go idle and be closed. - public var minConnections: Int = 1 + public var minConnections: Int = 0 /// The maximum amount of time a caller is willing to wait for a stream for before timing out. /// diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift index 4feb963aa..65bb416cd 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift @@ -1153,70 +1153,6 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertEqual(pool.sync.activeConnections, 2) XCTAssertEqual(pool.sync.transientFailureConnections, 0) } - - func testMinimumConnectionsAreOpenAfterOneIsClosed() { - let controller = ChannelController() - let pool = self.makePool( - minConnections: 1, - assumedMaxConcurrentStreams: 1, - channelProvider: controller - ) - - // Initialize two connections, and make sure that only one of them is active, - // since we have set minConnections to 1. - pool.initialize(connections: 2) - self.eventLoop.run() - XCTAssertEqual(pool.sync.connections, 2) - XCTAssertEqual(pool.sync.idleConnections, 1) - XCTAssertEqual(pool.sync.activeConnections, 1) - XCTAssertEqual(pool.sync.transientFailureConnections, 0) - - // Open two streams, which, because the maxConcurrentStreams is 1, will - // create two channels. - let w1 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { - $0.eventLoop.makeSucceededVoidFuture() - } - let w2 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { - $0.eventLoop.makeSucceededVoidFuture() - } - - // Start creating the channels. - self.eventLoop.run() - - // Make both connections ready. - controller.connectChannel(atIndex: 0) - controller.sendSettingsToChannel(atIndex: 0) - controller.connectChannel(atIndex: 1) - controller.sendSettingsToChannel(atIndex: 1) - - // Run the loop to create the streams/connections. - self.eventLoop.run() - XCTAssertNoThrow(try w1.wait()) - controller.openStreamInChannel(atIndex: 0) - XCTAssertNoThrow(try w2.wait()) - controller.openStreamInChannel(atIndex: 1) - - XCTAssertEqual(pool.sync.connections, 2) - XCTAssertEqual(pool.sync.idleConnections, 0) - XCTAssertEqual(pool.sync.activeConnections, 2) - XCTAssertEqual(pool.sync.transientFailureConnections, 0) - - // Close the connection that will be kept alive. - // It should be brought back up immediately after closing. - controller.fireChannelInactiveForChannel(atIndex: 0) - XCTAssertEqual(pool.sync.connections, 2) - XCTAssertEqual(pool.sync.idleConnections, 0) - XCTAssertEqual(pool.sync.activeConnections, 2) - XCTAssertEqual(pool.sync.transientFailureConnections, 0) - - // Now close the other one, which won't be brought back up immediately. - // It should remain in transient failure state. - controller.fireChannelInactiveForChannel(atIndex: 1) - XCTAssertEqual(pool.sync.connections, 2) - XCTAssertEqual(pool.sync.idleConnections, 0) - XCTAssertEqual(pool.sync.activeConnections, 1) - XCTAssertEqual(pool.sync.transientFailureConnections, 1) - } } extension ConnectionPool { From b066219c0321f4703ea9dc292f374b925c365d9a Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Fri, 8 Mar 2024 17:48:41 +0000 Subject: [PATCH 6/6] Rename min connections config --- Sources/GRPC/ConnectionPool/GRPCChannelPool.swift | 2 +- Sources/GRPC/ConnectionPool/PooledChannel.swift | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift index b3881b66e..0af26fecc 100644 --- a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift +++ b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift @@ -277,7 +277,7 @@ extension GRPCChannelPool.Configuration { /// The minimum number of connections to keep open in this pool, per EventLoop. /// This number of connections per EventLoop will never go idle and be closed. - public var minConnections: Int = 0 + public var minConnectionsPerEventLoop: Int = 0 /// The maximum amount of time a caller is willing to wait for a stream for before timing out. /// diff --git a/Sources/GRPC/ConnectionPool/PooledChannel.swift b/Sources/GRPC/ConnectionPool/PooledChannel.swift index e52ce119b..a8715070e 100644 --- a/Sources/GRPC/ConnectionPool/PooledChannel.swift +++ b/Sources/GRPC/ConnectionPool/PooledChannel.swift @@ -96,7 +96,7 @@ internal final class PooledChannel: GRPCChannel { perPoolConfiguration: .init( maxConnections: configuration.connectionPool.connectionsPerEventLoop, maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop, - minConnections: configuration.connectionPool.minConnections, + minConnections: configuration.connectionPool.minConnectionsPerEventLoop, loadThreshold: configuration.connectionPool.reservationLoadThreshold, assumedMaxConcurrentStreams: 100, connectionBackoff: configuration.connectionBackoff,