diff --git a/Sources/GRPC/ConnectionManager.swift b/Sources/GRPC/ConnectionManager.swift index d30d12ee0..93cbc7527 100644 --- a/Sources/GRPC/ConnectionManager.swift +++ b/Sources/GRPC/ConnectionManager.swift @@ -338,12 +338,12 @@ internal final class ConnectionManager: @unchecked Sendable { /// A logger. internal var logger: Logger - private let connectionID: String + internal let id: ConnectionManagerID private var channelNumber: UInt64 private var channelNumberLock = NIOLock() private var _connectionIDAndNumber: String { - return "\(self.connectionID)/\(self.channelNumber)" + return "\(self.id)/\(self.channelNumber)" } private var connectionIDAndNumber: String { @@ -394,7 +394,7 @@ internal final class ConnectionManager: @unchecked Sendable { ) { // Setup the logger. var logger = logger - let connectionID = UUID().uuidString + let connectionID = ConnectionManagerID() let channelNumber: UInt64 = 0 logger[metadataKey: MetadataKey.connectionID] = "\(connectionID)/\(channelNumber)" @@ -408,7 +408,7 @@ internal final class ConnectionManager: @unchecked Sendable { self.http2Delegate = http2Delegate self.idleBehavior = idleBehavior - self.connectionID = connectionID + self.id = connectionID self.channelNumber = channelNumber self.logger = logger } diff --git a/Sources/GRPC/ConnectionPool/ConnectionManagerID.swift b/Sources/GRPC/ConnectionPool/ConnectionManagerID.swift index 33a7220ed..a0ce0519f 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionManagerID.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionManagerID.swift @@ -14,25 +14,20 @@ * limitations under the License. */ +import struct Foundation.UUID + @usableFromInline internal struct ConnectionManagerID: Hashable, CustomStringConvertible, Sendable { @usableFromInline - internal let _id: ObjectIdentifier + internal let id: String @usableFromInline - internal init(_ manager: ConnectionManager) { - self._id = ObjectIdentifier(manager) + internal init() { + self.id = UUID().uuidString } @usableFromInline internal var description: String { - return String(describing: self._id) - } -} - -extension ConnectionManager { - @usableFromInline - internal var id: ConnectionManagerID { - return ConnectionManagerID(self) + return String(describing: self.id) } } diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool.swift b/Sources/GRPC/ConnectionPool/ConnectionPool.swift index 9ea75b405..4b678f15e 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool.swift @@ -13,6 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +import Atomics import Logging import NIOConcurrencyHelpers import NIOCore @@ -108,12 +110,16 @@ internal final class ConnectionPool { /// A logger which always sets "GRPC" as its source. @usableFromInline - private(set) var logger: GRPCLogger + internal let logger: GRPCLogger /// Returns `NIODeadline` representing 'now'. This is useful for testing. @usableFromInline internal let now: () -> NIODeadline + /// The ID of this sub-pool. + @usableFromInline + internal let id: GRPCSubPoolID + /// Logging metadata keys. @usableFromInline internal enum Metadata { @@ -190,8 +196,14 @@ internal final class ConnectionPool { self.channelProvider = channelProvider self.streamLender = streamLender self.delegate = delegate - self.logger = logger self.now = now + + let id = GRPCSubPoolID.next() + var logger = logger + logger[metadataKey: Metadata.id] = "\(id)" + + self.id = id + self.logger = logger } /// Initialize the connection pool. @@ -199,7 +211,6 @@ internal final class ConnectionPool { /// - Parameter connections: The number of connections to add to the pool. internal func initialize(connections: Int) { assert(self._connections.isEmpty) - self.logger.logger[metadataKey: Metadata.id] = "\(ObjectIdentifier(self))" self.logger.debug( "initializing new sub-pool", metadata: [ @@ -628,6 +639,46 @@ internal final class ConnectionPool { promise.succeed(()) } } + + internal func stats() -> EventLoopFuture { + let promise = self.eventLoop.makePromise(of: GRPCSubPoolStats.self) + + if self.eventLoop.inEventLoop { + self._stats(promise: promise) + } else { + self.eventLoop.execute { + self._stats(promise: promise) + } + } + + return promise.futureResult + } + + private func _stats(promise: EventLoopPromise) { + self.eventLoop.assertInEventLoop() + + var stats = GRPCSubPoolStats(id: self.id) + + for connection in self._connections.values { + let sync = connection.manager.sync + if sync.isIdle { + stats.connectionStates.idle += 1 + } else if sync.isConnecting { + stats.connectionStates.connecting += 1 + } else if sync.isReady { + stats.connectionStates.ready += 1 + } else if sync.isTransientFailure { + stats.connectionStates.transientFailure += 1 + } + + stats.streamsInUse += connection.reservedStreams + stats.streamsFreeToUse += connection.availableStreams + } + + stats.rpcsWaiting += self.waiters.count + + promise.succeed(stats) + } } extension ConnectionPool: ConnectionManagerConnectivityDelegate { diff --git a/Sources/GRPC/ConnectionPool/ConnectionPoolIDs.swift b/Sources/GRPC/ConnectionPool/ConnectionPoolIDs.swift new file mode 100644 index 000000000..350189172 --- /dev/null +++ b/Sources/GRPC/ConnectionPool/ConnectionPoolIDs.swift @@ -0,0 +1,59 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Atomics + +enum RawID { + private static let source = ManagedAtomic(0) + + static func next() -> Int { + self.source.loadThenWrappingIncrement(ordering: .relaxed) + } +} + +/// The ID of a connection pool. +public struct GRPCConnectionPoolID: Hashable, Sendable, CustomStringConvertible { + private var rawValue: Int + + private init(rawValue: Int) { + self.rawValue = rawValue + } + + public static func next() -> Self { + return Self(rawValue: RawID.next()) + } + + public var description: String { + "ConnectionPool(\(self.rawValue))" + } +} + +/// The ID of a sub-pool in a connection pool. +public struct GRPCSubPoolID: Hashable, Sendable, CustomStringConvertible { + private var rawValue: Int + + private init(rawValue: Int) { + self.rawValue = rawValue + } + + public static func next() -> Self { + return Self(rawValue: RawID.next()) + } + + public var description: String { + "SubPool(\(self.rawValue))" + } +} diff --git a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift index 0af26fecc..8a6cede36 100644 --- a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift +++ b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift @@ -179,6 +179,11 @@ extension GRPCChannelPool { /// pool. public var delegate: GRPCConnectionPoolDelegate? + /// The period at which connection pool stats are published to the ``delegate``. + /// + /// Ignored if either this value or ``delegate`` are `nil`. + public var statsPeriod: TimeAmount? + /// A logger used for background activity, such as connection state changes. public var backgroundActivityLogger = Logger( label: "io.grpc", @@ -354,7 +359,7 @@ public protocol GRPCConnectionPoolDelegate: Sendable { /// time and is reported via ``connectionUtilizationChanged(id:streamsUsed:streamCapacity:)``. The func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) - /// The utlization of the connection changed; a stream may have been used, returned or the + /// The utilization of the connection changed; a stream may have been used, returned or the /// maximum number of concurrent streams available on the connection changed. func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) @@ -365,4 +370,66 @@ public protocol GRPCConnectionPoolDelegate: Sendable { /// The connection was closed. The connection may be established again in the future (notified /// via ``startedConnecting(id:)``). func connectionClosed(id: GRPCConnectionID, error: Error?) + + /// Stats about the current state of the connection pool. + /// + /// Each ``GRPCConnectionPoolStats`` includes the stats for a sub-pool. Each sub-pool is tied + /// to an `EventLoop`. + /// + /// Unlike the other delegate methods, this is called periodically based on the value + /// of ``GRPCChannelPool/Configuration/statsPeriod``. + func connectionPoolStats(_ stats: [GRPCSubPoolStats], id: GRPCConnectionPoolID) +} + +extension GRPCConnectionPoolDelegate { + public func connectionPoolStats(_ stats: [GRPCSubPoolStats], id: GRPCConnectionPoolID) { + // Default conformance to avoid breaking changes. + } +} + +public struct GRPCSubPoolStats: Sendable, Hashable { + public struct ConnectionStates: Sendable, Hashable { + /// The number of idle connections. + public var idle: Int + /// The number of connections trying to establish a connection. + public var connecting: Int + /// The number of connections which are ready to use. + public var ready: Int + /// The number of connections which are backing off waiting to attempt to connect. + public var transientFailure: Int + + public init() { + self.idle = 0 + self.connecting = 0 + self.ready = 0 + self.transientFailure = 0 + } + } + + /// The ID of the subpool. + public var id: GRPCSubPoolID + + /// Counts of connection states. + public var connectionStates: ConnectionStates + + /// The number of streams currently being used. + public var streamsInUse: Int + + /// The number of streams which are currently free to use. + /// + /// The sum of this value and `streamsInUse` gives the capacity of the pool. + public var streamsFreeToUse: Int + + /// The number of RPCs currently waiting for a stream. + /// + /// RPCs waiting for a stream are also known as 'waiters'. + public var rpcsWaiting: Int + + public init(id: GRPCSubPoolID) { + self.id = id + self.connectionStates = ConnectionStates() + self.streamsInUse = 0 + self.streamsFreeToUse = 0 + self.rpcsWaiting = 0 + } } diff --git a/Sources/GRPC/ConnectionPool/PoolManager.swift b/Sources/GRPC/ConnectionPool/PoolManager.swift index 593d11e7d..27729c01b 100644 --- a/Sources/GRPC/ConnectionPool/PoolManager.swift +++ b/Sources/GRPC/ConnectionPool/PoolManager.swift @@ -63,6 +63,9 @@ internal final class PoolManager { @usableFromInline var delegate: GRPCConnectionPoolDelegate? + @usableFromInline + var statsPeriod: TimeAmount? + @usableFromInline internal init( maxConnections: Int, @@ -72,7 +75,8 @@ internal final class PoolManager { assumedMaxConcurrentStreams: Int = 100, connectionBackoff: ConnectionBackoff, channelProvider: DefaultChannelProvider, - delegate: GRPCConnectionPoolDelegate? + delegate: GRPCConnectionPoolDelegate?, + statsPeriod: TimeAmount? ) { self.maxConnections = maxConnections self.maxWaiters = maxWaiters @@ -82,6 +86,7 @@ internal final class PoolManager { self.connectionBackoff = connectionBackoff self.channelProvider = channelProvider self.delegate = delegate + self.statsPeriod = statsPeriod } } @@ -113,6 +118,9 @@ internal final class PoolManager { @usableFromInline internal let group: EventLoopGroup + @usableFromInline + internal let id: GRPCConnectionPoolID + /// Make a new pool manager and initialize it. /// /// The pool manager manages one connection pool per event loop in the provided `EventLoopGroup`. @@ -140,6 +148,7 @@ internal final class PoolManager { self._state = PoolManagerStateMachine(.inactive) self._pools = [] self.group = group + self.id = .next() // The pool relies on the identity of each `EventLoop` in the `EventLoopGroup` being unique. In // practice this is unlikely to happen unless a custom `EventLoopGroup` is constructed, because @@ -158,7 +167,7 @@ internal final class PoolManager { self.lock.withLock { assert( self._state.isShutdownOrShuttingDown, - "The pool manager (\(ObjectIdentifier(self))) must be shutdown before going out of scope." + "The pool manager (\(self.id)) must be shutdown before going out of scope." ) } } @@ -175,7 +184,7 @@ internal final class PoolManager { logger: GRPCLogger ) { var logger = logger - logger[metadataKey: Metadata.id] = "\(ObjectIdentifier(self))" + logger[metadataKey: Metadata.id] = "\(self.id)" let pools = self.makePools(perPoolConfiguration: configuration, logger: logger) @@ -200,13 +209,27 @@ internal final class PoolManager { ) } + let statsTask: RepeatedTask? + if let period = configuration.statsPeriod, let delegate = configuration.delegate { + let loop = self.group.next() + statsTask = loop.scheduleRepeatedTask(initialDelay: period, delay: period) { _ in + self.emitStats(delegate: delegate) + } + } else { + statsTask = nil + } + self.lock.withLock { assert(self._pools.isEmpty) self._pools = pools // We'll blow up if we've already been initialized, that's fine, we don't allow callers to // call `initialize` directly. - self._state.activatePools(keyedBy: poolKeys, assumingPerPoolCapacity: assumedCapacity) + self._state.activatePools( + keyedBy: poolKeys, + assumingPerPoolCapacity: assumedCapacity, + statsTask: statsTask + ) } for pool in pools { @@ -331,7 +354,8 @@ internal final class PoolManager { } switch (action, pools) { - case let (.shutdownPools, .some(pools)): + case let (.shutdownPools(statsTask), .some(pools)): + statsTask?.cancel(promise: nil) promise.futureResult.whenComplete { _ in self.shutdownComplete() } EventLoopFuture.andAllSucceed(pools.map { $0.shutdown(mode: mode) }, promise: promise) @@ -353,6 +377,18 @@ internal final class PoolManager { self._state.shutdownComplete() } } + + // MARK: - Stats + + private func emitStats(delegate: GRPCConnectionPoolDelegate) { + let pools = self.lock.withLock { self._pools } + if pools.isEmpty { return } + + let statsFutures = pools.map { $0.stats() } + EventLoopFuture.whenAllSucceed(statsFutures, on: self.group.any()).whenSuccess { stats in + delegate.connectionPoolStats(stats, id: self.id) + } + } } // MARK: - Connection Pool to Pool Manager diff --git a/Sources/GRPC/ConnectionPool/PoolManagerStateMachine.swift b/Sources/GRPC/ConnectionPool/PoolManagerStateMachine.swift index 5f0484a33..c3674a8f0 100644 --- a/Sources/GRPC/ConnectionPool/PoolManagerStateMachine.swift +++ b/Sources/GRPC/ConnectionPool/PoolManagerStateMachine.swift @@ -40,10 +40,14 @@ internal struct PoolManagerStateMachine { @usableFromInline internal var pools: [EventLoopID: PerPoolState] + @usableFromInline + internal var statsTask: RepeatedTask? + @usableFromInline internal init( poolKeys: [PoolManager.ConnectionPoolKey], - assumedMaxAvailableStreamsPerPool: Int + assumedMaxAvailableStreamsPerPool: Int, + statsTask: RepeatedTask? ) { self.pools = Dictionary( uniqueKeysWithValues: poolKeys.map { key in @@ -54,6 +58,7 @@ internal struct PoolManagerStateMachine { return (key.eventLoopID, value) } ) + self.statsTask = statsTask } } @@ -92,12 +97,18 @@ internal struct PoolManagerStateMachine { @usableFromInline internal mutating func activatePools( keyedBy keys: [PoolManager.ConnectionPoolKey], - assumingPerPoolCapacity capacity: Int + assumingPerPoolCapacity capacity: Int, + statsTask: RepeatedTask? ) { self.modifyingState { state in switch state { case .inactive: - state = .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: capacity)) + let active = ActiveState( + poolKeys: keys, + assumedMaxAvailableStreamsPerPool: capacity, + statsTask: statsTask + ) + state = .active(active) case .active, .shuttingDown, .shutdown, ._modifying: preconditionFailure() @@ -180,7 +191,7 @@ internal struct PoolManagerStateMachine { } enum ShutdownAction { - case shutdownPools + case shutdownPools(RepeatedTask?) case alreadyShutdown case alreadyShuttingDown(EventLoopFuture) } @@ -192,9 +203,9 @@ internal struct PoolManagerStateMachine { state = .shutdown return .alreadyShutdown - case .active: + case .active(let active): state = .shuttingDown(promise.futureResult) - return .shutdownPools + return .shutdownPools(active.statsTask) case let .shuttingDown(future): return .alreadyShuttingDown(future) diff --git a/Sources/GRPC/ConnectionPool/PooledChannel.swift b/Sources/GRPC/ConnectionPool/PooledChannel.swift index a8715070e..4d2b06c49 100644 --- a/Sources/GRPC/ConnectionPool/PooledChannel.swift +++ b/Sources/GRPC/ConnectionPool/PooledChannel.swift @@ -101,7 +101,8 @@ internal final class PooledChannel: GRPCChannel { assumedMaxConcurrentStreams: 100, connectionBackoff: configuration.connectionBackoff, channelProvider: provider, - delegate: configuration.delegate + delegate: configuration.delegate, + statsPeriod: configuration.statsPeriod ), logger: configuration.backgroundActivityLogger.wrapped ) diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolDelegates.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolDelegates.swift index 0de3de52d..cb0a677cd 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolDelegates.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolDelegates.swift @@ -104,8 +104,9 @@ final class EventRecordingConnectionPoolDelegate: GRPCConnectionPoolDelegate { case connectionUtilizationChanged(GRPCConnectionID, Int, Int) case connectionQuiescing(GRPCConnectionID) case connectionRemoved(GRPCConnectionID) + case stats([GRPCSubPoolStats], GRPCConnectionPoolID) - var id: GRPCConnectionID { + var id: GRPCConnectionID? { switch self { case let .connectionAdded(id), let .startedConnecting(id), @@ -116,6 +117,8 @@ final class EventRecordingConnectionPoolDelegate: GRPCConnectionPoolDelegate { let .connectionQuiescing(id), let .connectionRemoved(id): return id + case .stats: + return nil } } } @@ -139,6 +142,13 @@ final class EventRecordingConnectionPoolDelegate: GRPCConnectionPoolDelegate { } } + func removeAll() -> CircularBuffer { + return self.lock.withLock { + defer { self.events.removeAll() } + return self.events + } + } + func connectionAdded(id: GRPCConnectionID) { self.lock.withLock { self.events.append(.connectionAdded(id)) @@ -186,6 +196,12 @@ final class EventRecordingConnectionPoolDelegate: GRPCConnectionPoolDelegate { self.events.append(.connectionRemoved(id)) } } + + func connectionPoolStats(_ stats: [GRPCSubPoolStats], id: GRPCConnectionPoolID) { + self.lock.withLock { + self.events.append(.stats(stats, id)) + } + } } extension EventRecordingConnectionPoolDelegate: @unchecked Sendable {} diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift index 65bb416cd..53d44b677 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift @@ -1046,8 +1046,7 @@ final class ConnectionPoolTests: GRPCTestCase { // Two connections must be removed. for _ in 0 ..< 2 { if let event = recorder.popFirst() { - let id = event.id - XCTAssertEqual(event, .connectionRemoved(id)) + XCTAssertEqual(event, event.id.map { .connectionRemoved($0) }) } else { XCTFail("Expected .connectionRemoved") } diff --git a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift index 52b3f9377..90b53057d 100644 --- a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift @@ -589,5 +589,34 @@ final class GRPCChannelPoolTests: GRPCTestCase { } XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(rpcs, on: self.group.any()).wait()) } + + func testDelegateGetsCalledWithStats() throws { + let recorder = EventRecordingConnectionPoolDelegate() + + self.configureEventLoopGroup(threads: 4) + self.startServer(withTLS: false) + self.startChannel(withTLS: false) { + $0.statsPeriod = .milliseconds(1) + $0.delegate = recorder + } + + let scheduled = self.group.next().scheduleTask(in: .milliseconds(100)) { + _ = self.channel?.close() + } + + try scheduled.futureResult.wait() + + let events = recorder.removeAll() + let statsEvents = events.compactMap { event in + switch event { + case .stats(let stats, _): + return stats + default: + return nil + } + } + + XCTAssertGreaterThan(statsEvents.count, 0) + } } #endif // canImport(NIOSSL) diff --git a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift index 02f33fa33..096bf16b4 100644 --- a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift +++ b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift @@ -79,7 +79,7 @@ class PoolManagerStateMachineTests: GRPCTestCase { let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1) let keys = self.makeConnectionPoolKeys(for: pools) var state = PoolManagerStateMachine( - .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100)) + .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100, statsTask: nil)) ) for (index, loop) in group.loops.enumerated() { @@ -99,7 +99,7 @@ class PoolManagerStateMachineTests: GRPCTestCase { let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1) let keys = self.makeConnectionPoolKeys(for: pools) var state = PoolManagerStateMachine( - .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100)) + .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100, statsTask: nil)) ) let anotherLoop = EmbeddedEventLoop() @@ -118,7 +118,7 @@ class PoolManagerStateMachineTests: GRPCTestCase { let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1) let keys = self.makeConnectionPoolKeys(for: pools) var state = PoolManagerStateMachine(.inactive) - state.activatePools(keyedBy: keys, assumingPerPoolCapacity: 100) + state.activatePools(keyedBy: keys, assumingPerPoolCapacity: 100, statsTask: nil) // Reserve some streams. for (index, loop) in group.loops.enumerated() { @@ -177,7 +177,7 @@ class PoolManagerStateMachineTests: GRPCTestCase { let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1) let keys = self.makeConnectionPoolKeys(for: pools) var state = PoolManagerStateMachine( - .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100)) + .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100, statsTask: nil)) ) let reservePreferredLoop = state.reserveStream(preferringPoolWithEventLoopID: nil) @@ -230,7 +230,7 @@ class PoolManagerStateMachineTests: GRPCTestCase { let pools = self.makeInitializedPools(group: group, connectionsPerPool: 1) let keys = self.makeConnectionPoolKeys(for: pools) var state = PoolManagerStateMachine( - .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100)) + .active(.init(poolKeys: keys, assumedMaxAvailableStreamsPerPool: 100, statsTask: nil)) ) let promise = group.loops[0].makePromise(of: Void.self)