Skip to content

Commit

Permalink
Adopt NIOAsyncSequenceProducer in grpc-swift
Browse files Browse the repository at this point in the history
We want to make use of `NIOAsyncSequenceProducer` (or its throwing counterpart) instead of using `PassthroughMessageSequence` and `PassthroughMessageSource`.

* Replaced usages of `PassthroughMessageSequence` and `PassthroughMessageSource` with `NIOThrowingAsyncSequenceProducer`

grpc-swift now uses `NIOAsyncSequenceProducer`
  • Loading branch information
gjcairo committed Sep 27, 2022
1 parent dfc2ab0 commit 534f80d
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 532 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
*/
#if compiler(>=5.6)

import NIOCore
import NIOHPACK

/// Async-await variant of ``BidirectionalStreamingCall``.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: Sendable>: Sendable {
private let call: Call<Request, Response>
private let responseParts: StreamingResponseParts<Response>
private let responseSource: PassthroughMessageSource<Response, Error>
private let responseSource: NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source

/// A request stream writer for sending messages to the server.
public let requestStream: GRPCAsyncRequestStreamWriter<Request>
Expand Down Expand Up @@ -80,8 +86,17 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
private init(call: Call<Request, Response>) {
self.call = call
self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
self.responseSource = PassthroughMessageSource<Response, Error>()
self.responseStream = .init(PassthroughMessageSequence(consuming: self.responseSource))
let sequence = NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.makeSequence(
backPressureStrategy: .init(lowWatermark: 10, highWatermark: 50),
delegate: GRPCAsyncSequenceProducerDelegate()
)
self.responseSource = sequence.source
self.responseStream = .init(sequence.sequence)
self.requestStream = call.makeRequestStreamWriter()
}

