Skip to content

Commit

Permalink
[async-await] Base types for server implementation (#1249)
Browse files Browse the repository at this point in the history
This commit implements some of the types required by the proposal for async/await support, added in #1231.

To aid reviewing, only the types required for the server are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. It is a complimentary PR to #1243 ("Async-await: Base types for client implementation").

It provides a unified `AsyncServerHandler` for all of the RPC types which avoids a substantial amount of code duplication that is found in the existing handlers. Wrappers are provided for the four RPC types. Otherwise it is analogous to the existing `BidirectionalStreamingServerHandler`.

It's worth calling out that this PR makes use of some placeholder types which are not intended to be final. Specifically:

* `AsyncResponseStreamWriter` is expected to be superseded by the `AsyncWriter` from #1245.
* `AsyncServerCallContext` conformance has been added to the existing `ServerCallContextBase`. It is expected that we will provide a new implementation of `AsyncServerCallContext` that is independent from the existing call context types.
  • Loading branch information
simonjbeaumont authored and glbrntt committed Nov 26, 2021
1 parent c4054f0 commit bd7f40a
Show file tree
Hide file tree
Showing 10 changed files with 1,331 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if compiler(>=5.5)

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
extension CancellationError: GRPCStatusTransformable {
public func makeGRPCStatus() -> GRPCStatus {
return GRPCStatus(code: .unavailable, message: nil)
}
}

#endif
55 changes: 55 additions & 0 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStream.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if compiler(>=5.5)

/// This is currently a wrapper around AsyncThrowingStream because we want to be
/// able to swap out the implementation for something else in the future.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
public struct GRPCAsyncRequestStream<Element>: AsyncSequence {
@usableFromInline
internal typealias _WrappedStream = PassthroughMessageSequence<Element, Error>

@usableFromInline
internal let _stream: _WrappedStream

@inlinable
internal init(_ stream: _WrappedStream) {
self._stream = stream
}

@inlinable
public func makeAsyncIterator() -> Iterator {
Self.AsyncIterator(self._stream)
}

public struct Iterator: AsyncIteratorProtocol {
@usableFromInline
internal var iterator: _WrappedStream.AsyncIterator

@usableFromInline
internal init(_ stream: _WrappedStream) {
self.iterator = stream.makeAsyncIterator()
}

@inlinable
public mutating func next() async throws -> Element? {
try await self.iterator.next()
}
}
}

#endif
112 changes: 112 additions & 0 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStreamWriter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if compiler(>=5.5)

/// Writer for server-streaming RPC handlers to provide responses.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
public struct GRPCAsyncResponseStreamWriter<Response> {
@usableFromInline
internal typealias Element = (Response, Compression)

@usableFromInline
internal typealias Delegate = AsyncResponseStreamWriterDelegate<Response>

@usableFromInline
internal let asyncWriter: AsyncWriter<Delegate>

@inlinable
internal init(wrapping asyncWriter: AsyncWriter<Delegate>) {
self.asyncWriter = asyncWriter
}

@inlinable
public func send(
_ response: Response,
compression: Compression = .deferToCallDefault
) async throws {
try await self.asyncWriter.write((response, compression))
}
}

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
@usableFromInline
internal final class AsyncResponseStreamWriterDelegate<Response>: AsyncWriterDelegate {
@usableFromInline
internal typealias Element = (Response, Compression)

@usableFromInline
internal typealias End = GRPCStatus

@usableFromInline
internal let _context: GRPCAsyncServerCallContext

@usableFromInline
internal let _send: (Response, MessageMetadata) -> Void

@usableFromInline
internal let _finish: (GRPCStatus) -> Void

@usableFromInline
internal let _compressionEnabledOnServer: Bool

// Create a new AsyncResponseStreamWriterDelegate.
//
// - Important: the `send` and `finish` closures must be thread-safe.
@inlinable
internal init(
context: GRPCAsyncServerCallContext,
compressionIsEnabled: Bool,
send: @escaping (Response, MessageMetadata) -> Void,
finish: @escaping (GRPCStatus) -> Void
) {
self._context = context
self._compressionEnabledOnServer = compressionIsEnabled
self._send = send
self._finish = finish
}

@inlinable
internal func _shouldCompress(_ compression: Compression) -> Bool {
guard self._compressionEnabledOnServer else {
return false
}
return compression.isEnabled(callDefault: self._context.compressionEnabled)
}

@inlinable
internal func _send(
_ response: Response,
compression: Compression = .deferToCallDefault
) {
let compress = self._shouldCompress(compression)
self._send(response, .init(compress: compress, flush: true))
}

// MARK: - AsyncWriterDelegate conformance.

@inlinable
internal func write(_ element: (Response, Compression)) {
self._send(element.0, compression: element.1)
}

@inlinable
internal func writeEnd(_ end: GRPCStatus) {
self._finish(end)
}
}

#endif
111 changes: 111 additions & 0 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if compiler(>=5.5)

import Logging
import NIOConcurrencyHelpers
import NIOHPACK

// We use a `class` here because we do not want copy-on-write semantics. The instance that the async
// handler holds must not diverge from the instance the implementor of the RPC holds. They hold these
// instances on different threads (EventLoop vs Task).
//
// We considered wrapping this in a `struct` and pass it `inout` to the RPC. This would communicate
// explicitly that it stores mutable state. However, without copy-on-write semantics, this could
// make for a surprising API.
//
// We also considered an `actor` but that felt clunky at the point of use since adopters would need
// to `await` the retrieval of a logger or the updating of the trailers and each would requrie a
// promise to glue the NIO and async-await paradigms in the handler.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
public final class GRPCAsyncServerCallContext {
private let lock = Lock()

/// Request headers for this request.
public let headers: HPACKHeaders

/// The logger used for this call.
public var logger: Logger {
get { self.lock.withLock {
self._logger
} }
set { self.lock.withLock {
self._logger = newValue
} }
}

@usableFromInline
internal var _logger: Logger

/// Whether compression should be enabled for responses, defaulting to `true`. Note that for
/// this value to take effect compression must have been enabled on the server and a compression
/// algorithm must have been negotiated with the client.
public var compressionEnabled: Bool {
get { self.lock.withLock {
self._compressionEnabled
} }
set { self.lock.withLock {
self._compressionEnabled = newValue
} }
}

private var _compressionEnabled: Bool = true

/// A `UserInfo` dictionary which is shared with the interceptor contexts for this RPC.
///
/// - Important: While `UserInfo` has value-semantics, this property retrieves from, and sets a
/// reference wrapped `UserInfo`. The contexts passed to interceptors provide the same
/// reference. As such this may be used as a mechanism to pass information between interceptors
/// and service providers.
public var userInfo: UserInfo {
get { self.lock.withLock {
self.userInfoRef.value
} }
set { self.lock.withLock {
self.userInfoRef.value = newValue
} }
}

/// A reference to an underlying `UserInfo`. We share this with the interceptors.
@usableFromInline
internal let userInfoRef: Ref<UserInfo>

/// Metadata to return at the end of the RPC. If this is required it should be updated before
/// the `responsePromise` or `statusPromise` is fulfilled.
public var trailers: HPACKHeaders {
get { self.lock.withLock {
return self._trailers
} }
set { self.lock.withLock {
self._trailers = newValue
} }
}

private var _trailers: HPACKHeaders = [:]

@inlinable
internal init(
headers: HPACKHeaders,
logger: Logger,
userInfoRef: Ref<UserInfo>
) {
self.headers = headers
self.userInfoRef = userInfoRef
self._logger = logger
}
}

#endif
Loading

0 comments on commit bd7f40a

Please sign in to comment.