Skip to content

Commit

Permalink
Provide a 'FakeChannel' (#864)
Browse files Browse the repository at this point in the history
Motivation:

When consuming gRPC it is often helpful to be able to write tests that
ensure the client is integrated correctly. At the moment this is only
possible by running a local gRPC server with a custom service handler to
return the responses you would like to test.

Modifications:

This is a continuation of the work started in #855.

- This change addes a 'FakeChannel' this is the glue that binds the Call
  objects with the 'fake responses' added in the aforementioned pull
  request.
- Also adds a 'write capturing' handler which forwards request parts to
  a handler provided on the fake response.
- Appropriate internal initializers on each of the call types.

Result:

- Users can manually create 'test' RPCs.
  • Loading branch information
glbrntt authored Jun 30, 2020
1 parent 2c149a2 commit 2732990
Show file tree
Hide file tree
Showing 10 changed files with 540 additions and 7 deletions.
31 changes: 30 additions & 1 deletion Sources/GRPC/ClientCalls/BidirectionalStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,34 @@ extension BidirectionalStreamingCall {

return BidirectionalStreamingCall(transport: transport, options: callOptions)
}
}

internal static func make(
fakeResponse: FakeStreamingResponse<RequestPayload, ResponsePayload>?,
callOptions: CallOptions,
logger: Logger,
responseHandler: @escaping (ResponsePayload) -> Void
) -> BidirectionalStreamingCall<RequestPayload, ResponsePayload> {
let eventLoop = fakeResponse?.channel.eventLoop ?? EmbeddedEventLoop()
let responseContainer = ResponsePartContainer(eventLoop: eventLoop, streamingResponseHandler: responseHandler)

let transport: ChannelTransport<RequestPayload, ResponsePayload>
if let fakeResponse = fakeResponse {
transport = .init(
fakeResponse: fakeResponse,
responseContainer: responseContainer,
timeLimit: callOptions.timeLimit,
logger: logger
)

fakeResponse.activate()
} else {
transport = .makeTransportForMissingFakeResponse(
eventLoop: eventLoop,
responseContainer: responseContainer,
logger: logger
)
}

return BidirectionalStreamingCall(transport: transport, options: callOptions)
}
}
40 changes: 40 additions & 0 deletions Sources/GRPC/ClientCalls/ClientCallTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,46 @@ internal class ChannelTransport<Request: GRPCPayload, Response: GRPCPayload> {
}
}
}

internal convenience init(
fakeResponse: _FakeResponseStream<Request, Response>,
responseContainer: ResponsePartContainer<Response>,
timeLimit: TimeLimit,
logger: Logger
) {
self.init(
eventLoop: fakeResponse.channel.eventLoop,
responseContainer: responseContainer,
timeLimit: timeLimit,
errorDelegate: nil,
logger: logger
) { call, streamPromise in
fakeResponse.channel.pipeline.addHandler(GRPCClientCallHandler(call: call)).map {
fakeResponse.channel
}.cascade(to: streamPromise)
}
}

/// Makes a transport whose channel promise is failed immediately.
internal static func makeTransportForMissingFakeResponse(
eventLoop: EventLoop,
responseContainer: ResponsePartContainer<Response>,
logger: Logger
) -> ChannelTransport<Request, Response> {
return .init(
eventLoop: eventLoop,
responseContainer: responseContainer,
timeLimit: .none,
errorDelegate: nil,
logger: logger
) { call, promise in
let error = GRPCStatus(
code: .unavailable,
message: "No fake response was registered before starting an RPC."
)
promise.fail(error)
}
}
}

// MARK: - Call API (i.e. called from {Unary,ClientStreaming,...}Call)
Expand Down
31 changes: 31 additions & 0 deletions Sources/GRPC/ClientCalls/ClientStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,35 @@ extension ClientStreamingCall {
)
return ClientStreamingCall(response: responsePromise.futureResult, transport: transport, options: callOptions)
}

