Skip to content

Commit

Permalink
Remove the requirement that messages conform to GRPCPayload on the se…
Browse files Browse the repository at this point in the history
…rver

Motivation:

To support payloads other than `SwiftProtobuf.Message` we required that
all messages conform to `GRPCPayload`. For protobuf messages we added
`GRPCProtobufPayload` which provides a default implemenation of
`GRPCPayload` for protobuf messages. We generated this conformance for
all protobuf messages we saw. This lead to a number issues and
workarounds including: grpc#738, grpc#778, grpc#801, grpc#837, grpc#877, grpc#881.

The intention is to continue to support `GRPCPayload` in addition to
protobuf, however, support for protobuf will not be via the
`GRPCProtobufPayload` protocol.

This PR adjust the server components such they only support
SwiftProtobuf. Once the client side has had the same treatment (and
`GRPCProtobufPayload` no longer inherits from `SwiftProtobuf.Message`),
support for `GRPCPayload` will be added back.

Modifications:

- The `HTTP1ToGRPCServerCodec` has had the message encoding and decoding
  removed. It now deals in `ByteBuffer`s rather than request/response
  messages.
- An additional `GRPCServerCodecHandler` which sits between the
  `HTTP1ToGRPCServerCodec` and `_BaseCallHandler` has been added which
  serializes/deserializes messages.
- Custom payload tests have been commented out. They will return when
  the transition has completed.

Result:

- Servers only support SwiftProtobuf
- Genertic constraints on the server have been removed; the constraints
  are place on the `init` of public handlers instead.
- `GRPCProtobufPayload` is no longer required on the server.
  • Loading branch information
glbrntt committed Jul 10, 2020
1 parent e461a84 commit e0656ee
Show file tree
Hide file tree
Showing 18 changed files with 586 additions and 289 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ import Logging
/// If the framework user wants to return a call error (e.g. in case of authentication failure),
/// they can fail the observer block future.
/// - To close the call and send the status, complete `context.statusPromise`.
public class BidirectionalStreamingCallHandler<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public class BidirectionalStreamingCallHandler<RequestPayload, ResponsePayload>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public typealias Context = StreamingResponseCallContext<ResponsePayload>
public typealias EventObserver = (StreamEvent<RequestPayload>) -> Void
public typealias EventObserverFactory = (Context) -> EventLoopFuture<EventObserver>
Expand All @@ -42,12 +39,16 @@ public class BidirectionalStreamingCallHandler<
public init(
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>) -> EventLoopFuture<EventObserver>
) {
) where RequestPayload: SwiftProtobuf.Message, ResponsePayload: SwiftProtobuf.Message {
// Delay the creation of the event observer until we actually get a request head, otherwise it
// would be possible for the observer to write into the pipeline (by completing the status
// promise) before the pipeline is configured.
self.eventObserverFactory = eventObserverFactory
super.init(callHandlerContext: callHandlerContext)
super.init(
protobufRequest: RequestPayload.self,
protobufResponse: ResponsePayload.self,
callHandlerContext: callHandlerContext
)
}

internal override func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
Expand Down
16 changes: 10 additions & 6 deletions Sources/GRPC/CallHandlers/ClientStreamingCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ import Logging
/// If the framework user wants to return a call error (e.g. in case of authentication failure),
/// they can fail the observer block future.
/// - To close the call and send the response, complete `context.responsePromise`.
public final class ClientStreamingCallHandler<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public final class ClientStreamingCallHandler<RequestPayload, ResponsePayload>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public typealias Context = UnaryResponseCallContext<ResponsePayload>
public typealias EventObserver = (StreamEvent<RequestPayload>) -> Void
public typealias EventObserverFactory = (Context) -> EventLoopFuture<EventObserver>
Expand All @@ -39,9 +36,16 @@ public final class ClientStreamingCallHandler<

// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
public init(callHandlerContext: CallHandlerContext, eventObserverFactory: @escaping EventObserverFactory) {
public init(
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping EventObserverFactory
) where RequestPayload: SwiftProtobuf.Message, ResponsePayload: SwiftProtobuf.Message {
self.eventObserverFactory = eventObserverFactory
super.init(callHandlerContext: callHandlerContext)
super.init(
protobufRequest: RequestPayload.self,
protobufResponse: ResponsePayload.self,
callHandlerContext: callHandlerContext
)
}

internal override func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
Expand Down
13 changes: 7 additions & 6 deletions Sources/GRPC/CallHandlers/ServerStreamingCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ import Logging
///
/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
/// - To close the call and send the status, complete the status future returned by the observer block.
public final class ServerStreamingCallHandler<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public final class ServerStreamingCallHandler<RequestPayload, ResponsePayload>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public typealias EventObserver = (RequestPayload) -> EventLoopFuture<GRPCStatus>

private var eventObserver: EventObserver?
Expand All @@ -36,12 +33,16 @@ public final class ServerStreamingCallHandler<
public init(
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>) -> EventObserver
) {
) where RequestPayload: SwiftProtobuf.Message, ResponsePayload: SwiftProtobuf.Message {
// Delay the creation of the event observer until we actually get a request head, otherwise it
// would be possible for the observer to write into the pipeline (by completing the status
// promise) before the pipeline is configured.
self.eventObserverFactory = eventObserverFactory
super.init(callHandlerContext: callHandlerContext)
super.init(
protobufRequest: RequestPayload.self,
protobufResponse: ResponsePayload.self,
callHandlerContext: callHandlerContext
)
}

override internal func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
Expand Down
13 changes: 7 additions & 6 deletions Sources/GRPC/CallHandlers/UnaryCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ import Logging
/// - The observer block is implemented by the framework user and returns a future containing the call result.
/// - To return a response to the client, the framework user should complete that future
/// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor).
public final class UnaryCallHandler<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public final class UnaryCallHandler<RequestPayload, ResponsePayload>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public typealias EventObserver = (RequestPayload) -> EventLoopFuture<ResponsePayload>
private var eventObserver: EventObserver?
private var callContext: UnaryResponseCallContext<ResponsePayload>?
Expand All @@ -36,9 +33,13 @@ public final class UnaryCallHandler<
public init(
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (UnaryResponseCallContext<ResponsePayload>) -> EventObserver
) {
) where RequestPayload: SwiftProtobuf.Message, ResponsePayload: SwiftProtobuf.Message {
self.eventObserverFactory = eventObserverFactory
super.init(callHandlerContext: callHandlerContext)
super.init(
protobufRequest: RequestPayload.self,
protobufResponse: ResponsePayload.self,
callHandlerContext: callHandlerContext
)
}

