Skip to content

Commit

Permalink
Put the client connection behind a protocol (#727)
Browse files Browse the repository at this point in the history
Motivation:

Generated client stubs may only be initialized using a
`ClientConnection` which makes the assumption that a connection will
only ever be singular, and that all RPCs for that stub will run on a
single event loop.

However, there are scenarios where we may want to drive a stub and have
multiple connections (load balancing), or no real connection at all
(testing).

Modifications:

- Generated client stubs now depend on the implementation of a `GRPCChannel`
- The first argument label for the `init` in generated clients has
  changed from `connection` to `client`
- Added fixits for the above
- Updated generated code
- `GRPCChannel`s contract ensures that RPCs can be made but do not
  specify the transport.
- `GRPCClient` now requires a `GRPCChannel`
- `GRPCClient` provides higher level wrappers for the factory methods on
  `GRPCChannel` (i.e. provide defaults such as for call options)

Result:

- We have a better ability, in the future, to use already-generated
  stubs with different `GRPCChannel`s.
- Cleaner abstraction between call and connection.
  • Loading branch information
glbrntt authored Feb 26, 2020
1 parent 9243527 commit 50ccad2
Show file tree
Hide file tree
Showing 42 changed files with 580 additions and 322 deletions.
9 changes: 5 additions & 4 deletions Sources/Examples/Echo/Model/echo.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ public protocol Echo_EchoService {
}

public final class Echo_EchoServiceClient: GRPCClient, Echo_EchoService {
public let connection: ClientConnection
public let channel: GRPCChannel
public var defaultCallOptions: CallOptions

/// Creates a client for the echo.Echo service.
///
/// - Parameters:
/// - connection: `ClientConnection` to the service host.
/// - channel: `GRPCChannel` to the service host.
/// - defaultCallOptions: Options to use for each service call if the user doesn't provide them.
public init(connection: ClientConnection, defaultCallOptions: CallOptions = CallOptions()) {
self.connection = connection
public init(channel: GRPCChannel, defaultCallOptions: CallOptions = CallOptions()) {
self.channel = channel
self.defaultCallOptions = defaultCallOptions
}

Expand Down Expand Up @@ -153,3 +153,4 @@ extension Echo_EchoProvider {
/// Provides conformance to `GRPCPayload` for the request and response messages
extension Echo_EchoRequest: GRPCProtobufPayload {}
extension Echo_EchoResponse: GRPCProtobufPayload {}

2 changes: 1 addition & 1 deletion Sources/Examples/Echo/Runtime/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func makeClient(group: EventLoopGroup, host: String, port: Int, useTLS: Bool) ->

// Start the connection and create the client:
let connection = ClientConnection(configuration: configuration)
return Echo_EchoServiceClient(connection: connection)
return Echo_EchoServiceClient(channel: connection)
}

func callRPC(_ rpc: RPC, using client: Echo_EchoServiceClient, message: String) {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Examples/HelloWorld/Client/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func main(args: [String]) {
let connection = ClientConnection(configuration: configuration)

// Provide the connection to the generated client.
let greeter = Helloworld_GreeterServiceClient(connection: connection)
let greeter = Helloworld_GreeterServiceClient(channel: connection)

// Do the greeting.
greet(name: name, client: greeter)
Expand Down
9 changes: 5 additions & 4 deletions Sources/Examples/HelloWorld/Model/helloworld.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ public protocol Helloworld_GreeterService {
}

public final class Helloworld_GreeterServiceClient: GRPCClient, Helloworld_GreeterService {
public let connection: ClientConnection
public let channel: GRPCChannel
public var defaultCallOptions: CallOptions

/// Creates a client for the helloworld.Greeter service.
///
/// - Parameters:
/// - connection: `ClientConnection` to the service host.
/// - channel: `GRPCChannel` to the service host.
/// - defaultCallOptions: Options to use for each service call if the user doesn't provide them.
public init(connection: ClientConnection, defaultCallOptions: CallOptions = CallOptions()) {
self.connection = connection
public init(channel: GRPCChannel, defaultCallOptions: CallOptions = CallOptions()) {
self.channel = channel
self.defaultCallOptions = defaultCallOptions
}

Expand Down Expand Up @@ -88,3 +88,4 @@ extension Helloworld_GreeterProvider {
/// Provides conformance to `GRPCPayload` for the request and response messages
extension Helloworld_HelloRequest: GRPCProtobufPayload {}
extension Helloworld_HelloReply: GRPCProtobufPayload {}

4 changes: 2 additions & 2 deletions Sources/Examples/RouteGuide/Client/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func makeClient(port: Int, group: EventLoopGroup) -> Routeguide_RouteGuideServic
)

let connection = ClientConnection(configuration: config)
return Routeguide_RouteGuideServiceClient(connection: connection)
return Routeguide_RouteGuideServiceClient(channel: connection)
}

/// Unary call example. Calls `getFeature` and prints the response.
Expand Down Expand Up @@ -214,7 +214,7 @@ func main(args: [String]) throws {
// Make a client, make sure we close it when we're done.
let routeGuide = makeClient(port: port, group: group)
defer {
try? routeGuide.connection.close().wait()
try? routeGuide.channel.close().wait()
}

// Look for a valid feature.
Expand Down
9 changes: 5 additions & 4 deletions Sources/Examples/RouteGuide/Model/route_guide.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ public protocol Routeguide_RouteGuideService {
}

public final class Routeguide_RouteGuideServiceClient: GRPCClient, Routeguide_RouteGuideService {
public let connection: ClientConnection
public let channel: GRPCChannel
public var defaultCallOptions: CallOptions

/// Creates a client for the routeguide.RouteGuide service.
///
/// - Parameters:
/// - connection: `ClientConnection` to the service host.
/// - channel: `GRPCChannel` to the service host.
/// - defaultCallOptions: Options to use for each service call if the user doesn't provide them.
public init(connection: ClientConnection, defaultCallOptions: CallOptions = CallOptions()) {
self.connection = connection
public init(channel: GRPCChannel, defaultCallOptions: CallOptions = CallOptions()) {
self.channel = channel
self.defaultCallOptions = defaultCallOptions
}

Expand Down Expand Up @@ -156,3 +156,4 @@ extension Routeguide_Feature: GRPCProtobufPayload {}
extension Routeguide_Rectangle: GRPCProtobufPayload {}
extension Routeguide_RouteSummary: GRPCProtobufPayload {}
extension Routeguide_RouteNote: GRPCProtobufPayload {}

31 changes: 17 additions & 14 deletions Sources/GRPC/ClientCalls/BidirectionalStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Foundation
import SwiftProtobuf
import NIO
import NIOHTTP2
import Logging

/// A bidirectional-streaming gRPC call. Each response is passed to the provided observer block.
Expand All @@ -32,42 +31,46 @@ public final class BidirectionalStreamingCall<RequestPayload: GRPCPayload, Respo
StreamingRequestClientCall {
private var messageQueue: EventLoopFuture<Void>

public init(
connection: ClientConnection,
init(
path: String,
scheme: String,
authority: String,
callOptions: CallOptions,
eventLoop: EventLoop,
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
errorDelegate: ClientErrorDelegate?,
logger: Logger,
handler: @escaping (ResponsePayload) -> Void
) {
self.messageQueue = connection.channel.eventLoop.makeSucceededFuture(())
self.messageQueue = eventLoop.makeSucceededFuture(())
let requestID = callOptions.requestIDProvider.requestID()

let logger = Logger(subsystem: .clientChannelCall, metadata: [MetadataKey.requestID: "\(requestID)"])
var logger = logger
logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
logger.debug("starting rpc", metadata: ["path": "\(path)"])

let responseHandler = GRPCClientStreamingResponseChannelHandler(
initialMetadataPromise: connection.channel.eventLoop.makePromise(),
trailingMetadataPromise: connection.channel.eventLoop.makePromise(),
statusPromise: connection.channel.eventLoop.makePromise(),
initialMetadataPromise: eventLoop.makePromise(),
trailingMetadataPromise: eventLoop.makePromise(),
statusPromise: eventLoop.makePromise(),
errorDelegate: errorDelegate,
timeout: callOptions.timeout,
logger: logger,
responseHandler: handler
)

let requestHead = _GRPCRequestHead(
scheme: connection.configuration.httpProtocol.scheme,
scheme: scheme,
path: path,
host: connection.configuration.target.host,
host: authority,
requestID: requestID,
options: callOptions
)

let requestHandler = _StreamingRequestChannelHandler<RequestPayload>(requestHead: requestHead)

super.init(
eventLoop: connection.eventLoop,
multiplexer: connection.multiplexer,
eventLoop: eventLoop,
multiplexer: multiplexer,
callType: .bidirectionalStreaming,
callOptions: callOptions,
responseHandler: responseHandler,
Expand Down
34 changes: 19 additions & 15 deletions Sources/GRPC/ClientCalls/ClientStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Foundation
import SwiftProtobuf
import NIO
import NIOHTTP2
import Logging

/// A client-streaming gRPC call.
Expand All @@ -35,43 +34,48 @@ public final class ClientStreamingCall<RequestPayload: GRPCPayload, ResponsePayl
public let response: EventLoopFuture<ResponsePayload>
private var messageQueue: EventLoopFuture<Void>

public init(
connection: ClientConnection,
init(
path: String,
scheme: String,
authority: String,
callOptions: CallOptions,
errorDelegate: ClientErrorDelegate?
eventLoop: EventLoop,
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
errorDelegate: ClientErrorDelegate?,
logger: Logger
) {
let requestID = callOptions.requestIDProvider.requestID()
let logger = Logger(subsystem: .clientChannelCall, metadata: [MetadataKey.requestID: "\(requestID)"])
var logger = logger
logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
logger.debug("starting rpc", metadata: ["path": "\(path)"])

self.messageQueue = connection.eventLoop.makeSucceededFuture(())
let responsePromise = connection.eventLoop.makePromise(of: ResponsePayload.self)
self.messageQueue = eventLoop.makeSucceededFuture(())
let responsePromise = eventLoop.makePromise(of: ResponsePayload.self)
self.response = responsePromise.futureResult

let responseHandler = GRPCClientUnaryResponseChannelHandler(
initialMetadataPromise: connection.channel.eventLoop.makePromise(),
trailingMetadataPromise: connection.channel.eventLoop.makePromise(),
initialMetadataPromise: eventLoop.makePromise(),
trailingMetadataPromise: eventLoop.makePromise(),
responsePromise: responsePromise,
statusPromise: connection.channel.eventLoop.makePromise(),
statusPromise: eventLoop.makePromise(),
errorDelegate: errorDelegate,
timeout: callOptions.timeout,
logger: logger
)

let requestHead = _GRPCRequestHead(
scheme: connection.configuration.httpProtocol.scheme,
scheme: scheme,
path: path,
host: connection.configuration.target.host,
host: authority,
requestID: requestID,
options: callOptions
)

let requestHandler = _StreamingRequestChannelHandler<RequestPayload>(requestHead: requestHead)

super.init(
eventLoop: connection.eventLoop,
multiplexer: connection.multiplexer,
eventLoop: eventLoop,
multiplexer: multiplexer,
callType: .clientStreaming,
callOptions: callOptions,
responseHandler: responseHandler,
Expand Down
31 changes: 18 additions & 13 deletions Sources/GRPC/ClientCalls/ServerStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Foundation
import SwiftProtobuf
import NIO
import NIOHTTP2
import Logging

/// A server-streaming gRPC call. The request is sent on initialization, each response is passed to the provided observer block.
Expand All @@ -25,32 +24,38 @@ import Logging
/// - `status`: the status of the gRPC call after it has ended,
/// - `trailingMetadata`: any metadata returned from the server alongside the `status`.
public final class ServerStreamingCall<RequestPayload: GRPCPayload, ResponsePayload: GRPCPayload>: BaseClientCall<RequestPayload, ResponsePayload> {
public init(
connection: ClientConnection,
init(
path: String,
request: RequestPayload,
scheme: String,
authority: String,
callOptions: CallOptions,
eventLoop: EventLoop,
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
errorDelegate: ClientErrorDelegate?,
logger: Logger,
request: RequestPayload,
handler: @escaping (ResponsePayload) -> Void
) {
let requestID = callOptions.requestIDProvider.requestID()
let logger = Logger(subsystem: .clientChannelCall, metadata: [MetadataKey.requestID: "\(requestID)"])
var logger = logger
logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
logger.debug("starting rpc", metadata: ["path": "\(path)"])


let responseHandler = GRPCClientStreamingResponseChannelHandler(
initialMetadataPromise: connection.channel.eventLoop.makePromise(),
trailingMetadataPromise: connection.channel.eventLoop.makePromise(),
statusPromise: connection.channel.eventLoop.makePromise(),
initialMetadataPromise: eventLoop.makePromise(),
trailingMetadataPromise: eventLoop.makePromise(),
statusPromise: eventLoop.makePromise(),
errorDelegate: errorDelegate,
timeout: callOptions.timeout,
logger: logger,
responseHandler: handler
)

let requestHead = _GRPCRequestHead(
scheme: connection.configuration.httpProtocol.scheme,
scheme: scheme,
path: path,
host: connection.configuration.target.host,
host: authority,
requestID: requestID,
options: callOptions
)
Expand All @@ -61,8 +66,8 @@ public final class ServerStreamingCall<RequestPayload: GRPCPayload, ResponsePayl
)

super.init(
eventLoop: connection.eventLoop,
multiplexer: connection.multiplexer,
eventLoop: eventLoop,
multiplexer: multiplexer,
callType: .serverStreaming,
callOptions: callOptions,
responseHandler: responseHandler,
Expand Down
31 changes: 18 additions & 13 deletions Sources/GRPC/ClientCalls/UnaryCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,34 +32,39 @@ public final class UnaryCall<RequestPayload: GRPCPayload, ResponsePayload: GRPCP
UnaryResponseClientCall {
public let response: EventLoopFuture<ResponsePayload>

public init(
connection: ClientConnection,
init(
path: String,
request: RequestPayload,
scheme: String,
authority: String,
callOptions: CallOptions,
errorDelegate: ClientErrorDelegate?
eventLoop: EventLoop,
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
errorDelegate: ClientErrorDelegate?,
logger: Logger,
request: RequestPayload
) {
let requestID = callOptions.requestIDProvider.requestID()
let logger = Logger(subsystem: .clientChannelCall, metadata: [MetadataKey.requestID: "\(requestID)"])
var logger = logger
logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
logger.debug("starting rpc", metadata: ["path": "\(path)"])

let responsePromise = connection.channel.eventLoop.makePromise(of: ResponsePayload.self)
let responsePromise = eventLoop.makePromise(of: ResponsePayload.self)
self.response = responsePromise.futureResult

let responseHandler = GRPCClientUnaryResponseChannelHandler<ResponsePayload>(
initialMetadataPromise: connection.channel.eventLoop.makePromise(),
trailingMetadataPromise: connection.channel.eventLoop.makePromise(),
initialMetadataPromise: eventLoop.makePromise(),
trailingMetadataPromise: eventLoop.makePromise(),
responsePromise: responsePromise,
statusPromise: connection.channel.eventLoop.makePromise(),
statusPromise: eventLoop.makePromise(),
errorDelegate: errorDelegate,
timeout: callOptions.timeout,
logger: logger
)

let requestHead = _GRPCRequestHead(
scheme: connection.configuration.httpProtocol.scheme,
scheme: scheme,
path: path,
host: connection.configuration.target.host,
host: authority,
requestID: requestID,
options: callOptions
)
Expand All @@ -70,8 +75,8 @@ public final class UnaryCall<RequestPayload: GRPCPayload, ResponsePayload: GRPCP
)

super.init(
eventLoop: connection.channel.eventLoop,
multiplexer: connection.multiplexer,
eventLoop: eventLoop,
multiplexer: multiplexer,
callType: .unary,
callOptions: callOptions,
responseHandler: responseHandler,
Expand Down
Loading

0 comments on commit 50ccad2

Please sign in to comment.