Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose closeFuture in server interceptor context #1553

Merged
merged 1 commit into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ internal final class AsyncServerHandler<
callType: callType,
remoteAddress: context.remoteAddress,
userInfoRef: self.userInfoRef,
closeFuture: context.closeFuture,
interceptors: interceptors,
onRequestPart: self.receiveInterceptedPart(_:),
onResponsePart: self.sendInterceptedPart(_:promise:)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public final class BidirectionalStreamingServerHandler<
callType: .bidirectionalStreaming,
remoteAddress: context.remoteAddress,
userInfoRef: userInfoRef,
closeFuture: context.closeFuture,
interceptors: interceptors,
onRequestPart: self.receiveInterceptedPart(_:),
onResponsePart: self.sendInterceptedPart(_:promise:)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public final class ClientStreamingServerHandler<
callType: .clientStreaming,
remoteAddress: context.remoteAddress,
userInfoRef: userInfoRef,
closeFuture: context.closeFuture,
interceptors: interceptors,
onRequestPart: self.receiveInterceptedPart(_:),
onResponsePart: self.sendInterceptedPart(_:promise:)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public final class ServerStreamingServerHandler<
callType: .serverStreaming,
remoteAddress: context.remoteAddress,
userInfoRef: userInfoRef,
closeFuture: context.closeFuture,
interceptors: interceptors,
onRequestPart: self.receiveInterceptedPart(_:),
onResponsePart: self.sendInterceptedPart(_:promise:)
Expand Down
1 change: 1 addition & 0 deletions Sources/GRPC/CallHandlers/UnaryServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public final class UnaryServerHandler<
callType: .unary,
remoteAddress: context.remoteAddress,
userInfoRef: userInfoRef,
closeFuture: context.closeFuture,
interceptors: interceptors,
onRequestPart: self.receiveInterceptedPart(_:),
onResponsePart: self.sendInterceptedPart(_:promise:)
Expand Down
6 changes: 6 additions & 0 deletions Sources/GRPC/Interceptor/ServerInterceptorContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public struct ServerInterceptorContext<Request, Response> {
return self._pipeline.remoteAddress
}

/// A future which completes when the call closes. This may be used to register callbacks which
/// free up resources used by the interceptor.
public var closeFuture: EventLoopFuture<Void> {
return self._pipeline.closeFuture
}