internal override func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
Expand Down
27 changes: 17 additions & 10 deletions Sources/GRPC/CallHandlers/_BaseCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import Logging
///
/// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
/// - Important: This is **NOT** part of the public API.
public class _BaseCallHandler<RequestPayload: GRPCPayload, ResponsePayload: GRPCPayload>: GRPCCallHandler {
public class _BaseCallHandler<Request, Response>: GRPCCallHandler {
private let codec: ChannelHandler

public func makeGRPCServerCodec() -> ChannelHandler {
return HTTP1ToGRPCServerCodec<RequestPayload, ResponsePayload>(
encoding: self.callHandlerContext.encoding,
logger: self.logger
)
return self.codec
}

/// Called when the request head has been received.
Expand All @@ -41,7 +40,7 @@ public class _BaseCallHandler<RequestPayload: GRPCPayload, ResponsePayload: GRPC
/// Called whenever a message has been received.
///
/// Overridden by subclasses.
internal func processMessage(_ message: RequestPayload) throws {
internal func processMessage(_ message: Request) throws {
fatalError("needs to be overridden")
}

Expand Down Expand Up @@ -69,13 +68,21 @@ public class _BaseCallHandler<RequestPayload: GRPCPayload, ResponsePayload: GRPC
return self.callHandlerContext.logger
}

internal init(callHandlerContext: CallHandlerContext) {
internal init(
protobufRequest: Request.Type,
protobufResponse: Response.Type,
callHandlerContext: CallHandlerContext
) where Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message {
self.callHandlerContext = callHandlerContext
self.codec = GRPCServerCodecHandler(
serializer: ProtobufSerializer<Response>(),
deserializer: ProtobufDeserializer<Request>()
)
}
}

extension _BaseCallHandler: ChannelInboundHandler {
public typealias InboundIn = _GRPCServerRequestPart<RequestPayload>
public typealias InboundIn = _GRPCServerRequestPart<Request>

/// Passes errors to the user-provided `errorHandler`. After an error has been received an
/// appropriate status is written. Errors which don't conform to `GRPCStatusTransformable`
Expand Down Expand Up @@ -122,8 +129,8 @@ extension _BaseCallHandler: ChannelInboundHandler {
}

extension _BaseCallHandler: ChannelOutboundHandler {
public typealias OutboundIn = _GRPCServerResponsePart<ResponsePayload>
public typealias OutboundOut = _GRPCServerResponsePart<ResponsePayload>
public typealias OutboundIn = _GRPCServerResponsePart<Response>
public typealias OutboundOut = _GRPCServerResponsePart<Response>

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
guard self.serverCanWrite else {
Expand Down
77 changes: 77 additions & 0 deletions Sources/GRPC/GRPCServerCodecHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2020, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import NIO

class GRPCServerCodecHandler<Serializer: MessageSerializer, Deserializer: MessageDeserializer> {
/// The response serializer.
private let serializer: Serializer

/// The request deserializer.
private let deserializer: Deserializer

internal init(serializer: Serializer, deserializer: Deserializer) {
self.serializer = serializer
self.deserializer = deserializer
}
}

extension GRPCServerCodecHandler: ChannelInboundHandler {
typealias InboundIn = _RawGRPCServerRequestPart
typealias InboundOut = _GRPCServerRequestPart<Deserializer.Output>

internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
switch self.unwrapInboundIn(data) {
case .head(let head):
context.fireChannelRead(self.wrapInboundOut(.head(head)))

case .message(let buffer):
do {
let deserialized = try self.deserializer.deserialize(byteBuffer: buffer)
context.fireChannelRead(self.wrapInboundOut(.message(deserialized)))
} catch {
context.fireErrorCaught(error)
}

case .end:
context.fireChannelRead(self.wrapInboundOut(.end))
}
}
}

extension GRPCServerCodecHandler: ChannelOutboundHandler {
typealias OutboundIn = _GRPCServerResponsePart<Serializer.Input>
typealias OutboundOut = _RawGRPCServerResponsePart

internal func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
switch self.unwrapOutboundIn(data) {
case .headers(let headers):
context.write(self.wrapOutboundOut(.headers(headers)), promise: promise)

case .message(let messageContext):
do {
let buffer = try self.serializer.serialize(messageContext.message, allocator: context.channel.allocator)
context.write(self.wrapOutboundOut(.message(.init(buffer, compressed: messageContext.compressed))), promise: promise)
} catch {
let error = GRPCError.SerializationFailure().captureContext()
promise?.fail(error)
context.fireErrorCaught(error)
}

case .statusAndTrailers(let status, let trailers):
context.write(self.wrapOutboundOut(.statusAndTrailers(status, trailers)), promise: promise)
}
}
}
3 changes: 2 additions & 1 deletion Sources/GRPC/GRPCServerRequestRoutingHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,9 @@ extension GRPCServerRequestRoutingHandler: ChannelInboundHandler, RemovableChann
self.state = .configuring([requestPart])

// Configure the rest of the pipeline to serve the RPC.
let httpToGRPC = HTTP1ToGRPCServerCodec(encoding: self.encoding, logger: self.logger)
let codec = callHandler.makeGRPCServerCodec()
context.pipeline.addHandlers([codec, callHandler], position: .after(self)).whenSuccess {
context.pipeline.addHandlers([httpToGRPC, codec, callHandler], position: .after(self)).whenSuccess {
context.pipeline.removeHandler(self, promise: nil)
}

Expand Down
36 changes: 20 additions & 16 deletions Sources/GRPC/HTTP1ToGRPCServerCodec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,38 @@ import NIO
import NIOHTTP1
import NIOFoundationCompat
import Logging
import SwiftProtobuf

/// Incoming gRPC package with a fixed message type.
///
/// - Important: This is **NOT** part of the public API.
public enum _GRPCServerRequestPart<RequestPayload: GRPCPayload> {
public enum _GRPCServerRequestPart<Request> {
case head(HTTPRequestHead)
case message(RequestPayload)
case message(Request)
case end
}

public typealias _RawGRPCServerRequestPart = _GRPCServerRequestPart<ByteBuffer>

/// Outgoing gRPC package with a fixed message type.
///
/// - Important: This is **NOT** part of the public API.
public enum _GRPCServerResponsePart<ResponsePayload: GRPCPayload> {
public enum _GRPCServerResponsePart<Response> {
case headers(HTTPHeaders)
case message(_MessageContext<ResponsePayload>)
case message(_MessageContext<Response>)
case statusAndTrailers(GRPCStatus, HTTPHeaders)
}

public typealias _RawGRPCServerResponsePart = _GRPCServerResponsePart<ByteBuffer>

/// A simple channel handler that translates HTTP1 data types into gRPC packets, and vice versa.
///
/// We use HTTP1 (instead of HTTP2) primitives, as these are easier to work with than raw HTTP2
/// primitives while providing all the functionality we need. In addition, it allows us to support
/// gRPC-Web (gRPC over HTTP1).
///
/// The translation from HTTP2 to HTTP1 is done by `HTTP2ToHTTP1ServerCodec`.
public final class HTTP1ToGRPCServerCodec<Request: GRPCPayload, Response: GRPCPayload> {
public final class HTTP1ToGRPCServerCodec {
public init(encoding: ServerMessageEncoding, logger: Logger) {
self.encoding = encoding
self.encodingHeaderValidator = MessageEncodingHeaderValidator(encoding: encoding)
Expand Down Expand Up @@ -118,7 +123,7 @@ extension HTTP1ToGRPCServerCodec {

extension HTTP1ToGRPCServerCodec: ChannelInboundHandler {
public typealias InboundIn = HTTPServerRequestPart
public typealias InboundOut = _GRPCServerRequestPart<Request>
public typealias InboundOut = _RawGRPCServerRequestPart

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
if case .ignore = inboundState {
Expand Down Expand Up @@ -247,10 +252,10 @@ extension HTTP1ToGRPCServerCodec: ChannelInboundHandler {
}

self.messageReader.append(buffer: &body)
var requests: [Request] = []
var requests: [ByteBuffer] = []
do {
while var buffer = try self.messageReader.nextMessage() {
requests.append(try Request(serializedByteBuffer: &buffer))
while let buffer = try self.messageReader.nextMessage() {
requests.append(buffer)
}
} catch let grpcError as GRPCError.WithContext {
context.fireErrorCaught(grpcError)
Expand Down Expand Up @@ -280,7 +285,7 @@ extension HTTP1ToGRPCServerCodec: ChannelInboundHandler {
}

extension HTTP1ToGRPCServerCodec: ChannelOutboundHandler {
public typealias OutboundIn = _GRPCServerResponsePart<Response>
public typealias OutboundIn = _RawGRPCServerResponsePart
public typealias OutboundOut = HTTPServerResponsePart

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
Expand Down Expand Up @@ -337,7 +342,7 @@ extension HTTP1ToGRPCServerCodec: ChannelOutboundHandler {
// the base64 response properly encoded in a single byte stream.
precondition(self.responseTextBuffer != nil)
try self.messageWriter.write(
messageContext.message,
buffer: messageContext.message,
into: &self.responseTextBuffer,
compressed: messageContext.compressed
)
Expand All @@ -346,13 +351,12 @@ extension HTTP1ToGRPCServerCodec: ChannelOutboundHandler {
// ServerStreaming provider continues sending the data.
promise?.succeed(())
} else {
var lengthPrefixedMessageBuffer = context.channel.allocator.buffer(capacity: 0)
try self.messageWriter.write(
messageContext.message,
into: &lengthPrefixedMessageBuffer,
let messageBuffer = try self.messageWriter.write(
buffer: messageContext.message,
allocator: context.channel.allocator,
compressed: messageContext.compressed
)
context.write(self.wrapOutboundOut(.body(.byteBuffer(lengthPrefixedMessageBuffer))), promise: promise)
context.write(self.wrapOutboundOut(.body(.byteBuffer(messageBuffer))), promise: promise)
}
} catch {
let error = GRPCError.SerializationFailure().captureContext()
Expand Down
Loading

0 comments on commit e0656ee

Please sign in to comment.