Expand All @@ -96,7 +111,7 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
},
onError: { error in
asyncCall.responseParts.handleError(error)
asyncCall.responseSource.finish(throwing: error)
asyncCall.responseSource.finish(error)
asyncCall.requestStream.asyncWriter.cancelAsynchronously(withError: error)
},
onResponsePart: AsyncCall.makeResponsePartHandler(
Expand All @@ -114,7 +129,12 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
internal enum AsyncCall {
internal static func makeResponsePartHandler<Response, Request>(
responseParts: StreamingResponseParts<Response>,
responseSource: PassthroughMessageSource<Response, Error>,
responseSource: NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source,
requestStream: GRPCAsyncRequestStreamWriter<Request>?,
requestType: Request.Type = Request.self
) -> (GRPCClientResponsePart<Response>) -> Void {
Expand All @@ -135,7 +155,7 @@ internal enum AsyncCall {
if status.isOk {
responseSource.finish()
} else {
responseSource.finish(throwing: status)
responseSource.finish(status)
}

requestStream?.asyncWriter.cancelAsynchronously(withError: status)
Expand Down
66 changes: 10 additions & 56 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,65 +15,19 @@
*/

#if compiler(>=5.6)
/// A type for the stream of request messages send to a gRPC server method.
///
/// To enable testability this type provides a static ``GRPCAsyncRequestStream/makeTestingRequestStream()``
/// method which allows you to create a stream that you can drive.
///
/// - Note: This is currently a wrapper around AsyncThrowingStream because we want to be
import NIOCore

/// This is currently a wrapper around AsyncThrowingStream because we want to be
/// able to swap out the implementation for something else in the future.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
/// A source used for driving a ``GRPCAsyncRequestStream`` during tests.
public struct Source {
@usableFromInline
internal let continuation: AsyncThrowingStream<Element, Error>.Continuation

@inlinable
init(continuation: AsyncThrowingStream<Element, Error>.Continuation) {
self.continuation = continuation
}

/// Yields the element to the request stream.
///
/// - Parameter element: The element to yield to the request stream.
@inlinable
public func yield(_ element: Element) {
self.continuation.yield(element)
}

/// Finished the request stream.
@inlinable
public func finish() {
self.continuation.finish()
}

/// Finished the request stream.
///
/// - Parameter error: An optional `Error` to finish the request stream with.
@inlinable
public func finish(throwing error: Error?) {
self.continuation.finish(throwing: error)
}
}

/// Simple struct for the return type of ``GRPCAsyncRequestStream/makeTestingRequestStream()``.
///
/// This struct contains two properties:
/// 1. The ``stream`` which is the actual ``GRPCAsyncRequestStream`` and should be passed to the method under testing.
/// 2. The ``source`` which can be used to drive the stream.
public struct TestingStream {
/// The actual stream.
public let stream: GRPCAsyncRequestStream<Element>
/// The source used to drive the stream.
public let source: Source

@inlinable
init(stream: GRPCAsyncRequestStream<Element>, source: Source) {
self.stream = stream
self.source = source
}
}
@usableFromInline
internal typealias _WrappedStream = NIOThrowingAsyncSequenceProducer<
Element,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>

@usableFromInline
enum Backing: Sendable {
Expand Down
11 changes: 8 additions & 3 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
*/
#if compiler(>=5.6)

import NIOCore

/// This is currently a wrapper around AsyncThrowingStream because we want to be
/// able to swap out the implementation for something else in the future.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncResponseStream<Element: Sendable>: AsyncSequence {
@usableFromInline
internal typealias WrappedStream = PassthroughMessageSequence<Element, Error>
internal typealias WrappedStream = NIOThrowingAsyncSequenceProducer<
Element,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>

@usableFromInline
internal let stream: WrappedStream
Expand Down Expand Up @@ -52,7 +59,5 @@ public struct GRPCAsyncResponseStream<Element: Sendable>: AsyncSequence {

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension GRPCAsyncResponseStream: Sendable {}
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension GRPCAsyncResponseStream.Iterator: Sendable {}

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2022, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if compiler(>=5.6)

import NIOCore

@usableFromInline
internal struct GRPCAsyncSequenceProducerDelegate: NIOAsyncSequenceProducerDelegate {
@inlinable
internal init() {}

// TODO: this method will have to be implemented when we add support for backpressure.
@inlinable
internal func produceMore() {}

// TODO: this method will have to be implemented when we add support for backpressure.
@inlinable
internal func didTerminate() {}
}

#endif
43 changes: 33 additions & 10 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ internal final class AsyncServerHandler<
GRPCAsyncServerCallContext
) async throws -> Void

@usableFromInline
internal typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<
Request,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>

@inlinable
internal init(
context: CallHandlerContext,
Expand Down Expand Up @@ -422,7 +430,10 @@ internal final class AsyncServerHandler<
contextProvider: self
)

let requestSource = PassthroughMessageSource<Request, Error>()
let sequenceProducer = AsyncSequenceProducer.makeSequence(
backPressureStrategy: .init(lowWatermark: 10, highWatermark: 50),
delegate: GRPCAsyncSequenceProducerDelegate()
)

let writerDelegate = AsyncResponseStreamWriterDelegate(
send: self.interceptResponseMessage(_:compression:),
Expand All @@ -448,12 +459,13 @@ internal final class AsyncServerHandler<
// Update our state before invoke the handler.
self.handlerStateMachine.handlerInvoked(requestHeaders: headers)
self.handlerComponents = ServerHandlerComponents(
requestSource: requestSource,
requestSource: sequenceProducer.source,
responseWriter: writer,
task: promise.completeWithTask {
// We don't have a task cancellation handler here: we do it in `self.cancel()`.
try await self.invokeUserHandler(
requestStreamSource: requestSource,
sequence: sequenceProducer.sequence,
sequenceSource: sequenceProducer.source,
responseStreamWriter: writer,
callContext: handlerContext
)
Expand All @@ -468,18 +480,19 @@ internal final class AsyncServerHandler<
@Sendable
@usableFromInline
internal func invokeUserHandler(
requestStreamSource: PassthroughMessageSource<Request, Error>,
sequence: AsyncSequenceProducer,
sequenceSource: AsyncSequenceProducer.Source,
responseStreamWriter: AsyncWriter<AsyncResponseStreamWriterDelegate<Response>>,
callContext: GRPCAsyncServerCallContext
) async throws {
defer {
// It's possible the user handler completed before the end of the request stream. We
// explicitly finish it to drop any unconsumed inbound messages.
requestStreamSource.finish()
sequenceSource.finish()
}

do {
let requestStream = GRPCAsyncRequestStream(.init(consuming: requestStreamSource))
let requestStream = GRPCAsyncRequestStream(sequence)
let responseStream = GRPCAsyncResponseStreamWriter(wrapping: responseStreamWriter)
try await self.userHandler(requestStream, responseStream, callContext)

Expand Down Expand Up @@ -530,7 +543,7 @@ internal final class AsyncServerHandler<
case .forward:
switch self.handlerStateMachine.handleMessage() {
case .forward:
self.handlerComponents?.requestSource.yield(request)
_ = self.handlerComponents?.requestSource.yield(request)
case .cancel:
self.cancel(error: nil)
}
Expand Down Expand Up @@ -809,11 +822,21 @@ internal struct ServerHandlerComponents<Request: Sendable, Delegate: AsyncWriter
@usableFromInline
internal let responseWriter: AsyncWriter<Delegate>
@usableFromInline
internal let requestSource: PassthroughMessageSource<Request, Error>
internal let requestSource: NIOThrowingAsyncSequenceProducer<
Request,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source

@inlinable
init(
requestSource: PassthroughMessageSource<Request, Error>,
requestSource: NIOThrowingAsyncSequenceProducer<
Request,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source,
responseWriter: AsyncWriter<Delegate>,
task: Task<Void, Never>
) {
Expand All @@ -830,7 +853,7 @@ internal struct ServerHandlerComponents<Request: Sendable, Delegate: AsyncWriter
// to the request stream, and cancelling the writer will ensure no more responses are
// written. This should reduce how long the user handler runs for as it can no longer do
// anything useful.
self.requestSource.finish(throwing: CancellationError())
self.requestSource.finish()
self.responseWriter.cancelAsynchronously(withError: CancellationError())
self.task.cancel()
}
Expand Down
23 changes: 19 additions & 4 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
*/
#if compiler(>=5.6)

import NIOCore
import NIOHPACK

/// Async-await variant of ``ServerStreamingCall``.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable> {
private let call: Call<Request, Response>
private let responseParts: StreamingResponseParts<Response>
private let responseSource: PassthroughMessageSource<Response, Error>
private let responseSource: NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source

/// The stream of responses from the server.
public let responseStream: GRPCAsyncResponseStream<Response>
Expand Down Expand Up @@ -79,8 +85,17 @@ public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable
// We ignore messages in the closure and instead feed them into the response source when we
// invoke the `call`.
self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
self.responseSource = PassthroughMessageSource<Response, Error>()
self.responseStream = .init(PassthroughMessageSequence(consuming: self.responseSource))
let sequence = NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.makeSequence(
backPressureStrategy: .init(lowWatermark: 10, highWatermark: 50),
delegate: GRPCAsyncSequenceProducerDelegate()
)
self.responseSource = sequence.source
self.responseStream = .init(sequence.sequence)
}

/// We expose this as the only non-private initializer so that the caller
Expand All @@ -96,7 +111,7 @@ public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable
onStart: {},
onError: { error in
asyncCall.responseParts.handleError(error)
asyncCall.responseSource.finish(throwing: error)
asyncCall.responseSource.finish(error)
},
onResponsePart: AsyncCall.makeResponsePartHandler(
responseParts: asyncCall.responseParts,
Expand Down
Loading

0 comments on commit 534f80d

Please sign in to comment.