Skip to content

Commit

Permalink
Adopt the coalescing writer for clients (#1539)
Browse files Browse the repository at this point in the history
Motivation:

In #1357 we introduced a message frames which coalesces writes into a
single buffer in a write-flush cycle to reduce the number of emitted
DATA frames. This PR adopts those changes for the client.

Modifications:

- Adjust the client state machine to use the coalescing writer

Results:

Small messages are coalesced in a flush cycle within a stream.

Co-authored-by: Cory Benfield <[email protected]>
  • Loading branch information
glbrntt and Lukasa authored Jan 4, 2023
1 parent ff24d24 commit 254ea13
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 70 deletions.
80 changes: 57 additions & 23 deletions Sources/GRPC/GRPCClientChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -529,29 +529,13 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
// Feed the request message into the state machine:
let result = self.stateMachine.sendRequest(
request.message,
compressed: request.compressed
compressed: request.compressed,
promise: promise
)
switch result {
case let .success((buffer, maybeBuffer)):
let frame1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
self.logger.trace("writing HTTP2 frame", metadata: [
MetadataKey.h2Payload: "DATA",
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
MetadataKey.h2EndStream: "false",
])
// If there's a second buffer, attach the promise to the second write.
let promise1 = maybeBuffer == nil ? promise : nil
context.write(self.wrapOutboundOut(frame1), promise: promise1)

if let actuallyBuffer = maybeBuffer {
let frame2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(actuallyBuffer)))
self.logger.trace("writing HTTP2 frame", metadata: [
MetadataKey.h2Payload: "DATA",
MetadataKey.h2DataBytes: "\(actuallyBuffer.readableBytes)",
MetadataKey.h2EndStream: "false",
])
context.write(self.wrapOutboundOut(frame2), promise: promise)
}
switch result {
case .success:
()

case let .failure(writeError):
switch writeError {
Expand All @@ -572,13 +556,37 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
}

case .end:
// About to send end: write any outbound messages first.
while let (result, promise) = self.stateMachine.nextRequest() {
switch result {
case let .success(buffer):
let framePayload: HTTP2Frame.FramePayload = .data(
.init(data: .byteBuffer(buffer), endStream: false)
)

self.logger.trace("writing HTTP2 frame", metadata: [
MetadataKey.h2Payload: "DATA",
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
MetadataKey.h2EndStream: "false",
])
context.write(self.wrapOutboundOut(framePayload), promise: promise)

case let .failure(error):
context.fireErrorCaught(error)
promise?.fail(error)
return
}
}

// Okay: can we close the request stream?
switch self.stateMachine.sendEndOfRequestStream() {
case .success:
// We can. Send an empty DATA frame with end-stream set.
let empty = context.channel.allocator.buffer(capacity: 0)
let framePayload = HTTP2Frame.FramePayload
.data(.init(data: .byteBuffer(empty), endStream: true))
let framePayload: HTTP2Frame.FramePayload = .data(
.init(data: .byteBuffer(empty), endStream: true)
)

self.logger.trace("writing HTTP2 frame", metadata: [
MetadataKey.h2Payload: "DATA",
MetadataKey.h2DataBytes: "0",
Expand All @@ -605,4 +613,30 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
}
}
}

func flush(context: ChannelHandlerContext) {
// Drain any requests.
while let (result, promise) = self.stateMachine.nextRequest() {
switch result {
case let .success(buffer):
let framePayload: HTTP2Frame.FramePayload = .data(
.init(data: .byteBuffer(buffer), endStream: false)
)

self.logger.trace("writing HTTP2 frame", metadata: [
MetadataKey.h2Payload: "DATA",
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
MetadataKey.h2EndStream: "false",
])
context.write(self.wrapOutboundOut(framePayload), promise: promise)

case let .failure(error):
context.fireErrorCaught(error)
promise?.fail(error)
return
}
}

context.flush()
}
}
49 changes: 41 additions & 8 deletions Sources/GRPC/GRPCClientStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,18 @@ struct GRPCClientStateMachine {
/// request will be written.
mutating func sendRequest(
_ message: ByteBuffer,
compressed: Bool
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
compressed: Bool,
promise: EventLoopPromise<Void>? = nil
) -> Result<Void, MessageWriteError> {
return self.withStateAvoidingCoWs { state in
state.sendRequest(message, compressed: compressed)
state.sendRequest(message, compressed: compressed, promise: promise)
}
}

mutating func nextRequest() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
return self.state.nextRequest()
}

