Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Adopt h2handler multiplexer (#1587)" #1641

Merged
merged 2 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,45 +57,44 @@ jobs:
include:
- image: swiftlang/swift:nightly-focal
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 246000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 138000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 323000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 161000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 110000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 65000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 61000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 129000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 163000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 170000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 170000
- image: swift:5.8-jammy
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 246000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 138000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 323000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 161000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 110000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 65000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 61000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 129000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 163000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 170000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 170000
- image: swift:5.7-jammy
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 246000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 138000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 323000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 161000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 110000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 65000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 61000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 129000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 163000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 170000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 170000
- image: swift:5.6-focal
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 247000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 139000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 324000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 162000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 110000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 65000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 61000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 130000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 137000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 137000

MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 164000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 171000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 171000
name: Performance Tests on ${{ matrix.image }}
runs-on: ubuntu-latest
container:
Expand Down
8 changes: 0 additions & 8 deletions NOTICES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,3 @@ framework: 'test_01_allocation_counts.sh', 'run-nio-alloc-counter-tests.sh' and
* https://github.com/apple/swift-nio/blob/main/LICENSE.txt
* HOMEPAGE:
* https://github.com/apple/swift-nio

This product contains a simplified derivation of SwiftNIO HTTP/2's
'HTTP2FrameEncoder' for testing purposes.

* LICENSE (Apache License 2.0):
* https://github.com/apple/swift-nio-http2/blob/main/LICENSE.txt
* HOMEPAGE:
* https://github.com/apple/swift-nio-http2
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ let packageDependencies: [Package.Dependency] = [
),
.package(
url: "https://github.com/apple/swift-nio-http2.git",
from: "1.26.0"
from: "1.24.1"
),
.package(
url: "https://github.com/apple/swift-nio-transport-services.git",
Expand Down
91 changes: 43 additions & 48 deletions Sources/GRPC/ClientConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,28 @@ import SwiftProtobuf
/// │ DelegatingErrorHandler │
/// └──────────▲───────────────┘
/// HTTP2Frame│
/// │
/// │
/// │
/// │
/// │
/// HTTP2Frame│ ⠇ ⠇ ⠇ ⠇ ⠇
/// ┌─┴──────────────────▼─┐ ┌┴─▼┐ ┌┴─▼┐
/// │ GRPCIdleHandler │ │ | │ | HTTP/2 streams
/// └─▲──────────────────┬─┘ └▲─┬┘ └▲─┬┘
/// HTTP2Frame│ │ │ │ │ │ HTTP2Frame
/// ┌─┴──────────────────▼────────┴─▼───┴─▼┐
/// │ NIOHTTP2Handler │
/// └─▲──────────────────────────────────┬─┘
/// ByteBuffer│ │ByteBuffer
/// ┌─┴──────────────────────────────────▼─┐
/// │ NIOSSLHandler │
/// └─▲──────────────────────────────────┬─┘
/// ByteBuffer│ │ByteBuffer
/// │ ▼
/// │ ⠇ ⠇ ⠇ ⠇
/// │ ┌┴─▼┐ ┌┴─▼┐
/// │ │ | │ | HTTP/2 streams
/// │ └▲─┬┘ └▲─┬┘
/// │ │ │ │ │ HTTP2Frame
/// ┌─┴────────────────┴─▼───┴─▼┐
/// │ HTTP2StreamMultiplexer |
/// └─▲───────────────────────┬─┘
/// HTTP2Frame│ │HTTP2Frame
/// ┌─┴───────────────────────▼─┐
/// │ GRPCIdleHandler │
/// └─▲───────────────────────┬─┘
/// HTTP2Frame│ │HTTP2Frame
/// ┌─┴───────────────────────▼─┐
/// │ NIOHTTP2Handler │
/// └─▲───────────────────────┬─┘
/// ByteBuffer│ │ByteBuffer
/// ┌─┴───────────────────────▼─┐
/// │ NIOSSLHandler │
/// └─▲───────────────────────┬─┘
/// ByteBuffer│ │ByteBuffer
/// │ ▼
///
/// The 'GRPCIdleHandler' intercepts HTTP/2 frames and various events and is responsible for
/// informing and controlling the state of the connection (idling and keepalive). The HTTP/2 streams
Expand All @@ -85,7 +88,7 @@ public final class ClientConnection: Sendable {
private let connectionManager: ConnectionManager

/// HTTP multiplexer from the underlying channel handling gRPC calls.
internal func getMultiplexer() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
internal func getMultiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
return self.connectionManager.getHTTP2Multiplexer()
}

Expand Down Expand Up @@ -247,7 +250,7 @@ extension ClientConnection: GRPCChannel {
}

private static func makeStreamChannel(
using result: Result<NIOHTTP2Handler.StreamMultiplexer, Error>,
using result: Result<HTTP2StreamMultiplexer, Error>,
promise: EventLoopPromise<Channel>
) {
switch result {
Expand Down Expand Up @@ -618,31 +621,29 @@ extension ChannelPipeline.SynchronousOperations {
HTTP2Setting(parameter: .initialWindowSize, value: httpTargetWindowSize),
]

let grpcIdleHandler = GRPCIdleHandler(
// We could use 'configureHTTP2Pipeline' here, but we need to add a few handlers between the
// two HTTP/2 handlers so we'll do it manually instead.
try self.addHandler(NIOHTTP2Handler(mode: .client, initialSettings: initialSettings))

let h2Multiplexer = HTTP2StreamMultiplexer(
mode: .client,
channel: channel,
targetWindowSize: httpTargetWindowSize,
inboundStreamInitializer: nil
)

// The multiplexer is passed through the idle handler so it is only reported on
// successful channel activation - with happy eyeballs multiple pipelines can
// be constructed so it's not safe to report just yet.
try self.addHandler(GRPCIdleHandler(
connectionManager: connectionManager,
multiplexer: h2Multiplexer,
idleTimeout: connectionIdleTimeout,
keepalive: connectionKeepalive,
logger: logger
)

var connectionConfiguration = NIOHTTP2Handler.ConnectionConfiguration()
connectionConfiguration.initialSettings = initialSettings
var streamConfiguration = NIOHTTP2Handler.StreamConfiguration()
streamConfiguration.targetWindowSize = httpTargetWindowSize
let h2Handler = NIOHTTP2Handler(
mode: .client,
eventLoop: channel.eventLoop,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: grpcIdleHandler
) { channel in
channel.close()
}
try self.addHandler(h2Handler)

grpcIdleHandler.setMultiplexer(try h2Handler.syncMultiplexer())
try self.addHandler(grpcIdleHandler)
))

try self.addHandler(h2Multiplexer)
try self.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
}
}
Expand All @@ -652,13 +653,7 @@ extension Channel {
errorDelegate: ClientErrorDelegate?,
logger: Logger
) -> EventLoopFuture<Void> {
return self.configureHTTP2Pipeline(
mode: .client,
connectionConfiguration: .init(),
streamConfiguration: .init()
) { channel in
channel.eventLoop.makeSucceededVoidFuture()
}.flatMap { _ in
return self.configureHTTP2Pipeline(mode: .client, inboundStreamInitializer: nil).flatMap { _ in
self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
}
}
Expand Down
41 changes: 18 additions & 23 deletions Sources/GRPC/ConnectionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,19 @@ internal final class ConnectionManager: @unchecked Sendable {
var reconnect: Reconnect

var candidate: EventLoopFuture<Channel>
var readyChannelMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
var candidateMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
}

internal struct ConnectedState {
var backoffIterator: ConnectionBackoffIterator?
var reconnect: Reconnect
var candidate: Channel
var readyChannelMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
var multiplexer: NIOHTTP2Handler.StreamMultiplexer
var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
var multiplexer: HTTP2StreamMultiplexer
var error: Error?

init(
from state: ConnectingState,
candidate: Channel,
multiplexer: NIOHTTP2Handler.StreamMultiplexer
) {
init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
self.backoffIterator = state.backoffIterator
self.reconnect = state.reconnect
self.candidate = candidate
Expand All @@ -62,7 +58,7 @@ internal final class ConnectionManager: @unchecked Sendable {

internal struct ReadyState {
var channel: Channel
var multiplexer: NIOHTTP2Handler.StreamMultiplexer
var multiplexer: HTTP2StreamMultiplexer
var error: Error?

init(from state: ConnectedState) {
Expand All @@ -73,7 +69,7 @@ internal final class ConnectionManager: @unchecked Sendable {

internal struct TransientFailureState {
var backoffIterator: ConnectionBackoffIterator?
var readyChannelMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
var scheduled: Scheduled<Void>
var reason: Error

Expand Down Expand Up @@ -256,8 +252,8 @@ internal final class ConnectionManager: @unchecked Sendable {
}
}

/// Returns the `NIOHTTP2Handler.StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
private var multiplexer: NIOHTTP2Handler.StreamMultiplexer? {
/// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
private var multiplexer: HTTP2StreamMultiplexer? {
self.eventLoop.assertInEventLoop()
switch self.state {
case let .ready(state):
Expand Down Expand Up @@ -365,8 +361,8 @@ internal final class ConnectionManager: @unchecked Sendable {
/// Get the multiplexer from the underlying channel handling gRPC calls.
/// if the `ConnectionManager` was configured to be `fastFailure` this will have
/// one chance to connect - if not reconnections are managed here.
internal func getHTTP2Multiplexer() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
func getHTTP2Multiplexer0() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
switch self.callStartBehavior {
case .waitsForConnectivity:
return self.getHTTP2MultiplexerPatient()
Expand All @@ -386,8 +382,8 @@ internal final class ConnectionManager: @unchecked Sendable {

/// Returns a future for the multiplexer which succeeded when the channel is connected.
/// Reconnects are handled if necessary.
private func getHTTP2MultiplexerPatient() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
let multiplexer: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer>
private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>

switch self.state {
case .idle:
Expand Down Expand Up @@ -425,12 +421,11 @@ internal final class ConnectionManager: @unchecked Sendable {
/// attempt, or if the state is 'idle' returns the future for the next connection attempt.
///
/// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
private func getHTTP2MultiplexerOptimistic()
-> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
// `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
self.eventLoop.preconditionInEventLoop()

let muxFuture: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = { () in
let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
switch self.state {
case .idle:
self.startConnecting()
Expand Down Expand Up @@ -661,7 +656,7 @@ internal final class ConnectionManager: @unchecked Sendable {
}

/// The connecting channel became `active`. Must be called on the `EventLoop`.
internal func channelActive(channel: Channel, multiplexer: NIOHTTP2Handler.StreamMultiplexer) {
internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
self.eventLoop.preconditionInEventLoop()
self.logger.debug("activating connection", metadata: [
"connectivity_state": "\(self.state.label)",
Expand Down Expand Up @@ -978,7 +973,7 @@ extension ConnectionManager {

private func startConnecting(
backoffIterator: ConnectionBackoffIterator?,
muxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
) {
let timeoutAndBackoff = backoffIterator?.next()

Expand Down Expand Up @@ -1065,7 +1060,7 @@ extension ConnectionManager {

/// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
/// other state.
internal var multiplexer: NIOHTTP2Handler.StreamMultiplexer? {
internal var multiplexer: HTTP2StreamMultiplexer? {
return self.manager.multiplexer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ extension ConnectionPool {
}

@usableFromInline
var multiplexer: NIOHTTP2Handler.StreamMultiplexer
var multiplexer: HTTP2StreamMultiplexer
/// Maximum number of available streams.
@usableFromInline
var maxAvailable: Int
Expand All @@ -78,7 +78,7 @@ extension ConnectionPool {

/// Increment the reserved streams and return the multiplexer.
@usableFromInline
mutating func reserve() -> NIOHTTP2Handler.StreamMultiplexer {
mutating func reserve() -> HTTP2StreamMultiplexer {
assert(!self.isQuiescing)
self.reserved += 1
return self.multiplexer
Expand Down Expand Up @@ -132,7 +132,7 @@ extension ConnectionPool {
///
/// The result may be safely unwrapped if `self.availableStreams > 0` when reserving a stream.
@usableFromInline
internal mutating func reserveStream() -> NIOHTTP2Handler.StreamMultiplexer? {
internal mutating func reserveStream() -> HTTP2StreamMultiplexer? {
return self._availability?.reserve()
}

Expand Down
6 changes: 3 additions & 3 deletions Sources/GRPC/ConnectionPool/ConnectionPool+Waiter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extension ConnectionPool {

/// The channel initializer.
@usableFromInline
internal let _channelInitializer: @Sendable (Channel) -> EventLoopFuture<Void>
internal let _channelInitializer: (Channel) -> EventLoopFuture<Void>

/// The deadline at which the timeout is scheduled.
@usableFromInline
Expand All @@ -51,7 +51,7 @@ extension ConnectionPool {
internal init(
deadline: NIODeadline,
promise: EventLoopPromise<Channel>,
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
channelInitializer: @escaping (Channel) -> EventLoopFuture<Void>
) {
self._deadline = deadline
self._promise = promise
Expand Down Expand Up @@ -83,7 +83,7 @@ extension ConnectionPool {

/// Succeed the waiter with the given multiplexer.
@usableFromInline
internal func succeed(with multiplexer: NIOHTTP2Handler.StreamMultiplexer) {
internal func succeed(with multiplexer: HTTP2StreamMultiplexer) {
self._scheduledTimeout?.cancel()
self._scheduledTimeout = nil
multiplexer.createStreamChannel(promise: self._promise, self._channelInitializer)
Expand Down
Loading