internal static func make(
fakeResponse: FakeUnaryResponse<RequestPayload, ResponsePayload>?,
callOptions: CallOptions,
logger: Logger
) -> ClientStreamingCall<RequestPayload, ResponsePayload> {
let eventLoop = fakeResponse?.channel.eventLoop ?? EmbeddedEventLoop()
let responsePromise: EventLoopPromise<ResponsePayload> = eventLoop.makePromise()
let responseContainer = ResponsePartContainer(eventLoop: eventLoop, unaryResponsePromise: responsePromise)

let transport: ChannelTransport<RequestPayload, ResponsePayload>
if let fakeResponse = fakeResponse {
transport = .init(
fakeResponse: fakeResponse,
responseContainer: responseContainer,
timeLimit: callOptions.timeLimit,
logger: logger
)

fakeResponse.activate()
} else {
transport = .makeTransportForMissingFakeResponse(
eventLoop: eventLoop,
responseContainer: responseContainer,
logger: logger
)
}

return ClientStreamingCall(response: responsePromise.futureResult, transport: transport, options: callOptions)
}

}
30 changes: 30 additions & 0 deletions Sources/GRPC/ClientCalls/ServerStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,34 @@ extension ServerStreamingCall {

return ServerStreamingCall(transport: transport, options: callOptions)
}

internal static func make(
fakeResponse: FakeStreamingResponse<RequestPayload, ResponsePayload>?,
callOptions: CallOptions,
logger: Logger,
responseHandler: @escaping (ResponsePayload) -> Void
) -> ServerStreamingCall<RequestPayload, ResponsePayload> {
let eventLoop = fakeResponse?.channel.eventLoop ?? EmbeddedEventLoop()
let responseContainer = ResponsePartContainer(eventLoop: eventLoop, streamingResponseHandler: responseHandler)

let transport: ChannelTransport<RequestPayload, ResponsePayload>
if let callProxy = fakeResponse {
transport = .init(
fakeResponse: callProxy,
responseContainer: responseContainer,
timeLimit: callOptions.timeLimit,
logger: logger
)

callProxy.activate()
} else {
transport = .makeTransportForMissingFakeResponse(
eventLoop: eventLoop,
responseContainer: responseContainer,
logger: logger
)
}

return ServerStreamingCall(transport: transport, options: callOptions)
}
}
30 changes: 30 additions & 0 deletions Sources/GRPC/ClientCalls/UnaryCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,34 @@ extension UnaryCall {
)
return UnaryCall(response: responsePromise.futureResult, transport: transport, options: callOptions)
}

internal static func make(
fakeResponse: FakeUnaryResponse<RequestPayload, ResponsePayload>?,
callOptions: CallOptions,
logger: Logger
) -> UnaryCall<RequestPayload, ResponsePayload> {
let eventLoop = fakeResponse?.channel.eventLoop ?? EmbeddedEventLoop()
let responsePromise: EventLoopPromise<ResponsePayload> = eventLoop.makePromise()
let responseContainer = ResponsePartContainer(eventLoop: eventLoop, unaryResponsePromise: responsePromise)

let transport: ChannelTransport<RequestPayload, ResponsePayload>
if let fakeResponse = fakeResponse {
transport = .init(
fakeResponse: fakeResponse,
responseContainer: responseContainer,
timeLimit: callOptions.timeLimit,
logger: logger
)

fakeResponse.activate()
} else {
transport = .makeTransportForMissingFakeResponse(
eventLoop: eventLoop,
responseContainer: responseContainer,
logger: logger
)
}

return UnaryCall(response: responsePromise.futureResult, transport: transport, options: callOptions)
}
}
165 changes: 165 additions & 0 deletions Sources/GRPC/FakeChannel.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright 2020, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import NIO
import Logging

