From 2828ee77a74e7d22007c0632f8eb2c5a4b96503e Mon Sep 17 00:00:00 2001 From: George Barnett Date: Tue, 22 Aug 2023 08:53:54 +0100 Subject: [PATCH] Revert "Adopt h2handler multiplexer (#1587)" (#1641) This reverts commit 75b390e901c7d70af9b4c5ca2677e035f00cd1ab. --- .github/workflows/ci.yaml | 41 +- NOTICES.txt | 8 - Package.swift | 2 +- Sources/GRPC/ClientConnection.swift | 91 ++- Sources/GRPC/ConnectionManager.swift | 41 +- .../ConnectionPool+PerConnectionState.swift | 6 +- .../ConnectionPool+Waiter.swift | 6 +- .../GRPC/ConnectionPool/ConnectionPool.swift | 14 +- Sources/GRPC/ConnectionPool/PoolManager.swift | 2 +- Sources/GRPC/GRPCClientChannelHandler.swift | 4 +- Sources/GRPC/GRPCIdleHandler.swift | 102 +-- .../GRPC/GRPCServerPipelineConfigurator.swift | 67 +- Sources/GRPC/Server.swift | 6 +- Tests/GRPCTests/ClientTimeoutTests.swift | 7 +- Tests/GRPCTests/ConnectionManagerTests.swift | 588 ++++++++---------- .../ConnectionPoolDelegates.swift | 52 -- .../ConnectionPool/ConnectionPoolTests.swift | 229 +------ .../ConnectionPool/GRPCChannelPoolTests.swift | 83 +-- 18 files changed, 483 insertions(+), 866 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 5c8e754de..f91e7d497 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -57,45 +57,44 @@ jobs: include: - image: swiftlang/swift:nightly-focal env: - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 246000 - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 138000 + MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 323000 + MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 161000 MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 110000 MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 65000 MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 61000 - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 129000 - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 136000 - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 136000 + MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 163000 + MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 170000 + MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 170000 - image: swift:5.8-jammy env: - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 246000 - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 138000 + MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 323000 + MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 161000 MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 110000 MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 65000 MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 61000 - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 129000 - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 136000 - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 136000 + MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 163000 + MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 170000 + MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 170000 - image: swift:5.7-jammy env: - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 246000 - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 138000 + MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 323000 + MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 161000 MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 110000 MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 65000 MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 61000 - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 129000 - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 136000 - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 136000 + MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 163000 + MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 170000 + MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 170000 - image: swift:5.6-focal env: - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 247000 - MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 139000 + MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 324000 + MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 162000 MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 110000 MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 65000 MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 61000 - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 130000 - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 137000 - MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 137000 - + MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 164000 + MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 171000 + MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 171000 name: Performance Tests on ${{ matrix.image }} runs-on: ubuntu-latest container: diff --git a/NOTICES.txt b/NOTICES.txt index a0898dd80..f1ff7bbab 100644 --- a/NOTICES.txt +++ b/NOTICES.txt @@ -25,11 +25,3 @@ framework: 'test_01_allocation_counts.sh', 'run-nio-alloc-counter-tests.sh' and * https://github.com/apple/swift-nio/blob/main/LICENSE.txt * HOMEPAGE: * https://github.com/apple/swift-nio - -This product contains a simplified derivation of SwiftNIO HTTP/2's -'HTTP2FrameEncoder' for testing purposes. - - * LICENSE (Apache License 2.0): - * https://github.com/apple/swift-nio-http2/blob/main/LICENSE.txt - * HOMEPAGE: - * https://github.com/apple/swift-nio-http2 diff --git a/Package.swift b/Package.swift index 393599af8..2c89b5742 100644 --- a/Package.swift +++ b/Package.swift @@ -36,7 +36,7 @@ let packageDependencies: [Package.Dependency] = [ ), .package( url: "https://github.com/apple/swift-nio-http2.git", - from: "1.26.0" + from: "1.24.1" ), .package( url: "https://github.com/apple/swift-nio-transport-services.git", diff --git a/Sources/GRPC/ClientConnection.swift b/Sources/GRPC/ClientConnection.swift index ef997ad30..9b399d36f 100644 --- a/Sources/GRPC/ClientConnection.swift +++ b/Sources/GRPC/ClientConnection.swift @@ -58,25 +58,28 @@ import SwiftProtobuf /// │ DelegatingErrorHandler │ /// └──────────▲───────────────┘ /// HTTP2Frame│ -/// │ -/// │ -/// │ -/// │ -/// │ -/// HTTP2Frame│ ⠇ ⠇ ⠇ ⠇ ⠇ -/// ┌─┴──────────────────▼─┐ ┌┴─▼┐ ┌┴─▼┐ -/// │ GRPCIdleHandler │ │ | │ | HTTP/2 streams -/// └─▲──────────────────┬─┘ └▲─┬┘ └▲─┬┘ -/// HTTP2Frame│ │ │ │ │ │ HTTP2Frame -/// ┌─┴──────────────────▼────────┴─▼───┴─▼┐ -/// │ NIOHTTP2Handler │ -/// └─▲──────────────────────────────────┬─┘ -/// ByteBuffer│ │ByteBuffer -/// ┌─┴──────────────────────────────────▼─┐ -/// │ NIOSSLHandler │ -/// └─▲──────────────────────────────────┬─┘ -/// ByteBuffer│ │ByteBuffer -/// │ ▼ +/// │ ⠇ ⠇ ⠇ ⠇ +/// │ ┌┴─▼┐ ┌┴─▼┐ +/// │ │ | │ | HTTP/2 streams +/// │ └▲─┬┘ └▲─┬┘ +/// │ │ │ │ │ HTTP2Frame +/// ┌─┴────────────────┴─▼───┴─▼┐ +/// │ HTTP2StreamMultiplexer | +/// └─▲───────────────────────┬─┘ +/// HTTP2Frame│ │HTTP2Frame +/// ┌─┴───────────────────────▼─┐ +/// │ GRPCIdleHandler │ +/// └─▲───────────────────────┬─┘ +/// HTTP2Frame│ │HTTP2Frame +/// ┌─┴───────────────────────▼─┐ +/// │ NIOHTTP2Handler │ +/// └─▲───────────────────────┬─┘ +/// ByteBuffer│ │ByteBuffer +/// ┌─┴───────────────────────▼─┐ +/// │ NIOSSLHandler │ +/// └─▲───────────────────────┬─┘ +/// ByteBuffer│ │ByteBuffer +/// │ ▼ /// /// The 'GRPCIdleHandler' intercepts HTTP/2 frames and various events and is responsible for /// informing and controlling the state of the connection (idling and keepalive). The HTTP/2 streams @@ -85,7 +88,7 @@ public final class ClientConnection: Sendable { private let connectionManager: ConnectionManager /// HTTP multiplexer from the underlying channel handling gRPC calls. - internal func getMultiplexer() -> EventLoopFuture { + internal func getMultiplexer() -> EventLoopFuture { return self.connectionManager.getHTTP2Multiplexer() } @@ -247,7 +250,7 @@ extension ClientConnection: GRPCChannel { } private static func makeStreamChannel( - using result: Result, + using result: Result, promise: EventLoopPromise ) { switch result { @@ -618,31 +621,29 @@ extension ChannelPipeline.SynchronousOperations { HTTP2Setting(parameter: .initialWindowSize, value: httpTargetWindowSize), ] - let grpcIdleHandler = GRPCIdleHandler( + // We could use 'configureHTTP2Pipeline' here, but we need to add a few handlers between the + // two HTTP/2 handlers so we'll do it manually instead. + try self.addHandler(NIOHTTP2Handler(mode: .client, initialSettings: initialSettings)) + + let h2Multiplexer = HTTP2StreamMultiplexer( + mode: .client, + channel: channel, + targetWindowSize: httpTargetWindowSize, + inboundStreamInitializer: nil + ) + + // The multiplexer is passed through the idle handler so it is only reported on + // successful channel activation - with happy eyeballs multiple pipelines can + // be constructed so it's not safe to report just yet. + try self.addHandler(GRPCIdleHandler( connectionManager: connectionManager, + multiplexer: h2Multiplexer, idleTimeout: connectionIdleTimeout, keepalive: connectionKeepalive, logger: logger - ) - - var connectionConfiguration = NIOHTTP2Handler.ConnectionConfiguration() - connectionConfiguration.initialSettings = initialSettings - var streamConfiguration = NIOHTTP2Handler.StreamConfiguration() - streamConfiguration.targetWindowSize = httpTargetWindowSize - let h2Handler = NIOHTTP2Handler( - mode: .client, - eventLoop: channel.eventLoop, - connectionConfiguration: connectionConfiguration, - streamConfiguration: streamConfiguration, - streamDelegate: grpcIdleHandler - ) { channel in - channel.close() - } - try self.addHandler(h2Handler) - - grpcIdleHandler.setMultiplexer(try h2Handler.syncMultiplexer()) - try self.addHandler(grpcIdleHandler) + )) + try self.addHandler(h2Multiplexer) try self.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate)) } } @@ -652,13 +653,7 @@ extension Channel { errorDelegate: ClientErrorDelegate?, logger: Logger ) -> EventLoopFuture { - return self.configureHTTP2Pipeline( - mode: .client, - connectionConfiguration: .init(), - streamConfiguration: .init() - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - }.flatMap { _ in + return self.configureHTTP2Pipeline(mode: .client, inboundStreamInitializer: nil).flatMap { _ in self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate)) } } diff --git a/Sources/GRPC/ConnectionManager.swift b/Sources/GRPC/ConnectionManager.swift index efb801e48..35ee41101 100644 --- a/Sources/GRPC/ConnectionManager.swift +++ b/Sources/GRPC/ConnectionManager.swift @@ -35,23 +35,19 @@ internal final class ConnectionManager: @unchecked Sendable { var reconnect: Reconnect var candidate: EventLoopFuture - var readyChannelMuxPromise: EventLoopPromise - var candidateMuxPromise: EventLoopPromise + var readyChannelMuxPromise: EventLoopPromise + var candidateMuxPromise: EventLoopPromise } internal struct ConnectedState { var backoffIterator: ConnectionBackoffIterator? var reconnect: Reconnect var candidate: Channel - var readyChannelMuxPromise: EventLoopPromise - var multiplexer: NIOHTTP2Handler.StreamMultiplexer + var readyChannelMuxPromise: EventLoopPromise + var multiplexer: HTTP2StreamMultiplexer var error: Error? - init( - from state: ConnectingState, - candidate: Channel, - multiplexer: NIOHTTP2Handler.StreamMultiplexer - ) { + init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) { self.backoffIterator = state.backoffIterator self.reconnect = state.reconnect self.candidate = candidate @@ -62,7 +58,7 @@ internal final class ConnectionManager: @unchecked Sendable { internal struct ReadyState { var channel: Channel - var multiplexer: NIOHTTP2Handler.StreamMultiplexer + var multiplexer: HTTP2StreamMultiplexer var error: Error? init(from state: ConnectedState) { @@ -73,7 +69,7 @@ internal final class ConnectionManager: @unchecked Sendable { internal struct TransientFailureState { var backoffIterator: ConnectionBackoffIterator? - var readyChannelMuxPromise: EventLoopPromise + var readyChannelMuxPromise: EventLoopPromise var scheduled: Scheduled var reason: Error @@ -256,8 +252,8 @@ internal final class ConnectionManager: @unchecked Sendable { } } - /// Returns the `NIOHTTP2Handler.StreamMultiplexer` from the 'ready' state or `nil` if it is not available. - private var multiplexer: NIOHTTP2Handler.StreamMultiplexer? { + /// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available. + private var multiplexer: HTTP2StreamMultiplexer? { self.eventLoop.assertInEventLoop() switch self.state { case let .ready(state): @@ -365,8 +361,8 @@ internal final class ConnectionManager: @unchecked Sendable { /// Get the multiplexer from the underlying channel handling gRPC calls. /// if the `ConnectionManager` was configured to be `fastFailure` this will have /// one chance to connect - if not reconnections are managed here. - internal func getHTTP2Multiplexer() -> EventLoopFuture { - func getHTTP2Multiplexer0() -> EventLoopFuture { + internal func getHTTP2Multiplexer() -> EventLoopFuture { + func getHTTP2Multiplexer0() -> EventLoopFuture { switch self.callStartBehavior { case .waitsForConnectivity: return self.getHTTP2MultiplexerPatient() @@ -386,8 +382,8 @@ internal final class ConnectionManager: @unchecked Sendable { /// Returns a future for the multiplexer which succeeded when the channel is connected. /// Reconnects are handled if necessary. - private func getHTTP2MultiplexerPatient() -> EventLoopFuture { - let multiplexer: EventLoopFuture + private func getHTTP2MultiplexerPatient() -> EventLoopFuture { + let multiplexer: EventLoopFuture switch self.state { case .idle: @@ -425,12 +421,11 @@ internal final class ConnectionManager: @unchecked Sendable { /// attempt, or if the state is 'idle' returns the future for the next connection attempt. /// /// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned. - private func getHTTP2MultiplexerOptimistic() - -> EventLoopFuture { + private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture { // `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure. self.eventLoop.preconditionInEventLoop() - let muxFuture: EventLoopFuture = { () in + let muxFuture: EventLoopFuture = { () in switch self.state { case .idle: self.startConnecting() @@ -661,7 +656,7 @@ internal final class ConnectionManager: @unchecked Sendable { } /// The connecting channel became `active`. Must be called on the `EventLoop`. - internal func channelActive(channel: Channel, multiplexer: NIOHTTP2Handler.StreamMultiplexer) { + internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) { self.eventLoop.preconditionInEventLoop() self.logger.debug("activating connection", metadata: [ "connectivity_state": "\(self.state.label)", @@ -978,7 +973,7 @@ extension ConnectionManager { private func startConnecting( backoffIterator: ConnectionBackoffIterator?, - muxPromise: EventLoopPromise + muxPromise: EventLoopPromise ) { let timeoutAndBackoff = backoffIterator?.next() @@ -1065,7 +1060,7 @@ extension ConnectionManager { /// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any /// other state. - internal var multiplexer: NIOHTTP2Handler.StreamMultiplexer? { + internal var multiplexer: HTTP2StreamMultiplexer? { return self.manager.multiplexer } diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool+PerConnectionState.swift b/Sources/GRPC/ConnectionPool/ConnectionPool+PerConnectionState.swift index 0d4211924..3ebd6fbd4 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool+PerConnectionState.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool+PerConnectionState.swift @@ -58,7 +58,7 @@ extension ConnectionPool { } @usableFromInline - var multiplexer: NIOHTTP2Handler.StreamMultiplexer + var multiplexer: HTTP2StreamMultiplexer /// Maximum number of available streams. @usableFromInline var maxAvailable: Int @@ -78,7 +78,7 @@ extension ConnectionPool { /// Increment the reserved streams and return the multiplexer. @usableFromInline - mutating func reserve() -> NIOHTTP2Handler.StreamMultiplexer { + mutating func reserve() -> HTTP2StreamMultiplexer { assert(!self.isQuiescing) self.reserved += 1 return self.multiplexer @@ -132,7 +132,7 @@ extension ConnectionPool { /// /// The result may be safely unwrapped if `self.availableStreams > 0` when reserving a stream. @usableFromInline - internal mutating func reserveStream() -> NIOHTTP2Handler.StreamMultiplexer? { + internal mutating func reserveStream() -> HTTP2StreamMultiplexer? { return self._availability?.reserve() } diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool+Waiter.swift b/Sources/GRPC/ConnectionPool/ConnectionPool+Waiter.swift index ac9b6d810..b4b386f8b 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool+Waiter.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool+Waiter.swift @@ -30,7 +30,7 @@ extension ConnectionPool { /// The channel initializer. @usableFromInline - internal let _channelInitializer: @Sendable (Channel) -> EventLoopFuture + internal let _channelInitializer: (Channel) -> EventLoopFuture /// The deadline at which the timeout is scheduled. @usableFromInline @@ -51,7 +51,7 @@ extension ConnectionPool { internal init( deadline: NIODeadline, promise: EventLoopPromise, - channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture + channelInitializer: @escaping (Channel) -> EventLoopFuture ) { self._deadline = deadline self._promise = promise @@ -83,7 +83,7 @@ extension ConnectionPool { /// Succeed the waiter with the given multiplexer. @usableFromInline - internal func succeed(with multiplexer: NIOHTTP2Handler.StreamMultiplexer) { + internal func succeed(with multiplexer: HTTP2StreamMultiplexer) { self._scheduledTimeout?.cancel() self._scheduledTimeout = nil multiplexer.createStreamChannel(promise: self._promise, self._channelInitializer) diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool.swift b/Sources/GRPC/ConnectionPool/ConnectionPool.swift index 40c633764..132b3130d 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool.swift @@ -215,7 +215,7 @@ internal final class ConnectionPool { deadline: NIODeadline, promise: EventLoopPromise, logger: GRPCLogger, - initializer: @escaping @Sendable (Channel) -> EventLoopFuture + initializer: @escaping (Channel) -> EventLoopFuture ) { if self.eventLoop.inEventLoop { self._makeStream( @@ -241,7 +241,7 @@ internal final class ConnectionPool { internal func makeStream( deadline: NIODeadline, logger: GRPCLogger, - initializer: @escaping @Sendable (Channel) -> EventLoopFuture + initializer: @escaping (Channel) -> EventLoopFuture ) -> EventLoopFuture { let promise = self.eventLoop.makePromise(of: Channel.self) self.makeStream(deadline: deadline, promise: promise, logger: logger, initializer: initializer) @@ -277,7 +277,7 @@ internal final class ConnectionPool { deadline: NIODeadline, promise: EventLoopPromise, logger: GRPCLogger, - initializer: @escaping @Sendable (Channel) -> EventLoopFuture + initializer: @escaping (Channel) -> EventLoopFuture ) { self.eventLoop.assertInEventLoop() @@ -310,7 +310,7 @@ internal final class ConnectionPool { @inlinable internal func _tryMakeStream( promise: EventLoopPromise, - initializer: @escaping @Sendable (Channel) -> EventLoopFuture + initializer: @escaping (Channel) -> EventLoopFuture ) -> Bool { // We shouldn't jump the queue. guard self.waiters.isEmpty else { @@ -344,7 +344,7 @@ internal final class ConnectionPool { deadline: NIODeadline, promise: EventLoopPromise, logger: GRPCLogger, - initializer: @escaping @Sendable (Channel) -> EventLoopFuture + initializer: @escaping (Channel) -> EventLoopFuture ) { // Don't overwhelm the pool with too many waiters. guard self.waiters.count < self.maxWaiters else { @@ -479,10 +479,10 @@ internal final class ConnectionPool { /// Reserves a stream from the connection with the most available streams, if one exists. /// - /// - Returns: The `NIOHTTP2Handler.StreamMultiplexer` from the connection the stream was reserved from, + /// - Returns: The `HTTP2StreamMultiplexer` from the connection the stream was reserved from, /// or `nil` if no stream could be reserved. @usableFromInline - internal func _reserveStreamFromMostAvailableConnection() -> NIOHTTP2Handler.StreamMultiplexer? { + internal func _reserveStreamFromMostAvailableConnection() -> HTTP2StreamMultiplexer? { let index = self._mostAvailableConnectionIndex() if index != self._connections.endIndex { diff --git a/Sources/GRPC/ConnectionPool/PoolManager.swift b/Sources/GRPC/ConnectionPool/PoolManager.swift index a6e53d59f..c9f745422 100644 --- a/Sources/GRPC/ConnectionPool/PoolManager.swift +++ b/Sources/GRPC/ConnectionPool/PoolManager.swift @@ -278,7 +278,7 @@ internal final class PoolManager { preferredEventLoop: EventLoop?, deadline: NIODeadline, logger: GRPCLogger, - streamInitializer initializer: @escaping @Sendable (Channel) -> EventLoopFuture + streamInitializer initializer: @escaping (Channel) -> EventLoopFuture ) -> PooledStreamChannel { let preferredEventLoopID = preferredEventLoop.map { EventLoopID($0) } let reservedPool = self.lock.withLock { diff --git a/Sources/GRPC/GRPCClientChannelHandler.swift b/Sources/GRPC/GRPCClientChannelHandler.swift index 15b2c3d17..1e2695b90 100644 --- a/Sources/GRPC/GRPCClientChannelHandler.swift +++ b/Sources/GRPC/GRPCClientChannelHandler.swift @@ -270,10 +270,10 @@ public enum GRPCCallType: Hashable, Sendable { /// This handler relies heavily on the `GRPCClientStateMachine` to manage the state of the request /// and response streams, which share a single HTTP/2 stream for transport. /// -/// Typical usage of this handler is with a `NIOHTTP2Handler.StreamMultiplexer` from SwiftNIO HTTP2: +/// Typical usage of this handler is with a `HTTP2StreamMultiplexer` from SwiftNIO HTTP2: /// /// ``` -/// let multiplexer: NIOHTTP2Handler.StreamMultiplexer = // ... +/// let multiplexer: HTTP2StreamMultiplexer = // ... /// multiplexer.createStreamChannel(promise: nil) { (channel, streamID) in /// let clientChannelHandler = GRPCClientChannelHandler( /// streamID: streamID, diff --git a/Sources/GRPC/GRPCIdleHandler.swift b/Sources/GRPC/GRPCIdleHandler.swift index 535304e53..4e5c1eb90 100644 --- a/Sources/GRPC/GRPCIdleHandler.swift +++ b/Sources/GRPC/GRPCIdleHandler.swift @@ -35,73 +35,24 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { private var scheduledPing: RepeatedTask? /// The mode we're operating in. - /// - /// This is a `var` to allow the client configuration state to be updated. - private var mode: Mode + private let mode: Mode private var context: ChannelHandlerContext? - /// Keeps track of the client configuration state. - /// We need two levels of configuration to break the dependency cycle with the stream multiplexer. - internal enum ClientConfigurationState { - case partial(ConnectionManager) - case complete(ConnectionManager, NIOHTTP2Handler.StreamMultiplexer) - case deinitialized - - mutating func setMultiplexer(_ multiplexer: NIOHTTP2Handler.StreamMultiplexer) { - switch self { - case let .partial(connectionManager): - self = .complete(connectionManager, multiplexer) - case .complete: - preconditionFailure("Setting the multiplexer twice is not supported.") - case .deinitialized: - preconditionFailure( - "Setting the multiplexer after removing from a channel is not supported." - ) - } - } - } - /// The mode of operation: the client tracks additional connection state in the connection /// manager. internal enum Mode { - case client(ClientConfigurationState) + case client(ConnectionManager, HTTP2StreamMultiplexer) case server - mutating func setMultiplexer(_ multiplexer: NIOHTTP2Handler.StreamMultiplexer) { - switch self { - case var .client(clientConfigurationState): - clientConfigurationState.setMultiplexer(multiplexer) - self = .client(clientConfigurationState) - case .server: - preconditionFailure("Setting the multiplexer in server mode is not supported.") - } - } - var connectionManager: ConnectionManager? { switch self { - case let .client(configurationState): - switch configurationState { - case let .complete(connectionManager, _): - return connectionManager - case let .partial(connectionManager): - return connectionManager - case .deinitialized: - return nil - } + case let .client(manager, _): + return manager case .server: return nil } } - - mutating func deinitialize() { - switch self { - case .client: - self = .client(.deinitialized) - case .server: - break // nothing to drop - } - } } /// The current state. @@ -109,11 +60,12 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { init( connectionManager: ConnectionManager, + multiplexer: HTTP2StreamMultiplexer, idleTimeout: TimeAmount, keepalive configuration: ClientConnectionKeepalive, logger: Logger ) { - self.mode = .client(.partial(connectionManager)) + self.mode = .client(connectionManager, multiplexer) self.idleTimeout = idleTimeout self.stateMachine = .init(role: .client, logger: logger) self.pingHandler = PingHandler( @@ -146,10 +98,6 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { ) } - internal func setMultiplexer(_ multiplexer: NIOHTTP2Handler.StreamMultiplexer) { - self.mode.setMultiplexer(multiplexer) - } - private func sendGoAway(lastStreamID streamID: HTTP2StreamID) { guard let context = self.context else { return @@ -291,11 +239,20 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { func handlerRemoved(context: ChannelHandlerContext) { self.context = nil - self.mode.deinitialize() } func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { - if event is ChannelShouldQuiesceEvent { + if let created = event as? NIOHTTP2StreamCreatedEvent { + self.perform(operations: self.stateMachine.streamCreated(withID: created.streamID)) + self.handlePingAction(self.pingHandler.streamCreated()) + self.mode.connectionManager?.streamOpened() + context.fireUserInboundEventTriggered(event) + } else if let closed = event as? StreamClosedEvent { + self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID)) + self.handlePingAction(self.pingHandler.streamClosed()) + self.mode.connectionManager?.streamClosed() + context.fireUserInboundEventTriggered(event) + } else if event is ChannelShouldQuiesceEvent { self.perform(operations: self.stateMachine.initiateGracefulShutdown()) // Swallow this event. } else if case let .handshakeCompleted(negotiatedProtocol) = event as? TLSUserEvent { @@ -324,15 +281,8 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { // No state machine action here. switch self.mode { - case let .client(configurationState): - switch configurationState { - case let .complete(connectionManager, multiplexer): - connectionManager.channelActive(channel: context.channel, multiplexer: multiplexer) - case .partial: - preconditionFailure("not yet initialised") - case .deinitialized: - preconditionFailure("removed from channel") - } + case let .client(connectionManager, multiplexer): + connectionManager.channelActive(channel: context.channel, multiplexer: multiplexer) case .server: () } @@ -371,20 +321,6 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { } } -extension GRPCIdleHandler: NIOHTTP2StreamDelegate { - func streamCreated(_ id: NIOHTTP2.HTTP2StreamID, channel: NIOCore.Channel) { - self.perform(operations: self.stateMachine.streamCreated(withID: id)) - self.handlePingAction(self.pingHandler.streamCreated()) - self.mode.connectionManager?.streamOpened() - } - - func streamClosed(_ id: NIOHTTP2.HTTP2StreamID, channel: NIOCore.Channel) { - self.perform(operations: self.stateMachine.streamClosed(withID: id)) - self.handlePingAction(self.pingHandler.streamClosed()) - self.mode.connectionManager?.streamClosed() - } -} - extension HTTP2SettingsParameter { internal var loggingMetadataKey: String { switch self { diff --git a/Sources/GRPC/GRPCServerPipelineConfigurator.swift b/Sources/GRPC/GRPCServerPipelineConfigurator.swift index a095c36d5..eba4f9c5a 100644 --- a/Sources/GRPC/GRPCServerPipelineConfigurator.swift +++ b/Sources/GRPC/GRPCServerPipelineConfigurator.swift @@ -78,39 +78,38 @@ final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChan } /// Makes an HTTP/2 handler. - private func makeHTTP2Handler( - for channel: Channel, - streamDelegate: NIOHTTP2StreamDelegate? - ) -> NIOHTTP2Handler { - var connectionConfiguration = NIOHTTP2Handler.ConnectionConfiguration() - connectionConfiguration.initialSettings = [ - HTTP2Setting( - parameter: .maxConcurrentStreams, - value: self.configuration.httpMaxConcurrentStreams - ), - HTTP2Setting( - parameter: .maxHeaderListSize, - value: HPACKDecoder.defaultMaxHeaderListSize - ), - HTTP2Setting( - parameter: .maxFrameSize, - value: self.configuration.httpMaxFrameSize - ), - HTTP2Setting( - parameter: .initialWindowSize, - value: self.configuration.httpTargetWindowSize - ), - ] - - var streamConfiguration = NIOHTTP2Handler.StreamConfiguration() - streamConfiguration.targetWindowSize = self.configuration.httpTargetWindowSize + private func makeHTTP2Handler() -> NIOHTTP2Handler { + return .init( + mode: .server, + initialSettings: [ + HTTP2Setting( + parameter: .maxConcurrentStreams, + value: self.configuration.httpMaxConcurrentStreams + ), + HTTP2Setting( + parameter: .maxHeaderListSize, + value: HPACKDecoder.defaultMaxHeaderListSize + ), + HTTP2Setting( + parameter: .maxFrameSize, + value: self.configuration.httpMaxFrameSize + ), + HTTP2Setting( + parameter: .initialWindowSize, + value: self.configuration.httpTargetWindowSize + ), + ] + ) + } + + /// Makes an HTTP/2 multiplexer suitable handling gRPC requests. + private func makeHTTP2Multiplexer(for channel: Channel) -> HTTP2StreamMultiplexer { + var logger = self.configuration.logger return .init( mode: .server, - eventLoop: channel.eventLoop, - connectionConfiguration: connectionConfiguration, - streamConfiguration: streamConfiguration, - streamDelegate: streamDelegate + channel: channel, + targetWindowSize: self.configuration.httpTargetWindowSize ) { stream in // Sync options were added to the HTTP/2 stream channel in 1.17.0 (we require at least this) // so this shouldn't be `nil`, but it's not a problem if it is. @@ -119,7 +118,6 @@ final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChan return String(Int(streamID)) } ?? "" - var logger = self.configuration.logger logger[metadataKey: MetadataKey.h2StreamID] = "\(streamID)" do { @@ -167,14 +165,13 @@ final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChan // to then insert our keepalive and idle handlers between. We can just add everything together. let result: Result - let idleHandler = self.makeIdleHandler() do { // This is only ever called as a result of reading a user inbound event or reading inbound so // we'll be on the right event loop and sync operations are fine. let sync = context.pipeline.syncOperations - try sync.addHandler(self.makeHTTP2Handler(for: context.channel, streamDelegate: idleHandler)) - // Here we intentionally don't associate the multiplexer with the idleHandler in the server case - try sync.addHandler(idleHandler) + try sync.addHandler(self.makeHTTP2Handler()) + try sync.addHandler(self.makeIdleHandler()) + try sync.addHandler(self.makeHTTP2Multiplexer(for: context.channel)) result = .success(()) } catch { result = .failure(error) diff --git a/Sources/GRPC/Server.swift b/Sources/GRPC/Server.swift index 5ba505954..e6adf5299 100644 --- a/Sources/GRPC/Server.swift +++ b/Sources/GRPC/Server.swift @@ -54,6 +54,10 @@ import Network /// `GRPCServerPipelineConfigurator`. In the case of HTTP/2: /// /// ┌─────────────────────────────────┐ +/// │ HTTP2StreamMultiplexer │ +/// └─▲─────────────────────────────┬─┘ +/// HTTP2Frame│ │HTTP2Frame +/// ┌─┴─────────────────────────────▼─┐ /// │ HTTP2Handler │ /// └─▲─────────────────────────────┬─┘ /// ByteBuffer│ │ByteBuffer @@ -63,7 +67,7 @@ import Network /// ByteBuffer│ │ByteBuffer /// │ ▼ /// -/// The `NIOHTTP2Handler.StreamMultiplexer` provides one `Channel` for each HTTP/2 stream (and thus each +/// The `HTTP2StreamMultiplexer` provides one `Channel` for each HTTP/2 stream (and thus each /// RPC). /// /// 3. The frames for each stream channel are routed by the `HTTP2ToRawGRPCServerCodec` handler to diff --git a/Tests/GRPCTests/ClientTimeoutTests.swift b/Tests/GRPCTests/ClientTimeoutTests.swift index ff1cef9f5..0bf157101 100644 --- a/Tests/GRPCTests/ClientTimeoutTests.swift +++ b/Tests/GRPCTests/ClientTimeoutTests.swift @@ -169,7 +169,7 @@ extension EmbeddedGRPCChannel: @unchecked Sendable {} private final class EmbeddedGRPCChannel: GRPCChannel { let embeddedChannel: EmbeddedChannel - let multiplexer: EventLoopFuture + let multiplexer: EventLoopFuture let logger: Logger let scheme: String @@ -195,11 +195,8 @@ private final class EmbeddedGRPCChannel: GRPCChannel { errorDelegate: errorDelegate, logger: logger ).flatMap { - embeddedChannel.pipeline.handler(type: NIOHTTP2Handler.self) - }.flatMap { h2Handler in - h2Handler.multiplexer + embeddedChannel.pipeline.handler(type: HTTP2StreamMultiplexer.self) } - self.scheme = "http" self.authority = "localhost" self.errorDelegate = errorDelegate diff --git a/Tests/GRPCTests/ConnectionManagerTests.swift b/Tests/GRPCTests/ConnectionManagerTests.swift index b41daf4c7..92f2333f3 100644 --- a/Tests/GRPCTests/ConnectionManagerTests.swift +++ b/Tests/GRPCTests/ConnectionManagerTests.swift @@ -116,7 +116,7 @@ extension ConnectionManagerTests { return channelPromise.futureResult } - let multiplexer: EventLoopFuture = self + let multiplexer: EventLoopFuture = self .waitForStateChange(from: .idle, to: .connecting) { let channel = manager.getHTTP2Multiplexer() self.loop.run() @@ -146,22 +146,20 @@ extension ConnectionManagerTests { // Setup the real channel and activate it. let channel = EmbeddedChannel(loop: self.loop) - let idleHandler = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger - ) - let h2handler = NIOHTTP2Handler( + let h2mux = HTTP2StreamMultiplexer( mode: .client, - eventLoop: channel.eventLoop, - streamDelegate: idleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try channel.pipeline.addHandler(h2handler).wait() - idleHandler.setMultiplexer(try h2handler.syncMultiplexer()) - try channel.pipeline.addHandler(idleHandler).wait() + channel: channel, + inboundStreamInitializer: nil + ) + try channel.pipeline.addHandler( + GRPCIdleHandler( + connectionManager: manager, + multiplexer: h2mux, + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ) + ).wait() channelPromise.succeed(channel) XCTAssertNoThrow( try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")) @@ -171,7 +169,7 @@ extension ConnectionManagerTests { // Write a settings frame on the root stream; this'll make the channel 'ready'. try self.waitForStateChange(from: .connecting, to: .ready) { let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([]))) - XCTAssertNoThrow(try channel.writeInbound(frame.encode())) + XCTAssertNoThrow(try channel.writeInbound(frame)) } // Close the channel. @@ -190,7 +188,7 @@ extension ConnectionManagerTests { } // Start the connection. - let readyChannelMux: EventLoopFuture = self + let readyChannelMux: EventLoopFuture = self .waitForStateChange(from: .idle, to: .connecting) { let readyChannelMux = manager.getHTTP2Multiplexer() self.loop.run() @@ -199,23 +197,20 @@ extension ConnectionManagerTests { // Setup the channel. let channel = EmbeddedChannel(loop: self.loop) - let idleHandler = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger - ) - - let h2handler = NIOHTTP2Handler( + let h2mux = HTTP2StreamMultiplexer( mode: .client, - eventLoop: channel.eventLoop, - streamDelegate: idleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try channel.pipeline.addHandler(h2handler).wait() - idleHandler.setMultiplexer(try h2handler.syncMultiplexer()) - try channel.pipeline.addHandler(idleHandler).wait() + channel: channel, + inboundStreamInitializer: nil + ) + try channel.pipeline.addHandler( + GRPCIdleHandler( + connectionManager: manager, + multiplexer: h2mux, + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ) + ).wait() channelPromise.succeed(channel) XCTAssertNoThrow( try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")) @@ -225,7 +220,7 @@ extension ConnectionManagerTests { // Write a settings frame on the root stream; this'll make the channel 'ready'. try self.waitForStateChange(from: .connecting, to: .ready) { let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([]))) - XCTAssertNoThrow(try channel.writeInbound(frame.encode())) + XCTAssertNoThrow(try channel.writeInbound(frame)) // Wait for the multiplexer, it _must_ be ready now. XCTAssertNoThrow(try readyChannelMux.wait()) } @@ -244,24 +239,6 @@ extension ConnectionManagerTests { } } - /// Forwards only the first `channelInactive` call - /// - /// This is useful in tests where we intentionally mis-use the channels - /// and call `fireChannelInactive` manually during the test but don't want - /// teardown to cause precondition failures due to this unexpected behavior. - class SwallowSecondInactiveHandler: ChannelInboundHandler { - typealias InboundIn = HTTP2Frame - typealias OutboundOut = HTTP2Frame - - private var seenAnInactive = false - func channelInactive(context: ChannelHandlerContext) { - if !self.seenAnInactive { - self.seenAnInactive = true - context.fireChannelInactive() - } - } - } - func testChannelInactiveBeforeActiveWithNoReconnect() throws { let channel = EmbeddedChannel(loop: self.loop) let channelPromise = self.loop.makePromise(of: Channel.self) @@ -276,33 +253,28 @@ extension ConnectionManagerTests { _ = manager.getHTTP2Multiplexer() self.loop.run() } - let idleHandler = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger - ) - let h2handler = NIOHTTP2Handler( - mode: .client, - eventLoop: channel.eventLoop, - streamDelegate: idleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try channel.pipeline.syncOperations.addHandler(SwallowSecondInactiveHandler()) - try channel.pipeline.syncOperations.addHandler(h2handler) - idleHandler.setMultiplexer(try h2handler.syncMultiplexer()) - try channel.pipeline.syncOperations.addHandler(idleHandler) - try channel.pipeline.syncOperations.addHandler(NIOCloseOnErrorHandler()) + try channel.pipeline.syncOperations.addHandler( + GRPCIdleHandler( + connectionManager: manager, + multiplexer: HTTP2StreamMultiplexer( + mode: .client, + channel: channel, + inboundStreamInitializer: nil + ), + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ) + ) channelPromise.succeed(channel) + // Oops: wrong way around. We should tolerate this. + self.waitForStateChange(from: .connecting, to: .shutdown) { + channel.pipeline.fireChannelInactive() + } - // Oops: wrong way around. We should tolerate this - just don't crash. - channel.pipeline.fireChannelInactive() + // Should be ignored. channel.pipeline.fireChannelActive() - - channel.embeddedEventLoop.run() - try manager.shutdown(mode: .forceful).wait() } func testChannelInactiveBeforeActiveWillReconnect() throws { @@ -328,59 +300,55 @@ extension ConnectionManagerTests { // Setup the channel. let channel1 = channels.removeLast() let channel1Promise = channelPromises.removeLast() - let idleHandler1 = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger + + try channel1.pipeline.syncOperations.addHandler( + GRPCIdleHandler( + connectionManager: manager, + multiplexer: HTTP2StreamMultiplexer( + mode: .client, + channel: channel1, + inboundStreamInitializer: nil + ), + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ) ) - let h2handler1 = NIOHTTP2Handler( - mode: .client, - eventLoop: channel1.eventLoop, - streamDelegate: idleHandler1 - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try channel1.pipeline.syncOperations.addHandler(SwallowSecondInactiveHandler()) - try channel1.pipeline.syncOperations.addHandler(h2handler1) - idleHandler1.setMultiplexer(try h2handler1.syncMultiplexer()) - try channel1.pipeline.syncOperations.addHandler(idleHandler1) - try channel1.pipeline.syncOperations.addHandler(NIOCloseOnErrorHandler()) channel1Promise.succeed(channel1) // Oops: wrong way around. We should tolerate this. - channel1.pipeline.fireChannelInactive() + self.waitForStateChange(from: .connecting, to: .transientFailure) { + channel1.pipeline.fireChannelInactive() + } + channel1.pipeline.fireChannelActive() // Start the next attempt. - self.loop.advanceTime(by: .seconds(1)) + self.waitForStateChange(from: .transientFailure, to: .connecting) { + self.loop.advanceTime(by: .seconds(1)) + } let channel2 = channels.removeLast() let channel2Promise = channelPromises.removeLast() - let idleHandler2 = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger + try channel2.pipeline.syncOperations.addHandler( + GRPCIdleHandler( + connectionManager: manager, + multiplexer: HTTP2StreamMultiplexer( + mode: .client, + channel: channel1, + inboundStreamInitializer: nil + ), + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ) ) - let h2handler2 = NIOHTTP2Handler( - mode: .client, - eventLoop: channel2.eventLoop, - streamDelegate: idleHandler2 - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try channel2.pipeline.syncOperations.addHandler(SwallowSecondInactiveHandler()) - try channel2.pipeline.syncOperations.addHandler(h2handler2) - idleHandler2.setMultiplexer(try h2handler2.syncMultiplexer()) - try channel2.pipeline.syncOperations.addHandler(idleHandler2) - try channel2.pipeline.syncOperations.addHandler(NIOCloseOnErrorHandler()) channel2Promise.succeed(channel2) try self.waitForStateChange(from: .connecting, to: .ready) { channel2.pipeline.fireChannelActive() let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([]))) - XCTAssertNoThrow(try channel2.writeInbound(frame.encode())) + XCTAssertNoThrow(try channel2.writeInbound(frame)) } } @@ -391,7 +359,7 @@ extension ConnectionManagerTests { } // Start the connection. - let readyChannelMux: EventLoopFuture = self + let readyChannelMux: EventLoopFuture = self .waitForStateChange(from: .idle, to: .connecting) { let readyChannelMux = manager.getHTTP2Multiplexer() self.loop.run() @@ -400,23 +368,20 @@ extension ConnectionManagerTests { // Setup the channel. let channel = EmbeddedChannel(loop: self.loop) - let idleHandler = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger - ) - - let h2handler = NIOHTTP2Handler( + let h2mux = HTTP2StreamMultiplexer( mode: .client, - eventLoop: channel.eventLoop, - streamDelegate: idleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try channel.pipeline.addHandler(h2handler).wait() - idleHandler.setMultiplexer(try h2handler.syncMultiplexer()) - try channel.pipeline.addHandler(idleHandler).wait() + channel: channel, + inboundStreamInitializer: nil + ) + try channel.pipeline.addHandler( + GRPCIdleHandler( + connectionManager: manager, + multiplexer: h2mux, + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ) + ).wait() channelPromise.succeed(channel) XCTAssertNoThrow( @@ -427,13 +392,18 @@ extension ConnectionManagerTests { // Write a settings frame on the root stream; this'll make the channel 'ready'. try self.waitForStateChange(from: .connecting, to: .ready) { let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([]))) - XCTAssertNoThrow(try channel.writeInbound(frame.encode())) + XCTAssertNoThrow(try channel.writeInbound(frame)) // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now. XCTAssertNoThrow(try readyChannelMux.wait()) } // "create" a stream; the details don't matter here. - idleHandler.streamCreated(1, channel: channel) + let streamCreated = NIOHTTP2StreamCreatedEvent( + streamID: 1, + localInitialWindowSize: nil, + remoteInitialWindowSize: nil + ) + channel.pipeline.fireUserInboundEventTriggered(streamCreated) // Wait for the idle timeout: this should _not_ cause the channel to idle. self.loop.advanceTime(by: .minutes(5)) @@ -441,7 +411,8 @@ extension ConnectionManagerTests { // Now we're going to close the stream and wait for an idle timeout and then shutdown. self.waitForStateChange(from: .ready, to: .idle) { // Close the stream. - idleHandler.streamClosed(1, channel: channel) + let streamClosed = StreamClosedEvent(streamID: 1, reason: nil) + channel.pipeline.fireUserInboundEventTriggered(streamClosed) // ... wait for the idle timeout, self.loop.advanceTime(by: .minutes(5)) } @@ -460,7 +431,7 @@ extension ConnectionManagerTests { return channelPromise.futureResult } - let readyChannelMux: EventLoopFuture = self + let readyChannelMux: EventLoopFuture = self .waitForStateChange(from: .idle, to: .connecting) { let readyChannelMux = manager.getHTTP2Multiplexer() self.loop.run() @@ -469,23 +440,20 @@ extension ConnectionManagerTests { // Setup the channel. let channel = EmbeddedChannel(loop: self.loop) - let idleHandler = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger - ) - - let h2handler = NIOHTTP2Handler( + let h2mux = HTTP2StreamMultiplexer( mode: .client, - eventLoop: channel.eventLoop, - streamDelegate: idleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try channel.pipeline.addHandler(h2handler).wait() - idleHandler.setMultiplexer(try h2handler.syncMultiplexer()) - try channel.pipeline.addHandler(idleHandler).wait() + channel: channel, + inboundStreamInitializer: nil + ) + try channel.pipeline.addHandler( + GRPCIdleHandler( + connectionManager: manager, + multiplexer: h2mux, + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ) + ).wait() channelPromise.succeed(channel) XCTAssertNoThrow( try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")) @@ -523,10 +491,10 @@ extension ConnectionManagerTests { return next } - let readyChannelMux = self.waitForStateChanges([ + let readyChannelMux: EventLoopFuture = self.waitForStateChanges([ Change(from: .idle, to: .connecting), Change(from: .connecting, to: .transientFailure), - ]) { () -> EventLoopFuture in + ]) { // Get a HTTP/2 stream multiplexer. let readyChannelMux = manager.getHTTP2Multiplexer() self.loop.run() @@ -544,23 +512,20 @@ extension ConnectionManagerTests { // Setup the actual channel and complete the promise. let channel = EmbeddedChannel(loop: self.loop) - let idleHandler = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger - ) - - let h2handler = NIOHTTP2Handler( + let h2mux = HTTP2StreamMultiplexer( mode: .client, - eventLoop: channel.eventLoop, - streamDelegate: idleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try channel.pipeline.addHandler(h2handler).wait() - idleHandler.setMultiplexer(try h2handler.syncMultiplexer()) - try channel.pipeline.addHandler(idleHandler).wait() + channel: channel, + inboundStreamInitializer: nil + ) + try channel.pipeline.addHandler( + GRPCIdleHandler( + connectionManager: manager, + multiplexer: h2mux, + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ) + ).wait() channelPromise.succeed(channel) XCTAssertNoThrow( try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")) @@ -570,7 +535,7 @@ extension ConnectionManagerTests { // Write a SETTINGS frame on the root stream. try self.waitForStateChange(from: .connecting, to: .ready) { let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([]))) - XCTAssertNoThrow(try channel.writeInbound(frame.encode())) + XCTAssertNoThrow(try channel.writeInbound(frame)) } // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now. @@ -591,7 +556,7 @@ extension ConnectionManagerTests { return channelPromise.futureResult } - let readyChannelMux: EventLoopFuture = self + let readyChannelMux: EventLoopFuture = self .waitForStateChange(from: .idle, to: .connecting) { let readyChannelMux = manager.getHTTP2Multiplexer() self.loop.run() @@ -627,10 +592,10 @@ extension ConnectionManagerTests { self.loop.makeFailedFuture(DoomedChannelError()) } - let readyChannelMux = self.waitForStateChanges([ + let readyChannelMux: EventLoopFuture = self.waitForStateChanges([ Change(from: .idle, to: .connecting), Change(from: .connecting, to: .transientFailure), - ]) { () -> EventLoopFuture in + ]) { // Get a HTTP/2 stream multiplexer. let readyChannelMux = manager.getHTTP2Multiplexer() self.loop.run() @@ -654,7 +619,7 @@ extension ConnectionManagerTests { return channelPromise.futureResult } - let readyChannelMux: EventLoopFuture = self + let readyChannelMux: EventLoopFuture = self .waitForStateChange(from: .idle, to: .connecting) { let readyChannelMux = manager.getHTTP2Multiplexer() self.loop.run() @@ -663,23 +628,20 @@ extension ConnectionManagerTests { // Prepare the channel let channel = EmbeddedChannel(loop: self.loop) - let idleHandler = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger - ) - - let h2handler = NIOHTTP2Handler( + let h2mux = HTTP2StreamMultiplexer( mode: .client, - eventLoop: channel.eventLoop, - streamDelegate: idleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try channel.pipeline.addHandler(h2handler).wait() - idleHandler.setMultiplexer(try h2handler.syncMultiplexer()) - try channel.pipeline.addHandler(idleHandler).wait() + channel: channel, + inboundStreamInitializer: nil + ) + try channel.pipeline.addHandler( + GRPCIdleHandler( + connectionManager: manager, + multiplexer: h2mux, + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ) + ).wait() channelPromise.succeed(channel) XCTAssertNoThrow( try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")) @@ -732,7 +694,7 @@ extension ConnectionManagerTests { return next } - let readyChannelMux: EventLoopFuture = self + let readyChannelMux: EventLoopFuture = self .waitForStateChange(from: .idle, to: .connecting) { let readyChannelMux = manager.getHTTP2Multiplexer() self.loop.run() @@ -741,23 +703,20 @@ extension ConnectionManagerTests { // Prepare the channel let firstChannel = EmbeddedChannel(loop: self.loop) - let idleHandler = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger - ) - - let h2handler = NIOHTTP2Handler( + let h2mux = HTTP2StreamMultiplexer( mode: .client, - eventLoop: firstChannel.eventLoop, - streamDelegate: idleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try firstChannel.pipeline.addHandler(h2handler).wait() - idleHandler.setMultiplexer(try h2handler.syncMultiplexer()) - try firstChannel.pipeline.addHandler(idleHandler).wait() + channel: firstChannel, + inboundStreamInitializer: nil + ) + try firstChannel.pipeline.addHandler( + GRPCIdleHandler( + connectionManager: manager, + multiplexer: h2mux, + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ) + ).wait() channelPromise.succeed(firstChannel) XCTAssertNoThrow( @@ -811,7 +770,7 @@ extension ConnectionManagerTests { return next } - let readyChannelMux: EventLoopFuture = self + let readyChannelMux: EventLoopFuture = self .waitForStateChange(from: .idle, to: .connecting) { let readyChannelMux = manager.getHTTP2Multiplexer() self.loop.run() @@ -820,22 +779,20 @@ extension ConnectionManagerTests { // Prepare the first channel let firstChannel = EmbeddedChannel(loop: self.loop) - let firstIdleHandler = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger - ) - let firstH2handler = NIOHTTP2Handler( + let firstH2mux = HTTP2StreamMultiplexer( mode: .client, - eventLoop: firstChannel.eventLoop, - streamDelegate: firstIdleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try firstChannel.pipeline.addHandler(firstH2handler).wait() - firstIdleHandler.setMultiplexer(try firstH2handler.syncMultiplexer()) - try firstChannel.pipeline.addHandler(firstIdleHandler).wait() + channel: firstChannel, + inboundStreamInitializer: nil + ) + try firstChannel.pipeline.addHandler( + GRPCIdleHandler( + connectionManager: manager, + multiplexer: firstH2mux, + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ) + ).wait() firstChannelPromise.succeed(firstChannel) XCTAssertNoThrow( try firstChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")) @@ -845,14 +802,19 @@ extension ConnectionManagerTests { // Write a SETTINGS frame on the root stream. try self.waitForStateChange(from: .connecting, to: .ready) { let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([]))) - XCTAssertNoThrow(try firstChannel.writeInbound(frame.encode())) + XCTAssertNoThrow(try firstChannel.writeInbound(frame)) } // Channel should now be ready. XCTAssertNoThrow(try readyChannelMux.wait()) // Kill the first channel. But first ensure there's an active RPC, otherwise we'll idle. - firstIdleHandler.streamCreated(1, channel: firstChannel) + let streamCreated = NIOHTTP2StreamCreatedEvent( + streamID: 1, + localInitialWindowSize: nil, + remoteInitialWindowSize: nil + ) + firstChannel.pipeline.fireUserInboundEventTriggered(streamCreated) try self.waitForStateChange(from: .ready, to: .transientFailure) { XCTAssertNoThrow(try firstChannel.close().wait()) @@ -865,22 +827,20 @@ extension ConnectionManagerTests { // Prepare the second channel let secondChannel = EmbeddedChannel(loop: self.loop) - let secondIdleHandler = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger - ) - let secondH2handler = NIOHTTP2Handler( + let secondH2mux = HTTP2StreamMultiplexer( mode: .client, - eventLoop: secondChannel.eventLoop, - streamDelegate: secondIdleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try secondChannel.pipeline.addHandler(secondH2handler).wait() - secondIdleHandler.setMultiplexer(try secondH2handler.syncMultiplexer()) - try secondChannel.pipeline.addHandler(secondIdleHandler).wait() + channel: secondChannel, + inboundStreamInitializer: nil + ) + try secondChannel.pipeline.addHandler( + GRPCIdleHandler( + connectionManager: manager, + multiplexer: secondH2mux, + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ) + ).wait() secondChannelPromise.succeed(secondChannel) XCTAssertNoThrow( try secondChannel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")) @@ -890,7 +850,7 @@ extension ConnectionManagerTests { // Write a SETTINGS frame on the root stream. try self.waitForStateChange(from: .connecting, to: .ready) { let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([]))) - XCTAssertNoThrow(try secondChannel.writeInbound(frame.encode())) + XCTAssertNoThrow(try secondChannel.writeInbound(frame)) } // Now shutdown @@ -907,7 +867,7 @@ extension ConnectionManagerTests { return channelPromise.futureResult } - let readyChannelMux: EventLoopFuture = self + let readyChannelMux: EventLoopFuture = self .waitForStateChange(from: .idle, to: .connecting) { let readyChannelMux = manager.getHTTP2Multiplexer() self.loop.run() @@ -916,23 +876,20 @@ extension ConnectionManagerTests { // Setup the channel. let channel = EmbeddedChannel(loop: self.loop) - let idleHandler = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger - ) - - let h2handler = NIOHTTP2Handler( + let h2mux = HTTP2StreamMultiplexer( mode: .client, - eventLoop: channel.eventLoop, - streamDelegate: idleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try channel.pipeline.addHandler(h2handler).wait() - idleHandler.setMultiplexer(try h2handler.syncMultiplexer()) - try channel.pipeline.addHandler(idleHandler).wait() + channel: channel, + inboundStreamInitializer: nil + ) + try channel.pipeline.addHandler( + GRPCIdleHandler( + connectionManager: manager, + multiplexer: h2mux, + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ) + ).wait() channelPromise.succeed(channel) XCTAssertNoThrow( try channel.connect(to: SocketAddress(unixDomainSocketPath: "/ignored")) @@ -942,7 +899,7 @@ extension ConnectionManagerTests { try self.waitForStateChange(from: .connecting, to: .ready) { // Write a SETTINGS frame on the root stream. let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([]))) - XCTAssertNoThrow(try channel.writeInbound(frame.encode())) + XCTAssertNoThrow(try channel.writeInbound(frame)) } // Wait for the HTTP/2 stream multiplexer, it _must_ be ready now. @@ -955,7 +912,7 @@ extension ConnectionManagerTests { streamID: .rootStream, payload: .goAway(lastStreamID: 1, errorCode: .noError, opaqueData: nil) ) - XCTAssertNoThrow(try channel.writeInbound(goAway.encode())) + XCTAssertNoThrow(try channel.writeInbound(goAway)) self.loop.run() } @@ -1061,34 +1018,31 @@ extension ConnectionManagerTests { } // Start the connection. - let readyChannelMux: EventLoopFuture = self - .waitForStateChange( - from: .idle, - to: .connecting - ) { - let readyChannelMux = manager.getHTTP2Multiplexer() - self.loop.run() - return readyChannelMux - } + let readyChannelMux: EventLoopFuture = self.waitForStateChange( + from: .idle, + to: .connecting + ) { + let readyChannelMux = manager.getHTTP2Multiplexer() + self.loop.run() + return readyChannelMux + } // Setup the real channel and activate it. let channel = EmbeddedChannel(loop: self.loop) - let idleHandler = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger - ) - let h2handler = NIOHTTP2Handler( + let h2mux = HTTP2StreamMultiplexer( mode: .client, - eventLoop: channel.eventLoop, - streamDelegate: idleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try channel.pipeline.addHandler(h2handler).wait() - idleHandler.setMultiplexer(try h2handler.syncMultiplexer()) - XCTAssertNoThrow(try channel.pipeline.addHandler(idleHandler).wait()) + channel: channel, + inboundStreamInitializer: nil + ) + XCTAssertNoThrow(try channel.pipeline.addHandlers([ + GRPCIdleHandler( + connectionManager: manager, + multiplexer: h2mux, + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ), + ]).wait()) channelPromise.succeed(channel) self.loop.run() @@ -1098,7 +1052,7 @@ extension ConnectionManagerTests { // Write a SETTINGS frame on the root stream. try self.waitForStateChange(from: .connecting, to: .ready) { let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([]))) - XCTAssertNoThrow(try channel.writeInbound(frame.encode())) + XCTAssertNoThrow(try channel.writeInbound(frame)) } // The channel should now be ready. @@ -1122,7 +1076,7 @@ extension ConnectionManagerTests { let readyChannelMux = self.waitForStateChange( from: .idle, to: .connecting - ) { () -> EventLoopFuture in + ) { () -> EventLoopFuture in let readyChannelMux = manager.getHTTP2Multiplexer() self.loop.run() return readyChannelMux @@ -1130,22 +1084,20 @@ extension ConnectionManagerTests { // Setup the actual channel and activate it. let channel = EmbeddedChannel(loop: self.loop) - let idleHandler = GRPCIdleHandler( - connectionManager: manager, - idleTimeout: .minutes(5), - keepalive: .init(), - logger: self.logger - ) - let h2handler = NIOHTTP2Handler( + let h2mux = HTTP2StreamMultiplexer( mode: .client, - eventLoop: channel.eventLoop, - streamDelegate: idleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try channel.pipeline.addHandler(h2handler).wait() - idleHandler.setMultiplexer(try h2handler.syncMultiplexer()) - XCTAssertNoThrow(try channel.pipeline.addHandler(idleHandler).wait()) + channel: channel, + inboundStreamInitializer: nil + ) + XCTAssertNoThrow(try channel.pipeline.addHandlers([ + GRPCIdleHandler( + connectionManager: manager, + multiplexer: h2mux, + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.logger + ), + ]).wait()) channelPromise.succeed(channel) self.loop.run() @@ -1155,7 +1107,7 @@ extension ConnectionManagerTests { // "ready" the connection. try self.waitForStateChange(from: .connecting, to: .ready) { let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([]))) - XCTAssertNoThrow(try channel.writeInbound(frame.encode())) + XCTAssertNoThrow(try channel.writeInbound(frame)) } // The HTTP/2 stream multiplexer should now be ready. @@ -1188,6 +1140,12 @@ extension ConnectionManagerTests { XCTAssertNoThrow(try channel.finish()) } + let multiplexer = HTTP2StreamMultiplexer( + mode: .client, + channel: channel, + inboundStreamInitializer: nil + ) + class HTTP2Delegate: ConnectionManagerHTTP2Delegate { var streamsOpened = 0 var streamsClosed = 0 @@ -1216,19 +1174,11 @@ extension ConnectionManagerTests { channelProvider: HookedChannelProvider { manager, eventLoop -> EventLoopFuture in let idleHandler = GRPCIdleHandler( connectionManager: manager, + multiplexer: multiplexer, idleTimeout: .minutes(5), keepalive: ClientConnectionKeepalive(), logger: self.logger ) - let h2Handler = NIOHTTP2Handler( - mode: .client, - eventLoop: channel.eventLoop, - streamDelegate: idleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - try! channel.pipeline.syncOperations.addHandler(h2Handler) - idleHandler.setMultiplexer(try! h2Handler.syncMultiplexer()) // We're going to cheat a bit by not putting the multiplexer in the channel. This allows // us to just fire stream created/closed events into the channel. @@ -1260,26 +1210,30 @@ extension ConnectionManagerTests { let settings = [HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)] return HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings))) } - XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 42).encode())) + XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 42))) // We're ready now so the future multiplexer will resolve and we'll have seen an update to // max concurrent streams. XCTAssertNoThrow(try futureMultiplexer.wait()) XCTAssertEqual(http2.maxConcurrentStreams, 42) - XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 13).encode())) + XCTAssertNoThrow(try channel.writeInbound(makeSettingsFrame(maxConcurrentStreams: 13))) XCTAssertEqual(http2.maxConcurrentStreams, 13) - let streamDelegate = try channel.pipeline.handler(type: GRPCIdleHandler.self).wait() - // Open some streams. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) { - streamDelegate.streamCreated(streamID, channel: channel) + let streamCreated = NIOHTTP2StreamCreatedEvent( + streamID: streamID, + localInitialWindowSize: nil, + remoteInitialWindowSize: nil + ) + channel.pipeline.fireUserInboundEventTriggered(streamCreated) } // ... and then close them. for streamID in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(9), by: 2) { - streamDelegate.streamClosed(streamID, channel: channel) + let streamClosed = StreamClosedEvent(streamID: streamID, reason: nil) + channel.pipeline.fireUserInboundEventTriggered(streamClosed) } XCTAssertEqual(http2.streamsOpened, 4) @@ -1292,7 +1246,7 @@ extension ConnectionManagerTests { return channelPromise.futureResult } - let multiplexer: EventLoopFuture = self.waitForStateChange( + let multiplexer: EventLoopFuture = self.waitForStateChange( from: .idle, to: .connecting ) { diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolDelegates.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolDelegates.swift index 924b38d3d..4a063bfa3 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolDelegates.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolDelegates.swift @@ -189,55 +189,3 @@ final class EventRecordingConnectionPoolDelegate: GRPCConnectionPoolDelegate { } extension EventRecordingConnectionPoolDelegate: @unchecked Sendable {} - -final class AsyncEventStreamConnectionPoolDelegate: GRPCConnectionPoolDelegate { - private let continuation: AsyncStream.Continuation - - static func makeDelegateAndAsyncStream() - -> ( - AsyncEventStreamConnectionPoolDelegate, - AsyncStream - ) { - var continuation: AsyncStream.Continuation! - let asyncStream = AsyncStream(EventRecordingConnectionPoolDelegate.Event.self) { - continuation = $0 - } - return (Self(continuation: continuation), asyncStream) - } - - init(continuation: AsyncStream.Continuation) { - self.continuation = continuation - } - - func connectionAdded(id: GRPCConnectionID) { - self.continuation.yield(.connectionAdded(id)) - } - - func startedConnecting(id: GRPCConnectionID) { - self.continuation.yield(.startedConnecting(id)) - } - - func connectFailed(id: GRPCConnectionID, error: Error) { - self.continuation.yield(.connectFailed(id)) - } - - func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) { - self.continuation.yield(.connectSucceeded(id, streamCapacity)) - } - - func connectionClosed(id: GRPCConnectionID, error: Error?) { - self.continuation.yield(.connectionClosed(id)) - } - - func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) { - self.continuation.yield(.connectionUtilizationChanged(id, streamsUsed, streamCapacity)) - } - - func connectionQuiescing(id: GRPCConnectionID) { - self.continuation.yield(.connectionQuiescing(id)) - } - - func connectionRemoved(id: GRPCConnectionID) { - self.continuation.yield(.connectionRemoved(id)) - } -} diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift index 4c381c5f9..1859bf675 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift @@ -1054,24 +1054,17 @@ extension ConnectionPool { // MARK: - Helpers -struct ChannelAndState { - let channel: EmbeddedChannel - let streamDelegate: NIOHTTP2StreamDelegate - var isActive: Bool -} - internal final class ChannelController { - private var channels: [ChannelAndState] = [] + private var channels: [EmbeddedChannel] = [] internal var count: Int { return self.channels.count } internal func finish() { - while let state = self.channels.popLast() { - if state.isActive { - _ = try? state.channel.finish() - } + while let channel = self.channels.popLast() { + // We're okay with this throwing: some channels are left in a bad state (i.e. with errors). + _ = try? channel.finish() } } @@ -1091,10 +1084,9 @@ internal final class ChannelController { line: UInt = #line ) { guard self.isValidIndex(index, file: file, line: line) else { return } - self.channels[index].isActive = true XCTAssertNoThrow( - try self.channels[index].channel.connect(to: .init(unixDomainSocketPath: "/")), + try self.channels[index].connect(to: .init(unixDomainSocketPath: "/")), file: file, line: line ) @@ -1106,8 +1098,7 @@ internal final class ChannelController { line: UInt = #line ) { guard self.isValidIndex(index, file: file, line: line) else { return } - self.channels[index].channel.pipeline.fireChannelInactive() - self.channels[index].isActive = false + self.channels[index].pipeline.fireChannelInactive() } internal func throwError( @@ -1117,7 +1108,7 @@ internal final class ChannelController { line: UInt = #line ) { guard self.isValidIndex(index, file: file, line: line) else { return } - self.channels[index].channel.pipeline.fireErrorCaught(error) + self.channels[index].pipeline.fireErrorCaught(error) } internal func sendSettingsToChannel( @@ -1131,11 +1122,7 @@ internal final class ChannelController { let settings = [HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams)] let settingsFrame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings(settings))) - XCTAssertNoThrow( - try self.channels[index].channel.writeInbound(settingsFrame.encode()), - file: file, - line: line - ) + XCTAssertNoThrow(try self.channels[index].writeInbound(settingsFrame), file: file, line: line) } internal func sendGoAwayToChannel( @@ -1150,11 +1137,7 @@ internal final class ChannelController { payload: .goAway(lastStreamID: .maxID, errorCode: .noError, opaqueData: nil) ) - XCTAssertNoThrow( - try self.channels[index].channel.writeInbound(goAwayFrame.encode()), - file: file, - line: line - ) + XCTAssertNoThrow(try self.channels[index].writeInbound(goAwayFrame), file: file, line: line) } internal func openStreamInChannel( @@ -1165,8 +1148,13 @@ internal final class ChannelController { guard self.isValidIndex(index, file: file, line: line) else { return } // The details don't matter here. - let channel = self.channels[index] - channel.streamDelegate.streamCreated(.rootStream, channel: channel.channel) + let event = NIOHTTP2StreamCreatedEvent( + streamID: .rootStream, + localInitialWindowSize: nil, + remoteInitialWindowSize: nil + ) + + self.channels[index].pipeline.fireUserInboundEventTriggered(event) } internal func closeStreamInChannel( @@ -1177,8 +1165,8 @@ internal final class ChannelController { guard self.isValidIndex(index, file: file, line: line) else { return } // The details don't matter here. - let channel = self.channels[index] - channel.streamDelegate.streamClosed(.rootStream, channel: channel.channel) + let event = StreamClosedEvent(streamID: .rootStream, reason: nil) + self.channels[index].pipeline.fireUserInboundEventTriggered(event) } } @@ -1190,27 +1178,24 @@ extension ChannelController: ConnectionManagerChannelProvider { logger: Logger ) -> EventLoopFuture { let channel = EmbeddedChannel(loop: eventLoop as! EmbeddedEventLoop) + self.channels.append(channel) + + let multiplexer = HTTP2StreamMultiplexer( + mode: .client, + channel: channel, + inboundStreamInitializer: nil + ) let idleHandler = GRPCIdleHandler( connectionManager: connectionManager, + multiplexer: multiplexer, idleTimeout: .minutes(5), keepalive: ClientConnectionKeepalive(), logger: logger ) - let h2handler = NIOHTTP2Handler( - mode: .client, - eventLoop: channel.eventLoop, - streamDelegate: idleHandler - ) { channel in - channel.eventLoop.makeSucceededVoidFuture() - } - XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(h2handler)) - - idleHandler.setMultiplexer(try! h2handler.syncMultiplexer()) - self.channels.append(.init(channel: channel, streamDelegate: idleHandler, isActive: false)) - XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(idleHandler)) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(multiplexer)) return eventLoop.makeSucceededFuture(channel) } @@ -1257,163 +1242,3 @@ extension Optional where Wrapped == ConnectionPoolError { } } } - -// Simplified version of the frame encoder found in SwiftNIO HTTP/2 -struct HTTP2FrameEncoder { - mutating func encode(frame: HTTP2Frame, to buf: inout ByteBuffer) throws -> IOData? { - // note our starting point - let start = buf.writerIndex - - // +-----------------------------------------------+ - // | Length (24) | - // +---------------+---------------+---------------+ - // | Type (8) | Flags (8) | - // +-+-------------+---------------+-------------------------------+ - // |R| Stream Identifier (31) | - // +=+=============================================================+ - // | Frame Payload (0...) ... - // +---------------------------------------------------------------+ - - // skip 24-bit length for now, we'll fill that in later - buf.moveWriterIndex(forwardBy: 3) - - // 8-bit type - buf.writeInteger(frame.code()) - - // skip the 8 bit flags for now, we'll fill it in later as well. - let flagsIndex = buf.writerIndex - var flags = FrameFlags() - buf.moveWriterIndex(forwardBy: 1) - - // 32-bit stream identifier -- ensuring the top bit is empty - buf.writeInteger(Int32(frame.streamID)) - - // frame payload follows, which depends on the frame type itself - let extraFrameData: IOData? - let payloadSize: Int - - switch frame.payload { - case let .settings(.settings(settings)): - for setting in settings { - buf.writeInteger(setting.parameter.networkRepresentation()) - buf.writeInteger(UInt32(setting.value)) - } - - payloadSize = settings.count * 6 - extraFrameData = nil - - case .settings(.ack): - payloadSize = 0 - extraFrameData = nil - flags.insert(.ack) - - case let .goAway(lastStreamID, errorCode, opaqueData): - let streamVal = UInt32(Int(lastStreamID)) & ~0x8000_0000 - buf.writeInteger(streamVal) - buf.writeInteger(UInt32(errorCode.networkCode)) - - if let data = opaqueData { - payloadSize = data.readableBytes + 8 - extraFrameData = .byteBuffer(data) - } else { - payloadSize = 8 - extraFrameData = nil - } - - case .data, .headers, .priority, - .rstStream, .pushPromise, .ping, - .windowUpdate, .alternativeService, .origin: - preconditionFailure("Frame type not supported: \(frame.payload)") - } - - // Write the frame data. This is the payload size and the flags byte. - buf.writePayloadSize(payloadSize, at: start) - buf.setInteger(flags.rawValue, at: flagsIndex) - - // all bytes to write are in the provided buffer now - return extraFrameData - } - - struct FrameFlags: OptionSet { - internal private(set) var rawValue: UInt8 - - internal init(rawValue: UInt8) { - self.rawValue = rawValue - } - - /// ACK flag. Valid on SETTINGS and PING frames. - internal static let ack = FrameFlags(rawValue: 0x01) - } -} - -extension HTTP2SettingsParameter { - internal func networkRepresentation() -> UInt16 { - switch self { - case HTTP2SettingsParameter.headerTableSize: - return UInt16(1) - case HTTP2SettingsParameter.enablePush: - return UInt16(2) - case HTTP2SettingsParameter.maxConcurrentStreams: - return UInt16(3) - case HTTP2SettingsParameter.initialWindowSize: - return UInt16(4) - case HTTP2SettingsParameter.maxFrameSize: - return UInt16(5) - case HTTP2SettingsParameter.maxHeaderListSize: - return UInt16(6) - case HTTP2SettingsParameter.enableConnectProtocol: - return UInt16(8) - default: - preconditionFailure("Unknown settings parameter.") - } - } -} - -extension ByteBuffer { - fileprivate mutating func writePayloadSize(_ size: Int, at location: Int) { - // Yes, this performs better than running a UInt8 through the generic write(integer:) three times. - var bytes: (UInt8, UInt8, UInt8) - bytes.0 = UInt8((size & 0xFF0000) >> 16) - bytes.1 = UInt8((size & 0x00FF00) >> 8) - bytes.2 = UInt8(size & 0x0000FF) - withUnsafeBytes(of: bytes) { ptr in - _ = self.setBytes(ptr, at: location) - } - } -} - -extension HTTP2Frame { - internal func encode() throws -> ByteBuffer { - let allocator = ByteBufferAllocator() - var buffer = allocator.buffer(capacity: 1024) - - var frameEncoder = HTTP2FrameEncoder() - let extraData = try frameEncoder.encode(frame: self, to: &buffer) - if let extraData = extraData { - switch extraData { - case let .byteBuffer(extraBuffer): - buffer.writeImmutableBuffer(extraBuffer) - default: - preconditionFailure() - } - } - return buffer - } - - /// The one-byte identifier used to indicate the type of a frame on the wire. - internal func code() -> UInt8 { - switch self.payload { - case .data: return 0x0 - case .headers: return 0x1 - case .priority: return 0x2 - case .rstStream: return 0x3 - case .settings: return 0x4 - case .pushPromise: return 0x5 - case .ping: return 0x6 - case .goAway: return 0x7 - case .windowUpdate: return 0x8 - case .alternativeService: return 0xA - case .origin: return 0xC - } - } -} diff --git a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift index 7edc6e564..a118d813e 100644 --- a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift @@ -441,75 +441,54 @@ final class GRPCChannelPoolTests: GRPCTestCase { } } - func testConnectionPoolDelegateSingleConnection() async throws { - let (delegate, stream) = AsyncEventStreamConnectionPoolDelegate.makeDelegateAndAsyncStream() + func testConnectionPoolDelegateSingleConnection() throws { + let recorder = EventRecordingConnectionPoolDelegate() self.setUpClientAndServer(withTLS: false, threads: 1) { - $0.delegate = delegate + $0.delegate = recorder } let warmup = self.echo.get(.with { $0.text = "" }) XCTAssertNoThrow(try warmup.status.wait()) - var iterator = stream.makeAsyncIterator() - - var event = await iterator.next() - let id = try XCTUnwrap(event?.id) - XCTAssertEqual(event, .connectionAdded(id)) - event = await iterator.next() - XCTAssertEqual(event, .startedConnecting(id)) - event = await iterator.next() - XCTAssertEqual(event, .connectSucceeded(id, 100)) - event = await iterator.next() - XCTAssertEqual(event, .connectionUtilizationChanged(id, 1, 100)) - event = await iterator.next() - XCTAssertEqual(event, .connectionUtilizationChanged(id, 0, 100)) - - let rpcs: [ClientStreamingCall] = try (1 ... 10).map { _ in + let id = try XCTUnwrap(recorder.first?.id) + XCTAssertEqual(recorder.popFirst(), .connectionAdded(id)) + XCTAssertEqual(recorder.popFirst(), .startedConnecting(id)) + XCTAssertEqual(recorder.popFirst(), .connectSucceeded(id, 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 1, 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 0, 100)) + + let rpcs: [ClientStreamingCall] = try (1 ... 10).map { i in let rpc = self.echo.collect() XCTAssertNoThrow(try rpc.sendMessage(.with { $0.text = "foo" }).wait()) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, i, 100)) return rpc } - for (i, _) in rpcs.enumerated() { - let event = await iterator.next() - XCTAssertEqual(event, .connectionUtilizationChanged(id, i + 1, 100)) - } - for (i, rpc) in rpcs.enumerated() { XCTAssertNoThrow(try rpc.sendEnd().wait()) XCTAssertNoThrow(try rpc.status.wait()) - let event = await iterator.next() - XCTAssertEqual(event, .connectionUtilizationChanged(id, 10 - (i + 1), 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 10 - (i + 1), 100)) } XCTAssertNoThrow(try self.channel?.close().wait()) - event = await iterator.next() - XCTAssertEqual(event, .connectionClosed(id)) - event = await iterator.next() - XCTAssertEqual(event, .connectionRemoved(id)) + XCTAssertEqual(recorder.popFirst(), .connectionClosed(id)) + XCTAssertEqual(recorder.popFirst(), .connectionRemoved(id)) + XCTAssert(recorder.isEmpty) } - func testConnectionPoolDelegateQuiescing() async throws { - let (delegate, stream) = AsyncEventStreamConnectionPoolDelegate.makeDelegateAndAsyncStream() + func testConnectionPoolDelegateQuiescing() throws { + let recorder = EventRecordingConnectionPoolDelegate() self.setUpClientAndServer(withTLS: false, threads: 1) { - $0.delegate = delegate + $0.delegate = recorder } XCTAssertNoThrow(try self.echo.get(.with { $0.text = "foo" }).status.wait()) - - var iterator = stream.makeAsyncIterator() - - var event = await iterator.next() - let id1 = try XCTUnwrap(event?.id) - XCTAssertEqual(event, .connectionAdded(id1)) - event = await iterator.next() - XCTAssertEqual(event, .startedConnecting(id1)) - event = await iterator.next() - XCTAssertEqual(event, .connectSucceeded(id1, 100)) - event = await iterator.next() - XCTAssertEqual(event, .connectionUtilizationChanged(id1, 1, 100)) - event = await iterator.next() - XCTAssertEqual(event, .connectionUtilizationChanged(id1, 0, 100)) + let id1 = try XCTUnwrap(recorder.first?.id) + XCTAssertEqual(recorder.popFirst(), .connectionAdded(id1)) + XCTAssertEqual(recorder.popFirst(), .startedConnecting(id1)) + XCTAssertEqual(recorder.popFirst(), .connectSucceeded(id1, 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 0, 100)) // Start an RPC. let rpc = self.echo.collect() @@ -517,12 +496,9 @@ final class GRPCChannelPoolTests: GRPCTestCase { // Complete another one to make sure the previous one is known by the server. XCTAssertNoThrow(try self.echo.get(.with { $0.text = "foo" }).status.wait()) - event = await iterator.next() - XCTAssertEqual(event, .connectionUtilizationChanged(id1, 1, 100)) - event = await iterator.next() - XCTAssertEqual(event, .connectionUtilizationChanged(id1, 2, 100)) - event = await iterator.next() - XCTAssertEqual(event, .connectionUtilizationChanged(id1, 1, 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 2, 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100)) // Start shutting the server down. let didShutdown = self.server!.initiateGracefulShutdown() @@ -531,8 +507,7 @@ final class GRPCChannelPoolTests: GRPCTestCase { // Pause a moment so we know the client received the GOAWAY. let sleep = self.group.any().scheduleTask(in: .milliseconds(50)) {} XCTAssertNoThrow(try sleep.futureResult.wait()) - event = await iterator.next() - XCTAssertEqual(event, .connectionQuiescing(id1)) + XCTAssertEqual(recorder.popFirst(), .connectionQuiescing(id1)) // Finish the RPC. XCTAssertNoThrow(try rpc.sendEnd().wait())