Skip to content

Commit

Permalink
Expose closeFuture in server interceptor context (#1553)
Browse files Browse the repository at this point in the history
Motivation:

Server calls expose a `closeFuture` where users can register callbacks
to tear things down when the RPC ends. Interceptors don't have this
capability and must rely on observing an `.end`.

Modifications:

Expose the `closeFuture` from `ServerCallContext` to
the `ServerInterceptorContext`.

Result:

- Users can be notified in interceptors when the call ends.
- Resolves #1552
  • Loading branch information
glbrntt authored Jan 11, 2023
1 parent 1629de1 commit bcca31f
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ internal final class AsyncServerHandler<
callType: callType,
remoteAddress: context.remoteAddress,
userInfoRef: self.userInfoRef,
closeFuture: context.closeFuture,
interceptors: interceptors,
onRequestPart: self.receiveInterceptedPart(_:),
onResponsePart: self.sendInterceptedPart(_:promise:)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public final class BidirectionalStreamingServerHandler<
callType: .bidirectionalStreaming,
remoteAddress: context.remoteAddress,
userInfoRef: userInfoRef,
closeFuture: context.closeFuture,
interceptors: interceptors,
onRequestPart: self.receiveInterceptedPart(_:),
onResponsePart: self.sendInterceptedPart(_:promise:)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public final class ClientStreamingServerHandler<
callType: .clientStreaming,
remoteAddress: context.remoteAddress,
userInfoRef: userInfoRef,
closeFuture: context.closeFuture,
interceptors: interceptors,
onRequestPart: self.receiveInterceptedPart(_:),
onResponsePart: self.sendInterceptedPart(_:promise:)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public final class ServerStreamingServerHandler<
callType: .serverStreaming,
remoteAddress: context.remoteAddress,
userInfoRef: userInfoRef,
closeFuture: context.closeFuture,
interceptors: interceptors,
onRequestPart: self.receiveInterceptedPart(_:),
onResponsePart: self.sendInterceptedPart(_:promise:)
Expand Down
1 change: 1 addition & 0 deletions Sources/GRPC/CallHandlers/UnaryServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public final class UnaryServerHandler<
callType: .unary,
remoteAddress: context.remoteAddress,
userInfoRef: userInfoRef,
closeFuture: context.closeFuture,
interceptors: interceptors,
onRequestPart: self.receiveInterceptedPart(_:),
onResponsePart: self.sendInterceptedPart(_:promise:)
Expand Down
6 changes: 6 additions & 0 deletions Sources/GRPC/Interceptor/ServerInterceptorContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public struct ServerInterceptorContext<Request, Response> {
return self._pipeline.remoteAddress
}

/// A future which completes when the call closes. This may be used to register callbacks which
/// free up resources used by the interceptor.
public var closeFuture: EventLoopFuture<Void> {
return self._pipeline.closeFuture
}

/// A 'UserInfo' dictionary.
///
/// - Important: While `UserInfo` has value-semantics, this property retrieves from, and sets a
Expand Down
7 changes: 7 additions & 0 deletions Sources/GRPC/Interceptor/ServerInterceptorPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ internal final class ServerInterceptorPipeline<Request, Response> {
@usableFromInline
internal let userInfoRef: Ref<UserInfo>

/// A future which completes when the call closes. This may be used to register callbacks which
/// free up resources used by the interceptor.
@usableFromInline
internal let closeFuture: EventLoopFuture<Void>

/// Called when a response part has traversed the interceptor pipeline.
@usableFromInline
internal let _onResponsePart: (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
Expand Down Expand Up @@ -99,6 +104,7 @@ internal final class ServerInterceptorPipeline<Request, Response> {
callType: GRPCCallType,
remoteAddress: SocketAddress?,
userInfoRef: Ref<UserInfo>,
closeFuture: EventLoopFuture<Void>,
interceptors: [ServerInterceptor<Request, Response>],
onRequestPart: @escaping (GRPCServerRequestPart<Request>) -> Void,
onResponsePart: @escaping (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
Expand All @@ -109,6 +115,7 @@ internal final class ServerInterceptorPipeline<Request, Response> {
self.type = callType
self.remoteAddress = remoteAddress
self.userInfoRef = userInfoRef
self.closeFuture = closeFuture

self._onResponsePart = onResponsePart
self._onRequestPart = onRequestPart
Expand Down
60 changes: 59 additions & 1 deletion Tests/GRPCTests/InterceptorsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Atomics
import EchoImplementation
import EchoModel
import GRPC
Expand All @@ -28,14 +29,15 @@ class InterceptorsTests: GRPCTestCase {
private var server: Server!
private var connection: ClientConnection!
private var echo: Echo_EchoNIOClient!
private let onCloseCounter = ManagedAtomic<Int>(0)

override func setUp() {
super.setUp()
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)

self.server = try! Server.insecure(group: self.group)
.withServiceProviders([
EchoProvider(),
EchoProvider(interceptors: CountOnCloseInterceptors(counter: self.onCloseCounter)),
HelloWorldProvider(interceptors: HelloWorldServerInterceptorFactory()),
])
.withLogger(self.serverLogger)
Expand Down Expand Up @@ -64,6 +66,8 @@ class InterceptorsTests: GRPCTestCase {
let get = self.echo.get(.with { $0.text = "hello" })
assertThat(try get.response.wait(), .is(.with { $0.text = "hello :teg ohce tfiwS" }))
assertThat(try get.status.wait(), .hasCode(.ok))

XCTAssertEqual(self.onCloseCounter.load(ordering: .sequentiallyConsistent), 1)
}

func testCollect() {
Expand All @@ -73,6 +77,8 @@ class InterceptorsTests: GRPCTestCase {
collect.sendEnd(promise: nil)
assertThat(try collect.response.wait(), .is(.with { $0.text = "3 4 1 2 :tcelloc ohce tfiwS" }))
assertThat(try collect.status.wait(), .hasCode(.ok))

XCTAssertEqual(self.onCloseCounter.load(ordering: .sequentiallyConsistent), 1)
}

func testExpand() {
Expand All @@ -81,6 +87,8 @@ class InterceptorsTests: GRPCTestCase {
assertThat(response, .is(.with { $0.text = "hello :)0( dnapxe ohce tfiwS" }))
}
assertThat(try expand.status.wait(), .hasCode(.ok))

XCTAssertEqual(self.onCloseCounter.load(ordering: .sequentiallyConsistent), 1)
}

func testUpdate() {
Expand All @@ -91,6 +99,8 @@ class InterceptorsTests: GRPCTestCase {
update.sendMessage(.with { $0.text = "hello" }, promise: nil)
update.sendEnd(promise: nil)
assertThat(try update.status.wait(), .hasCode(.ok))

XCTAssertEqual(self.onCloseCounter.load(ordering: .sequentiallyConsistent), 1)
}

func testSayHello() {
Expand Down Expand Up @@ -360,6 +370,54 @@ final class ReversingInterceptors: Echo_EchoClientInterceptorFactoryProtocol {
}
}

final class CountOnCloseInterceptors: Echo_EchoServerInterceptorFactoryProtocol {
// This interceptor is stateless, let's just share it.
private let interceptors: [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>]

init(counter: ManagedAtomic<Int>) {
self.interceptors = [CountOnCloseServerInterceptor(counter: counter)]
}

func makeGetInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.interceptors
}

func makeExpandInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.interceptors
}

func makeCollectInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.interceptors
}

func makeUpdateInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.interceptors
}
}

final class CountOnCloseServerInterceptor: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse> {
private let counter: ManagedAtomic<Int>

init(counter: ManagedAtomic<Int>) {
self.counter = counter
}

override func receive(
_ part: GRPCServerRequestPart<Echo_EchoRequest>,
context: ServerInterceptorContext<Echo_EchoRequest, Echo_EchoResponse>
) {
switch part {
case .metadata:
context.closeFuture.whenComplete { _ in
self.counter.wrappingIncrement(ordering: .sequentiallyConsistent)
}
default:
()
}
context.receive(part)
}
}

private enum MagicKey: UserInfo.Key {
typealias Value = String
}
Expand Down
1 change: 1 addition & 0 deletions Tests/GRPCTests/ServerInterceptorPipelineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ServerInterceptorPipelineTests: GRPCTestCase {
callType: callType,
remoteAddress: nil,
userInfoRef: Ref(UserInfo()),
closeFuture: self.embeddedEventLoop.makeSucceededVoidFuture(),
interceptors: interceptors,
onRequestPart: onRequestPart,
onResponsePart: onResponsePart
Expand Down

0 comments on commit bcca31f

Please sign in to comment.