/// A fake channel for use with generated test clients.
///
/// The `FakeChannel` provides factories for calls which avoid most of the gRPC stack and don't do
/// real networking. Each call relies on either a `FakeUnaryResponse` or a `FakeStreamingResponse`
/// to get responses or errors. The fake response of each type should be registered with the channel
/// prior to making a call via `makeFakeUnaryResponse` or `makeFakeStreamingResponse` respectively.
///
/// Users will typically not be required to interact with the channel directly, instead they should
/// do so via a generated test client.
public class FakeChannel: GRPCChannel {
/// Fake response streams keyed by their path.
private var responseStreams: [String: CircularBuffer<Any>]

/// A logger.
public let logger: Logger

public init(logger: Logger = Logger(label: "io.grpc.testing")) {
self.responseStreams = [:]
self.logger = logger
}

/// Make and store a fake unary response for the given path. Users should prefer making a response
/// stream for their RPC directly via the appropriate method on their generated test client.
public func makeFakeUnaryResponse<Request: GRPCPayload, Response: GRPCPayload>(
path: String,
requestHandler: @escaping (FakeRequestPart<Request>) -> ()
) -> FakeUnaryResponse<Request, Response> {
let proxy = FakeUnaryResponse<Request, Response>(requestHandler: requestHandler)
self.responseStreams[path, default: []].append(proxy)
return proxy
}

/// Make and store a fake streaming response for the given path. Users should prefer making a
/// response stream for their RPC directly via the appropriate method on their generated test
/// client.
public func makeFakeStreamingResponse<Request: GRPCPayload, Response: GRPCPayload>(
path: String,
requestHandler: @escaping (FakeRequestPart<Request>) -> ()
) -> FakeStreamingResponse<Request, Response> {
let proxy = FakeStreamingResponse<Request, Response>(requestHandler: requestHandler)
self.responseStreams[path, default: []].append(proxy)
return proxy
}

// (Docs inherited from `GRPCChannel`)
public func makeUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
path: String,
request: Request,
callOptions: CallOptions
) -> UnaryCall<Request, Response> {
let call = UnaryCall<Request, Response>.make(
fakeResponse: self.dequeueResponseStream(forPath: path),
callOptions: callOptions,
logger: self.logger
)

call.send(self.makeRequestHead(path: path, callOptions: callOptions), request: request)

return call
}

// (Docs inherited from `GRPCChannel`)
public func makeServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
path: String,
request: Request,
callOptions: CallOptions,
handler: @escaping (Response) -> Void
) -> ServerStreamingCall<Request, Response> {
let call = ServerStreamingCall<Request, Response>.make(
fakeResponse: self.dequeueResponseStream(forPath: path),
callOptions: callOptions,
logger: self.logger,
responseHandler: handler
)

call.send(self.makeRequestHead(path: path, callOptions: callOptions), request: request)

return call
}

// (Docs inherited from `GRPCChannel`)
public func makeClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
path: String,
callOptions: CallOptions
) -> ClientStreamingCall<Request, Response> {
let call = ClientStreamingCall<Request, Response>.make(
fakeResponse: self.dequeueResponseStream(forPath: path),
callOptions: callOptions,
logger: self.logger
)

call.sendHead(self.makeRequestHead(path: path, callOptions: callOptions))

return call
}

// (Docs inherited from `GRPCChannel`)
public func makeBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
path: String,
callOptions: CallOptions,
handler: @escaping (Response) -> Void
) -> BidirectionalStreamingCall<Request, Response> {
let call = BidirectionalStreamingCall<Request, Response>.make(
fakeResponse: self.dequeueResponseStream(forPath: path),
callOptions: callOptions,
logger: self.logger,
responseHandler: handler
)

call.sendHead(self.makeRequestHead(path: path, callOptions: callOptions))

return call
}

public func close() -> EventLoopFuture<Void> {
// We don't have anything to close.
return EmbeddedEventLoop().makeSucceededFuture(())
}
}

extension FakeChannel {
/// Dequeue a proxy for the given path and casts it to the given type, if one exists.
private func dequeueResponseStream<Stream>(
forPath path: String,
as: Stream.Type = Stream.self
) -> Stream? {
guard var streams = self.responseStreams[path], !streams.isEmpty else {
return nil
}

// This is fine: we know we're non-empty.
let first = streams.removeFirst()
self.responseStreams.updateValue(streams, forKey: path)

return first as? Stream
}

private func makeRequestHead(path: String, callOptions: CallOptions) -> _GRPCRequestHead {
return _GRPCRequestHead(
scheme: "http",
path: path,
host: "localhost",
requestID: callOptions.requestIDProvider.requestID(),
options: callOptions
)
}
}
Loading

0 comments on commit 2732990

Please sign in to comment.