Skip to content

Commit

Permalink
Revert "Adopt h2handler multiplexer (#1587)" (#1641)
Browse files Browse the repository at this point in the history
This reverts commit 75b390e.
  • Loading branch information
glbrntt authored Aug 22, 2023
1 parent 666e30d commit 2828ee7
Show file tree
Hide file tree
Showing 18 changed files with 483 additions and 866 deletions.
41 changes: 20 additions & 21 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,45 +57,44 @@ jobs:
include:
- image: swiftlang/swift:nightly-focal
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 246000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 138000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 323000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 161000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 110000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 65000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 61000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 129000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 163000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 170000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 170000
- image: swift:5.8-jammy
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 246000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 138000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 323000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 161000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 110000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 65000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 61000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 129000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 163000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 170000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 170000
- image: swift:5.7-jammy
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 246000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 138000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 323000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 161000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 110000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 65000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 61000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 129000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 163000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 170000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 170000
- image: swift:5.6-focal
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 247000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 139000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 324000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 162000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 110000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 65000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 61000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 130000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 137000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 137000

MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 164000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 171000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 171000
name: Performance Tests on ${{ matrix.image }}
runs-on: ubuntu-latest
container:
Expand Down
8 changes: 0 additions & 8 deletions NOTICES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,3 @@ framework: 'test_01_allocation_counts.sh', 'run-nio-alloc-counter-tests.sh' and
* https://github.com/apple/swift-nio/blob/main/LICENSE.txt
* HOMEPAGE:
* https://github.com/apple/swift-nio

This product contains a simplified derivation of SwiftNIO HTTP/2's
'HTTP2FrameEncoder' for testing purposes.

* LICENSE (Apache License 2.0):
* https://github.com/apple/swift-nio-http2/blob/main/LICENSE.txt
* HOMEPAGE:
* https://github.com/apple/swift-nio-http2
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ let packageDependencies: [Package.Dependency] = [
),
.package(
url: "https://github.com/apple/swift-nio-http2.git",
from: "1.26.0"
from: "1.24.1"
),
.package(
url: "https://github.com/apple/swift-nio-transport-services.git",
Expand Down
91 changes: 43 additions & 48 deletions Sources/GRPC/ClientConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,28 @@ import SwiftProtobuf
/// │ DelegatingErrorHandler │
/// └──────────▲───────────────┘
/// HTTP2Frame│
/// │
/// │
/// │
/// │
/// │
/// HTTP2Frame│ ⠇ ⠇ ⠇ ⠇ ⠇
/// ┌─┴──────────────────▼─┐ ┌┴─▼┐ ┌┴─▼┐
/// │ GRPCIdleHandler │ │ | │ | HTTP/2 streams
/// └─▲──────────────────┬─┘ └▲─┬┘ └▲─┬┘
/// HTTP2Frame│ │ │ │ │ │ HTTP2Frame
/// ┌─┴──────────────────▼────────┴─▼───┴─▼┐
/// │ NIOHTTP2Handler │
/// └─▲──────────────────────────────────┬─┘
/// ByteBuffer│ │ByteBuffer
/// ┌─┴──────────────────────────────────▼─┐
/// │ NIOSSLHandler │
/// └─▲──────────────────────────────────┬─┘
/// ByteBuffer│ │ByteBuffer
/// │ ▼
/// │ ⠇ ⠇ ⠇ ⠇
/// │ ┌┴─▼┐ ┌┴─▼┐
/// │ │ | │ | HTTP/2 streams
/// │ └▲─┬┘ └▲─┬┘
/// │ │ │ │ │ HTTP2Frame
/// ┌─┴────────────────┴─▼───┴─▼┐
/// │ HTTP2StreamMultiplexer |
/// └─▲───────────────────────┬─┘
/// HTTP2Frame│ │HTTP2Frame
/// ┌─┴───────────────────────▼─┐
/// │ GRPCIdleHandler │
/// └─▲───────────────────────┬─┘
/// HTTP2Frame│ │HTTP2Frame
/// ┌─┴───────────────────────▼─┐
/// │ NIOHTTP2Handler │
/// └─▲───────────────────────┬─┘
/// ByteBuffer│ │ByteBuffer
/// ┌─┴───────────────────────▼─┐
/// │ NIOSSLHandler │
/// └─▲───────────────────────┬─┘
/// ByteBuffer│ │ByteBuffer
/// │ ▼
///
/// The 'GRPCIdleHandler' intercepts HTTP/2 frames and various events and is responsible for
/// informing and controlling the state of the connection (idling and keepalive). The HTTP/2 streams
Expand All @@ -85,7 +88,7 @@ public final class ClientConnection: Sendable {
private let connectionManager: ConnectionManager

/// HTTP multiplexer from the underlying channel handling gRPC calls.
internal func getMultiplexer() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
internal func getMultiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
return self.connectionManager.getHTTP2Multiplexer()
}

Expand Down Expand Up @@ -247,7 +250,7 @@ extension ClientConnection: GRPCChannel {
}

private static func makeStreamChannel(
using result: Result<NIOHTTP2Handler.StreamMultiplexer, Error>,
using result: Result<HTTP2StreamMultiplexer, Error>,
promise: EventLoopPromise<Channel>
) {
switch result {
Expand Down Expand Up @@ -618,31 +621,29 @@ extension ChannelPipeline.SynchronousOperations {
HTTP2Setting(parameter: .initialWindowSize, value: httpTargetWindowSize),
]

let grpcIdleHandler = GRPCIdleHandler(
// We could use 'configureHTTP2Pipeline' here, but we need to add a few handlers between the
// two HTTP/2 handlers so we'll do it manually instead.
try self.addHandler(NIOHTTP2Handler(mode: .client, initialSettings: initialSettings))

let h2Multiplexer = HTTP2StreamMultiplexer(
mode: .client,
channel: channel,
targetWindowSize: httpTargetWindowSize,
inboundStreamInitializer: nil
)

// The multiplexer is passed through the idle handler so it is only reported on
// successful channel activation - with happy eyeballs multiple pipelines can
// be constructed so it's not safe to report just yet.
try self.addHandler(GRPCIdleHandler(
connectionManager: connectionManager,
multiplexer: h2Multiplexer,
idleTimeout: connectionIdleTimeout,
keepalive: connectionKeepalive,
logger: logger
)

var connectionConfiguration = NIOHTTP2Handler.ConnectionConfiguration()
connectionConfiguration.initialSettings = initialSettings
var streamConfiguration = NIOHTTP2Handler.StreamConfiguration()
streamConfiguration.targetWindowSize = httpTargetWindowSize
let h2Handler = NIOHTTP2Handler(
mode: .client,
eventLoop: channel.eventLoop,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: grpcIdleHandler
) { channel in
channel.close()
}
try self.addHandler(h2Handler)

grpcIdleHandler.setMultiplexer(try h2Handler.syncMultiplexer())
try self.addHandler(grpcIdleHandler)
))

try self.addHandler(h2Multiplexer)
try self.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
}
}
Expand All @@ -652,13 +653,7 @@ extension Channel {
errorDelegate: ClientErrorDelegate?,
logger: Logger
) -> EventLoopFuture<Void> {
return self.configureHTTP2Pipeline(
mode: .client,
connectionConfiguration: .init(),
streamConfiguration: .init()
) { channel in
channel.eventLoop.makeSucceededVoidFuture()
}.flatMap { _ in
return self.configureHTTP2Pipeline(mode: .client, inboundStreamInitializer: nil).flatMap { _ in
self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
}
}
Expand Down
41 changes: 18 additions & 23 deletions Sources/GRPC/ConnectionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,19 @@ internal final class ConnectionManager: @unchecked Sendable {
var reconnect: Reconnect

var candidate: EventLoopFuture<Channel>
var readyChannelMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
var candidateMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
}

internal struct ConnectedState {
var backoffIterator: ConnectionBackoffIterator?
var reconnect: Reconnect
var candidate: Channel
var readyChannelMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
var multiplexer: NIOHTTP2Handler.StreamMultiplexer
var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
var multiplexer: HTTP2StreamMultiplexer
var error: Error?

init(
from state: ConnectingState,
candidate: Channel,
multiplexer: NIOHTTP2Handler.StreamMultiplexer
) {
init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
self.backoffIterator = state.backoffIterator
self.reconnect = state.reconnect
self.candidate = candidate
Expand All @@ -62,7 +58,7 @@ internal final class ConnectionManager: @unchecked Sendable {

internal struct ReadyState {
var channel: Channel
var multiplexer: NIOHTTP2Handler.StreamMultiplexer
var multiplexer: HTTP2StreamMultiplexer
var error: Error?

init(from state: ConnectedState) {
Expand All @@ -73,7 +69,7 @@ internal final class ConnectionManager: @unchecked Sendable {

internal struct TransientFailureState {
var backoffIterator: ConnectionBackoffIterator?
var readyChannelMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
var scheduled: Scheduled<Void>
var reason: Error

Expand Down Expand Up @@ -256,8 +252,8 @@ internal final class ConnectionManager: @unchecked Sendable {
}
}

/// Returns the `NIOHTTP2Handler.StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
private var multiplexer: NIOHTTP2Handler.StreamMultiplexer? {
/// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
private var multiplexer: HTTP2StreamMultiplexer? {
self.eventLoop.assertInEventLoop()
switch self.state {
case let .ready(state):
Expand Down Expand Up @@ -365,8 +361,8 @@ internal final class ConnectionManager: @unchecked Sendable {
/// Get the multiplexer from the underlying channel handling gRPC calls.
/// if the `ConnectionManager` was configured to be `fastFailure` this will have
/// one chance to connect - if not reconnections are managed here.
internal func getHTTP2Multiplexer() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
func getHTTP2Multiplexer0() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
switch self.callStartBehavior {
case .waitsForConnectivity:
return self.getHTTP2MultiplexerPatient()
Expand All @@ -386,8 +382,8 @@ internal final class ConnectionManager: @unchecked Sendable {

/// Returns a future for the multiplexer which succeeded when the channel is connected.
/// Reconnects are handled if necessary.
private func getHTTP2MultiplexerPatient() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
let multiplexer: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer>
private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>

switch self.state {
case .idle:
Expand Down Expand Up @@ -425,12 +421,11 @@ internal final class ConnectionManager: @unchecked Sendable {
/// attempt, or if the state is 'idle' returns the future for the next connection attempt.
///
/// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
private func getHTTP2MultiplexerOptimistic()
-> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
// `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
self.eventLoop.preconditionInEventLoop()

let muxFuture: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = { () in
let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
switch self.state {
case .idle:
self.startConnecting()
Expand Down Expand Up @@ -661,7 +656,7 @@ internal final class ConnectionManager: @unchecked Sendable {
}

/// The connecting channel became `active`. Must be called on the `EventLoop`.
internal func channelActive(channel: Channel, multiplexer: NIOHTTP2Handler.StreamMultiplexer) {
internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
self.eventLoop.preconditionInEventLoop()
self.logger.debug("activating connection", metadata: [
"connectivity_state": "\(self.state.label)",
Expand Down Expand Up @@ -978,7 +973,7 @@ extension ConnectionManager {

private func startConnecting(
backoffIterator: ConnectionBackoffIterator?,
muxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
) {
let timeoutAndBackoff = backoffIterator?.next()

Expand Down Expand Up @@ -1065,7 +1060,7 @@ extension ConnectionManager {

/// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
/// other state.
internal var multiplexer: NIOHTTP2Handler.StreamMultiplexer? {
internal var multiplexer: HTTP2StreamMultiplexer? {
return self.manager.multiplexer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ extension ConnectionPool {
}

@usableFromInline
var multiplexer: NIOHTTP2Handler.StreamMultiplexer
var multiplexer: HTTP2StreamMultiplexer
/// Maximum number of available streams.
@usableFromInline
var maxAvailable: Int
Expand All @@ -78,7 +78,7 @@ extension ConnectionPool {

/// Increment the reserved streams and return the multiplexer.
@usableFromInline
mutating func reserve() -> NIOHTTP2Handler.StreamMultiplexer {
mutating func reserve() -> HTTP2StreamMultiplexer {
assert(!self.isQuiescing)
self.reserved += 1
return self.multiplexer
Expand Down Expand Up @@ -132,7 +132,7 @@ extension ConnectionPool {
///
/// The result may be safely unwrapped if `self.availableStreams > 0` when reserving a stream.
@usableFromInline
internal mutating func reserveStream() -> NIOHTTP2Handler.StreamMultiplexer? {
internal mutating func reserveStream() -> HTTP2StreamMultiplexer? {
return self._availability?.reserve()
}

Expand Down
6 changes: 3 additions & 3 deletions Sources/GRPC/ConnectionPool/ConnectionPool+Waiter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extension ConnectionPool {

/// The channel initializer.
@usableFromInline
internal let _channelInitializer: @Sendable (Channel) -> EventLoopFuture<Void>
internal let _channelInitializer: (Channel) -> EventLoopFuture<Void>

/// The deadline at which the timeout is scheduled.
@usableFromInline
Expand All @@ -51,7 +51,7 @@ extension ConnectionPool {
internal init(
deadline: NIODeadline,
promise: EventLoopPromise<Channel>,
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
channelInitializer: @escaping (Channel) -> EventLoopFuture<Void>
) {
self._deadline = deadline
self._promise = promise
Expand Down Expand Up @@ -83,7 +83,7 @@ extension ConnectionPool {

/// Succeed the waiter with the given multiplexer.
@usableFromInline
internal func succeed(with multiplexer: NIOHTTP2Handler.StreamMultiplexer) {
internal func succeed(with multiplexer: HTTP2StreamMultiplexer) {
self._scheduledTimeout?.cancel()
self._scheduledTimeout = nil
multiplexer.createStreamChannel(promise: self._promise, self._channelInitializer)
Expand Down
Loading

0 comments on commit 2828ee7

Please sign in to comment.