Skip to content

Commit

Permalink
Add Sendable conformance to core async/await API (#1378)
Browse files Browse the repository at this point in the history
Motivation:

The core async/await API should have appropriate 'Sendable' conformance
before we publish it to main. This change adds it to the core parts of
the new API.

There are still numerous types which can be made 'Sendable' but are
either less significant (e.g. configuration) or require depdendencies to
adopt 'Sendable' first (and would otherwise require `@preconcurrency`)

Modifications:

- Add `GRPCSendable` to ease adoption
- Make some plain-old-data types `Sendable` including `GRPCStatus`
- Require `Request` and `Response` to be `Sendable` in the client and
  server async APIs
- Make the async server context `@unchecked Sendable`
- Make the async writer and stream reader `Sendable`

Result:

Core async API is `Sendable`.
  • Loading branch information
glbrntt authored Apr 1, 2022
1 parent 8a11a2e commit b341dbb
Show file tree
Hide file tree
Showing 17 changed files with 108 additions and 52 deletions.
10 changes: 5 additions & 5 deletions Sources/GRPC/AsyncAwaitSupport/AsyncWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import NIOCore
/// may suspend if the writer has been paused.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@usableFromInline
internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
internal final actor AsyncWriter<Delegate: AsyncWriterDelegate>: Sendable {
@usableFromInline
internal typealias Element = Delegate.Element

Expand All @@ -36,7 +36,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {

/// A value pending a write.
@usableFromInline
internal struct _Pending<Value> {
internal struct _Pending<Value: Sendable>: Sendable {
@usableFromInline
var value: Value

Expand Down Expand Up @@ -323,9 +323,9 @@ public struct GRPCAsyncWriterError: Error, Hashable {
}

@usableFromInline
internal protocol AsyncWriterDelegate: AnyObject {
associatedtype Element
associatedtype End
internal protocol AsyncWriterDelegate: AnyObject, Sendable {
associatedtype Element: Sendable
associatedtype End: Sendable

@inlinable
func write(_ element: Element)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import NIOHPACK

/// Async-await variant of BidirectionalStreamingCall.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
public struct GRPCAsyncBidirectionalStreamingCall<Request: Sendable, Response: Sendable> {
private let call: Call<Request, Response>
private let responseParts: StreamingResponseParts<Response>
private let responseSource: PassthroughMessageSource<Response, Error>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import NIOHPACK

/// Async-await variant of `ClientStreamingCall`.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncClientStreamingCall<Request, Response> {
public struct GRPCAsyncClientStreamingCall<Request: Sendable, Response: Sendable> {
private let call: Call<Request, Response>
private let responseParts: UnaryResponseParts<Response>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/// This is currently a wrapper around AsyncThrowingStream because we want to be
/// able to swap out the implementation for something else in the future.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncRequestStream<Element>: AsyncSequence {
public struct GRPCAsyncRequestStream<Element: Sendable>: AsyncSequence {
@usableFromInline
internal typealias _WrappedStream = PassthroughMessageSequence<Element, Error>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
/// try await stream.finish()
/// ```
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncRequestStreamWriter<Request> {
public struct GRPCAsyncRequestStreamWriter<Request: Sendable> {
@usableFromInline
internal let asyncWriter: AsyncWriter<Delegate<Request>>

Expand Down Expand Up @@ -78,7 +78,7 @@ public struct GRPCAsyncRequestStreamWriter<Request> {
extension GRPCAsyncRequestStreamWriter {
/// A delegate for the writer which writes messages to an underlying receiver.`
@usableFromInline
internal final class Delegate<Request>: AsyncWriterDelegate {
internal final class Delegate<Request: Sendable>: AsyncWriterDelegate, Sendable {
@usableFromInline
internal typealias Element = (Request, Compression)

Expand All @@ -89,16 +89,16 @@ extension GRPCAsyncRequestStreamWriter {
internal let _compressionEnabled: Bool

@usableFromInline
internal let _send: (Request, MessageMetadata) -> Void
internal let _send: @Sendable(Request, MessageMetadata) -> Void

@usableFromInline
internal let _finish: () -> Void
internal let _finish: @Sendable() -> Void

@inlinable
internal init(
compressionEnabled: Bool,
send: @escaping (Request, MessageMetadata) -> Void,
finish: @escaping () -> Void
send: @Sendable @escaping (Request, MessageMetadata) -> Void,
finish: @Sendable @escaping () -> Void
) {
self._compressionEnabled = compressionEnabled
self._send = send
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

/// Writer for server-streaming RPC handlers to provide responses.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncResponseStreamWriter<Response> {
public struct GRPCAsyncResponseStreamWriter<Response: Sendable> {
@usableFromInline
internal typealias Element = (Response, Compression)

Expand All @@ -44,7 +44,7 @@ public struct GRPCAsyncResponseStreamWriter<Response> {

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@usableFromInline
internal final class AsyncResponseStreamWriterDelegate<Response>: AsyncWriterDelegate {
internal final class AsyncResponseStreamWriterDelegate<Response: Sendable>: AsyncWriterDelegate {
@usableFromInline
internal typealias Element = (Response, Compression)

Expand All @@ -55,10 +55,10 @@ internal final class AsyncResponseStreamWriterDelegate<Response>: AsyncWriterDel
internal let _context: GRPCAsyncServerCallContext

@usableFromInline
internal let _send: (Response, MessageMetadata) -> Void
internal let _send: @Sendable(Response, MessageMetadata) -> Void

@usableFromInline
internal let _finish: (GRPCStatus) -> Void
internal let _finish: @Sendable(GRPCStatus) -> Void

@usableFromInline
internal let _compressionEnabledOnServer: Bool
Expand All @@ -70,8 +70,8 @@ internal final class AsyncResponseStreamWriterDelegate<Response>: AsyncWriterDel
internal init(
context: GRPCAsyncServerCallContext,
compressionIsEnabled: Bool,
send: @escaping (Response, MessageMetadata) -> Void,
finish: @escaping (GRPCStatus) -> Void
send: @escaping @Sendable(Response, MessageMetadata) -> Void,
finish: @escaping @Sendable(GRPCStatus) -> Void
) {
self._context = context
self._compressionEnabledOnServer = compressionIsEnabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import NIOHPACK
// make for a surprising API.
//
// We also considered an `actor` but that felt clunky at the point of use since adopters would need
// to `await` the retrieval of a logger or the updating of the trailers and each would requrie a
// to `await` the retrieval of a logger or the updating of the trailers and each would require a
// promise to glue the NIO and async-await paradigms in the handler.
//
// Note: this is `@unchecked Sendable`; all mutable state is protected by a lock.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public final class GRPCAsyncServerCallContext {
public final class GRPCAsyncServerCallContext: @unchecked Sendable {
private let lock = Lock()

/// Metadata for this request.
Expand Down
23 changes: 12 additions & 11 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import NIOHPACK
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncServerHandler<
Serializer: MessageSerializer,
Deserializer: MessageDeserializer
>: GRPCServerHandlerProtocol {
Deserializer: MessageDeserializer,
Request: Sendable,
Response: Sendable
>: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
@usableFromInline
internal let _handler: AsyncServerHandler<Serializer, Deserializer>
internal let _handler: AsyncServerHandler<Serializer, Deserializer, Request, Response>

public func receiveMetadata(_ metadata: HPACKHeaders) {
self._handler.receiveMetadata(metadata)
Expand Down Expand Up @@ -153,13 +155,10 @@ extension GRPCAsyncServerHandler {
@usableFromInline
internal final class AsyncServerHandler<
Serializer: MessageSerializer,
Deserializer: MessageDeserializer
>: GRPCServerHandlerProtocol {
@usableFromInline
internal typealias Request = Deserializer.Output
@usableFromInline
internal typealias Response = Serializer.Input

Deserializer: MessageDeserializer,
Request: Sendable,
Response: Sendable
>: GRPCServerHandlerProtocol where Serializer.Input == Response, Deserializer.Output == Request {
/// A response serializer.
@usableFromInline
internal let serializer: Serializer
Expand All @@ -182,7 +181,7 @@ internal final class AsyncServerHandler<

/// The user provided function to execute.
@usableFromInline
internal let userHandler: (
internal let userHandler: @Sendable(
GRPCAsyncRequestStream<Request>,
GRPCAsyncResponseStreamWriter<Response>,
GRPCAsyncServerCallContext
Expand Down Expand Up @@ -531,6 +530,7 @@ internal final class AsyncServerHandler<
}
}

@Sendable
@inlinable
internal func interceptResponse(_ response: Response, metadata: MessageMetadata) {
if self.context.eventLoop.inEventLoop {
Expand Down Expand Up @@ -587,6 +587,7 @@ internal final class AsyncServerHandler<
}
}

@Sendable
@inlinable
internal func responseStreamDrained(_ status: GRPCStatus) {
if self.context.eventLoop.inEventLoop {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import NIOHPACK

/// Async-await variant of `ServerStreamingCall`.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncServerStreamingCall<Request, Response> {
public struct GRPCAsyncServerStreamingCall<Request: Sendable, Response: Sendable> {
private let call: Call<Request, Response>
private let responseParts: StreamingResponseParts<Response>
private let responseSource: PassthroughMessageSource<Response, Error>
Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPC/AsyncAwaitSupport/GRPCAsyncUnaryCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import NIOHPACK
/// Note: while this object is a `struct`, its implementation delegates to `Call`. It therefore
/// has reference semantics.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct GRPCAsyncUnaryCall<Request, Response> {
public struct GRPCAsyncUnaryCall<Request: Sendable, Response: Sendable> {
private let call: Call<Request, Response>
private let responseParts: UnaryResponseParts<Response>

Expand Down
40 changes: 32 additions & 8 deletions Sources/GRPC/AsyncAwaitSupport/GRPCChannel+AsyncAwaitSupport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ extension GRPCChannel {
/// - request: The request to send.
/// - callOptions: Options for the RPC.
/// - interceptors: A list of interceptors to intercept the request and response stream with.
internal func makeAsyncUnaryCall<Request: Message, Response: Message>(
internal func makeAsyncUnaryCall<
Request: Message & Sendable,
Response: Message & Sendable
>(
path: String,
request: Request,
callOptions: CallOptions,
Expand All @@ -50,7 +53,10 @@ extension GRPCChannel {
/// - request: The request to send.
/// - callOptions: Options for the RPC.
/// - interceptors: A list of interceptors to intercept the request and response stream with.
internal func makeAsyncUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
internal func makeAsyncUnaryCall<
Request: GRPCPayload & Sendable,
Response: GRPCPayload & Sendable
>(
path: String,
request: Request,
callOptions: CallOptions,
Expand All @@ -73,7 +79,10 @@ extension GRPCChannel {
/// - path: Path of the RPC, e.g. "/echo.Echo/Get"
/// - callOptions: Options for the RPC.
/// - interceptors: A list of interceptors to intercept the request and response stream with.
internal func makeAsyncClientStreamingCall<Request: Message, Response: Message>(
internal func makeAsyncClientStreamingCall<
Request: Message & Sendable,
Response: Message & Sendable
>(
path: String,
callOptions: CallOptions,
interceptors: [ClientInterceptor<Request, Response>] = []
Expand All @@ -94,7 +103,10 @@ extension GRPCChannel {
/// - path: Path of the RPC, e.g. "/echo.Echo/Get"
/// - callOptions: Options for the RPC.
/// - interceptors: A list of interceptors to intercept the request and response stream with.
internal func makeAsyncClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
internal func makeAsyncClientStreamingCall<
Request: GRPCPayload & Sendable,
Response: GRPCPayload & Sendable
>(
path: String,
callOptions: CallOptions,
interceptors: [ClientInterceptor<Request, Response>] = []
Expand All @@ -116,7 +128,10 @@ extension GRPCChannel {
/// - request: The request to send.
/// - callOptions: Options for the RPC.
/// - interceptors: A list of interceptors to intercept the request and response stream with.
internal func makeAsyncServerStreamingCall<Request: Message, Response: Message>(
internal func makeAsyncServerStreamingCall<
Request: Message & Sendable,
Response: Message & Sendable
>(
path: String,
request: Request,
callOptions: CallOptions,
Expand All @@ -140,7 +155,10 @@ extension GRPCChannel {
/// - request: The request to send.
/// - callOptions: Options for the RPC.
/// - interceptors: A list of interceptors to intercept the request and response stream with.
internal func makeAsyncServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
internal func makeAsyncServerStreamingCall<
Request: GRPCPayload & Sendable,
Response: GRPCPayload & Sendable
>(
path: String,
request: Request,
callOptions: CallOptions,
Expand All @@ -163,7 +181,10 @@ extension GRPCChannel {
/// - path: Path of the RPC, e.g. "/echo.Echo/Get"
/// - callOptions: Options for the RPC.
/// - interceptors: A list of interceptors to intercept the request and response stream with.
internal func makeAsyncBidirectionalStreamingCall<Request: Message, Response: Message>(
internal func makeAsyncBidirectionalStreamingCall<
Request: Message & Sendable,
Response: Message & Sendable
>(
path: String,
callOptions: CallOptions,
interceptors: [ClientInterceptor<Request, Response>] = []
Expand All @@ -184,7 +205,10 @@ extension GRPCChannel {
/// - path: Path of the RPC, e.g. "/echo.Echo/Get"
/// - callOptions: Options for the RPC.
/// - interceptors: A list of interceptors to intercept the request and response stream with.
internal func makeAsyncBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
internal func makeAsyncBidirectionalStreamingCall<
Request: GRPCPayload & Sendable,
Response: GRPCPayload & Sendable
>(
path: String,
callOptions: CallOptions,
interceptors: [ClientInterceptor<Request, Response>] = []
Expand Down
21 changes: 21 additions & 0 deletions Sources/GRPC/AsyncAwaitSupport/GRPCSendable.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2022, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if compiler(>=5.6)
@preconcurrency public typealias GRPCSendable = Swift.Sendable
#else
public typealias GRPCSendable = Any
#endif // compiler(>=5.6)
4 changes: 2 additions & 2 deletions Sources/GRPC/Compression/MessageEncoding.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/

/// Whether compression should be enabled for the message.
public struct Compression: Hashable {
public struct Compression: Hashable, GRPCSendable {
@usableFromInline
internal enum _Wrapped: Hashable {
internal enum _Wrapped: Hashable, GRPCSendable {
case enabled
case disabled
case deferToCallDefault
Expand Down
Loading

0 comments on commit b341dbb

Please sign in to comment.