Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more properties to ClientContext and have the ClientTransport provide it #2158

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ extension VariableDescription {
/// static let descriptor = GRPCCore.MethodDescriptor(
/// service: GRPCCore.ServiceDescriptor(fullyQualifiedServiceName: "<literalFullyQualifiedService>"),
/// method: "<literalMethodName>"
/// )
/// ```
package static func methodDescriptor(
accessModifier: AccessModifier? = nil,
Expand Down
34 changes: 33 additions & 1 deletion Sources/GRPCCore/Call/Client/ClientContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,40 @@ public struct ClientContext: Sendable {
/// A description of the method being called.
public var descriptor: MethodDescriptor

/// A description of the remote peer.
///
/// The format of the description should follow the pattern "<transport>:<address>" where
/// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
/// "in-process"). This is a guideline for how descriptions should be formatted; different
/// implementations may not follow this format so you shouldn't make assumptions based on it.
///
/// Some examples include:
/// - "ipv4:127.0.0.1:31415",
/// - "ipv6:[::1]:443",
/// - "in-process:27182".
public var remotePeer: String

/// A description of the local peer.
///
/// The format of the description should follow the pattern "<transport>:<address>" where
/// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
/// "in-process"). This is a guideline for how descriptions should be formatted; different
/// implementations may not follow this format so you shouldn't make assumptions based on it.
///
/// Some examples include:
/// - "ipv4:127.0.0.1:31415",
/// - "ipv6:[::1]:443",
/// - "in-process:27182".
public var localPeer: String

/// Create a new client interceptor context.
public init(descriptor: MethodDescriptor) {
public init(
descriptor: MethodDescriptor,
remotePeer: String,
localPeer: String
) {
self.descriptor = descriptor
self.remotePeer = remotePeer
self.localPeer = localPeer
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ extension ClientRPCExecutor.HedgingExecutor {
return try await self.transport.withStream(
descriptor: method,
options: options
) { stream -> _HedgingAttemptTaskResult<R, Output>.AttemptResult in
) { stream, context -> _HedgingAttemptTaskResult<R, Output>.AttemptResult in
return await withTaskGroup(of: _HedgingAttemptTaskResult<R, Output>.self) { group in
group.addTask {
do {
Expand All @@ -348,8 +348,8 @@ extension ClientRPCExecutor.HedgingExecutor {

let response = await ClientRPCExecutor._execute(
in: &group,
context: context,
request: request,
method: method,
attempt: attempt,
serializer: self.serializer,
deserializer: self.deserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,14 @@ extension ClientRPCExecutor.OneShotExecutor {
) async -> Result<R, any Error> {
return await withTaskGroup(of: Void.self, returning: Result<R, any Error>.self) { group in
do {
return try await self.transport.withStream(descriptor: method, options: options) { stream in
return try await self.transport.withStream(
descriptor: method,
options: options
) { stream, context in
let response = await ClientRPCExecutor._execute(
in: &group,
context: context,
request: request,
method: method,
attempt: 1,
serializer: self.serializer,
deserializer: self.deserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ extension ClientRPCExecutor.RetryExecutor {
let attemptResult = try await self.transport.withStream(
descriptor: method,
options: options
) { stream in
) { stream, context in
group.addTask {
var metadata = request.metadata
// Work out the timeout from the deadline.
Expand All @@ -127,6 +127,7 @@ extension ClientRPCExecutor.RetryExecutor {
}

return await self.executeAttempt(
context: context,
stream: stream,
metadata: metadata,
retryStream: retry.stream,
Expand Down Expand Up @@ -194,6 +195,7 @@ extension ClientRPCExecutor.RetryExecutor {

@inlinable
func executeAttempt<R: Sendable>(
context: ClientContext,
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>,
metadata: Metadata,
retryStream: BroadcastAsyncSequence<Input>,
Expand All @@ -211,8 +213,8 @@ extension ClientRPCExecutor.RetryExecutor {

let response = await ClientRPCExecutor._execute(
in: &group,
context: context,
request: request,
method: method,
attempt: attempt,
serializer: self.serializer,
deserializer: self.deserializer,
Expand Down
6 changes: 3 additions & 3 deletions Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,25 @@ extension ClientRPCExecutor {
///
/// - Parameters:
/// - request: The request to execute.
/// - method: A description of the method to execute the request against.
/// - context: The ``ClientContext`` related to this request.
/// - attempt: The attempt number of the request.
/// - serializer: A serializer to convert input messages to bytes.
/// - deserializer: A deserializer to convert bytes to output messages.
/// - interceptors: An array of interceptors which the request and response pass through. The
/// interceptors will be called in the order of the array.
/// - stream: The stream to excecute the RPC on.
/// - Returns: The deserialized response.
@inlinable // would be private
static func _execute<Input: Sendable, Output: Sendable>(
in group: inout TaskGroup<Void>,
context: ClientContext,
request: StreamingClientRequest<Input>,
method: MethodDescriptor,
attempt: Int,
serializer: some MessageSerializer<Input>,
deserializer: some MessageDeserializer<Output>,
interceptors: [any ClientInterceptor],
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>
) async -> StreamingClientResponse<Output> {
let context = ClientContext(descriptor: method)

if interceptors.isEmpty {
return await ClientStreamExecutor.execute(
Expand Down
4 changes: 2 additions & 2 deletions Sources/GRPCCore/Transport/ClientTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public protocol ClientTransport: Sendable {
/// - Parameters:
/// - descriptor: A description of the method to open a stream for.
/// - options: Options specific to the stream.
/// - closure: A closure that takes the opened stream as parameter.
/// - closure: A closure that takes the opened stream and the client context as its parameters.
/// - Returns: Whatever value was returned from `closure`.
func withStream<T: Sendable>(
descriptor: MethodDescriptor,
options: CallOptions,
_ closure: (_ stream: RPCStream<Inbound, Outbound>) async throws -> T
_ closure: (_ stream: RPCStream<Inbound, Outbound>, _ context: ClientContext) async throws -> T
) async throws -> T

/// Returns the configuration for a given method.
Expand Down
18 changes: 14 additions & 4 deletions Sources/GRPCInProcessTransport/InProcessTransport+Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,23 @@ extension InProcessTransport {

private let methodConfig: MethodConfigs
private let state: Mutex<State>
private let peer: String

/// Creates a new in-process client transport.
///
/// - Parameters:
/// - server: The in-process server transport to connect to.
/// - serviceConfig: Service configuration.
/// - peer: The system's PID for the running client and server.
package init(
server: InProcessTransport.Server,
serviceConfig: ServiceConfig = ServiceConfig()
serviceConfig: ServiceConfig = ServiceConfig(),
peer: String
) {
self.retryThrottle = serviceConfig.retryThrottling.map { RetryThrottle(policy: $0) }
self.methodConfig = MethodConfigs(serviceConfig: serviceConfig)
self.state = Mutex(.unconnected(.init(serverTransport: server)))
self.peer = peer
}

/// Establish and maintain a connection to the remote destination.
Expand Down Expand Up @@ -225,12 +229,12 @@ extension InProcessTransport {
/// - Parameters:
/// - descriptor: A description of the method to open a stream for.
/// - options: Options specific to the stream.
/// - closure: A closure that takes the opened stream as parameter.
/// - closure: A closure that takes the opened stream and the client context as its parameters.
/// - Returns: Whatever value was returned from `closure`.
public func withStream<T>(
descriptor: MethodDescriptor,
options: CallOptions,
_ closure: (RPCStream<Inbound, Outbound>) async throws -> T
_ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
) async throws -> T {
let request = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self)
let response = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self)
Expand Down Expand Up @@ -297,11 +301,17 @@ extension InProcessTransport {
}
}

let clientContext = ClientContext(
descriptor: descriptor,
remotePeer: self.peer,
localPeer: self.peer
)

switch acceptStream {
case .success(let streamID):
let streamHandlingResult: Result<T, any Error>
do {
let result = try await closure(clientStream)
let result = try await closure(clientStream, clientContext)
streamHandlingResult = .success(result)
} catch {
streamHandlingResult = .failure(error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ extension InProcessTransport {
private let handles: Mutex<State>

/// Creates a new instance of ``Server``.
///
/// - Parameters:
/// - peer: The system's PID for the running client and server.
package init(peer: String) {
(self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream()
self.handles = Mutex(State())
Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPCInProcessTransport/InProcessTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ public struct InProcessTransport: Sendable {
public init(serviceConfig: ServiceConfig = ServiceConfig()) {
let peer = "in-process:\(System.pid())"
self.server = Self.Server(peer: peer)
self.client = Self.Client(server: self.server, serviceConfig: serviceConfig)
self.client = Self.Client(server: self.server, serviceConfig: serviceConfig, peer: peer)
}
}
30 changes: 15 additions & 15 deletions Tests/GRPCCoreTests/GRPCServerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ final class GRPCServerTests: XCTestCase {
try await client.withStream(
descriptor: BinaryEcho.Methods.get,
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
await stream.outbound.finish()
Expand All @@ -75,7 +75,7 @@ final class GRPCServerTests: XCTestCase {
try await client.withStream(
descriptor: BinaryEcho.Methods.collect,
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
try await stream.outbound.write(.message([3]))
try await stream.outbound.write(.message([1]))
Expand Down Expand Up @@ -106,7 +106,7 @@ final class GRPCServerTests: XCTestCase {
try await client.withStream(
descriptor: BinaryEcho.Methods.expand,
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
await stream.outbound.finish()
Expand Down Expand Up @@ -135,7 +135,7 @@ final class GRPCServerTests: XCTestCase {
try await client.withStream(
descriptor: BinaryEcho.Methods.update,
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
for byte in [3, 1, 4, 1, 5] as [UInt8] {
try await stream.outbound.write(.message([byte]))
Expand Down Expand Up @@ -166,7 +166,7 @@ final class GRPCServerTests: XCTestCase {
try await client.withStream(
descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
await stream.outbound.finish()

Expand All @@ -187,7 +187,7 @@ final class GRPCServerTests: XCTestCase {
try await client.withStream(
descriptor: BinaryEcho.Methods.get,
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
try await stream.outbound.write(.message([i]))
await stream.outbound.finish()
Expand Down Expand Up @@ -225,7 +225,7 @@ final class GRPCServerTests: XCTestCase {
try await client.withStream(
descriptor: BinaryEcho.Methods.get,
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
await stream.outbound.finish()

Expand All @@ -250,7 +250,7 @@ final class GRPCServerTests: XCTestCase {
try await client.withStream(
descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
await stream.outbound.finish()

Expand All @@ -277,7 +277,7 @@ final class GRPCServerTests: XCTestCase {
try await client.withStream(
descriptor: BinaryEcho.Methods.get,
options: .defaults
) { stream in
) { stream, _ in
XCTFail("Stream shouldn't be opened")
}
} errorHandler: { error in
Expand All @@ -291,7 +291,7 @@ final class GRPCServerTests: XCTestCase {
try await client.withStream(
descriptor: BinaryEcho.Methods.update,
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
var iterator = stream.inbound.makeAsyncIterator()
// Don't need to validate the response, just that the server is running.
Expand Down Expand Up @@ -364,7 +364,7 @@ final class GRPCServerTests: XCTestCase {
try await transport.withStream(
descriptor: BinaryEcho.Methods.get,
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
try await stream.outbound.write(.message([0]))
await stream.outbound.finish()
Expand Down Expand Up @@ -407,7 +407,7 @@ struct ServerTests {
try await client.withStream(
descriptor: BinaryEcho.Methods.get,
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
try await stream.outbound.write(.message(Array("hello".utf8)))
await stream.outbound.finish()
Expand Down Expand Up @@ -437,7 +437,7 @@ struct ServerTests {
try await client.withStream(
descriptor: HelloWorld.Methods.sayHello,
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
try await stream.outbound.write(.message(Array("Swift".utf8)))
await stream.outbound.finish()
Expand Down Expand Up @@ -494,7 +494,7 @@ struct ServerTests {
try await client.withStream(
descriptor: BinaryEcho.Methods.get,
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
try await stream.outbound.write(.message(Array("hello".utf8)))
await stream.outbound.finish()
Expand Down Expand Up @@ -524,7 +524,7 @@ struct ServerTests {
try await client.withStream(
descriptor: BinaryEcho.Methods.collect,
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
try await stream.outbound.write(.message(Array("hello".utf8)))
await stream.outbound.finish()
Expand Down
Loading
Loading