Skip to content

Commit

Permalink
Adopt NIOAsyncSequenceProducer in grpc-swift
Browse files Browse the repository at this point in the history
We want to make use of `NIOAsyncSequenceProducer` (or its throwing counterpart) instead of using `PassthroughMessageSequence` and `PassthroughMessageSource`.

* Replaced usages of `PassthroughMessageSequence` and `PassthroughMessageSource` with `NIOThrowingAsyncSequenceProducer`

grpc-swift now uses `NIOAsyncSequenceProducer`
  • Loading branch information
gjcairo committed Sep 5, 2022
1 parent 20bba23 commit f833d14
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 481 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ let includeNIOSSL = ProcessInfo.processInfo.environment["GRPC_NO_NIO_SSL"] == ni
let packageDependencies: [Package.Dependency] = [
.package(
url: "https://github.com/apple/swift-nio.git",
from: "2.36.0"
branch: "main"
),
.package(
url: "https://github.com/apple/swift-nio-http2.git",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@
#if compiler(>=5.6)

import NIOHPACK
import NIOCore

/// Async-await variant of ``BidirectionalStreamingCall``.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: Sendable>: Sendable {
private let call: Call<Request, Response>
private let responseParts: StreamingResponseParts<Response>
private let responseSource: PassthroughMessageSource<Response, Error>
private let responseSource: NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source

/// A request stream writer for sending messages to the server.
public let requestStream: GRPCAsyncRequestStreamWriter<Request>
Expand Down Expand Up @@ -80,8 +86,17 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
private init(call: Call<Request, Response>) {
self.call = call
self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
self.responseSource = PassthroughMessageSource<Response, Error>()
self.responseStream = .init(PassthroughMessageSequence(consuming: self.responseSource))
let sequence = NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.makeSequence(
backPressureStrategy: .init(lowWatermark: 10, highWatermark: 50),
delegate: GRPCAsyncSequenceProducerDelegate()
)
self.responseSource = sequence.source
self.responseStream = .init(sequence.sequence)
self.requestStream = call.makeRequestStreamWriter()
}

Expand All @@ -96,7 +111,7 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
},
onError: { error in
asyncCall.responseParts.handleError(error)
asyncCall.responseSource.finish(throwing: error)
asyncCall.responseSource.finish(error)
asyncCall.requestStream.asyncWriter.cancelAsynchronously(withError: error)
},
onResponsePart: AsyncCall.makeResponsePartHandler(
Expand All @@ -114,7 +129,12 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: S
internal enum AsyncCall {
internal static func makeResponsePartHandler<Response, Request>(
responseParts: StreamingResponseParts<Response>,
responseSource: PassthroughMessageSource<Response, Error>,
responseSource: NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source,
requestStream: GRPCAsyncRequestStreamWriter<Request>?,
requestType: Request.Type = Request.self
) -> (GRPCClientResponsePart<Response>) -> Void {
Expand All @@ -135,7 +155,7 @@ internal enum AsyncCall {
if status.isOk {
responseSource.finish()
} else {
responseSource.finish(throwing: status)
responseSource.finish(status)
}

requestStream?.asyncWriter.cancelAsynchronously(withError: status)
Expand Down
11 changes: 8 additions & 3 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@

#if compiler(>=5.6)

import NIOCore

/// This is currently a wrapper around AsyncThrowingStream because we want to be
/// able to swap out the implementation for something else in the future.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
@usableFromInline
internal typealias _WrappedStream = PassthroughMessageSequence<Element, Error>
internal typealias _WrappedStream = NIOThrowingAsyncSequenceProducer<
Element,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>

@usableFromInline
internal let _stream: _WrappedStream
Expand Down Expand Up @@ -54,7 +61,5 @@ public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension GRPCAsyncRequestStream: Sendable where Element: Sendable {}
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension GRPCAsyncRequestStream.Iterator: Sendable where Element: Sendable {}

#endif
11 changes: 8 additions & 3 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
*/
#if compiler(>=5.6)

import NIOCore

/// This is currently a wrapper around AsyncThrowingStream because we want to be
/// able to swap out the implementation for something else in the future.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncResponseStream<Element: Sendable>: AsyncSequence {
@usableFromInline
internal typealias WrappedStream = PassthroughMessageSequence<Element, Error>
internal typealias WrappedStream = NIOThrowingAsyncSequenceProducer<
Element,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>

@usableFromInline
internal let stream: WrappedStream
Expand Down Expand Up @@ -52,7 +59,5 @@ public struct GRPCAsyncResponseStream<Element: Sendable>: AsyncSequence {

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension GRPCAsyncResponseStream: Sendable {}
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension GRPCAsyncResponseStream.Iterator: Sendable {}

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2022, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if compiler(>=5.6)

import NIOCore

@usableFromInline
internal struct GRPCAsyncSequenceProducerDelegate: NIOAsyncSequenceProducerDelegate {

@inlinable
internal init() {}

// TODO: this method will have to be implemented when we add support for backpressure.
@inlinable
internal func produceMore() {}

// TODO: this method will have to be implemented when we add support for backpressure.
@inlinable
internal func didTerminate() {}
}

#endif
43 changes: 33 additions & 10 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ internal final class AsyncServerHandler<
GRPCAsyncServerCallContext
) async throws -> Void

@usableFromInline
internal typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<
Request,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>

@inlinable
internal init(
context: CallHandlerContext,
Expand Down Expand Up @@ -422,7 +430,10 @@ internal final class AsyncServerHandler<
contextProvider: self
)

let requestSource = PassthroughMessageSource<Request, Error>()
let sequenceProducer = AsyncSequenceProducer.makeSequence(
backPressureStrategy: .init(lowWatermark: 0, highWatermark: 0),
delegate: GRPCAsyncSequenceProducerDelegate()
)

let writerDelegate = AsyncResponseStreamWriterDelegate(
send: self.interceptResponseMessage(_:compression:),
Expand All @@ -448,12 +459,13 @@ internal final class AsyncServerHandler<
// Update our state before invoke the handler.
self.handlerStateMachine.handlerInvoked(requestHeaders: headers)
self.handlerComponents = ServerHandlerComponents(
requestSource: requestSource,
requestSource: sequenceProducer.source,
responseWriter: writer,
task: promise.completeWithTask {
// We don't have a task cancellation handler here: we do it in `self.cancel()`.
try await self.invokeUserHandler(
requestStreamSource: requestSource,
sequence: sequenceProducer.sequence,
sequenceSource: sequenceProducer.source,
responseStreamWriter: writer,
callContext: handlerContext
)
Expand All @@ -468,18 +480,19 @@ internal final class AsyncServerHandler<
@Sendable
@usableFromInline
internal func invokeUserHandler(
requestStreamSource: PassthroughMessageSource<Request, Error>,
sequence: AsyncSequenceProducer,
sequenceSource: AsyncSequenceProducer.Source,
responseStreamWriter: AsyncWriter<AsyncResponseStreamWriterDelegate<Response>>,
callContext: GRPCAsyncServerCallContext
) async throws {
defer {
// It's possible the user handler completed before the end of the request stream. We
// explicitly finish it to drop any unconsumed inbound messages.
requestStreamSource.finish()
sequenceSource.finish()
}

do {
let requestStream = GRPCAsyncRequestStream(.init(consuming: requestStreamSource))
let requestStream = GRPCAsyncRequestStream(sequence)
let responseStream = GRPCAsyncResponseStreamWriter(wrapping: responseStreamWriter)
try await self.userHandler(requestStream, responseStream, callContext)

Expand Down Expand Up @@ -530,7 +543,7 @@ internal final class AsyncServerHandler<
case .forward:
switch self.handlerStateMachine.handleMessage() {
case .forward:
self.handlerComponents?.requestSource.yield(request)
_ = self.handlerComponents?.requestSource.yield(request)
case .cancel:
self.cancel(error: nil)
}
Expand Down Expand Up @@ -809,11 +822,21 @@ internal struct ServerHandlerComponents<Request: Sendable, Delegate: AsyncWriter
@usableFromInline
internal let responseWriter: AsyncWriter<Delegate>
@usableFromInline
internal let requestSource: PassthroughMessageSource<Request, Error>
internal let requestSource: NIOThrowingAsyncSequenceProducer<
Request,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source

@inlinable
init(
requestSource: PassthroughMessageSource<Request, Error>,
requestSource: NIOThrowingAsyncSequenceProducer<
Request,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source,
responseWriter: AsyncWriter<Delegate>,
task: Task<Void, Never>
) {
Expand All @@ -830,7 +853,7 @@ internal struct ServerHandlerComponents<Request: Sendable, Delegate: AsyncWriter
// to the request stream, and cancelling the writer will ensure no more responses are
// written. This should reduce how long the user handler runs for as it can no longer do
// anything useful.
self.requestSource.finish(throwing: CancellationError())
self.requestSource.finish(CancellationError())
self.responseWriter.cancelAsynchronously(withError: CancellationError())
self.task.cancel()
}
Expand Down
23 changes: 19 additions & 4 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
*/
#if compiler(>=5.6)

import NIOCore
import NIOHPACK

/// Async-await variant of ``ServerStreamingCall``.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable> {
private let call: Call<Request, Response>
private let responseParts: StreamingResponseParts<Response>
private let responseSource: PassthroughMessageSource<Response, Error>
private let responseSource: NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.Source

/// The stream of responses from the server.
public let responseStream: GRPCAsyncResponseStream<Response>
Expand Down Expand Up @@ -79,8 +85,17 @@ public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable
// We ignore messages in the closure and instead feed them into the response source when we
// invoke the `call`.
self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
self.responseSource = PassthroughMessageSource<Response, Error>()
self.responseStream = .init(PassthroughMessageSequence(consuming: self.responseSource))
let sequence = NIOThrowingAsyncSequenceProducer<
Response,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
GRPCAsyncSequenceProducerDelegate
>.makeSequence(
backPressureStrategy: .init(lowWatermark: 10, highWatermark: 50),
delegate: GRPCAsyncSequenceProducerDelegate()
)
self.responseSource = sequence.source
self.responseStream = .init(sequence.sequence)
}

/// We expose this as the only non-private initializer so that the caller
Expand All @@ -96,7 +111,7 @@ public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable
onStart: {},
onError: { error in
asyncCall.responseParts.handleError(error)
asyncCall.responseSource.finish(throwing: error)
asyncCall.responseSource.finish(error)
},
onResponsePart: AsyncCall.makeResponsePartHandler(
responseParts: asyncCall.responseParts,
Expand Down
Loading

0 comments on commit f833d14

Please sign in to comment.