Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support protobuf separately to GRPCPayload for the client #889

Merged
merged 4 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Sources/Examples/Echo/Model/echo.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,3 @@ extension Echo_EchoProvider {
}
}


// Provides conformance to `GRPCPayload`
extension Echo_EchoRequest: GRPCProtobufPayload {}
extension Echo_EchoResponse: GRPCProtobufPayload {}
4 changes: 0 additions & 4 deletions Sources/Examples/HelloWorld/Model/helloworld.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,3 @@ extension Helloworld_GreeterProvider {
}
}


// Provides conformance to `GRPCPayload`
extension Helloworld_HelloRequest: GRPCProtobufPayload {}
extension Helloworld_HelloReply: GRPCProtobufPayload {}
7 changes: 0 additions & 7 deletions Sources/Examples/RouteGuide/Model/route_guide.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,3 @@ extension Routeguide_RouteGuideProvider {
}
}


// Provides conformance to `GRPCPayload`
extension Routeguide_Point: GRPCProtobufPayload {}
extension Routeguide_Rectangle: GRPCProtobufPayload {}
extension Routeguide_Feature: GRPCProtobufPayload {}
extension Routeguide_RouteNote: GRPCProtobufPayload {}
extension Routeguide_RouteSummary: GRPCProtobufPayload {}
19 changes: 11 additions & 8 deletions Sources/GRPC/ClientCalls/BidirectionalStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import Logging
///
/// Messages should be sent via the `sendMessage` and `sendMessages` methods; the stream of messages
/// must be terminated by calling `sendEnd` to indicate the final message has been sent.
public final class BidirectionalStreamingCall<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: StreamingRequestClientCall {
public final class BidirectionalStreamingCall<RequestPayload, ResponsePayload>: StreamingRequestClientCall {
private let transport: ChannelTransport<RequestPayload, ResponsePayload>

/// The options used to make the RPC.
Expand Down Expand Up @@ -147,16 +144,20 @@ public final class BidirectionalStreamingCall<
}

extension BidirectionalStreamingCall {
internal static func makeOnHTTP2Stream(
internal static func makeOnHTTP2Stream<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
serializer: Serializer,
deserializer: Deserializer,
callOptions: CallOptions,
errorDelegate: ClientErrorDelegate?,
logger: Logger,
responseHandler: @escaping (ResponsePayload) -> Void
) -> BidirectionalStreamingCall<RequestPayload, ResponsePayload> {
) -> BidirectionalStreamingCall<RequestPayload, ResponsePayload> where Serializer.Input == RequestPayload, Deserializer.Output == ResponsePayload {
let eventLoop = multiplexer.eventLoop
let transport = ChannelTransport<RequestPayload, ResponsePayload>(
multiplexer: multiplexer,
serializer: serializer,
deserializer: deserializer,
responseContainer: .init(eventLoop: eventLoop, streamingResponseHandler: responseHandler),
callType: .bidirectionalStreaming,
timeLimit: callOptions.timeLimit,
Expand All @@ -167,12 +168,14 @@ extension BidirectionalStreamingCall {
return BidirectionalStreamingCall(transport: transport, options: callOptions)
}

internal static func make(
internal static func make<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
serializer: Serializer,
deserializer: Deserializer,
fakeResponse: FakeStreamingResponse<RequestPayload, ResponsePayload>?,
callOptions: CallOptions,
logger: Logger,
responseHandler: @escaping (ResponsePayload) -> Void
) -> BidirectionalStreamingCall<RequestPayload, ResponsePayload> {
) -> BidirectionalStreamingCall<RequestPayload, ResponsePayload> where Serializer.Input == RequestPayload, Deserializer.Output == ResponsePayload {
let eventLoop = fakeResponse?.channel.eventLoop ?? EmbeddedEventLoop()
let responseContainer = ResponsePartContainer(eventLoop: eventLoop, streamingResponseHandler: responseHandler)

Expand Down
8 changes: 4 additions & 4 deletions Sources/GRPC/ClientCalls/ClientCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import SwiftProtobuf
/// Base protocol for a client call to a gRPC service.
public protocol ClientCall {
/// The type of the request message for the call.
associatedtype RequestPayload: GRPCPayload
associatedtype RequestPayload
/// The type of the response message for the call.
associatedtype ResponsePayload: GRPCPayload
associatedtype ResponsePayload

/// The event loop this call is running on.
var eventLoop: EventLoop { get }
Expand Down Expand Up @@ -159,7 +159,7 @@ public protocol UnaryResponseClientCall: ClientCall {
// a NIO HTTP/2 stream channel.

internal protocol ClientCallInbound {
associatedtype Response: GRPCPayload
associatedtype Response
typealias ResponsePart = _GRPCClientResponsePart<Response>

/// Receive an error.
Expand All @@ -170,7 +170,7 @@ internal protocol ClientCallInbound {
}

internal protocol ClientCallOutbound {
associatedtype Request: GRPCPayload
associatedtype Request
typealias RequestPart = _GRPCClientRequestPart<Request>

/// Send a single request part and complete the promise once the part has been sent.
Expand Down
11 changes: 7 additions & 4 deletions Sources/GRPC/ClientCalls/ClientCallTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import Logging
///```
///
/// Note: the "main" pipeline provided by the channel in `ClientConnection`.
internal class ChannelTransport<Request: GRPCPayload, Response: GRPCPayload> {
internal class ChannelTransport<Request, Response> {
internal typealias RequestPart = _GRPCClientRequestPart<Request>
internal typealias ResponsePart = _GRPCClientResponsePart<Response>

Expand Down Expand Up @@ -148,14 +148,16 @@ internal class ChannelTransport<Request: GRPCPayload, Response: GRPCPayload> {
channelProvider(self, channelPromise)
}

internal convenience init(
internal convenience init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
serializer: Serializer,
deserializer: Deserializer,
responseContainer: ResponsePartContainer<Response>,
callType: GRPCCallType,
timeLimit: TimeLimit,
errorDelegate: ClientErrorDelegate?,
logger: Logger
) {
) where Serializer.Input == Request, Deserializer.Output == Response {
self.init(
eventLoop: multiplexer.eventLoop,
responseContainer: responseContainer,
Expand All @@ -168,7 +170,8 @@ internal class ChannelTransport<Request: GRPCPayload, Response: GRPCPayload> {
case .success(let mux):
mux.createStreamChannel(promise: streamPromise) { stream, streamID in
stream.pipeline.addHandlers([
_GRPCClientChannelHandler<Request, Response>(streamID: streamID, callType: callType, logger: logger),
_GRPCClientChannelHandler(streamID: streamID, callType: callType, logger: logger),
GRPCClientCodecHandler(serializer: serializer, deserializer: deserializer),
GRPCClientCallHandler(call: call)
])
}
Expand Down
19 changes: 11 additions & 8 deletions Sources/GRPC/ClientCalls/ClientStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import Logging
///
/// Messages should be sent via the `sendMessage` and `sendMessages` methods; the stream of messages
/// must be terminated by calling `sendEnd` to indicate the final message has been sent.
public final class ClientStreamingCall<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
> : StreamingRequestClientCall, UnaryResponseClientCall {
public final class ClientStreamingCall<RequestPayload, ResponsePayload>: StreamingRequestClientCall, UnaryResponseClientCall {
private let transport: ChannelTransport<RequestPayload, ResponsePayload>

/// The options used to make the RPC.
Expand Down Expand Up @@ -152,16 +149,20 @@ public final class ClientStreamingCall<
}

extension ClientStreamingCall {
internal static func makeOnHTTP2Stream(
internal static func makeOnHTTP2Stream<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
serializer: Serializer,
deserializer: Deserializer,
callOptions: CallOptions,
errorDelegate: ClientErrorDelegate?,
logger: Logger
) -> ClientStreamingCall<RequestPayload, ResponsePayload> {
) -> ClientStreamingCall<RequestPayload, ResponsePayload> where Serializer.Input == RequestPayload, Deserializer.Output == ResponsePayload {
let eventLoop = multiplexer.eventLoop
let responsePromise: EventLoopPromise<ResponsePayload> = eventLoop.makePromise()
let transport = ChannelTransport<RequestPayload, ResponsePayload>(
multiplexer: multiplexer,
serializer: serializer,
deserializer: deserializer,
responseContainer: .init(eventLoop: eventLoop, unaryResponsePromise: responsePromise),
callType: .clientStreaming,
timeLimit: callOptions.timeLimit,
Expand All @@ -171,11 +172,13 @@ extension ClientStreamingCall {
return ClientStreamingCall(response: responsePromise.futureResult, transport: transport, options: callOptions)
}

internal static func make(
internal static func make<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
serializer: Serializer,
deserializer: Deserializer,
fakeResponse: FakeUnaryResponse<RequestPayload, ResponsePayload>?,
callOptions: CallOptions,
logger: Logger
) -> ClientStreamingCall<RequestPayload, ResponsePayload> {
) -> ClientStreamingCall<RequestPayload, ResponsePayload> where Serializer.Input == RequestPayload, Deserializer.Output == ResponsePayload {
let eventLoop = fakeResponse?.channel.eventLoop ?? EmbeddedEventLoop()
let responsePromise: EventLoopPromise<ResponsePayload> = eventLoop.makePromise()
let responseContainer = ResponsePartContainer(eventLoop: eventLoop, unaryResponsePromise: responsePromise)
Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPC/ClientCalls/GRPCClientCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import NIO

/// An inbound channel handler which forwards events and messages to a client call.
internal class GRPCClientCallHandler<Request: GRPCPayload, Response: GRPCPayload>: ChannelInboundHandler {
internal class GRPCClientCallHandler<Request, Response>: ChannelInboundHandler {
typealias InboundIn = _GRPCClientResponsePart<Response>
private var call: ChannelTransport<Request, Response>

Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPC/ClientCalls/ResponsePartContainer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import NIO
import NIOHPACK

/// A container for RPC response parts.
internal struct ResponsePartContainer<Response: GRPCPayload> {
internal struct ResponsePartContainer<Response> {
/// The type of handler for response message part.
enum ResponseHandler {
case unary(EventLoopPromise<Response>)
Expand Down
19 changes: 11 additions & 8 deletions Sources/GRPC/ClientCalls/ServerStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ import Logging

/// A server-streaming gRPC call. The request is sent on initialization, each response is passed to
/// the provided observer block.
public final class ServerStreamingCall<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: ClientCall {
public final class ServerStreamingCall<RequestPayload, ResponsePayload>: ClientCall {
private let transport: ChannelTransport<RequestPayload, ResponsePayload>

/// The options used to make the RPC.
Expand Down Expand Up @@ -93,16 +90,20 @@ public final class ServerStreamingCall<
}

extension ServerStreamingCall {
internal static func makeOnHTTP2Stream(
internal static func makeOnHTTP2Stream<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
serializer: Serializer,
deserializer: Deserializer,
callOptions: CallOptions,
errorDelegate: ClientErrorDelegate?,
logger: Logger,
responseHandler: @escaping (ResponsePayload) -> Void
) -> ServerStreamingCall<RequestPayload, ResponsePayload> {
) -> ServerStreamingCall<RequestPayload, ResponsePayload> where Serializer.Input == RequestPayload, Deserializer.Output == ResponsePayload {
let eventLoop = multiplexer.eventLoop
let transport = ChannelTransport<RequestPayload, ResponsePayload>(
multiplexer: multiplexer,
serializer: serializer,
deserializer: deserializer,
responseContainer: .init(eventLoop: eventLoop, streamingResponseHandler: responseHandler),
callType: .serverStreaming,
timeLimit: callOptions.timeLimit,
Expand All @@ -113,12 +114,14 @@ extension ServerStreamingCall {
return ServerStreamingCall(transport: transport, options: callOptions)
}

internal static func make(
internal static func make<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
serializer: Serializer,
deserializer: Deserializer,
fakeResponse: FakeStreamingResponse<RequestPayload, ResponsePayload>?,
callOptions: CallOptions,
logger: Logger,
responseHandler: @escaping (ResponsePayload) -> Void
) -> ServerStreamingCall<RequestPayload, ResponsePayload> {
) -> ServerStreamingCall<RequestPayload, ResponsePayload> where Serializer.Input == RequestPayload, Deserializer.Output == ResponsePayload {
let eventLoop = fakeResponse?.channel.eventLoop ?? EmbeddedEventLoop()
let responseContainer = ResponsePartContainer(eventLoop: eventLoop, streamingResponseHandler: responseHandler)

Expand Down
19 changes: 11 additions & 8 deletions Sources/GRPC/ClientCalls/UnaryCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import NIOHPACK
import Logging

/// A unary gRPC call. The request is sent on initialization.
public final class UnaryCall<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: UnaryResponseClientCall {
public final class UnaryCall<RequestPayload, ResponsePayload>: UnaryResponseClientCall {
private let transport: ChannelTransport<RequestPayload, ResponsePayload>

/// The options used to make the RPC.
Expand Down Expand Up @@ -100,16 +97,20 @@ public final class UnaryCall<
}

extension UnaryCall {
internal static func makeOnHTTP2Stream(
internal static func makeOnHTTP2Stream<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
serializer: Serializer,
deserializer: Deserializer,
callOptions: CallOptions,
errorDelegate: ClientErrorDelegate?,
logger: Logger
) -> UnaryCall<RequestPayload, ResponsePayload> {
) -> UnaryCall<RequestPayload, ResponsePayload> where Serializer.Input == RequestPayload, Deserializer.Output == ResponsePayload {
let eventLoop = multiplexer.eventLoop
let responsePromise: EventLoopPromise<ResponsePayload> = eventLoop.makePromise()
let transport = ChannelTransport<RequestPayload, ResponsePayload>(
multiplexer: multiplexer,
serializer: serializer,
deserializer: deserializer,
responseContainer: .init(eventLoop: eventLoop, unaryResponsePromise: responsePromise),
callType: .unary,
timeLimit: callOptions.timeLimit,
Expand All @@ -119,11 +120,13 @@ extension UnaryCall {
return UnaryCall(response: responsePromise.futureResult, transport: transport, options: callOptions)
}

internal static func make(
internal static func make<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
serializer: Serializer,
deserializer: Deserializer,
fakeResponse: FakeUnaryResponse<RequestPayload, ResponsePayload>?,
callOptions: CallOptions,
logger: Logger
) -> UnaryCall<RequestPayload, ResponsePayload> {
) -> UnaryCall<RequestPayload, ResponsePayload> where Serializer.Input == RequestPayload, Deserializer.Output == ResponsePayload {
let eventLoop = fakeResponse?.channel.eventLoop ?? EmbeddedEventLoop()
let responsePromise: EventLoopPromise<ResponsePayload> = eventLoop.makePromise()
let responseContainer = ResponsePartContainer(eventLoop: eventLoop, unaryResponsePromise: responsePromise)
Expand Down
Loading