diff --git a/Sources/GRPC/ClientConnection.swift b/Sources/GRPC/ClientConnection.swift index d238b7def..26adea540 100644 --- a/Sources/GRPC/ClientConnection.swift +++ b/Sources/GRPC/ClientConnection.swift @@ -131,6 +131,7 @@ public final class ClientConnection: Sendable { self.connectionManager = ConnectionManager( configuration: configuration, connectivityDelegate: monitor, + idleBehavior: .closeWhenIdleTimeout, logger: configuration.backgroundActivityLogger ) } diff --git a/Sources/GRPC/ConnectionManager.swift b/Sources/GRPC/ConnectionManager.swift index e21be63b2..d30d12ee0 100644 --- a/Sources/GRPC/ConnectionManager.swift +++ b/Sources/GRPC/ConnectionManager.swift @@ -25,6 +25,14 @@ import NIOHTTP2 // event loop is being used. @usableFromInline internal final class ConnectionManager: @unchecked Sendable { + + /// Whether the connection managed by this manager should be allowed to go idle and be closed, or + /// if it should remain open indefinitely even when there are no active streams. + internal enum IdleBehavior { + case closeWhenIdleTimeout + case neverGoIdle + } + internal enum Reconnect { case none case after(TimeInterval) @@ -324,6 +332,9 @@ internal final class ConnectionManager: @unchecked Sendable { /// attempts should be made at all. private let connectionBackoff: ConnectionBackoff? + /// Whether this connection should be allowed to go idle (and thus be closed when the idle timer fires). + internal let idleBehavior: IdleBehavior + /// A logger. internal var logger: Logger @@ -356,12 +367,14 @@ internal final class ConnectionManager: @unchecked Sendable { configuration: ClientConnection.Configuration, channelProvider: ConnectionManagerChannelProvider? = nil, connectivityDelegate: ConnectionManagerConnectivityDelegate?, + idleBehavior: IdleBehavior, logger: Logger ) { self.init( eventLoop: configuration.eventLoopGroup.next(), channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration), callStartBehavior: configuration.callStartBehavior.wrapped, + idleBehavior: idleBehavior, connectionBackoff: configuration.connectionBackoff, connectivityDelegate: connectivityDelegate, http2Delegate: nil, @@ -373,6 +386,7 @@ internal final class ConnectionManager: @unchecked Sendable { eventLoop: EventLoop, channelProvider: ConnectionManagerChannelProvider, callStartBehavior: CallStartBehavior.Behavior, + idleBehavior: IdleBehavior, connectionBackoff: ConnectionBackoff?, connectivityDelegate: ConnectionManagerConnectivityDelegate?, http2Delegate: ConnectionManagerHTTP2Delegate?, @@ -392,6 +406,7 @@ internal final class ConnectionManager: @unchecked Sendable { self.connectionBackoff = connectionBackoff self.connectivityDelegate = connectivityDelegate self.http2Delegate = http2Delegate + self.idleBehavior = idleBehavior self.connectionID = connectionID self.channelNumber = channelNumber @@ -799,7 +814,7 @@ internal final class ConnectionManager: @unchecked Sendable { // Yes, after some time. case let .after(delay): let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting") - // Fail the candidate mux promise. KEep the 'readyChannelMuxPromise' as we'll try again. + // Fail the candidate mux promise. Keep the 'readyChannelMuxPromise' as we'll try again. connecting.candidateMuxPromise.fail(error) let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) { diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool.swift b/Sources/GRPC/ConnectionPool/ConnectionPool.swift index 039d788f3..9ea75b405 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool.swift @@ -85,6 +85,11 @@ internal final class ConnectionPool { @usableFromInline internal let maxWaiters: Int + /// The number of connections in the pool that should always be kept open (i.e. they won't go idle). + /// In other words, it's the number of connections for which we should ignore idle timers. + @usableFromInline + internal let minConnections: Int + /// Configuration for backoff between subsequence connection attempts. @usableFromInline internal let connectionBackoff: ConnectionBackoff @@ -157,6 +162,7 @@ internal final class ConnectionPool { init( eventLoop: EventLoop, maxWaiters: Int, + minConnections: Int, reservationLoadThreshold: Double, assumedMaxConcurrentStreams: Int, connectionBackoff: ConnectionBackoff, @@ -176,6 +182,7 @@ internal final class ConnectionPool { self._connections = [:] self.maxWaiters = maxWaiters + self.minConnections = minConnections self.waiters = CircularBuffer(initialCapacity: 16) self.eventLoop = eventLoop @@ -201,17 +208,25 @@ internal final class ConnectionPool { ] ) self._connections.reserveCapacity(connections) + var numberOfKeepOpenConnections = self.minConnections while self._connections.count < connections { - self.addConnectionToPool() + // If we have less than the minimum number of connections, don't let + // the new connection close when idle. + let idleBehavior = + numberOfKeepOpenConnections > 0 + ? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout + numberOfKeepOpenConnections -= 1 + self.addConnectionToPool(idleBehavior: idleBehavior) } } /// Make and add a new connection to the pool. - private func addConnectionToPool() { + private func addConnectionToPool(idleBehavior: ConnectionManager.IdleBehavior) { let manager = ConnectionManager( eventLoop: self.eventLoop, channelProvider: self.channelProvider, callStartBehavior: .waitsForConnectivity, + idleBehavior: idleBehavior, connectionBackoff: self.connectionBackoff, connectivityDelegate: self, http2Delegate: self, @@ -220,6 +235,19 @@ internal final class ConnectionPool { let id = manager.id self._connections[id] = PerConnectionState(manager: manager) self.delegate?.connectionAdded(id: .init(id)) + + // If it's one of the connections that should be kept open, then connect + // straight away. + switch idleBehavior { + case .neverGoIdle: + self.eventLoop.execute { + if manager.sync.isIdle { + manager.sync.startConnecting() + } + } + case .closeWhenIdleTimeout: + () + } } // MARK: - Called from the pool manager @@ -689,8 +717,9 @@ extension ConnectionPool: ConnectionManagerConnectivityDelegate { // Grab the number of reserved streams (before invalidating the index by adding a connection). let reservedStreams = self._connections.values[index].reservedStreams - // Replace the connection with a new idle one. - self.addConnectionToPool() + // Replace the connection with a new idle one. Keep the idle behavior, so that + // if it's a connection that should be kept alive, we maintain it. + self.addConnectionToPool(idleBehavior: manager.idleBehavior) // Since we're removing this connection from the pool (and no new streams can be created on // the connection), the pool manager can ignore any streams reserved against this connection. @@ -881,6 +910,22 @@ extension ConnectionPool { return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) } } + /// The number of active (i.e. connecting or ready) connections in the pool. + internal var activeConnections: Int { + self.pool.eventLoop.assertInEventLoop() + return self.pool._connections.values.reduce(0) { + $0 &+ (($1.manager.sync.isReady || $1.manager.sync.isConnecting) ? 1 : 0) + } + } + + /// The number of connections in the pool in transient failure state. + internal var transientFailureConnections: Int { + self.pool.eventLoop.assertInEventLoop() + return self.pool._connections.values.reduce(0) { + $0 &+ ($1.manager.sync.isTransientFailure ? 1 : 0) + } + } + /// The number of streams currently available to reserve across all connections in the pool. internal var availableStreams: Int { self.pool.eventLoop.assertInEventLoop() diff --git a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift index 8076342a0..0af26fecc 100644 --- a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift +++ b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift @@ -275,6 +275,10 @@ extension GRPCChannelPool.Configuration { /// Defaults to 100. public var maxWaitersPerEventLoop: Int = 100 + /// The minimum number of connections to keep open in this pool, per EventLoop. + /// This number of connections per EventLoop will never go idle and be closed. + public var minConnectionsPerEventLoop: Int = 0 + /// The maximum amount of time a caller is willing to wait for a stream for before timing out. /// /// Defaults to 30 seconds. diff --git a/Sources/GRPC/ConnectionPool/PoolManager.swift b/Sources/GRPC/ConnectionPool/PoolManager.swift index 49d184b7e..593d11e7d 100644 --- a/Sources/GRPC/ConnectionPool/PoolManager.swift +++ b/Sources/GRPC/ConnectionPool/PoolManager.swift @@ -33,6 +33,11 @@ internal final class PoolManager { @usableFromInline var maxWaiters: Int + /// The minimum number of connections to keep open per pool. + /// This number of connections will never go idle and be closed. + @usableFromInline + var minConnections: Int + /// A load threshold in the range `0.0 ... 1.0` beyond which another connection will be started /// (assuming there is a connection available to start). @usableFromInline @@ -62,6 +67,7 @@ internal final class PoolManager { internal init( maxConnections: Int, maxWaiters: Int, + minConnections: Int, loadThreshold: Double, assumedMaxConcurrentStreams: Int = 100, connectionBackoff: ConnectionBackoff, @@ -70,6 +76,7 @@ internal final class PoolManager { ) { self.maxConnections = maxConnections self.maxWaiters = maxWaiters + self.minConnections = minConnections self.loadThreshold = loadThreshold self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams self.connectionBackoff = connectionBackoff @@ -225,6 +232,7 @@ internal final class PoolManager { return ConnectionPool( eventLoop: eventLoop, maxWaiters: configuration.maxWaiters, + minConnections: configuration.minConnections, reservationLoadThreshold: configuration.loadThreshold, assumedMaxConcurrentStreams: configuration.assumedMaxConcurrentStreams, connectionBackoff: configuration.connectionBackoff, diff --git a/Sources/GRPC/ConnectionPool/PooledChannel.swift b/Sources/GRPC/ConnectionPool/PooledChannel.swift index 1b706451f..a8715070e 100644 --- a/Sources/GRPC/ConnectionPool/PooledChannel.swift +++ b/Sources/GRPC/ConnectionPool/PooledChannel.swift @@ -96,6 +96,7 @@ internal final class PooledChannel: GRPCChannel { perPoolConfiguration: .init( maxConnections: configuration.connectionPool.connectionsPerEventLoop, maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop, + minConnections: configuration.connectionPool.minConnectionsPerEventLoop, loadThreshold: configuration.connectionPool.reservationLoadThreshold, assumedMaxConcurrentStreams: 100, connectionBackoff: configuration.connectionBackoff, diff --git a/Sources/GRPC/GRPCIdleHandler.swift b/Sources/GRPC/GRPCIdleHandler.swift index 6371cb7a7..0f9492163 100644 --- a/Sources/GRPC/GRPCIdleHandler.swift +++ b/Sources/GRPC/GRPCIdleHandler.swift @@ -24,7 +24,8 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { typealias OutboundOut = HTTP2Frame /// The amount of time to wait before closing the channel when there are no active streams. - private let idleTimeout: TimeAmount + /// If nil, then we shouldn't schedule idle tasks. + private let idleTimeout: TimeAmount? /// The ping handler. private var pingHandler: PingHandler @@ -78,7 +79,12 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { logger: Logger ) { self.mode = .client(connectionManager, multiplexer) - self.idleTimeout = idleTimeout + switch connectionManager.idleBehavior { + case .neverGoIdle: + self.idleTimeout = nil + case .closeWhenIdleTimeout: + self.idleTimeout = idleTimeout + } self.stateMachine = .init(role: .client, logger: logger) self.pingHandler = PingHandler( pingCode: 5, @@ -135,7 +141,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { } // Handle idle timeout creation/cancellation. - if let idleTask = operations.idleTask { + if let idleTimeout = self.idleTimeout, let idleTask = operations.idleTask { switch idleTask { case let .cancel(task): self.stateMachine.logger.debug("idle timeout task cancelled") @@ -145,9 +151,9 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { if self.idleTimeout != .nanoseconds(.max), let context = self.context { self.stateMachine.logger.debug( "scheduling idle timeout task", - metadata: [MetadataKey.delayMs: "\(self.idleTimeout.milliseconds)"] + metadata: [MetadataKey.delayMs: "\(idleTimeout.milliseconds)"] ) - let task = context.eventLoop.scheduleTask(in: self.idleTimeout) { + let task = context.eventLoop.scheduleTask(in: idleTimeout) { self.stateMachine.logger.debug("idle timeout task fired") self.idleTimeoutFired() } diff --git a/Tests/GRPCTests/ConnectionManagerTests.swift b/Tests/GRPCTests/ConnectionManagerTests.swift index 5bd1ad432..ff27d53f8 100644 --- a/Tests/GRPCTests/ConnectionManagerTests.swift +++ b/Tests/GRPCTests/ConnectionManagerTests.swift @@ -59,6 +59,7 @@ class ConnectionManagerTests: GRPCTestCase { configuration: configuration, channelProvider: channelProvider.map { HookedChannelProvider($0) }, connectivityDelegate: self.monitor, + idleBehavior: .closeWhenIdleTimeout, logger: self.logger ) } @@ -948,6 +949,7 @@ extension ConnectionManagerTests { return loop.makeFailedFuture(DoomedChannelError()) }, connectivityDelegate: nil, + idleBehavior: .closeWhenIdleTimeout, logger: self.logger ) let candidate = manager.getHTTP2Multiplexer() @@ -1207,6 +1209,7 @@ extension ConnectionManagerTests { return eventLoop.makeSucceededFuture(channel) }, callStartBehavior: .waitsForConnectivity, + idleBehavior: .closeWhenIdleTimeout, connectionBackoff: ConnectionBackoff(), connectivityDelegate: nil, http2Delegate: http2, @@ -1383,6 +1386,7 @@ extension ConnectionManagerTests { configuration: configuration, channelProvider: Provider(), connectivityDelegate: self.monitor, + idleBehavior: .closeWhenIdleTimeout, logger: self.logger ) diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift index 5c2738774..65bb416cd 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift @@ -53,6 +53,8 @@ final class ConnectionPoolTests: GRPCTestCase { private func makePool( waiters: Int = 1000, reservationLoadThreshold: Double = 0.9, + minConnections: Int = 0, + assumedMaxConcurrentStreams: Int = 100, now: @escaping () -> NIODeadline = { .now() }, connectionBackoff: ConnectionBackoff = ConnectionBackoff(), delegate: GRPCConnectionPoolDelegate? = nil, @@ -63,8 +65,9 @@ final class ConnectionPoolTests: GRPCTestCase { return ConnectionPool( eventLoop: self.eventLoop, maxWaiters: waiters, + minConnections: minConnections, reservationLoadThreshold: reservationLoadThreshold, - assumedMaxConcurrentStreams: 100, + assumedMaxConcurrentStreams: assumedMaxConcurrentStreams, connectionBackoff: connectionBackoff, channelProvider: channelProvider, streamLender: HookedStreamLender( @@ -1069,6 +1072,87 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertEqual(error.code, .deadlineExceeded) XCTAssertNotEqual(error.code, .shutdown) } + + func testMinimumConnectionsAreOpenRightAfterInitializing() { + let controller = ChannelController() + let pool = self.makePool(minConnections: 5, channelProvider: controller) + + pool.initialize(connections: 20) + self.eventLoop.run() + + XCTAssertEqual(pool.sync.connections, 20) + XCTAssertEqual(pool.sync.idleConnections, 15) + XCTAssertEqual(pool.sync.activeConnections, 5) + XCTAssertEqual(pool.sync.waiters, 0) + XCTAssertEqual(pool.sync.availableStreams, 0) + XCTAssertEqual(pool.sync.reservedStreams, 0) + XCTAssertEqual(pool.sync.transientFailureConnections, 0) + } + + func testMinimumConnectionsAreOpenAfterOneIsQuiesced() { + let controller = ChannelController() + let pool = self.makePool( + minConnections: 1, + assumedMaxConcurrentStreams: 1, + channelProvider: controller + ) + + // Initialize two connections, and make sure that only one of them is active, + // since we have set minConnections to 1. + pool.initialize(connections: 2) + self.eventLoop.run() + XCTAssertEqual(pool.sync.connections, 2) + XCTAssertEqual(pool.sync.idleConnections, 1) + XCTAssertEqual(pool.sync.activeConnections, 1) + XCTAssertEqual(pool.sync.transientFailureConnections, 0) + + // Open two streams, which, because the maxConcurrentStreams is 1, will + // create two channels. + let w1 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { + $0.eventLoop.makeSucceededVoidFuture() + } + let w2 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { + $0.eventLoop.makeSucceededVoidFuture() + } + + // Start creating the channels. + self.eventLoop.run() + + // Make both connections ready. + controller.connectChannel(atIndex: 0) + controller.sendSettingsToChannel(atIndex: 0) + controller.connectChannel(atIndex: 1) + controller.sendSettingsToChannel(atIndex: 1) + + // Run the loop to create the streams/connections. + self.eventLoop.run() + XCTAssertNoThrow(try w1.wait()) + controller.openStreamInChannel(atIndex: 0) + XCTAssertNoThrow(try w2.wait()) + controller.openStreamInChannel(atIndex: 1) + + XCTAssertEqual(pool.sync.connections, 2) + XCTAssertEqual(pool.sync.idleConnections, 0) + XCTAssertEqual(pool.sync.activeConnections, 2) + XCTAssertEqual(pool.sync.transientFailureConnections, 0) + + // Quiesce the connection that should be kept alive. + // Another connection should be brought back up immediately after, to maintain + // the minimum number of active connections that won't go idle. + controller.sendGoAwayToChannel(atIndex: 0) + XCTAssertEqual(pool.sync.connections, 3) + XCTAssertEqual(pool.sync.idleConnections, 1) + XCTAssertEqual(pool.sync.activeConnections, 2) + XCTAssertEqual(pool.sync.transientFailureConnections, 0) + + // Now quiesce the other one. This will add a new idle connection, but it + // won't connect it right away. + controller.sendGoAwayToChannel(atIndex: 1) + XCTAssertEqual(pool.sync.connections, 4) + XCTAssertEqual(pool.sync.idleConnections, 2) + XCTAssertEqual(pool.sync.activeConnections, 2) + XCTAssertEqual(pool.sync.transientFailureConnections, 0) + } } extension ConnectionPool { diff --git a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift index 438bb461d..02f33fa33 100644 --- a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift +++ b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift @@ -33,6 +33,7 @@ class PoolManagerStateMachineTests: GRPCTestCase { return ConnectionPool( eventLoop: eventLoop, maxWaiters: maxWaiters, + minConnections: 0, reservationLoadThreshold: loadThreshold, assumedMaxConcurrentStreams: maxConcurrentStreams, connectionBackoff: connectionBackoff,