/// Closes the request stream.
///
/// The client must be streaming requests in order to terminate the request stream. Valid
Expand Down Expand Up @@ -394,18 +399,21 @@ extension GRPCClientStateMachine.State {
/// See `GRPCClientStateMachine.sendRequest(_:allocator:)`.
mutating func sendRequest(
_ message: ByteBuffer,
compressed: Bool
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
let result: Result<(ByteBuffer, ByteBuffer?), MessageWriteError>
compressed: Bool,
promise: EventLoopPromise<Void>?
) -> Result<Void, MessageWriteError> {
let result: Result<Void, MessageWriteError>

switch self {
case .clientActiveServerIdle(var writeState, let pendingReadState):
result = writeState.write(message, compressed: compressed)
let result = writeState.write(message, compressed: compressed, promise: promise)
self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
return result

case .clientActiveServerActive(var writeState, let readState):
result = writeState.write(message, compressed: compressed)
let result = writeState.write(message, compressed: compressed, promise: promise)
self = .clientActiveServerActive(writeState: writeState, readState: readState)
return result

case .clientClosedServerIdle,
.clientClosedServerActive,
Expand All @@ -422,6 +430,31 @@ extension GRPCClientStateMachine.State {
return result
}

mutating func nextRequest() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
switch self {
case .clientActiveServerIdle(var writeState, let pendingReadState):
self = .modifying
let result = writeState.next()
self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
return result

case .clientActiveServerActive(var writeState, let readState):
self = .modifying
let result = writeState.next()
self = .clientActiveServerActive(writeState: writeState, readState: readState)
return result

case .clientIdleServerIdle,
.clientClosedServerIdle,
.clientClosedServerActive,
.clientClosedServerClosed:
return nil

case .modifying:
preconditionFailure("State left as 'modifying'")
}
}

/// See `GRPCClientStateMachine.sendEndOfRequestStream()`.
mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
let result: Result<Void, SendEndOfRequestStreamError>
Expand Down
1 change: 1 addition & 0 deletions Sources/GRPC/LengthPrefixedMessageWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
import Foundation
import NIOCore
import NIOHPACK

internal struct LengthPrefixedMessageWriter {
static let metadataLength = 5
Expand Down
69 changes: 40 additions & 29 deletions Sources/GRPC/ReadWriteStates.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,50 +42,61 @@ struct PendingWriteState {
compression = nil
}

let writer = LengthPrefixedMessageWriter(compression: compression, allocator: allocator)
return .writing(self.arity, self.contentType, writer)
let writer = CoalescingLengthPrefixedMessageWriter(
compression: compression,
allocator: allocator
)
return .init(arity: self.arity, contentType: self.contentType, writer: writer)
}
}

/// The write state of a stream.
enum WriteState {
/// Writing may be attempted using the given writer.
case writing(MessageArity, ContentType, LengthPrefixedMessageWriter)

/// Writing may not be attempted: either a write previously failed or it is not valid for any
/// more messages to be written.
case notWriting
struct WriteState {
private var arity: MessageArity
private var contentType: ContentType
private var writer: CoalescingLengthPrefixedMessageWriter
private var canWrite: Bool

init(
arity: MessageArity,
contentType: ContentType,
writer: CoalescingLengthPrefixedMessageWriter
) {
self.arity = arity
self.contentType = contentType
self.writer = writer
self.canWrite = true
}

/// Writes a message into a buffer using the `writer`.
///
/// - Parameter message: The `Message` to write.
mutating func write(
_ message: ByteBuffer,
compressed: Bool
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
switch self {
case .notWriting:
compressed: Bool,
promise: EventLoopPromise<Void>?
) -> Result<Void, MessageWriteError> {
guard self.canWrite else {
return .failure(.cardinalityViolation)
}

case .writing(let writeArity, let contentType, var writer):
self = .notWriting
let buffers: (ByteBuffer, ByteBuffer?)
self.writer.append(buffer: message, compress: compressed, promise: promise)

do {
buffers = try writer.write(buffer: message, compressed: compressed)
} catch {
self = .notWriting
return .failure(.serializationFailed)
}
switch self.arity {
case .one:
self.canWrite = false
case .many:
()
}

// If we only expect to write one message then we're no longer writable.
if case .one = writeArity {
self = .notWriting
} else {
self = .writing(writeArity, contentType, writer)
}
return .success(())
}

return .success(buffers)
mutating func next() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
if let next = self.writer.next() {
return (next.0.mapError { _ in .serializationFailed }, next.1)
} else {
return nil
}
}
}
Expand Down
21 changes: 11 additions & 10 deletions Tests/GRPCTests/GRPCClientStateMachineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,7 @@ extension GRPCClientStateMachineTests {
stateMachine.sendRequest(
ByteBuffer(string: request),
compressed: false
).assertSuccess { buffers in
var buffer = buffers.0
XCTAssertNil(buffers.1)
// Remove the length and compression flag prefix.
buffer.moveReaderIndex(forwardBy: 5)
let data = buffer.readString(length: buffer.readableBytes)!
XCTAssertEqual(request, data)
}
).assertSuccess()
}

func testSendRequestFromIdle() {
Expand Down Expand Up @@ -1299,10 +1292,18 @@ extension PendingWriteState {

extension WriteState {
static func one() -> WriteState {
return .writing(.one, .protobuf, LengthPrefixedMessageWriter(compression: .none))
return .init(
arity: .one,
contentType: .protobuf,
writer: .init(compression: .none, allocator: .init())
)
}

static func many() -> WriteState {
return .writing(.many, .protobuf, LengthPrefixedMessageWriter(compression: .none))
return .init(
arity: .many,
contentType: .protobuf,
writer: .init(compression: .none, allocator: .init())
)
}
}

0 comments on commit 254ea13

Please sign in to comment.