Skip to content

Commit

Permalink
Adopt ClientContext changes (#56)
Browse files Browse the repository at this point in the history
This PR adopts the changes introduced in
grpc/grpc-swift#2158, requiring client
transports to provide a `ClientContext` alongside the stream.
  • Loading branch information
gjcairo authored Jan 16, 2025
1 parent 92ad3c0 commit b8a3c3b
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ let products: [Product] = [
let dependencies: [Package.Dependency] = [
.package(
url: "https://github.com/grpc/grpc-swift.git",
exact: "2.0.0-beta.2"
branch: "main"
),
.package(
url: "https://github.com/apple/swift-nio.git",
Expand Down
24 changes: 18 additions & 6 deletions Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ package final class Connection: Sendable {
descriptor: MethodDescriptor,
options: CallOptions
) async throws -> Stream {
let (multiplexer, scheme) = try self.state.withLock { state in
let (multiplexer, scheme, remotePeer, localPeer) = try self.state.withLock { state in
switch state {
case .connected(let connected):
return (connected.multiplexer, connected.scheme)
return (connected.multiplexer, connected.scheme, connected.remotePeer, connected.localPeer)
case .notConnected, .closing, .closed:
throw RPCError(code: .unavailable, message: "subchannel isn't ready")
}
Expand Down Expand Up @@ -246,7 +246,13 @@ package final class Connection: Sendable {
}
}

return Stream(wrapping: stream, descriptor: descriptor)
let context = ClientContext(
descriptor: descriptor,
remotePeer: remotePeer,
localPeer: localPeer
)

return Stream(wrapping: stream, context: context)
} catch {
throw RPCError(code: .unavailable, message: "subchannel is unavailable", cause: error)
}
Expand Down Expand Up @@ -417,16 +423,16 @@ extension Connection {
}
}

let descriptor: MethodDescriptor
let context: ClientContext

private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>

init(
wrapping stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>,
descriptor: MethodDescriptor
context: ClientContext
) {
self.http2Stream = stream
self.descriptor = descriptor
self.context = context
}

package func execute<T>(
Expand Down Expand Up @@ -457,13 +463,19 @@ extension Connection {
struct Connected: Sendable {
/// The connection channel.
var channel: NIOAsyncChannel<ClientConnectionEvent, Void>
/// The connection's remote peer information.
var remotePeer: String
/// The connection's local peer information.
var localPeer: String
/// Multiplexer for creating HTTP/2 streams.
var multiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Void>
/// Whether the connection is plaintext, `false` implies TLS is being used.
var scheme: Scheme

init(_ connection: HTTP2Connection) {
self.channel = connection.channel
self.remotePeer = connection.channel.remoteAddressInfo
self.localPeer = connection.channel.localAddressInfo
self.multiplexer = connection.multiplexer
self.scheme = connection.isPlaintext ? .http : .https
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ package final class GRPCChannel: ClientTransport {
self.input.continuation.yield(.close)
}

/// Opens a stream using the transport, and uses it as input into a user-provided closure.
/// Opens a stream using the transport, and uses it as input into a user-provided closure, alongside the client's context.
package func withStream<T: Sendable>(
descriptor: MethodDescriptor,
options: CallOptions,
_ closure: (_ stream: RPCStream<Inbound, Outbound>) async throws -> T
_ closure: (_ stream: RPCStream<Inbound, Outbound>, _ context: ClientContext) async throws -> T
) async throws -> T {
// Merge options from the call with those from the service config.
let methodConfig = self.config(forMethod: descriptor)
Expand All @@ -214,11 +214,11 @@ package final class GRPCChannel: ClientTransport {
case .created(let stream):
return try await stream.execute { inbound, outbound in
let rpcStream = RPCStream(
descriptor: stream.descriptor,
descriptor: stream.context.descriptor,
inbound: RPCAsyncSequence<RPCResponsePart, any Error>(wrapping: inbound),
outbound: RPCWriter.Closable(wrapping: outbound)
)
return try await closure(rpcStream)
return try await closure(rpcStream, stream.context)
}

case .tryAgain(let error):
Expand Down
93 changes: 93 additions & 0 deletions Sources/GRPCNIOTransportCore/Internal/Channel+AddressInfo.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2025, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

internal import NIOCore

extension NIOAsyncChannel {
var remoteAddressInfo: String {
guard let remote = self.channel.remoteAddress else {
return "<unknown>"
}

switch remote {
case .v4(let address):
// '!' is safe, v4 always has a port.
return "ipv4:\(address.host):\(remote.port!)"

case .v6(let address):
// '!' is safe, v6 always has a port.
return "ipv6:[\(address.host)]:\(remote.port!)"

case .unixDomainSocket:
// '!' is safe, UDS always has a path.
if remote.pathname!.isEmpty {
guard let local = self.channel.localAddress else {
return "unix:<unknown>"
}

switch local {
case .unixDomainSocket:
// '!' is safe, UDS always has a path.
return "unix:\(local.pathname!)"

case .v4, .v6:
// Remote address is UDS but local isn't. This shouldn't ever happen.
return "unix:<unknown>"
}
} else {
// '!' is safe, UDS always has a path.
return "unix:\(remote.pathname!)"
}
}
}

var localAddressInfo: String {
guard let local = self.channel.localAddress else {
return "<unknown>"
}

switch local {
case .v4(let address):
// '!' is safe, v4 always has a port.
return "ipv4:\(address.host):\(local.port!)"

case .v6(let address):
// '!' is safe, v6 always has a port.
return "ipv6:[\(address.host)]:\(local.port!)"

case .unixDomainSocket:
// '!' is safe, UDS always has a path.
if local.pathname!.isEmpty {
guard let remote = self.channel.remoteAddress else {
return "unix:<unknown>"
}

switch remote {
case .unixDomainSocket:
// '!' is safe, UDS always has a path.
return "unix:\(remote.pathname!)"

case .v4, .v6:
// Remote address is UDS but local isn't. This shouldn't ever happen.
return "unix:<unknown>"
}
} else {
// '!' is safe, UDS always has a path.
return "unix:\(local.pathname!)"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,40 +191,6 @@ package final class CommonHTTP2ServerTransport<
}
}

private func peerInfo(channel: any Channel) -> String {
guard let remote = channel.remoteAddress else {
return "<unknown>"
}

switch remote {
case .v4(let address):
// '!' is safe, v4 always has a port.
return "ipv4:\(address.host):\(remote.port!)"

case .v6(let address):
// '!' is safe, v6 always has a port.
return "ipv6:[\(address.host)]:\(remote.port!)"

case .unixDomainSocket:
// The pathname will be on the local address.
guard let local = channel.localAddress else {
// UDS but no local address; this shouldn't ever happen but at least note the transport
// as being UDS.
return "unix:<unknown>"
}

switch local {
case .unixDomainSocket:
// '!' is safe, UDS always has a path.
return "unix:\(local.pathname!)"

case .v4, .v6:
// Remote address is UDS but local isn't. This shouldn't ever happen.
return "unix:<unknown>"
}
}
}

private func handleConnection(
_ connection: NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
multiplexer: ChannelPipeline.SynchronousOperations.HTTP2StreamMultiplexer,
Expand All @@ -233,7 +199,7 @@ package final class CommonHTTP2ServerTransport<
_ context: ServerContext
) async -> Void
) async throws {
let peer = self.peerInfo(channel: connection.channel)
let peer = connection.remoteAddressInfo
try await connection.executeThenClose { inbound, _ in
await withDiscardingTaskGroup { group in
group.addTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ extension HTTP2ClientTransport {
public func withStream<T: Sendable>(
descriptor: MethodDescriptor,
options: CallOptions,
_ closure: (RPCStream<Inbound, Outbound>) async throws -> T
_ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
) async throws -> T {
try await self.channel.withStream(descriptor: descriptor, options: options, closure)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ extension HTTP2ClientTransport {
public func withStream<T: Sendable>(
descriptor: MethodDescriptor,
options: CallOptions,
_ closure: (RPCStream<Inbound, Outbound>) async throws -> T
_ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
) async throws -> T {
try await self.channel.withStream(descriptor: descriptor, options: options, closure)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ final class GRPCChannelTests: XCTestCase {
await channel.connect()
}

try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream in
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream, _ in
try await stream.outbound.write(.metadata([:]))

var iterator = stream.inbound.makeAsyncIterator()
Expand Down Expand Up @@ -441,7 +441,7 @@ final class GRPCChannelTests: XCTestCase {
// be queued though.
for _ in 1 ... 100 {
group.addTask {
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream in
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream, _ in
try await stream.outbound.write(.metadata([:]))
await stream.outbound.finish()

Expand Down Expand Up @@ -510,7 +510,7 @@ final class GRPCChannelTests: XCTestCase {
options.waitForReady = false

await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
try await channel.withStream(descriptor: .echoGet, options: options) { _ in
try await channel.withStream(descriptor: .echoGet, options: options) { _, _ in
XCTFail("Unexpected stream")
}
} errorHandler: { error in
Expand Down Expand Up @@ -780,7 +780,7 @@ final class GRPCChannelTests: XCTestCase {

// Try to open a new stream.
await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream in
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream, _ in
XCTFail("Unexpected new stream")
}
} errorHandler: { error in
Expand Down Expand Up @@ -823,7 +823,7 @@ final class GRPCChannelTests: XCTestCase {
}

func doAnRPC() async throws {
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream in
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream, _ in
try await stream.outbound.write(.metadata([:]))
await stream.outbound.finish()

Expand Down Expand Up @@ -873,7 +873,7 @@ extension GRPCChannel {
let values: Metadata.StringValues? = try await self.withStream(
descriptor: .echoGet,
options: .defaults
) { stream in
) { stream, _ in
try await stream.outbound.write(.metadata([:]))
await stream.outbound.finish()

Expand Down
2 changes: 1 addition & 1 deletion Tests/GRPCNIOTransportHTTP2Tests/ControlClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ internal struct ControlClient {
internal func peerInfo<R>(
options: GRPCCore.CallOptions = .defaults,
_ body: @Sendable @escaping (
_ response: GRPCCore.ClientResponse<String>
_ response: GRPCCore.ClientResponse<ControlService.PeerInfoResponse>
) async throws -> R = { try $0.message }
) async throws -> R where R: Sendable {
try await self.client.unary(
Expand Down
Loading

0 comments on commit b8a3c3b

Please sign in to comment.