Skip to content

Commit

Permalink
Adopt h2handler multiplexer (grpc#1587)
Browse files Browse the repository at this point in the history
Motivation:

Switch from `HTTP2StreamMultiplexer` to `HTTP2Handler.StreamMultiplexer`
to benefit from improved performance due to reduced allocations.

Modifications:

- Replace all references to  `HTTP2StreamMultiplexer` with `HTTP2Handler.StreamMultiplexer`.
- `GRPCIdleHandler` is now a `NIOHTTP2StreamDelegate` to allow it to
  receive notifications when a stream is created or closed rather than
  using `UserInboundEvent`s.
- `GRPCIdleHandler` now has two states of configuration in the client
  case. This is required to break a cycle of dependencies which would
  otherwise exist at `init` because the `GRPCIdleHandler` holds a reference
  to the `HTTP2Handler.StreamMultiplexer` (which is a member of the
  `HTTP2Handler` itself) and the `HTTP2Handler` holds a reference back to the
  `GRPCIdleHandler` as its stream delegate.
- Several tests now insert `HTTP2Handler` into the pipeline where they
  used to use `HTTP2StreamMultiplexer`. This causes a few changes in
  behavior often around different assertions. This required some small
  supporting changes.
- Introduce test infrastructure `AsyncEventStreamConnectionPoolDelegate`
  to eliminate observed racey behavior in `EventRecordingConnectionPoolDelegate`.

Result:

Performance increase without Overall behavior changes.
  • Loading branch information
rnro authored Apr 18, 2023
1 parent 87fa9ed commit 75b390e
Show file tree
Hide file tree
Showing 18 changed files with 867 additions and 484 deletions.
40 changes: 20 additions & 20 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,44 +57,44 @@ jobs:
include:
- image: swiftlang/swift:nightly-focal
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 347000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 167000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 246000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 138000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 109000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 64000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 60000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 169000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 176000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 176000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 129000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 136000
- image: swift:5.8-jammy
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 347000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 167000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 246000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 138000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 109000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 64000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 60000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 169000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 176000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 176000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 129000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 136000
- image: swift:5.7-jammy
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 347000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 167000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 246000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 138000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 109000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 64000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 60000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 169000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 176000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 176000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 129000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 136000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 136000
- image: swift:5.6-focal
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 348000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 168000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 247000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 139000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 109000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 64000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 60000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 170000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 177000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 177000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 130000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 137000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 137000

name: Performance Tests on ${{ matrix.image }}
runs-on: ubuntu-latest
Expand Down
8 changes: 8 additions & 0 deletions NOTICES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,11 @@ framework: 'test_01_allocation_counts.sh', 'run-nio-alloc-counter-tests.sh' and
* https://github.com/apple/swift-nio/blob/main/LICENSE.txt
* HOMEPAGE:
* https://github.com/apple/swift-nio

This product contains a simplified derivation of SwiftNIO HTTP/2's
'HTTP2FrameEncoder' for testing purposes.

* LICENSE (Apache License 2.0):
* https://github.com/apple/swift-nio-http2/blob/main/LICENSE.txt
* HOMEPAGE:
* https://github.com/apple/swift-nio-http2
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ let packageDependencies: [Package.Dependency] = [
),
.package(
url: "https://github.com/apple/swift-nio-http2.git",
from: "1.24.1"
from: "1.26.0"
),
.package(
url: "https://github.com/apple/swift-nio-transport-services.git",
Expand Down
91 changes: 48 additions & 43 deletions Sources/GRPC/ClientConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,25 @@ import SwiftProtobuf
/// │ DelegatingErrorHandler │
/// └──────────▲───────────────┘
/// HTTP2Frame│
/// │ ⠇ ⠇ ⠇ ⠇
/// │ ┌┴─▼┐ ┌┴─▼┐
/// │ │ | │ | HTTP/2 streams
/// │ └▲─┬┘ └▲─┬┘
/// │ │ │ │ │ HTTP2Frame
/// ┌─┴────────────────┴─▼───┴─▼┐
/// │ HTTP2StreamMultiplexer |
/// └─▲───────────────────────┬─┘
/// HTTP2Frame│ │HTTP2Frame
/// ┌─┴───────────────────────▼─┐
/// │ GRPCIdleHandler │
/// └─▲───────────────────────┬─┘
/// HTTP2Frame│ │HTTP2Frame
/// ┌─┴───────────────────────▼─┐
/// │ NIOHTTP2Handler │
/// └─▲───────────────────────┬─┘
/// ByteBuffer│ │ByteBuffer
/// ┌─┴───────────────────────▼─┐
/// │ NIOSSLHandler │
/// └─▲───────────────────────┬─┘
/// ByteBuffer│ │ByteBuffer
/// │ ▼
/// │
/// │
/// │
/// │
/// │
/// HTTP2Frame│ ⠇ ⠇ ⠇ ⠇ ⠇
/// ┌─┴──────────────────▼─┐ ┌┴─▼┐ ┌┴─▼┐
/// │ GRPCIdleHandler │ │ | │ | HTTP/2 streams
/// └─▲──────────────────┬─┘ └▲─┬┘ └▲─┬┘
/// HTTP2Frame│ │ │ │ │ │ HTTP2Frame
/// ┌─┴──────────────────▼────────┴─▼───┴─▼┐
/// │ NIOHTTP2Handler │
/// └─▲──────────────────────────────────┬─┘
/// ByteBuffer│ │ByteBuffer
/// ┌─┴──────────────────────────────────▼─┐
/// │ NIOSSLHandler │
/// └─▲──────────────────────────────────┬─┘
/// ByteBuffer│ │ByteBuffer
/// │ ▼
///
/// The 'GRPCIdleHandler' intercepts HTTP/2 frames and various events and is responsible for
/// informing and controlling the state of the connection (idling and keepalive). The HTTP/2 streams
Expand All @@ -83,7 +80,7 @@ public final class ClientConnection: Sendable {
private let connectionManager: ConnectionManager

/// HTTP multiplexer from the underlying channel handling gRPC calls.
internal func getMultiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
internal func getMultiplexer() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
return self.connectionManager.getHTTP2Multiplexer()
}

Expand Down Expand Up @@ -245,7 +242,7 @@ extension ClientConnection: GRPCChannel {
}

private static func makeStreamChannel(
using result: Result<HTTP2StreamMultiplexer, Error>,
using result: Result<NIOHTTP2Handler.StreamMultiplexer, Error>,
promise: EventLoopPromise<Channel>
) {
switch result {
Expand Down Expand Up @@ -606,29 +603,31 @@ extension ChannelPipeline.SynchronousOperations {
HTTP2Setting(parameter: .initialWindowSize, value: httpTargetWindowSize),
]

// We could use 'configureHTTP2Pipeline' here, but we need to add a few handlers between the
// two HTTP/2 handlers so we'll do it manually instead.
try self.addHandler(NIOHTTP2Handler(mode: .client, initialSettings: initialSettings))

let h2Multiplexer = HTTP2StreamMultiplexer(
mode: .client,
channel: channel,
targetWindowSize: httpTargetWindowSize,
inboundStreamInitializer: nil
)

// The multiplexer is passed through the idle handler so it is only reported on
// successful channel activation - with happy eyeballs multiple pipelines can
// be constructed so it's not safe to report just yet.
try self.addHandler(GRPCIdleHandler(
let grpcIdleHandler = GRPCIdleHandler(
connectionManager: connectionManager,
multiplexer: h2Multiplexer,
idleTimeout: connectionIdleTimeout,
keepalive: connectionKeepalive,
logger: logger
))
)

var connectionConfiguration = NIOHTTP2Handler.ConnectionConfiguration()
connectionConfiguration.initialSettings = initialSettings
var streamConfiguration = NIOHTTP2Handler.StreamConfiguration()
streamConfiguration.targetWindowSize = httpTargetWindowSize
let h2Handler = NIOHTTP2Handler(
mode: .client,
eventLoop: channel.eventLoop,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: grpcIdleHandler
) { channel in
channel.close()
}
try self.addHandler(h2Handler)

grpcIdleHandler.setMultiplexer(try h2Handler.syncMultiplexer())
try self.addHandler(grpcIdleHandler)

try self.addHandler(h2Multiplexer)
try self.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
}
}
Expand All @@ -638,7 +637,13 @@ extension Channel {
errorDelegate: ClientErrorDelegate?,
logger: Logger
) -> EventLoopFuture<Void> {
return self.configureHTTP2Pipeline(mode: .client, inboundStreamInitializer: nil).flatMap { _ in
return self.configureHTTP2Pipeline(
mode: .client,
connectionConfiguration: .init(),
streamConfiguration: .init()
) { channel in
channel.eventLoop.makeSucceededVoidFuture()
}.flatMap { _ in
self.pipeline.addHandler(DelegatingErrorHandler(logger: logger, delegate: errorDelegate))
}
}
Expand Down
41 changes: 23 additions & 18 deletions Sources/GRPC/ConnectionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,23 @@ internal final class ConnectionManager: @unchecked Sendable {
var reconnect: Reconnect

var candidate: EventLoopFuture<Channel>
var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
var candidateMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
var readyChannelMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
var candidateMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
}

internal struct ConnectedState {
var backoffIterator: ConnectionBackoffIterator?
var reconnect: Reconnect
var candidate: Channel
var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
var multiplexer: HTTP2StreamMultiplexer
var readyChannelMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
var multiplexer: NIOHTTP2Handler.StreamMultiplexer
var error: Error?

init(from state: ConnectingState, candidate: Channel, multiplexer: HTTP2StreamMultiplexer) {
init(
from state: ConnectingState,
candidate: Channel,
multiplexer: NIOHTTP2Handler.StreamMultiplexer
) {
self.backoffIterator = state.backoffIterator
self.reconnect = state.reconnect
self.candidate = candidate
Expand All @@ -58,7 +62,7 @@ internal final class ConnectionManager: @unchecked Sendable {

internal struct ReadyState {
var channel: Channel
var multiplexer: HTTP2StreamMultiplexer
var multiplexer: NIOHTTP2Handler.StreamMultiplexer
var error: Error?

init(from state: ConnectedState) {
Expand All @@ -69,7 +73,7 @@ internal final class ConnectionManager: @unchecked Sendable {

internal struct TransientFailureState {
var backoffIterator: ConnectionBackoffIterator?
var readyChannelMuxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
var readyChannelMuxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
var scheduled: Scheduled<Void>
var reason: Error

Expand Down Expand Up @@ -252,8 +256,8 @@ internal final class ConnectionManager: @unchecked Sendable {
}
}

/// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
private var multiplexer: HTTP2StreamMultiplexer? {
/// Returns the `NIOHTTP2Handler.StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
private var multiplexer: NIOHTTP2Handler.StreamMultiplexer? {
self.eventLoop.assertInEventLoop()
switch self.state {
case let .ready(state):
Expand Down Expand Up @@ -361,8 +365,8 @@ internal final class ConnectionManager: @unchecked Sendable {
/// Get the multiplexer from the underlying channel handling gRPC calls.
/// if the `ConnectionManager` was configured to be `fastFailure` this will have
/// one chance to connect - if not reconnections are managed here.
internal func getHTTP2Multiplexer() -> EventLoopFuture<HTTP2StreamMultiplexer> {
func getHTTP2Multiplexer0() -> EventLoopFuture<HTTP2StreamMultiplexer> {
internal func getHTTP2Multiplexer() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
func getHTTP2Multiplexer0() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
switch self.callStartBehavior {
case .waitsForConnectivity:
return self.getHTTP2MultiplexerPatient()
Expand All @@ -382,8 +386,8 @@ internal final class ConnectionManager: @unchecked Sendable {

/// Returns a future for the multiplexer which succeeded when the channel is connected.
/// Reconnects are handled if necessary.
private func getHTTP2MultiplexerPatient() -> EventLoopFuture<HTTP2StreamMultiplexer> {
let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
private func getHTTP2MultiplexerPatient() -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
let multiplexer: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer>

switch self.state {
case .idle:
Expand Down Expand Up @@ -421,11 +425,12 @@ internal final class ConnectionManager: @unchecked Sendable {
/// attempt, or if the state is 'idle' returns the future for the next connection attempt.
///
/// Note: if the state is 'transientFailure' or 'shutdown' then a failed future will be returned.
private func getHTTP2MultiplexerOptimistic() -> EventLoopFuture<HTTP2StreamMultiplexer> {
private func getHTTP2MultiplexerOptimistic()
-> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
// `getHTTP2Multiplexer` makes sure we're on the event loop but let's just be sure.
self.eventLoop.preconditionInEventLoop()

let muxFuture: EventLoopFuture<HTTP2StreamMultiplexer> = { () in
let muxFuture: EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> = { () in
switch self.state {
case .idle:
self.startConnecting()
Expand Down Expand Up @@ -656,7 +661,7 @@ internal final class ConnectionManager: @unchecked Sendable {
}

/// The connecting channel became `active`. Must be called on the `EventLoop`.
internal func channelActive(channel: Channel, multiplexer: HTTP2StreamMultiplexer) {
internal func channelActive(channel: Channel, multiplexer: NIOHTTP2Handler.StreamMultiplexer) {
self.eventLoop.preconditionInEventLoop()
self.logger.debug("activating connection", metadata: [
"connectivity_state": "\(self.state.label)",
Expand Down Expand Up @@ -973,7 +978,7 @@ extension ConnectionManager {

private func startConnecting(
backoffIterator: ConnectionBackoffIterator?,
muxPromise: EventLoopPromise<HTTP2StreamMultiplexer>
muxPromise: EventLoopPromise<NIOHTTP2Handler.StreamMultiplexer>
) {
let timeoutAndBackoff = backoffIterator?.next()

Expand Down Expand Up @@ -1060,7 +1065,7 @@ extension ConnectionManager {

/// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
/// other state.
internal var multiplexer: HTTP2StreamMultiplexer? {
internal var multiplexer: NIOHTTP2Handler.StreamMultiplexer? {
return self.manager.multiplexer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ extension ConnectionPool {
}

@usableFromInline
var multiplexer: HTTP2StreamMultiplexer
var multiplexer: NIOHTTP2Handler.StreamMultiplexer
/// Maximum number of available streams.
@usableFromInline
var maxAvailable: Int
Expand All @@ -73,7 +73,7 @@ extension ConnectionPool {

/// Increment the reserved streams and return the multiplexer.
@usableFromInline
mutating func reserve() -> HTTP2StreamMultiplexer {
mutating func reserve() -> NIOHTTP2Handler.StreamMultiplexer {
assert(!self.isQuiescing)
self.reserved += 1
return self.multiplexer
Expand Down Expand Up @@ -127,7 +127,7 @@ extension ConnectionPool {
///
/// The result may be safely unwrapped if `self.availableStreams > 0` when reserving a stream.
@usableFromInline
internal mutating func reserveStream() -> HTTP2StreamMultiplexer? {
internal mutating func reserveStream() -> NIOHTTP2Handler.StreamMultiplexer? {
return self._availability?.reserve()
}

Expand Down
6 changes: 3 additions & 3 deletions Sources/GRPC/ConnectionPool/ConnectionPool+Waiter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extension ConnectionPool {

/// The channel initializer.
@usableFromInline
internal let _channelInitializer: (Channel) -> EventLoopFuture<Void>
internal let _channelInitializer: @Sendable (Channel) -> EventLoopFuture<Void>

/// The deadline at which the timeout is scheduled.
@usableFromInline
Expand All @@ -51,7 +51,7 @@ extension ConnectionPool {
internal init(
deadline: NIODeadline,
promise: EventLoopPromise<Channel>,
channelInitializer: @escaping (Channel) -> EventLoopFuture<Void>
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
) {
self._deadline = deadline
self._promise = promise
Expand Down Expand Up @@ -83,7 +83,7 @@ extension ConnectionPool {

/// Succeed the waiter with the given multiplexer.
@usableFromInline
internal func succeed(with multiplexer: HTTP2StreamMultiplexer) {
internal func succeed(with multiplexer: NIOHTTP2Handler.StreamMultiplexer) {
self._scheduledTimeout?.cancel()
self._scheduledTimeout = nil
multiplexer.createStreamChannel(promise: self._promise, self._channelInitializer)
Expand Down
Loading

0 comments on commit 75b390e

Please sign in to comment.