-
Notifications
You must be signed in to change notification settings - Fork 420
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[async-await] Base types for server implementation #1249
[async-await] Base types for server implementation #1249
Conversation
let requestStream = GRPCAsyncStream<Request>(AsyncThrowingStream { continuation in | ||
self.state = .observing({ streamEvent in | ||
switch streamEvent { | ||
case let .message(request): continuation.yield(request) | ||
case .end: continuation.finish() | ||
} | ||
}, context) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is our synchronisation mechanism for self.state
here. On first sight it looks like this handler works in an EL context. But I would bet that the AsyncThrowingStream calls us not on any EL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right that this type is a bit obscure because it is the glue layer from NIO world to async-await world.
This handler uses an EL internally because it interacts with the rest of the internal GRPC library. Its internal implementation is similar in structure to the extant BidirectionalStreamingServerHandler
.
In order to call the user function, we construct an AsyncSequence
of requests and set the state
to .observing
with a closure that handles incoming stream events (message or end).
The closure passed into the .obeserving
enum value will always be called on the right EL.
If I've misunderstood this then I guess we'll need to do something different here.
Worth noting that when we have #1245 we will not be doing things this way and the code here is a stopgap so may not be worth agonising over.
#if compiler(>=5.5) | ||
|
||
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) | ||
public final class GRPCAsyncServerHandler< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who creates an GRPCAsyncServerHandler
? For which request/response type is this class used? For all? Only unary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who creates an
GRPCAsyncServerHandler
?
It's created in in generated code (an example of which is not present in this PR). The generated code will make use of the initializers defined in the extension near the bottom of this file: https://github.com/grpc/grpc-swift/pull/1249/files#diff-fcf632a779a41e9ebc055fb40ac26d96379a72ca8506cc6e43d022834b9b287fR402-R517.
For which request/response type is this class used? For all? Only unary?
There is one such initializer per RPC type, so this is a common handler for all of them, which was something @glbrntt was keen we try and do with this API (the existing one has different handler API types per RPC type which we wanted to avoid).
Taking the Echo service, the codegen would generate an Echo_AsyncEchoProvider
protocol that they would need to conform to (provide just the user functions). The generated code would also contain the following extension which glues it all together:
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
extension Echo_AsyncEchoProvider {
public var serviceName: Substring { return "echo.Echo" }
/// Determines, calls and returns the appropriate request handler, depending on the request's method.
/// Returns nil for methods not handled by this service.
public func handle(
method name: Substring,
context: CallHandlerContext
) -> GRPCServerHandlerProtocol? {
switch name {
case "Get":
return AsyncServerHandler(
context: context,
requestDeserializer: ProtobufDeserializer<Echo_EchoRequest>(),
responseSerializer: ProtobufSerializer<Echo_EchoResponse>(),
interceptors: self.interceptors?.makeGetInterceptors() ?? [],
wrapping: self.get(request:context:)
)
case "Expand":
return AsyncServerHandler(
context: context,
requestDeserializer: ProtobufDeserializer<Echo_EchoRequest>(),
responseSerializer: ProtobufSerializer<Echo_EchoResponse>(),
interceptors: self.interceptors?.makeExpandInterceptors() ?? [],
wrapping: self.expand(request:responseStreamWriter:context:)
)
case "Collect":
return AsyncServerHandler(
context: context,
requestDeserializer: ProtobufDeserializer<Echo_EchoRequest>(),
responseSerializer: ProtobufSerializer<Echo_EchoResponse>(),
interceptors: self.interceptors?.makeCollectInterceptors() ?? [],
wrapping: self.collect(requests:context:)
)
case "Update":
return AsyncServerHandler(
context: context,
requestDeserializer: ProtobufDeserializer<Echo_EchoRequest>(),
responseSerializer: ProtobufSerializer<Echo_EchoResponse>(),
interceptors: self.interceptors?.makeUpdateInterceptors() ?? [],
wrapping: self.update(requests:responseStreamWriter:context:)
)
default:
return nil
}
}
}
This is structurally similar to what we have today except that we implement everything in terms of a single handler and use the various init(...:wrapping:)
initializers in the codegen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies: this was spread over a couple of days so some comments are probably out-of-date / answered already.
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStreamWriter.swift
Outdated
Show resolved
Hide resolved
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift
Outdated
Show resolved
Hide resolved
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) | ||
public protocol GRPCAsyncServerCallContext { | ||
/// Request headers for this request. | ||
var headers: HPACKHeaders { get } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need a way to set response headers too (we don't currently support this but we should!) so we'll need clearer names to distinguish between them.
One idea which might be worth exploring is whether namespacing parts of the context by request and response makes sense:
let traceID = context.request.headers["x-uuid"]
// (Not currently in context, could be added in the future)
let path = context.request.path
context.response.headers["..."] = "..."
context.response.trailers["..."] = "..."
context.response.compressMessages = false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need a way to set response headers too (we don't currently support this but we should!)
Do you mean it isn't even supported in the existing, non-async-await, handlers? I took a look and I don't see it; I just see (request) headers
and (response) trailers
.
I could play around with what you suggest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean it isn't even supported in the existing, non-async-await, handlers?
That's right (although there have been requests to add it). We don't have to add this now but we should leave ourselves enough room in the API that we can add it without the naming being ambiguous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could play around with what you suggest.
Up to you; play around and see what feels most natural.
Side note: I wonder also if we should use "metadata" rather than "headers" and "{initial,trailing}metadata" rather than response headers and response trailers to align more with the client API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side note: I wonder also if we should use "metadata" rather than "headers" and "{initial,trailing}metadata" rather than response headers and response trailers to align more with the client API.
That would be my thinking if we decide to add this.
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift
Outdated
Show resolved
Hide resolved
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift
Outdated
Show resolved
Hide resolved
|
||
// Create a request stream to pass to the user function and capture the | ||
// handler in the updated state to allow us to produce more results. | ||
let requestStream = GRPCAsyncRequestStream<Request>(AsyncThrowingStream { continuation in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI #1252 has been merged now. It has two parts a PassthroughMessageSource
and a PassthroughMessageSequence
(an adapter for the source to turn it into an AsyncSequence
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉 Would you like me to rework this PR to make use of it or save that for a future PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mind. It shouldn't be a particularly large change though.
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStreamWriter.swift
Outdated
Show resolved
Hide resolved
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of small questions...
Urgh. @glbrntt in order to incorporate the new stream source and writers I merged from the feature branch but this now makes the files-changed view look like these are part of this PR. Apologies for that. I'm happy to rebase or do something else if it would clean things up for review? I only made use of the new stream source in the server code in this PR. Intend to follow up with another PR for client once we land this. I have added the additional state which should take care of ensuring that the response stream writer has been emptied before we send |
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
…Tests Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
…r wrappers Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
4ce6646
to
16e880c
Compare
@glbrntt as requested I rebased on |
Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStreamWriter.swift
Outdated
Show resolved
Hide resolved
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
…ction) Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
…atus Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
…leError Signed-off-by: Si Beaumont <[email protected]>
Signed-off-by: Si Beaumont <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is good to be merged now! 🎉
Modulo any feedback from @fabianfett |
This commit implements some of the types required by the proposal for async/await support, added in grpc#1231. To aid reviewing, only the types required for the server are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. It is a complimentary PR to grpc#1243 ("Async-await: Base types for client implementation"). It provides a unified `AsyncServerHandler` for all of the RPC types which avoids a substantial amount of code duplication that is found in the existing handlers. Wrappers are provided for the four RPC types. Otherwise it is analogous to the existing `BidirectionalStreamingServerHandler`. It's worth calling out that this PR makes use of some placeholder types which are not intended to be final. Specifically: * `AsyncResponseStreamWriter` is expected to be superseded by the `AsyncWriter` from grpc#1245. * `AsyncServerCallContext` conformance has been added to the existing `ServerCallContextBase`. It is expected that we will provide a new implementation of `AsyncServerCallContext` that is independent from the existing call context types.
This commit implements some of the types required by the proposal for async/await support, added in grpc#1231. To aid reviewing, only the types required for the server are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. It is a complimentary PR to grpc#1243 ("Async-await: Base types for client implementation"). It provides a unified `AsyncServerHandler` for all of the RPC types which avoids a substantial amount of code duplication that is found in the existing handlers. Wrappers are provided for the four RPC types. Otherwise it is analogous to the existing `BidirectionalStreamingServerHandler`. It's worth calling out that this PR makes use of some placeholder types which are not intended to be final. Specifically: * `AsyncResponseStreamWriter` is expected to be superseded by the `AsyncWriter` from grpc#1245. * `AsyncServerCallContext` conformance has been added to the existing `ServerCallContextBase`. It is expected that we will provide a new implementation of `AsyncServerCallContext` that is independent from the existing call context types.
This commit implements some of the types required by the proposal for async/await support, added in #1231. To aid reviewing, only the types required for the server are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. It is a complimentary PR to #1243 ("Async-await: Base types for client implementation"). It provides a unified `AsyncServerHandler` for all of the RPC types which avoids a substantial amount of code duplication that is found in the existing handlers. Wrappers are provided for the four RPC types. Otherwise it is analogous to the existing `BidirectionalStreamingServerHandler`. It's worth calling out that this PR makes use of some placeholder types which are not intended to be final. Specifically: * `AsyncResponseStreamWriter` is expected to be superseded by the `AsyncWriter` from #1245. * `AsyncServerCallContext` conformance has been added to the existing `ServerCallContextBase`. It is expected that we will provide a new implementation of `AsyncServerCallContext` that is independent from the existing call context types.
This commit implements some of the types required by the proposal for async/await support, added in #1231. To aid reviewing, only the types required for the server are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. It is a complimentary PR to #1243 ("Async-await: Base types for client implementation"). It provides a unified `AsyncServerHandler` for all of the RPC types which avoids a substantial amount of code duplication that is found in the existing handlers. Wrappers are provided for the four RPC types. Otherwise it is analogous to the existing `BidirectionalStreamingServerHandler`. It's worth calling out that this PR makes use of some placeholder types which are not intended to be final. Specifically: * `AsyncResponseStreamWriter` is expected to be superseded by the `AsyncWriter` from #1245. * `AsyncServerCallContext` conformance has been added to the existing `ServerCallContextBase`. It is expected that we will provide a new implementation of `AsyncServerCallContext` that is independent from the existing call context types.
This PR implements some of the types required by the proposal for async/await support, added in #1231.
To aid reviewing, only the types required for the server are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. It is a complimentary PR to #1243 ("Async-await: Base types for client implementation").
It provides a unified
AsyncServerHandler
for all of the RPC types which avoids a substantial amount of code duplication that is found in the existing handlers. Wrappers are provided for the four RPC types. Otherwise it is analogous to the existingBidirectionalStreamingServerHandler
.It's worth calling out that this PR makes use of some placeholder types which are not intended to be final. Specifically:
AsyncResponseStreamWriter
is expected to be superseded by theAsyncWriter
from Add an internal pausable writer #1245.AsyncServerCallContext
conformance has been added to the existingServerCallContextBase
. It is expected that we will provide a new implementation ofAsyncServerCallContext
that is independent from the existing call context types.