/// A 'UserInfo' dictionary.
///
/// - Important: While `UserInfo` has value-semantics, this property retrieves from, and sets a
Expand Down
7 changes: 7 additions & 0 deletions Sources/GRPC/Interceptor/ServerInterceptorPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ internal final class ServerInterceptorPipeline<Request, Response> {
@usableFromInline
internal let userInfoRef: Ref<UserInfo>

/// A future which completes when the call closes. This may be used to register callbacks which
/// free up resources used by the interceptor.
@usableFromInline
internal let closeFuture: EventLoopFuture<Void>

/// Called when a response part has traversed the interceptor pipeline.
@usableFromInline
internal let _onResponsePart: (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
Expand Down Expand Up @@ -99,6 +104,7 @@ internal final class ServerInterceptorPipeline<Request, Response> {
callType: GRPCCallType,
remoteAddress: SocketAddress?,
userInfoRef: Ref<UserInfo>,
closeFuture: EventLoopFuture<Void>,
interceptors: [ServerInterceptor<Request, Response>],
onRequestPart: @escaping (GRPCServerRequestPart<Request>) -> Void,
onResponsePart: @escaping (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
Expand All @@ -109,6 +115,7 @@ internal final class ServerInterceptorPipeline<Request, Response> {
self.type = callType
self.remoteAddress = remoteAddress
self.userInfoRef = userInfoRef
self.closeFuture = closeFuture

self._onResponsePart = onResponsePart
self._onRequestPart = onRequestPart
Expand Down
60 changes: 59 additions & 1 deletion Tests/GRPCTests/InterceptorsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Atomics
import EchoImplementation
import EchoModel
import GRPC
Expand All @@ -28,14 +29,15 @@ class InterceptorsTests: GRPCTestCase {
private var server: Server!
private var connection: ClientConnection!
private var echo: Echo_EchoNIOClient!
private let onCloseCounter = ManagedAtomic<Int>(0)

override func setUp() {
super.setUp()
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)

self.server = try! Server.insecure(group: self.group)
.withServiceProviders([
EchoProvider(),
EchoProvider(interceptors: CountOnCloseInterceptors(counter: self.onCloseCounter)),
HelloWorldProvider(interceptors: HelloWorldServerInterceptorFactory()),
])
.withLogger(self.serverLogger)
Expand Down Expand Up @@ -64,6 +66,8 @@ class InterceptorsTests: GRPCTestCase {
let get = self.echo.get(.with { $0.text = "hello" })
assertThat(try get.response.wait(), .is(.with { $0.text = "hello :teg ohce tfiwS" }))
assertThat(try get.status.wait(), .hasCode(.ok))

XCTAssertEqual(self.onCloseCounter.load(ordering: .sequentiallyConsistent), 1)
}

func testCollect() {
Expand All @@ -73,6 +77,8 @@ class InterceptorsTests: GRPCTestCase {
collect.sendEnd(promise: nil)
assertThat(try collect.response.wait(), .is(.with { $0.text = "3 4 1 2 :tcelloc ohce tfiwS" }))
assertThat(try collect.status.wait(), .hasCode(.ok))

XCTAssertEqual(self.onCloseCounter.load(ordering: .sequentiallyConsistent), 1)
}

func testExpand() {
Expand All @@ -81,6 +87,8 @@ class InterceptorsTests: GRPCTestCase {
assertThat(response, .is(.with { $0.text = "hello :)0( dnapxe ohce tfiwS" }))
}
assertThat(try expand.status.wait(), .hasCode(.ok))

XCTAssertEqual(self.onCloseCounter.load(ordering: .sequentiallyConsistent), 1)
}

func testUpdate() {
Expand All @@ -91,6 +99,8 @@ class InterceptorsTests: GRPCTestCase {
update.sendMessage(.with { $0.text = "hello" }, promise: nil)
update.sendEnd(promise: nil)
assertThat(try update.status.wait(), .hasCode(.ok))

XCTAssertEqual(self.onCloseCounter.load(ordering: .sequentiallyConsistent), 1)
}

func testSayHello() {
Expand Down Expand Up @@ -360,6 +370,54 @@ final class ReversingInterceptors: Echo_EchoClientInterceptorFactoryProtocol {
}
}

final class CountOnCloseInterceptors: Echo_EchoServerInterceptorFactoryProtocol {
// This interceptor is stateless, let's just share it.
private let interceptors: [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>]

init(counter: ManagedAtomic<Int>) {
self.interceptors = [CountOnCloseServerInterceptor(counter: counter)]
}

func makeGetInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.interceptors
}

func makeExpandInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.interceptors
}

func makeCollectInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.interceptors
}

func makeUpdateInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.interceptors
}
}

final class CountOnCloseServerInterceptor: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse> {
private let counter: ManagedAtomic<Int>

init(counter: ManagedAtomic<Int>) {
self.counter = counter
}

override func receive(
_ part: GRPCServerRequestPart<Echo_EchoRequest>,
context: ServerInterceptorContext<Echo_EchoRequest, Echo_EchoResponse>
) {
switch part {
case .metadata:
context.closeFuture.whenComplete { _ in
self.counter.wrappingIncrement(ordering: .sequentiallyConsistent)
}
default:
()
}
context.receive(part)
}
}

private enum MagicKey: UserInfo.Key {
typealias Value = String
}
Expand Down
1 change: 1 addition & 0 deletions Tests/GRPCTests/ServerInterceptorPipelineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ServerInterceptorPipelineTests: GRPCTestCase {
callType: callType,
remoteAddress: nil,
userInfoRef: Ref(UserInfo()),
closeFuture: self.embeddedEventLoop.makeSucceededVoidFuture(),
interceptors: interceptors,
onRequestPart: onRequestPart,
onResponsePart: onResponsePart
Expand Down