diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool.swift b/Sources/GRPC/ConnectionPool/ConnectionPool.swift index d9ffe7bd6..5ae1e973c 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool.swift @@ -70,6 +70,10 @@ internal final class ConnectionPool { @usableFromInline internal let maxWaiters: Int + /// Configuration for backoff between subsequence connection attempts. + @usableFromInline + internal let connectionBackoff: ConnectionBackoff + /// Provides a channel factory to the `ConnectionManager`. @usableFromInline internal let channelProvider: ConnectionManagerChannelProvider @@ -125,6 +129,7 @@ internal final class ConnectionPool { maxWaiters: Int, reservationLoadThreshold: Double, assumedMaxConcurrentStreams: Int, + connectionBackoff: ConnectionBackoff, channelProvider: ConnectionManagerChannelProvider, streamLender: StreamLender, logger: GRPCLogger, @@ -142,6 +147,7 @@ internal final class ConnectionPool { self.waiters = CircularBuffer(initialCapacity: 16) self.eventLoop = eventLoop + self.connectionBackoff = connectionBackoff self.channelProvider = channelProvider self.streamLender = streamLender self.logger = logger @@ -165,7 +171,7 @@ internal final class ConnectionPool { eventLoop: self.eventLoop, channelProvider: self.channelProvider, callStartBehavior: .waitsForConnectivity, - connectionBackoff: ConnectionBackoff(), + connectionBackoff: self.connectionBackoff, connectivityDelegate: self, http2Delegate: self, logger: self.logger.unwrapped diff --git a/Sources/GRPC/ConnectionPool/PoolManager.swift b/Sources/GRPC/ConnectionPool/PoolManager.swift index a1a879fcb..6e1ccd456 100644 --- a/Sources/GRPC/ConnectionPool/PoolManager.swift +++ b/Sources/GRPC/ConnectionPool/PoolManager.swift @@ -45,6 +45,9 @@ internal final class PoolManager { return self.maxConnections * self.assumedMaxConcurrentStreams } + @usableFromInline + var connectionBackoff: ConnectionBackoff + /// A `Channel` provider. @usableFromInline var channelProvider: DefaultChannelProvider @@ -55,12 +58,14 @@ internal final class PoolManager { maxWaiters: Int, loadThreshold: Double, assumedMaxConcurrentStreams: Int = 100, + connectionBackoff: ConnectionBackoff, channelProvider: DefaultChannelProvider ) { self.maxConnections = maxConnections self.maxWaiters = maxWaiters self.loadThreshold = loadThreshold self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams + self.connectionBackoff = connectionBackoff self.channelProvider = channelProvider } } @@ -211,6 +216,7 @@ internal final class PoolManager { maxWaiters: configuration.maxWaiters, reservationLoadThreshold: configuration.loadThreshold, assumedMaxConcurrentStreams: configuration.assumedMaxConcurrentStreams, + connectionBackoff: configuration.connectionBackoff, channelProvider: configuration.channelProvider, streamLender: self, logger: logger diff --git a/Sources/GRPC/ConnectionPool/PooledChannel.swift b/Sources/GRPC/ConnectionPool/PooledChannel.swift index 701be53f0..fff804a66 100644 --- a/Sources/GRPC/ConnectionPool/PooledChannel.swift +++ b/Sources/GRPC/ConnectionPool/PooledChannel.swift @@ -72,6 +72,7 @@ internal final class PooledChannel: GRPCChannel { maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop, loadThreshold: configuration.connectionPool.reservationLoadThreshold, assumedMaxConcurrentStreams: 100, + connectionBackoff: configuration.connectionBackoff, channelProvider: provider ), logger: configuration.backgroundActivityLogger.wrapped diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift index ce1b88926..61b9e7f34 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift @@ -52,6 +52,7 @@ final class ConnectionPoolTests: GRPCTestCase { waiters: Int = 1000, reservationLoadThreshold: Double = 0.9, now: @escaping () -> NIODeadline = { .now() }, + connectionBackoff: ConnectionBackoff = ConnectionBackoff(), onReservationReturned: @escaping (Int) -> Void = { _ in }, onMaximumReservationsChange: @escaping (Int) -> Void = { _ in }, channelProvider: ConnectionManagerChannelProvider @@ -61,6 +62,7 @@ final class ConnectionPoolTests: GRPCTestCase { maxWaiters: waiters, reservationLoadThreshold: reservationLoadThreshold, assumedMaxConcurrentStreams: 100, + connectionBackoff: connectionBackoff, channelProvider: channelProvider, streamLender: HookedStreamLender( onReturnStreams: onReservationReturned, @@ -85,6 +87,7 @@ final class ConnectionPoolTests: GRPCTestCase { waiters: Int = 1000, reservationLoadThreshold: Double = 0.9, now: @escaping () -> NIODeadline = { .now() }, + connectionBackoff: ConnectionBackoff = ConnectionBackoff(), onReservationReturned: @escaping (Int) -> Void = { _ in }, onMaximumReservationsChange: @escaping (Int) -> Void = { _ in } ) -> (ConnectionPool, ChannelController) { @@ -93,6 +96,7 @@ final class ConnectionPoolTests: GRPCTestCase { waiters: waiters, reservationLoadThreshold: reservationLoadThreshold, now: now, + connectionBackoff: connectionBackoff, onReservationReturned: onReservationReturned, onMaximumReservationsChange: onMaximumReservationsChange, channelProvider: controller @@ -631,6 +635,71 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertEqual(pool.sync.reservedStreams, 0) XCTAssertEqual(pool.sync.availableStreams, 100) } + + func testBackoffIsUsedForReconnections() { + // Fix backoff to always be 1 second. + let backoff = ConnectionBackoff( + initialBackoff: 1.0, + maximumBackoff: 1.0, + multiplier: 1.0, + jitter: 0.0 + ) + + let (pool, controller) = self.setUpPoolAndController(connectionBackoff: backoff) + pool.initialize(connections: 1) + XCTAssertEqual(pool.sync.connections, 1) + + let w1 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { + $0.eventLoop.makeSucceededVoidFuture() + } + // Start creating the channel. + self.eventLoop.run() + + // Make the connection 'ready'. + controller.connectChannel(atIndex: 0) + controller.sendSettingsToChannel(atIndex: 0) + self.eventLoop.run() + XCTAssertNoThrow(try w1.wait()) + controller.openStreamInChannel(atIndex: 0) + + // Close the connection. It should hit the transient failure state. + controller.fireChannelInactiveForChannel(atIndex: 0) + // Now nothing is available in the pool. + XCTAssertEqual(pool.sync.waiters, 0) + XCTAssertEqual(pool.sync.availableStreams, 0) + XCTAssertEqual(pool.sync.reservedStreams, 0) + XCTAssertEqual(pool.sync.idleConnections, 0) + + // Enqueue two waiters. One to time out before the reconnect happens. + let w2 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { + $0.eventLoop.makeSucceededVoidFuture() + } + + let w3 = pool.makeStream( + deadline: .uptimeNanoseconds(UInt64(TimeAmount.milliseconds(500).nanoseconds)), + logger: self.logger.wrapped + ) { + $0.eventLoop.makeSucceededVoidFuture() + } + + XCTAssertEqual(pool.sync.waiters, 2) + + // Time out w3. + self.eventLoop.advanceTime(by: .milliseconds(500)) + XCTAssertThrowsError(try w3.wait()) + XCTAssertEqual(pool.sync.waiters, 1) + + // Wait a little more for the backoff to pass. The controller should now have a second channel. + self.eventLoop.advanceTime(by: .milliseconds(500)) + XCTAssertEqual(controller.count, 2) + + // Start up the next channel. + controller.connectChannel(atIndex: 1) + controller.sendSettingsToChannel(atIndex: 1) + self.eventLoop.run() + XCTAssertNoThrow(try w2.wait()) + controller.openStreamInChannel(atIndex: 1) + } } // MARK: - Helpers diff --git a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift index 0b7a11baf..77f8b2515 100644 --- a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift +++ b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift @@ -25,6 +25,7 @@ class PoolManagerStateMachineTests: GRPCTestCase { maxWaiters: Int = 100, maxConcurrentStreams: Int = 100, loadThreshold: Double = 0.9, + connectionBackoff: ConnectionBackoff = ConnectionBackoff(), makeChannel: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture ) -> ConnectionPool { return ConnectionPool( @@ -32,6 +33,7 @@ class PoolManagerStateMachineTests: GRPCTestCase { maxWaiters: maxWaiters, reservationLoadThreshold: loadThreshold, assumedMaxConcurrentStreams: maxConcurrentStreams, + connectionBackoff: connectionBackoff, channelProvider: HookedChannelProvider(makeChannel), streamLender: HookedStreamLender( onReturnStreams: { _ in },