Skip to content

Commit

Permalink
Delay creating event observers for client streaming calls
Browse files Browse the repository at this point in the history
Motivation:

Providers for client and bidirectional streaming calls require
the user provide a future stream-event handler to handle requests
from the client. However, these methods get called as the pipeline
handling an incoming call is being configured, as these methods
also expose promises for response (for client streaming) and
call status (for bidirectional streaming) it is possible for these
to be fulfilled before the pipeline has been configured. Since
no handler is in place to deal with the promised types the server
will fatal error as the first handler in place will fail to unwrap
the promised type.

Modifications:

Delay the creation of event observers for client and
bidirectional streaming calls until their handlers have been added
to the pipeline.

Result:

Client streaming and bidirectional streaming calls can fulfill
their response and status promises outside of their stream handlers.
  • Loading branch information
glbrntt committed Jul 24, 2019
1 parent 7416161 commit 0296e3e
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 12 deletions.
19 changes: 13 additions & 6 deletions Sources/GRPC/CallHandlers/BidirectionalStreamingCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,23 @@ import NIOHTTP1
public class BidirectionalStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void
private var eventObserver: EventLoopFuture<EventObserver>?
private let eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>

private var callContext: StreamingResponseCallContext<ResponseMessage>?

// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
public init(channel: Channel, request: HTTPRequestHead, errorDelegate: ServerErrorDelegate?, eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) {
super.init(errorDelegate: errorDelegate)
public init(channel: Channel, request: HTTPRequestHead, errorDelegate: ServerErrorDelegate?, eventObserverFactory: @escaping (StreamingResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) {
// Delay the creation of the event observer until `handlerAdded(context:)`, otherwise it is
// possible for the service to write into the pipeline (by fulfilling the status promise
// of the call context outside of the observer) before it has been configured.
self.eventObserverFactory = eventObserverFactory

let context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request, errorDelegate: errorDelegate)
self.callContext = context
let eventObserver = eventObserverFactory(context)
self.eventObserver = eventObserver

super.init(errorDelegate: errorDelegate)

context.statusPromise.futureResult.whenComplete { _ in
// When done, reset references to avoid retain cycles.
self.eventObserver = nil
Expand All @@ -46,8 +52,9 @@ public class BidirectionalStreamingCallHandler<RequestMessage: Message, Response
}

public override func handlerAdded(context: ChannelHandlerContext) {
guard let eventObserver = self.eventObserver,
let callContext = self.callContext else { return }
guard let callContext = self.callContext else { return }
let eventObserver = self.eventObserverFactory(callContext)
self.eventObserver = eventObserver
// Terminate the call if the future providing an observer fails.
// This is being done _after_ we have been added as a handler to ensure that the `GRPCServerCodec` required to
// translate our outgoing `GRPCServerResponsePart<ResponseMessage>` message is already present on the channel.
Expand Down
20 changes: 14 additions & 6 deletions Sources/GRPC/CallHandlers/ClientStreamingCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,23 @@ import NIOHTTP1
public class ClientStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> {
public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void
private var eventObserver: EventLoopFuture<EventObserver>?
private let eventObserverFactory: (UnaryResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>

private var callContext: UnaryResponseCallContext<ResponseMessage>?

// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
public init(channel: Channel, request: HTTPRequestHead, errorDelegate: ServerErrorDelegate?, eventObserverFactory: (UnaryResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) {
super.init(errorDelegate: errorDelegate)
public init(channel: Channel, request: HTTPRequestHead, errorDelegate: ServerErrorDelegate?, eventObserverFactory: @escaping (UnaryResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) {
// Delay the creation of the event observer until `handlerAdded(context:)`, otherwise it is
// possible for the service to write into the pipeline (by fulfilling the response promise
// of the call context outside of the observer) before it has been configured.
self.eventObserverFactory = eventObserverFactory

let callContext = UnaryResponseCallContextImpl<ResponseMessage>(channel: channel, request: request, errorDelegate: errorDelegate)
self.callContext = callContext
let eventObserver = eventObserverFactory(callContext)
self.eventObserver = eventObserver

super.init(errorDelegate: errorDelegate)

callContext.responsePromise.futureResult.whenComplete { _ in
// When done, reset references to avoid retain cycles.
self.eventObserver = nil
Expand All @@ -46,8 +52,10 @@ public class ClientStreamingCallHandler<RequestMessage: Message, ResponseMessage
}

public override func handlerAdded(context: ChannelHandlerContext) {
guard let eventObserver = self.eventObserver,
let callContext = self.callContext else { return }
guard let callContext = self.callContext else { return }
let eventObserver = self.eventObserverFactory(callContext)
self.eventObserver = eventObserver

// Terminate the call if the future providing an observer fails.
// This is being done _after_ we have been added as a handler to ensure that the `GRPCServerCodec` required to
// translate our outgoing `GRPCServerResponsePart<ResponseMessage>` message is already present on the channel.
Expand Down
97 changes: 97 additions & 0 deletions Tests/GRPCTests/ImmediateServerFailureTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2019, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Foundation
import GRPC
import NIO
import XCTest

class ImmediatelyFailingEchoProvider: Echo_EchoProvider {
static let status: GRPCStatus = .init(code: .unavailable, message: nil)

func get(
request: Echo_EchoRequest,
context: StatusOnlyCallContext
) -> EventLoopFuture<Echo_EchoResponse> {
return context.eventLoop.makeFailedFuture(ImmediatelyFailingEchoProvider.status)
}

func expand(
request: Echo_EchoRequest,
context: StreamingResponseCallContext<Echo_EchoResponse>
) -> EventLoopFuture<GRPCStatus> {
return context.eventLoop.makeFailedFuture(ImmediatelyFailingEchoProvider.status)
}

func collect(
context: UnaryResponseCallContext<Echo_EchoResponse>
) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
context.responsePromise.fail(ImmediatelyFailingEchoProvider.status)
return context.eventLoop.makeSucceededFuture({ _ in
// no-op
})
}

func update(
context: StreamingResponseCallContext<Echo_EchoResponse>
) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
context.statusPromise.fail(ImmediatelyFailingEchoProvider.status)
return context.eventLoop.makeSucceededFuture({ _ in
// no-op
})
}
}

class ImmediatelyFailingProviderTests: EchoTestCaseBase {
override func makeEchoProvider() -> Echo_EchoProvider {
return ImmediatelyFailingEchoProvider()
}

func testUnary() throws {
let expcectation = self.makeStatusExpectation()
let call = self.client.get(Echo_EchoRequest(text: "foo"))
call.status.map { $0.code }.assertEqual(.unavailable, fulfill: expcectation)

self.wait(for: [expcectation], timeout: self.defaultTestTimeout)
}

func testServerStreaming() throws {
let expcectation = self.makeStatusExpectation()
let call = self.client.expand(Echo_EchoRequest(text: "foo")) { response in
XCTFail("unexpected response: \(response)")
}

call.status.map { $0.code }.assertEqual(.unavailable, fulfill: expcectation)
self.wait(for: [expcectation], timeout: self.defaultTestTimeout)
}

func testClientStreaming() throws {
let expcectation = self.makeStatusExpectation()
let call = self.client.collect()

call.status.map { $0.code }.assertEqual(.unavailable, fulfill: expcectation)
self.wait(for: [expcectation], timeout: self.defaultTestTimeout)
}

func testBidirectionalStreaming() throws {
let expcectation = self.makeStatusExpectation()
let call = self.client.update { response in
XCTFail("unexpected response: \(response)")
}

call.status.map { $0.code }.assertEqual(.unavailable, fulfill: expcectation)
self.wait(for: [expcectation], timeout: self.defaultTestTimeout)
}
}
13 changes: 13 additions & 0 deletions Tests/GRPCTests/XCTestManifests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,18 @@ extension HTTP1ToRawGRPCServerCodecTests {
]
}

extension ImmediatelyFailingProviderTests {
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
// to regenerate.
static let __allTests__ImmediatelyFailingProviderTests = [
("testBidirectionalStreaming", testBidirectionalStreaming),
("testClientStreaming", testClientStreaming),
("testServerStreaming", testServerStreaming),
("testUnary", testUnary),
]
}

extension LengthPrefixedMessageReaderTests {
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
Expand Down Expand Up @@ -435,6 +447,7 @@ public func __allTests() -> [XCTestCaseEntry] {
testCase(GRPCStatusMessageMarshallerTests.__allTests__GRPCStatusMessageMarshallerTests),
testCase(GRPCTypeSizeTests.__allTests__GRPCTypeSizeTests),
testCase(HTTP1ToRawGRPCServerCodecTests.__allTests__HTTP1ToRawGRPCServerCodecTests),
testCase(ImmediatelyFailingProviderTests.__allTests__ImmediatelyFailingProviderTests),
testCase(LengthPrefixedMessageReaderTests.__allTests__LengthPrefixedMessageReaderTests),
testCase(ServerDelayedThrowingTests.__allTests__ServerDelayedThrowingTests),
testCase(ServerErrorTransformingTests.__allTests__ServerErrorTransformingTests),
Expand Down

0 comments on commit 0296e3e

Please sign in to comment.