From f91f6d3d488c0860210082360421953960fec8d3 Mon Sep 17 00:00:00 2001 From: Michael Schwarz Date: Fri, 14 Oct 2022 23:02:39 +0200 Subject: [PATCH 1/3] Add sync method variant for finishing RequestStreamWriter --- .../AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift index 499075e41..58450f7bf 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift @@ -90,6 +90,11 @@ public struct GRPCAsyncRequestStreamWriter: Sendable { self.asyncWriter.finish() } + /// Sync variant for finishing the request stream for the RPC. This must be called when there are no more requests to be sent. + public func finish() { + self.asyncWriter.finish() + } + /// Finish the request stream for the RPC with the given error. internal func finish(_ error: Error) { self.asyncWriter.finish(error: error) From 116fbe8edf273737f7d966dbb839e22e411c976b Mon Sep 17 00:00:00 2001 From: Michael Schwarz Date: Fri, 14 Oct 2022 23:08:28 +0200 Subject: [PATCH 2/3] Adjust call sites to use sync RequestStreamWriter finish --- Sources/Examples/RouteGuide/Client/RouteGuideClient.swift | 4 ++-- .../AsyncAwaitSupport/GRPCClient+AsyncAwaitSupport.swift | 4 ++-- Tests/GRPCTests/AsyncAwaitSupport/AsyncClientTests.swift | 4 ++-- .../AsyncAwaitSupport/AsyncIntegrationTests.swift | 6 +++--- .../AsyncAwaitSupport/InterceptorsAsyncTests.swift | 4 ++-- Tests/GRPCTests/GRPCAsyncClientCallTests.swift | 8 ++++---- 6 files changed, 15 insertions(+), 15 deletions(-) diff --git a/Sources/Examples/RouteGuide/Client/RouteGuideClient.swift b/Sources/Examples/RouteGuide/Client/RouteGuideClient.swift index 93252c238..7b34777d0 100644 --- a/Sources/Examples/RouteGuide/Client/RouteGuideClient.swift +++ b/Sources/Examples/RouteGuide/Client/RouteGuideClient.swift @@ -143,7 +143,7 @@ extension RouteGuideExample { } } - try await recordRoute.requestStream.finish() + recordRoute.requestStream.finish() let summary = try await recordRoute.response print( @@ -188,7 +188,7 @@ extension RouteGuideExample { try await Task.sleep(nanoseconds: UInt64.random(in: UInt64(2e8) ... UInt64(1e9))) } - try await routeChat.requestStream.finish() + routeChat.requestStream.finish() } // Add a task to print each message received on the response stream. diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCClient+AsyncAwaitSupport.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCClient+AsyncAwaitSupport.swift index 423174f17..8f675cf78 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCClient+AsyncAwaitSupport.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCClient+AsyncAwaitSupport.swift @@ -421,7 +421,7 @@ extension GRPCClient { for try await request in requests { try await call.requestStream.send(request) } - try await call.requestStream.finish() + call.requestStream.finish() } catch { // If we throw then cancel the call. We will rely on the response throwing an appropriate // error below. @@ -452,7 +452,7 @@ extension GRPCClient { for try await request in requests { try await call.requestStream.send(request) } - try await call.requestStream.finish() + call.requestStream.finish() } onCancel: { call.cancel() } diff --git a/Tests/GRPCTests/AsyncAwaitSupport/AsyncClientTests.swift b/Tests/GRPCTests/AsyncAwaitSupport/AsyncClientTests.swift index 5833e35e5..df009a6cf 100644 --- a/Tests/GRPCTests/AsyncAwaitSupport/AsyncClientTests.swift +++ b/Tests/GRPCTests/AsyncAwaitSupport/AsyncClientTests.swift @@ -222,7 +222,7 @@ final class AsyncClientCancellationTests: GRPCTestCase { let collect = echo.makeCollectCall() // Send and close. try await collect.requestStream.send(.with { $0.text = "foo" }) - try await collect.requestStream.finish() + collect.requestStream.finish() // Await the response and status. _ = try await collect.response @@ -294,7 +294,7 @@ final class AsyncClientCancellationTests: GRPCTestCase { let update = echo.makeUpdateCall() // Send and close. try await update.requestStream.send(.with { $0.text = "foo" }) - try await update.requestStream.finish() + update.requestStream.finish() // Await the response and status. let responseCount = try await update.responseStream.count() diff --git a/Tests/GRPCTests/AsyncAwaitSupport/AsyncIntegrationTests.swift b/Tests/GRPCTests/AsyncAwaitSupport/AsyncIntegrationTests.swift index 34c960116..4f41ebef0 100644 --- a/Tests/GRPCTests/AsyncAwaitSupport/AsyncIntegrationTests.swift +++ b/Tests/GRPCTests/AsyncAwaitSupport/AsyncIntegrationTests.swift @@ -82,7 +82,7 @@ final class AsyncIntegrationTests: GRPCTestCase { try await collect.requestStream.send(.with { $0.text = "boyle" }) try await collect.requestStream.send(.with { $0.text = "jeffers" }) try await collect.requestStream.send(.with { $0.text = "holt" }) - try await collect.requestStream.finish() + collect.requestStream.finish() let initialMetadata = try await collect.initialMetadata initialMetadata.assertFirst("200", forName: ":status") @@ -149,7 +149,7 @@ final class AsyncIntegrationTests: GRPCTestCase { XCTAssertEqual(response, "Swift echo update (\(i)): \(name)") } - try await update.requestStream.finish() + update.requestStream.finish() // This isn't right after we make the call as servers are not guaranteed to send metadata back // immediately. Concretely, we don't send initial metadata back until the first response @@ -186,7 +186,7 @@ final class AsyncIntegrationTests: GRPCTestCase { _ = try await update.responseStream.first(where: { _ in true }) XCTAssertNoThrow(try self.server.close().wait()) self.server = nil // So that tearDown() does not call close() again. - try await update.requestStream.finish() + update.requestStream.finish() } } diff --git a/Tests/GRPCTests/AsyncAwaitSupport/InterceptorsAsyncTests.swift b/Tests/GRPCTests/AsyncAwaitSupport/InterceptorsAsyncTests.swift index 9375ee15e..8ff554ebc 100644 --- a/Tests/GRPCTests/AsyncAwaitSupport/InterceptorsAsyncTests.swift +++ b/Tests/GRPCTests/AsyncAwaitSupport/InterceptorsAsyncTests.swift @@ -105,7 +105,7 @@ class InterceptorsAsyncTests: GRPCTestCase { let call = self.echo.makeCollectCall(callOptions: .init()) try await call.requestStream.send(.with { $0.text = "1 2" }) try await call.requestStream.send(.with { $0.text = "3 4" }) - try await call.requestStream.finish() + call.requestStream.finish() await assertThat( try await call.response, @@ -153,7 +153,7 @@ class InterceptorsAsyncTests: GRPCTestCase { let call = self.echo.makeUpdateCall(callOptions: .init()) try await call.requestStream.send(.with { $0.text = "1 2" }) try await call.requestStream.send(.with { $0.text = "3 4" }) - try await call.requestStream.finish() + call.requestStream.finish() var count = 0 for try await response in call.responseStream { diff --git a/Tests/GRPCTests/GRPCAsyncClientCallTests.swift b/Tests/GRPCTests/GRPCAsyncClientCallTests.swift index d64b2032a..067c41504 100644 --- a/Tests/GRPCTests/GRPCAsyncClientCallTests.swift +++ b/Tests/GRPCTests/GRPCAsyncClientCallTests.swift @@ -98,7 +98,7 @@ class GRPCAsyncClientCallTests: GRPCTestCase { for word in ["boyle", "jeffers", "holt"] { try await collect.requestStream.send(.with { $0.text = word }) } - try await collect.requestStream.finish() + collect.requestStream.finish() await assertThat(try await collect.initialMetadata, .is(.equalTo(Self.OKInitialMetadata))) await assertThat(try await collect.response, .doesNotThrow()) @@ -138,7 +138,7 @@ class GRPCAsyncClientCallTests: GRPCTestCase { try await update.requestStream.send(request) } try await update.requestStream.send(requests) - try await update.requestStream.finish() + update.requestStream.finish() let numResponses = try await update.responseStream.map { _ in 1 }.reduce(0, +) @@ -163,7 +163,7 @@ class GRPCAsyncClientCallTests: GRPCTestCase { await assertThat(try await responseStreamIterator.next(), .is(.notNil())) } - try await update.requestStream.finish() + update.requestStream.finish() await assertThat(try await responseStreamIterator.next(), .is(.nil())) @@ -191,7 +191,7 @@ class GRPCAsyncClientCallTests: GRPCTestCase { try await update.requestStream.send(.with { $0.text = word }) await counter.incrementRequests() } - try await update.requestStream.finish() + update.requestStream.finish() } // Get responses in a separate task. taskGroup.addTask { From 6e52439c9b2e27d4b7b1bd1fb3dad054956bf129 Mon Sep 17 00:00:00 2001 From: Michael Schwarz Date: Wed, 19 Oct 2022 17:21:52 +0200 Subject: [PATCH 3/3] Remove async variant of GRPCAsyncRequestStreamWriter's .finish() --- .../AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift | 5 ----- 1 file changed, 5 deletions(-) diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift index 58450f7bf..7bfe524c0 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift @@ -86,11 +86,6 @@ public struct GRPCAsyncRequestStreamWriter: Sendable { } /// Finish the request stream for the RPC. This must be called when there are no more requests to be sent. - public func finish() async throws { - self.asyncWriter.finish() - } - - /// Sync variant for finishing the request stream for the RPC. This must be called when there are no more requests to be sent. public func finish() { self.asyncWriter.finish() }