Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a minimum connections configuration to the ConnectionPool #1822

Merged
merged 6 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Sources/GRPC/ClientConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public final class ClientConnection: Sendable {
self.connectionManager = ConnectionManager(
configuration: configuration,
connectivityDelegate: monitor,
idleBehavior: .closeWhenIdleTimeout,
logger: configuration.backgroundActivityLogger
)
}
Expand Down
17 changes: 16 additions & 1 deletion Sources/GRPC/ConnectionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ import NIOHTTP2
// event loop is being used.
@usableFromInline
internal final class ConnectionManager: @unchecked Sendable {

/// Whether the connection managed by this manager should be allowed to go idle and be closed, or
/// if it should remain open indefinitely even when there are no active streams.
internal enum IdleBehavior {
case closeWhenIdleTimeout
case neverGoIdle
}

internal enum Reconnect {
case none
case after(TimeInterval)
Expand Down Expand Up @@ -324,6 +332,9 @@ internal final class ConnectionManager: @unchecked Sendable {
/// attempts should be made at all.
private let connectionBackoff: ConnectionBackoff?

/// Whether this connection should be allowed to go idle (and thus be closed when the idle timer fires).
internal let idleBehavior: IdleBehavior
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like this property is ever used?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used from the GRPCIdleHandler

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we store it here if it's only used in the idle handler? Can't we just pass it the idle handler on init?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is the min connections configuration lives/is passed in via the ConnectionPool: we need to have the context of the pool to understand how many connections are currently being kept open and decide whether we need to keep the connection from going idle or not.
The idle handler is created by the ConnectionManager in startConnecting(backoffIterator:muxPromise:connectTimeoutOverride:) for each connection, thus the manager has to be aware of the idle behaviour to be able to pass it down to the handler.


/// A logger.
internal var logger: Logger

Expand Down Expand Up @@ -356,12 +367,14 @@ internal final class ConnectionManager: @unchecked Sendable {
configuration: ClientConnection.Configuration,
channelProvider: ConnectionManagerChannelProvider? = nil,
connectivityDelegate: ConnectionManagerConnectivityDelegate?,
idleBehavior: IdleBehavior,
logger: Logger
) {
self.init(
eventLoop: configuration.eventLoopGroup.next(),
channelProvider: channelProvider ?? DefaultChannelProvider(configuration: configuration),
callStartBehavior: configuration.callStartBehavior.wrapped,
idleBehavior: idleBehavior,
connectionBackoff: configuration.connectionBackoff,
connectivityDelegate: connectivityDelegate,
http2Delegate: nil,
Expand All @@ -373,6 +386,7 @@ internal final class ConnectionManager: @unchecked Sendable {
eventLoop: EventLoop,
channelProvider: ConnectionManagerChannelProvider,
callStartBehavior: CallStartBehavior.Behavior,
idleBehavior: IdleBehavior,
connectionBackoff: ConnectionBackoff?,
connectivityDelegate: ConnectionManagerConnectivityDelegate?,
http2Delegate: ConnectionManagerHTTP2Delegate?,
Expand All @@ -392,6 +406,7 @@ internal final class ConnectionManager: @unchecked Sendable {
self.connectionBackoff = connectionBackoff
self.connectivityDelegate = connectivityDelegate
self.http2Delegate = http2Delegate
self.idleBehavior = idleBehavior

self.connectionID = connectionID
self.channelNumber = channelNumber
Expand Down Expand Up @@ -799,7 +814,7 @@ internal final class ConnectionManager: @unchecked Sendable {
// Yes, after some time.
case let .after(delay):
let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting")
// Fail the candidate mux promise. KEep the 'readyChannelMuxPromise' as we'll try again.
// Fail the candidate mux promise. Keep the 'readyChannelMuxPromise' as we'll try again.
connecting.candidateMuxPromise.fail(error)

let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
Expand Down
53 changes: 49 additions & 4 deletions Sources/GRPC/ConnectionPool/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ internal final class ConnectionPool {
@usableFromInline
internal let maxWaiters: Int

/// The number of connections in the pool that should always be kept open (i.e. they won't go idle).
/// In other words, it's the number of connections for which we should ignore idle timers.
@usableFromInline
internal let minConnections: Int

/// Configuration for backoff between subsequence connection attempts.
@usableFromInline
internal let connectionBackoff: ConnectionBackoff
Expand Down Expand Up @@ -157,6 +162,7 @@ internal final class ConnectionPool {
init(
eventLoop: EventLoop,
maxWaiters: Int,
minConnections: Int,
reservationLoadThreshold: Double,
assumedMaxConcurrentStreams: Int,
connectionBackoff: ConnectionBackoff,
Expand All @@ -176,6 +182,7 @@ internal final class ConnectionPool {

self._connections = [:]
self.maxWaiters = maxWaiters
self.minConnections = minConnections
self.waiters = CircularBuffer(initialCapacity: 16)

self.eventLoop = eventLoop
Expand All @@ -201,17 +208,25 @@ internal final class ConnectionPool {
]
)
self._connections.reserveCapacity(connections)
var numberOfKeepOpenConnections = self.minConnections
while self._connections.count < connections {
self.addConnectionToPool()
// If we have less than the minimum number of connections, don't let
// the new connection close when idle.
let idleBehavior =
numberOfKeepOpenConnections > 0
? ConnectionManager.IdleBehavior.neverGoIdle : .closeWhenIdleTimeout
numberOfKeepOpenConnections -= 1
self.addConnectionToPool(idleBehavior: idleBehavior)
}
}

/// Make and add a new connection to the pool.
private func addConnectionToPool() {
private func addConnectionToPool(idleBehavior: ConnectionManager.IdleBehavior) {
let manager = ConnectionManager(
eventLoop: self.eventLoop,
channelProvider: self.channelProvider,
callStartBehavior: .waitsForConnectivity,
idleBehavior: idleBehavior,
connectionBackoff: self.connectionBackoff,
connectivityDelegate: self,
http2Delegate: self,
Expand All @@ -220,6 +235,19 @@ internal final class ConnectionPool {
let id = manager.id
self._connections[id] = PerConnectionState(manager: manager)
self.delegate?.connectionAdded(id: .init(id))

// If it's one of the connections that should be kept open, then connect
// straight away.
switch idleBehavior {
case .neverGoIdle:
self.eventLoop.execute {
if manager.sync.isIdle {
manager.sync.startConnecting()
}
}
case .closeWhenIdleTimeout:
()
}
}

// MARK: - Called from the pool manager
Expand Down Expand Up @@ -689,8 +717,9 @@ extension ConnectionPool: ConnectionManagerConnectivityDelegate {
// Grab the number of reserved streams (before invalidating the index by adding a connection).
let reservedStreams = self._connections.values[index].reservedStreams

// Replace the connection with a new idle one.
self.addConnectionToPool()
// Replace the connection with a new idle one. Keep the idle behavior, so that
// if it's a connection that should be kept alive, we maintain it.
self.addConnectionToPool(idleBehavior: manager.idleBehavior)

// Since we're removing this connection from the pool (and no new streams can be created on
// the connection), the pool manager can ignore any streams reserved against this connection.
Expand Down Expand Up @@ -881,6 +910,22 @@ extension ConnectionPool {
return self.pool._connections.values.reduce(0) { $0 &+ ($1.manager.sync.isIdle ? 1 : 0) }
}

/// The number of active (i.e. connecting or ready) connections in the pool.
internal var activeConnections: Int {
self.pool.eventLoop.assertInEventLoop()
return self.pool._connections.values.reduce(0) {
$0 &+ (($1.manager.sync.isReady || $1.manager.sync.isConnecting) ? 1 : 0)
}
}

/// The number of connections in the pool in transient failure state.
internal var transientFailureConnections: Int {
self.pool.eventLoop.assertInEventLoop()
return self.pool._connections.values.reduce(0) {
$0 &+ ($1.manager.sync.isTransientFailure ? 1 : 0)
}
}

Comment on lines +913 to +928
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these are unused now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I'm using them in tests.

/// The number of streams currently available to reserve across all connections in the pool.
internal var availableStreams: Int {
self.pool.eventLoop.assertInEventLoop()
Expand Down
4 changes: 4 additions & 0 deletions Sources/GRPC/ConnectionPool/GRPCChannelPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ extension GRPCChannelPool.Configuration {
/// Defaults to 100.
public var maxWaitersPerEventLoop: Int = 100

/// The minimum number of connections to keep open in this pool, per EventLoop.
/// This number of connections per EventLoop will never go idle and be closed.
public var minConnections: Int = 0
gjcairo marked this conversation as resolved.
Show resolved Hide resolved

/// The maximum amount of time a caller is willing to wait for a stream for before timing out.
///
/// Defaults to 30 seconds.
Expand Down
8 changes: 8 additions & 0 deletions Sources/GRPC/ConnectionPool/PoolManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ internal final class PoolManager {
@usableFromInline
var maxWaiters: Int

/// The minimum number of connections to keep open per pool.
/// This number of connections will never go idle and be closed.
@usableFromInline
var minConnections: Int

/// A load threshold in the range `0.0 ... 1.0` beyond which another connection will be started
/// (assuming there is a connection available to start).
@usableFromInline
Expand Down Expand Up @@ -62,6 +67,7 @@ internal final class PoolManager {
internal init(
maxConnections: Int,
maxWaiters: Int,
minConnections: Int,
loadThreshold: Double,
assumedMaxConcurrentStreams: Int = 100,
connectionBackoff: ConnectionBackoff,
Expand All @@ -70,6 +76,7 @@ internal final class PoolManager {
) {
self.maxConnections = maxConnections
self.maxWaiters = maxWaiters
self.minConnections = minConnections
self.loadThreshold = loadThreshold
self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams
self.connectionBackoff = connectionBackoff
Expand Down Expand Up @@ -225,6 +232,7 @@ internal final class PoolManager {
return ConnectionPool(
eventLoop: eventLoop,
maxWaiters: configuration.maxWaiters,
minConnections: configuration.minConnections,
reservationLoadThreshold: configuration.loadThreshold,
assumedMaxConcurrentStreams: configuration.assumedMaxConcurrentStreams,
connectionBackoff: configuration.connectionBackoff,
Expand Down
1 change: 1 addition & 0 deletions Sources/GRPC/ConnectionPool/PooledChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ internal final class PooledChannel: GRPCChannel {
perPoolConfiguration: .init(
maxConnections: configuration.connectionPool.connectionsPerEventLoop,
maxWaiters: configuration.connectionPool.maxWaitersPerEventLoop,
minConnections: configuration.connectionPool.minConnections,
loadThreshold: configuration.connectionPool.reservationLoadThreshold,
assumedMaxConcurrentStreams: 100,
connectionBackoff: configuration.connectionBackoff,
Expand Down
16 changes: 11 additions & 5 deletions Sources/GRPC/GRPCIdleHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
typealias OutboundOut = HTTP2Frame

/// The amount of time to wait before closing the channel when there are no active streams.
private let idleTimeout: TimeAmount
/// If nil, then we shouldn't schedule idle tasks.
private let idleTimeout: TimeAmount?

/// The ping handler.
private var pingHandler: PingHandler
Expand Down Expand Up @@ -78,7 +79,12 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
logger: Logger
) {
self.mode = .client(connectionManager, multiplexer)
self.idleTimeout = idleTimeout
switch connectionManager.idleBehavior {
case .neverGoIdle:
self.idleTimeout = nil
case .closeWhenIdleTimeout:
self.idleTimeout = idleTimeout
}
self.stateMachine = .init(role: .client, logger: logger)
self.pingHandler = PingHandler(
pingCode: 5,
Expand Down Expand Up @@ -135,7 +141,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
}

// Handle idle timeout creation/cancellation.
if let idleTask = operations.idleTask {
if let idleTimeout = self.idleTimeout, let idleTask = operations.idleTask {
switch idleTask {
case let .cancel(task):
self.stateMachine.logger.debug("idle timeout task cancelled")
Expand All @@ -145,9 +151,9 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
if self.idleTimeout != .nanoseconds(.max), let context = self.context {
self.stateMachine.logger.debug(
"scheduling idle timeout task",
metadata: [MetadataKey.delayMs: "\(self.idleTimeout.milliseconds)"]
metadata: [MetadataKey.delayMs: "\(idleTimeout.milliseconds)"]
)
let task = context.eventLoop.scheduleTask(in: self.idleTimeout) {
let task = context.eventLoop.scheduleTask(in: idleTimeout) {
self.stateMachine.logger.debug("idle timeout task fired")
self.idleTimeoutFired()
}
Expand Down
4 changes: 4 additions & 0 deletions Tests/GRPCTests/ConnectionManagerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ConnectionManagerTests: GRPCTestCase {
configuration: configuration,
channelProvider: channelProvider.map { HookedChannelProvider($0) },
connectivityDelegate: self.monitor,
idleBehavior: .closeWhenIdleTimeout,
logger: self.logger
)
}
Expand Down Expand Up @@ -948,6 +949,7 @@ extension ConnectionManagerTests {
return loop.makeFailedFuture(DoomedChannelError())
},
connectivityDelegate: nil,
idleBehavior: .closeWhenIdleTimeout,
logger: self.logger
)
let candidate = manager.getHTTP2Multiplexer()
Expand Down Expand Up @@ -1207,6 +1209,7 @@ extension ConnectionManagerTests {
return eventLoop.makeSucceededFuture(channel)
},
callStartBehavior: .waitsForConnectivity,
idleBehavior: .closeWhenIdleTimeout,
connectionBackoff: ConnectionBackoff(),
connectivityDelegate: nil,
http2Delegate: http2,
Expand Down Expand Up @@ -1383,6 +1386,7 @@ extension ConnectionManagerTests {
configuration: configuration,
channelProvider: Provider(),
connectivityDelegate: self.monitor,
idleBehavior: .closeWhenIdleTimeout,
logger: self.logger
)

Expand Down
Loading
Loading