Skip to content

Commit

Permalink
Adopt NIOAsyncSequenceProducer in grpc-swift (grpc#1477)
Browse files Browse the repository at this point in the history
We want to make use of `NIOAsyncSequenceProducer` (or its throwing counterpart) instead of using `PassthroughMessageSequence` and `PassthroughMessageSource`.

* Replaced usages of `PassthroughMessageSequence` and `PassthroughMessageSource` with `NIOThrowingAsyncSequenceProducer`

grpc-swift now uses `NIOAsyncSequenceProducer`
  • Loading branch information
gjcairo authored and pinlin168 committed Aug 24, 2023
1 parent f6eff2c commit 56a742b
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 484 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
*/
#if compiler(>=5.6)

import NIOCore
import NIOHPACK

/// Async-await variant of ``BidirectionalStreamingCall``.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: Sendable>: Sendable {
private let call: Call<Request, Response>
private let responseParts: StreamingResponseParts<Response>
private let responseSource: PassthroughMessageSource<Response, Error>
private let responseSource: NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source

/// A request stream writer for sending messages to the server.
public let requestStream: GRPCAsyncRequestStreamWriter<Request>
Expand Down Expand Up @@ -80,8 +86,17 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
private init(call: Call<Request, Response>) {
self.call = call
self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
self.responseSource = PassthroughMessageSource<Response, Error>()
self.responseStream = .init(PassthroughMessageSequence(consuming: self.responseSource))
let sequence = NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.makeSequence(
backPressureStrategy: .init(lowWatermark: 10, highWatermark: 50),
delegate: GRPCAsyncSequenceProducerDelegate()
)
self.responseSource = sequence.source
self.responseStream = .init(sequence.sequence)
self.requestStream = call.makeRequestStreamWriter()
}

Expand All @@ -96,7 +111,7 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
},
onError: { error in
asyncCall.responseParts.handleError(error)
asyncCall.responseSource.finish(throwing: error)
asyncCall.responseSource.finish(error)
asyncCall.requestStream.asyncWriter.cancelAsynchronously(withError: error)
},
onResponsePart: AsyncCall.makeResponsePartHandler(
Expand All @@ -114,7 +129,12 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
internal enum AsyncCall {
internal static func makeResponsePartHandler<Response, Request>(
responseParts: StreamingResponseParts<Response>,
responseSource: PassthroughMessageSource<Response, Error>,
responseSource: NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source,
requestStream: GRPCAsyncRequestStreamWriter<Request>?,
requestType: Request.Type = Request.self
) -> (GRPCClientResponsePart<Response>) -> Void {
Expand All @@ -135,7 +155,7 @@ internal enum AsyncCall {
if status.isOk {
responseSource.finish()
} else {
responseSource.finish(throwing: status)
responseSource.finish(status)
}

requestStream?.asyncWriter.cancelAsynchronously(withError: status)
Expand Down
33 changes: 25 additions & 8 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

#if compiler(>=5.6)
import NIOCore

/// A type for the stream of request messages send to a gRPC server method.
///
/// To enable testability this type provides a static ``GRPCAsyncRequestStream/makeTestingRequestStream()``
Expand Down Expand Up @@ -77,16 +79,26 @@ public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {

@usableFromInline
enum Backing: Sendable {
case passthroughMessageSequence(PassthroughMessageSequence<Element, Error>)
case asyncStream(AsyncThrowingStream<Element, Error>)
case nioThrowingAsyncSequence(NIOThrowingAsyncSequenceProducer<
Element,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>)
}

@usableFromInline
internal let backing: Backing

@inlinable
internal init(_ sequence: PassthroughMessageSequence<Element, Error>) {
self.backing = .passthroughMessageSequence(sequence)
internal init(_ sequence: NIOThrowingAsyncSequenceProducer<
Element,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>) {
self.backing = .nioThrowingAsyncSequence(sequence)
}

@inlinable
Expand All @@ -112,18 +124,23 @@ public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
@inlinable
public func makeAsyncIterator() -> Iterator {
switch self.backing {
case let .passthroughMessageSequence(sequence):
return Self.AsyncIterator(.passthroughMessageSequence(sequence.makeAsyncIterator()))
case let .asyncStream(stream):
return Self.AsyncIterator(.asyncStream(stream.makeAsyncIterator()))
case let .nioThrowingAsyncSequence(sequence):
return Self.AsyncIterator(.nioThrowingAsyncSequence(sequence.makeAsyncIterator()))
}
}

public struct Iterator: AsyncIteratorProtocol {
@usableFromInline
enum BackingIterator {
case passthroughMessageSequence(PassthroughMessageSequence<Element, Error>.Iterator)
case asyncStream(AsyncThrowingStream<Element, Error>.Iterator)
case nioThrowingAsyncSequence(NIOThrowingAsyncSequenceProducer<
Element,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.AsyncIterator)
}

@usableFromInline
Expand All @@ -137,12 +154,12 @@ public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
@inlinable
public mutating func next() async throws -> Element? {
switch self.iterator {
case let .passthroughMessageSequence(iterator):
return try await iterator.next()
case var .asyncStream(iterator):
let element = try await iterator.next()
self.iterator = .asyncStream(iterator)
return element
case let .nioThrowingAsyncSequence(iterator):
return try await iterator.next()
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
*/
#if compiler(>=5.6)

import NIOCore

/// This is currently a wrapper around AsyncThrowingStream because we want to be
/// able to swap out the implementation for something else in the future.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncResponseStream<Element: Sendable>: AsyncSequence {
@usableFromInline
internal typealias WrappedStream = PassthroughMessageSequence<Element, Error>
internal typealias WrappedStream = NIOThrowingAsyncSequenceProducer<
Element,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>

@usableFromInline
internal let stream: WrappedStream
Expand Down Expand Up @@ -52,7 +59,5 @@ public struct GRPCAsyncResponseStream<Element: Sendable>: AsyncSequence {

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension GRPCAsyncResponseStream: Sendable {}
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension GRPCAsyncResponseStream.Iterator: Sendable {}

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2022, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if compiler(>=5.6)

import NIOCore

@usableFromInline
internal struct GRPCAsyncSequenceProducerDelegate: NIOAsyncSequenceProducerDelegate {
@inlinable
internal init() {}

// TODO: this method will have to be implemented when we add support for backpressure.
@inlinable
internal func produceMore() {}

// TODO: this method will have to be implemented when we add support for backpressure.
@inlinable
internal func didTerminate() {}
}

#endif
43 changes: 33 additions & 10 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ internal final class AsyncServerHandler<
GRPCAsyncServerCallContext
) async throws -> Void

@usableFromInline
internal typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<
Request,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>

@inlinable
internal init(
context: CallHandlerContext,
Expand Down Expand Up @@ -422,7 +430,10 @@ internal final class AsyncServerHandler<
contextProvider: self
)

let requestSource = PassthroughMessageSource<Request, Error>()
let sequenceProducer = AsyncSequenceProducer.makeSequence(
backPressureStrategy: .init(lowWatermark: 10, highWatermark: 50),
delegate: GRPCAsyncSequenceProducerDelegate()
)

let writerDelegate = AsyncResponseStreamWriterDelegate(
send: self.interceptResponseMessage(_:compression:),
Expand All @@ -448,12 +459,13 @@ internal final class AsyncServerHandler<
// Update our state before invoke the handler.
self.handlerStateMachine.handlerInvoked(requestHeaders: headers)
self.handlerComponents = ServerHandlerComponents(
requestSource: requestSource,
requestSource: sequenceProducer.source,
responseWriter: writer,
task: promise.completeWithTask {
// We don't have a task cancellation handler here: we do it in `self.cancel()`.
try await self.invokeUserHandler(
requestStreamSource: requestSource,
sequence: sequenceProducer.sequence,
sequenceSource: sequenceProducer.source,
responseStreamWriter: writer,
callContext: handlerContext
)
Expand All @@ -468,18 +480,19 @@ internal final class AsyncServerHandler<
@Sendable
@usableFromInline
internal func invokeUserHandler(
requestStreamSource: PassthroughMessageSource<Request, Error>,
sequence: AsyncSequenceProducer,
sequenceSource: AsyncSequenceProducer.Source,
responseStreamWriter: AsyncWriter<AsyncResponseStreamWriterDelegate<Response>>,
callContext: GRPCAsyncServerCallContext
) async throws {
defer {
// It's possible the user handler completed before the end of the request stream. We
// explicitly finish it to drop any unconsumed inbound messages.
requestStreamSource.finish()
sequenceSource.finish()
}

do {
let requestStream = GRPCAsyncRequestStream(.init(consuming: requestStreamSource))
let requestStream = GRPCAsyncRequestStream(sequence)
let responseStream = GRPCAsyncResponseStreamWriter(wrapping: responseStreamWriter)
try await self.userHandler(requestStream, responseStream, callContext)

Expand Down Expand Up @@ -530,7 +543,7 @@ internal final class AsyncServerHandler<
case .forward:
switch self.handlerStateMachine.handleMessage() {
case .forward:
self.handlerComponents?.requestSource.yield(request)
_ = self.handlerComponents?.requestSource.yield(request)
case .cancel:
self.cancel(error: nil)
}
Expand Down Expand Up @@ -809,11 +822,21 @@ internal struct ServerHandlerComponents<Request: Sendable, Delegate: AsyncWriter
@usableFromInline
internal let responseWriter: AsyncWriter<Delegate>
@usableFromInline
internal let requestSource: PassthroughMessageSource<Request, Error>
internal let requestSource: NIOThrowingAsyncSequenceProducer<
Request,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source

@inlinable
init(
requestSource: PassthroughMessageSource<Request, Error>,
requestSource: NIOThrowingAsyncSequenceProducer<
Request,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source,
responseWriter: AsyncWriter<Delegate>,
task: Task<Void, Never>
) {
Expand All @@ -830,7 +853,7 @@ internal struct ServerHandlerComponents<Request: Sendable, Delegate: AsyncWriter
// to the request stream, and cancelling the writer will ensure no more responses are
// written. This should reduce how long the user handler runs for as it can no longer do
// anything useful.
self.requestSource.finish(throwing: CancellationError())
self.requestSource.finish()
self.responseWriter.cancelAsynchronously(withError: CancellationError())
self.task.cancel()
}
Expand Down
23 changes: 19 additions & 4 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
*/
#if compiler(>=5.6)

import NIOCore
import NIOHPACK

/// Async-await variant of ``ServerStreamingCall``.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable> {
private let call: Call<Request, Response>
private let responseParts: StreamingResponseParts<Response>
private let responseSource: PassthroughMessageSource<Response, Error>
private let responseSource: NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source

/// The stream of responses from the server.
public let responseStream: GRPCAsyncResponseStream<Response>
Expand Down Expand Up @@ -79,8 +85,17 @@ public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable
// We ignore messages in the closure and instead feed them into the response source when we
// invoke the `call`.
self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
self.responseSource = PassthroughMessageSource<Response, Error>()
self.responseStream = .init(PassthroughMessageSequence(consuming: self.responseSource))
let sequence = NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.makeSequence(
backPressureStrategy: .init(lowWatermark: 10, highWatermark: 50),
delegate: GRPCAsyncSequenceProducerDelegate()
)
self.responseSource = sequence.source
self.responseStream = .init(sequence.sequence)
}

/// We expose this as the only non-private initializer so that the caller
Expand All @@ -96,7 +111,7 @@ public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable
onStart: {},
onError: { error in
asyncCall.responseParts.handleError(error)
asyncCall.responseSource.finish(throwing: error)
asyncCall.responseSource.finish(error)
},
onResponsePart: AsyncCall.makeResponsePartHandler(
responseParts: asyncCall.responseParts,
Expand Down
Loading

0 comments on commit 56a742b

Please sign in to comment.