Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire up connection backoff of channel pool #1289

Merged
merged 1 commit into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Sources/GRPC/ConnectionPool/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ internal final class ConnectionPool {
@usableFromInline
internal let maxWaiters: Int

/// Configuration for backoff between subsequence connection attempts.
@usableFromInline
internal let connectionBackoff: ConnectionBackoff

/// Provides a channel factory to the `ConnectionManager`.
@usableFromInline
internal let channelProvider: ConnectionManagerChannelProvider
Expand Down Expand Up @@ -125,6 +129,7 @@ internal final class ConnectionPool {
maxWaiters: Int,
reservationLoadThreshold: Double,
assumedMaxConcurrentStreams: Int,
connectionBackoff: ConnectionBackoff,
channelProvider: ConnectionManagerChannelProvider,
streamLender: StreamLender,
logger: GRPCLogger,
Expand All @@ -142,6 +147,7 @@ internal final class ConnectionPool {
self.waiters = CircularBuffer(initialCapacity: 16)

self.eventLoop = eventLoop
self.connectionBackoff = connectionBackoff
self.channelProvider = channelProvider
self.streamLender = streamLender
self.logger = logger
Expand All @@ -165,7 +171,7 @@ internal final class ConnectionPool {
eventLoop: self.eventLoop,
channelProvider: self.channelProvider,
callStartBehavior: .waitsForConnectivity,
connectionBackoff: ConnectionBackoff(),
connectionBackoff: self.connectionBackoff,
connectivityDelegate: self,
http2Delegate: self,
logger: self.logger.unwrapped
Expand Down
6 changes: 6 additions & 0 deletions Sources/GRPC/ConnectionPool/PoolManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ internal final class PoolManager {
return self.maxConnections * self.assumedMaxConcurrentStreams
}

@usableFromInline
var connectionBackoff: ConnectionBackoff

/// A `Channel` provider.
@usableFromInline
var channelProvider: DefaultChannelProvider
Expand All @@ -55,12 +58,14 @@ internal final class PoolManager {
maxWaiters: Int,
loadThreshold: Double,
assumedMaxConcurrentStreams: Int = 100,
connectionBackoff: ConnectionBackoff,
channelProvider: DefaultChannelProvider
) {
self.maxConnections = maxConnections
self.maxWaiters = maxWaiters
self.loadThreshold = loadThreshold
self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
self.connectionBackoff = connectionBackoff
self.channelProvider = channelProvider
}
}
Expand Down Expand Up @@ -211,6 +216,7 @@ internal final class PoolManager {
maxWaiters: configuration.maxWaiters,
reservationLoadThreshold: configuration.loadThreshold,
assumedMaxConcurrentStreams: configuration.assumedMaxConcurrentStreams,
connectionBackoff: configuration.connectionBackoff,
channelProvider: configuration.channelProvider,
streamLender: self,
logger: logger
Expand Down
1 change: 1 addition & 0 deletions Sources/GRPC/ConnectionPool/PooledChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ internal final class PooledChannel: GRPCChannel {
maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop,
loadThreshold: configuration.connectionPool.reservationLoadThreshold,
assumedMaxConcurrentStreams: 100,
connectionBackoff: configuration.connectionBackoff,
channelProvider: provider
),
logger: configuration.backgroundActivityLogger.wrapped
Expand Down
69 changes: 69 additions & 0 deletions Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ final class ConnectionPoolTests: GRPCTestCase {
waiters: Int = 1000,
reservationLoadThreshold: Double = 0.9,
now: @escaping () -> NIODeadline = { .now() },
connectionBackoff: ConnectionBackoff = ConnectionBackoff(),
onReservationReturned: @escaping (Int) -> Void = { _ in },
onMaximumReservationsChange: @escaping (Int) -> Void = { _ in },
channelProvider: ConnectionManagerChannelProvider
Expand All @@ -61,6 +62,7 @@ final class ConnectionPoolTests: GRPCTestCase {
maxWaiters: waiters,
reservationLoadThreshold: reservationLoadThreshold,
assumedMaxConcurrentStreams: 100,
connectionBackoff: connectionBackoff,
channelProvider: channelProvider,
streamLender: HookedStreamLender(
onReturnStreams: onReservationReturned,
Expand All @@ -85,6 +87,7 @@ final class ConnectionPoolTests: GRPCTestCase {
waiters: Int = 1000,
reservationLoadThreshold: Double = 0.9,
now: @escaping () -> NIODeadline = { .now() },
connectionBackoff: ConnectionBackoff = ConnectionBackoff(),
onReservationReturned: @escaping (Int) -> Void = { _ in },
onMaximumReservationsChange: @escaping (Int) -> Void = { _ in }
) -> (ConnectionPool, ChannelController) {
Expand All @@ -93,6 +96,7 @@ final class ConnectionPoolTests: GRPCTestCase {
waiters: waiters,
reservationLoadThreshold: reservationLoadThreshold,
now: now,
connectionBackoff: connectionBackoff,
onReservationReturned: onReservationReturned,
onMaximumReservationsChange: onMaximumReservationsChange,
channelProvider: controller
Expand Down Expand Up @@ -631,6 +635,71 @@ final class ConnectionPoolTests: GRPCTestCase {
XCTAssertEqual(pool.sync.reservedStreams, 0)
XCTAssertEqual(pool.sync.availableStreams, 100)
}

func testBackoffIsUsedForReconnections() {
// Fix backoff to always be 1 second.
let backoff = ConnectionBackoff(
initialBackoff: 1.0,
maximumBackoff: 1.0,
multiplier: 1.0,
jitter: 0.0
)

let (pool, controller) = self.setUpPoolAndController(connectionBackoff: backoff)
pool.initialize(connections: 1)
XCTAssertEqual(pool.sync.connections, 1)

let w1 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
$0.eventLoop.makeSucceededVoidFuture()
}
// Start creating the channel.
self.eventLoop.run()

// Make the connection 'ready'.
controller.connectChannel(atIndex: 0)
controller.sendSettingsToChannel(atIndex: 0)
self.eventLoop.run()
XCTAssertNoThrow(try w1.wait())
controller.openStreamInChannel(atIndex: 0)

// Close the connection. It should hit the transient failure state.
controller.fireChannelInactiveForChannel(atIndex: 0)
// Now nothing is available in the pool.
XCTAssertEqual(pool.sync.waiters, 0)
XCTAssertEqual(pool.sync.availableStreams, 0)
XCTAssertEqual(pool.sync.reservedStreams, 0)
XCTAssertEqual(pool.sync.idleConnections, 0)

// Enqueue two waiters. One to time out before the reconnect happens.
let w2 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
$0.eventLoop.makeSucceededVoidFuture()
}

let w3 = pool.makeStream(
deadline: .uptimeNanoseconds(UInt64(TimeAmount.milliseconds(500).nanoseconds)),
logger: self.logger.wrapped
) {
$0.eventLoop.makeSucceededVoidFuture()
}

XCTAssertEqual(pool.sync.waiters, 2)

// Time out w3.
self.eventLoop.advanceTime(by: .milliseconds(500))
XCTAssertThrowsError(try w3.wait())
XCTAssertEqual(pool.sync.waiters, 1)

// Wait a little more for the backoff to pass. The controller should now have a second channel.
self.eventLoop.advanceTime(by: .milliseconds(500))
XCTAssertEqual(controller.count, 2)

// Start up the next channel.
controller.connectChannel(atIndex: 1)
controller.sendSettingsToChannel(atIndex: 1)
self.eventLoop.run()
XCTAssertNoThrow(try w2.wait())
controller.openStreamInChannel(atIndex: 1)
}
}

// MARK: - Helpers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ class PoolManagerStateMachineTests: GRPCTestCase {
maxWaiters: Int = 100,
maxConcurrentStreams: Int = 100,
loadThreshold: Double = 0.9,
connectionBackoff: ConnectionBackoff = ConnectionBackoff(),
makeChannel: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture<Channel>
) -> ConnectionPool {
return ConnectionPool(
eventLoop: eventLoop,
maxWaiters: maxWaiters,
reservationLoadThreshold: loadThreshold,
assumedMaxConcurrentStreams: maxConcurrentStreams,
connectionBackoff: connectionBackoff,
channelProvider: HookedChannelProvider(makeChannel),
streamLender: HookedStreamLender(
onReturnStreams: { _ in },
Expand Down