From ac79222e366cbd225900abe691de2e51d365a5fc Mon Sep 17 00:00:00 2001 From: George Barnett Date: Wed, 4 Jan 2023 13:04:00 +0000 Subject: [PATCH] Adopt the coalescing writer for clients (#1539) Motivation: In #1357 we introduced a message frames which coalesces writes into a single buffer in a write-flush cycle to reduce the number of emitted DATA frames. This PR adopts those changes for the client. Modifications: - Adjust the client state machine to use the coalescing writer Results: Small messages are coalesced in a flush cycle within a stream. Co-authored-by: Cory Benfield --- Sources/GRPC/GRPCClientChannelHandler.swift | 80 +++++++++++++------ Sources/GRPC/GRPCClientStateMachine.swift | 49 ++++++++++-- .../GRPC/LengthPrefixedMessageWriter.swift | 1 + Sources/GRPC/ReadWriteStates.swift | 69 +++++++++------- .../GRPCClientStateMachineTests.swift | 21 ++--- 5 files changed, 150 insertions(+), 70 deletions(-) diff --git a/Sources/GRPC/GRPCClientChannelHandler.swift b/Sources/GRPC/GRPCClientChannelHandler.swift index 052ec8e5b..d8bb8d999 100644 --- a/Sources/GRPC/GRPCClientChannelHandler.swift +++ b/Sources/GRPC/GRPCClientChannelHandler.swift @@ -529,29 +529,13 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler { // Feed the request message into the state machine: let result = self.stateMachine.sendRequest( request.message, - compressed: request.compressed + compressed: request.compressed, + promise: promise ) - switch result { - case let .success((buffer, maybeBuffer)): - let frame1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))) - self.logger.trace("writing HTTP2 frame", metadata: [ - MetadataKey.h2Payload: "DATA", - MetadataKey.h2DataBytes: "\(buffer.readableBytes)", - MetadataKey.h2EndStream: "false", - ]) - // If there's a second buffer, attach the promise to the second write. - let promise1 = maybeBuffer == nil ? promise : nil - context.write(self.wrapOutboundOut(frame1), promise: promise1) - if let actuallyBuffer = maybeBuffer { - let frame2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(actuallyBuffer))) - self.logger.trace("writing HTTP2 frame", metadata: [ - MetadataKey.h2Payload: "DATA", - MetadataKey.h2DataBytes: "\(actuallyBuffer.readableBytes)", - MetadataKey.h2EndStream: "false", - ]) - context.write(self.wrapOutboundOut(frame2), promise: promise) - } + switch result { + case .success: + () case let .failure(writeError): switch writeError { @@ -572,13 +556,37 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler { } case .end: + // About to send end: write any outbound messages first. + while let (result, promise) = self.stateMachine.nextRequest() { + switch result { + case let .success(buffer): + let framePayload: HTTP2Frame.FramePayload = .data( + .init(data: .byteBuffer(buffer), endStream: false) + ) + + self.logger.trace("writing HTTP2 frame", metadata: [ + MetadataKey.h2Payload: "DATA", + MetadataKey.h2DataBytes: "\(buffer.readableBytes)", + MetadataKey.h2EndStream: "false", + ]) + context.write(self.wrapOutboundOut(framePayload), promise: promise) + + case let .failure(error): + context.fireErrorCaught(error) + promise?.fail(error) + return + } + } + // Okay: can we close the request stream? switch self.stateMachine.sendEndOfRequestStream() { case .success: // We can. Send an empty DATA frame with end-stream set. let empty = context.channel.allocator.buffer(capacity: 0) - let framePayload = HTTP2Frame.FramePayload - .data(.init(data: .byteBuffer(empty), endStream: true)) + let framePayload: HTTP2Frame.FramePayload = .data( + .init(data: .byteBuffer(empty), endStream: true) + ) + self.logger.trace("writing HTTP2 frame", metadata: [ MetadataKey.h2Payload: "DATA", MetadataKey.h2DataBytes: "0", @@ -605,4 +613,30 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler { } } } + + func flush(context: ChannelHandlerContext) { + // Drain any requests. + while let (result, promise) = self.stateMachine.nextRequest() { + switch result { + case let .success(buffer): + let framePayload: HTTP2Frame.FramePayload = .data( + .init(data: .byteBuffer(buffer), endStream: false) + ) + + self.logger.trace("writing HTTP2 frame", metadata: [ + MetadataKey.h2Payload: "DATA", + MetadataKey.h2DataBytes: "\(buffer.readableBytes)", + MetadataKey.h2EndStream: "false", + ]) + context.write(self.wrapOutboundOut(framePayload), promise: promise) + + case let .failure(error): + context.fireErrorCaught(error) + promise?.fail(error) + return + } + } + + context.flush() + } } diff --git a/Sources/GRPC/GRPCClientStateMachine.swift b/Sources/GRPC/GRPCClientStateMachine.swift index 1a8031a27..3d79ebd5a 100644 --- a/Sources/GRPC/GRPCClientStateMachine.swift +++ b/Sources/GRPC/GRPCClientStateMachine.swift @@ -196,13 +196,18 @@ struct GRPCClientStateMachine { /// request will be written. mutating func sendRequest( _ message: ByteBuffer, - compressed: Bool - ) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> { + compressed: Bool, + promise: EventLoopPromise? = nil + ) -> Result { return self.withStateAvoidingCoWs { state in - state.sendRequest(message, compressed: compressed) + state.sendRequest(message, compressed: compressed, promise: promise) } } + mutating func nextRequest() -> (Result, EventLoopPromise?)? { + return self.state.nextRequest() + } + /// Closes the request stream. /// /// The client must be streaming requests in order to terminate the request stream. Valid @@ -394,18 +399,21 @@ extension GRPCClientStateMachine.State { /// See `GRPCClientStateMachine.sendRequest(_:allocator:)`. mutating func sendRequest( _ message: ByteBuffer, - compressed: Bool - ) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> { - let result: Result<(ByteBuffer, ByteBuffer?), MessageWriteError> + compressed: Bool, + promise: EventLoopPromise? + ) -> Result { + let result: Result switch self { case .clientActiveServerIdle(var writeState, let pendingReadState): - result = writeState.write(message, compressed: compressed) + let result = writeState.write(message, compressed: compressed, promise: promise) self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState) + return result case .clientActiveServerActive(var writeState, let readState): - result = writeState.write(message, compressed: compressed) + let result = writeState.write(message, compressed: compressed, promise: promise) self = .clientActiveServerActive(writeState: writeState, readState: readState) + return result case .clientClosedServerIdle, .clientClosedServerActive, @@ -422,6 +430,31 @@ extension GRPCClientStateMachine.State { return result } + mutating func nextRequest() -> (Result, EventLoopPromise?)? { + switch self { + case .clientActiveServerIdle(var writeState, let pendingReadState): + self = .modifying + let result = writeState.next() + self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState) + return result + + case .clientActiveServerActive(var writeState, let readState): + self = .modifying + let result = writeState.next() + self = .clientActiveServerActive(writeState: writeState, readState: readState) + return result + + case .clientIdleServerIdle, + .clientClosedServerIdle, + .clientClosedServerActive, + .clientClosedServerClosed: + return nil + + case .modifying: + preconditionFailure("State left as 'modifying'") + } + } + /// See `GRPCClientStateMachine.sendEndOfRequestStream()`. mutating func sendEndOfRequestStream() -> Result { let result: Result diff --git a/Sources/GRPC/LengthPrefixedMessageWriter.swift b/Sources/GRPC/LengthPrefixedMessageWriter.swift index fc3e326a3..8fef6efd7 100644 --- a/Sources/GRPC/LengthPrefixedMessageWriter.swift +++ b/Sources/GRPC/LengthPrefixedMessageWriter.swift @@ -15,6 +15,7 @@ */ import Foundation import NIOCore +import NIOHPACK internal struct LengthPrefixedMessageWriter { static let metadataLength = 5 diff --git a/Sources/GRPC/ReadWriteStates.swift b/Sources/GRPC/ReadWriteStates.swift index cdef94f75..62456f7cf 100644 --- a/Sources/GRPC/ReadWriteStates.swift +++ b/Sources/GRPC/ReadWriteStates.swift @@ -42,50 +42,61 @@ struct PendingWriteState { compression = nil } - let writer = LengthPrefixedMessageWriter(compression: compression, allocator: allocator) - return .writing(self.arity, self.contentType, writer) + let writer = CoalescingLengthPrefixedMessageWriter( + compression: compression, + allocator: allocator + ) + return .init(arity: self.arity, contentType: self.contentType, writer: writer) } } /// The write state of a stream. -enum WriteState { - /// Writing may be attempted using the given writer. - case writing(MessageArity, ContentType, LengthPrefixedMessageWriter) - - /// Writing may not be attempted: either a write previously failed or it is not valid for any - /// more messages to be written. - case notWriting +struct WriteState { + private var arity: MessageArity + private var contentType: ContentType + private var writer: CoalescingLengthPrefixedMessageWriter + private var canWrite: Bool + + init( + arity: MessageArity, + contentType: ContentType, + writer: CoalescingLengthPrefixedMessageWriter + ) { + self.arity = arity + self.contentType = contentType + self.writer = writer + self.canWrite = true + } /// Writes a message into a buffer using the `writer`. /// /// - Parameter message: The `Message` to write. mutating func write( _ message: ByteBuffer, - compressed: Bool - ) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> { - switch self { - case .notWriting: + compressed: Bool, + promise: EventLoopPromise? + ) -> Result { + guard self.canWrite else { return .failure(.cardinalityViolation) + } - case .writing(let writeArity, let contentType, var writer): - self = .notWriting - let buffers: (ByteBuffer, ByteBuffer?) + self.writer.append(buffer: message, compress: compressed, promise: promise) - do { - buffers = try writer.write(buffer: message, compressed: compressed) - } catch { - self = .notWriting - return .failure(.serializationFailed) - } + switch self.arity { + case .one: + self.canWrite = false + case .many: + () + } - // If we only expect to write one message then we're no longer writable. - if case .one = writeArity { - self = .notWriting - } else { - self = .writing(writeArity, contentType, writer) - } + return .success(()) + } - return .success(buffers) + mutating func next() -> (Result, EventLoopPromise?)? { + if let next = self.writer.next() { + return (next.0.mapError { _ in .serializationFailed }, next.1) + } else { + return nil } } } diff --git a/Tests/GRPCTests/GRPCClientStateMachineTests.swift b/Tests/GRPCTests/GRPCClientStateMachineTests.swift index dc31e25cf..2357ca1a8 100644 --- a/Tests/GRPCTests/GRPCClientStateMachineTests.swift +++ b/Tests/GRPCTests/GRPCClientStateMachineTests.swift @@ -154,14 +154,7 @@ extension GRPCClientStateMachineTests { stateMachine.sendRequest( ByteBuffer(string: request), compressed: false - ).assertSuccess { buffers in - var buffer = buffers.0 - XCTAssertNil(buffers.1) - // Remove the length and compression flag prefix. - buffer.moveReaderIndex(forwardBy: 5) - let data = buffer.readString(length: buffer.readableBytes)! - XCTAssertEqual(request, data) - } + ).assertSuccess() } func testSendRequestFromIdle() { @@ -1299,10 +1292,18 @@ extension PendingWriteState { extension WriteState { static func one() -> WriteState { - return .writing(.one, .protobuf, LengthPrefixedMessageWriter(compression: .none)) + return .init( + arity: .one, + contentType: .protobuf, + writer: .init(compression: .none, allocator: .init()) + ) } static func many() -> WriteState { - return .writing(.many, .protobuf, LengthPrefixedMessageWriter(compression: .none)) + return .init( + arity: .many, + contentType: .protobuf, + writer: .init(compression: .none, allocator: .init()) + ) } }