Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sendMessages and flush to StreamingRequestClientCall #538

Merged
merged 6 commits into from
Aug 5, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions Sources/GRPC/ClientCalls/ClientCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ public protocol StreamingRequestClientCall: ClientCall {
/// - flush: Whether the buffer should be flushed after writing the message.
func sendMessage(_ message: RequestMessage, promise: EventLoopPromise<Void>?, flush: Bool)

/// Sends a sequence of messages to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or `sendEnd(promise:)`.
///
/// - Parameters:
/// - messages: The sequence of messages to send.
/// - flush: Whether the buffer should be flushed after writing the message.
func sendMessages<S: Sequence>(_ messages: S, flush: Bool) -> EventLoopFuture<Void> where S.Element == RequestMessage

/// Sends a sequence of messages to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or `sendEnd(promise:)`.
///
/// - Parameters:
/// - messages: The sequence of messages to send.
/// - promise: A promise to be fulfilled when all messages have been sent successfully.
/// - flush: Whether the buffer should be flushed after writing the message.
func sendMessages<S: Sequence>(_ messages: S, promise: EventLoopPromise<Void>?, flush: Bool) where S.Element == RequestMessage
MrMage marked this conversation as resolved.
Show resolved Hide resolved

/// Returns a future which can be used as a message queue.
///
/// Callers may use this as such:
Expand All @@ -100,6 +119,9 @@ public protocol StreamingRequestClientCall: ClientCall {
/// - Important: This should only ever be called once.
/// - Parameter promise: A promise to be fulfilled when the end has been sent.
func sendEnd(promise: EventLoopPromise<Void>?)

/// Flush the buffer of messages.
func flush()
}

/// A `ClientCall` with a unary response; i.e. unary and client-streaming.
Expand Down Expand Up @@ -131,6 +153,33 @@ extension StreamingRequestClientCall {
}
}

public func sendMessages<S: Sequence>(_ messages: S, flush: Bool = true) -> EventLoopFuture<Void> where S.Element == RequestMessage {
self.subchannel.flatMap { channel in
glbrntt marked this conversation as resolved.
Show resolved Hide resolved
let writeFutures = messages.map { message in
channel.write(GRPCClientRequestPart.message(_Box(message)))
}
if flush {
channel.flush()
}
return EventLoopFuture.andAllSucceed(writeFutures, on: channel.eventLoop)
}
}

public func sendMessages<S: Sequence>(_ messages: S, promise: EventLoopPromise<Void>?, flush: Bool = true) where S.Element == RequestMessage {
if let promise = promise {
self.sendMessages(messages, flush: flush).cascade(to: promise)
} else {
self.subchannel.whenSuccess { channel in
MrMage marked this conversation as resolved.
Show resolved Hide resolved
for message in messages {
channel.write(GRPCClientRequestPart.message(_Box(message)), promise: nil)
}
if flush {
channel.flush()
}
}
}
}

public func sendEnd() -> EventLoopFuture<Void> {
return self.subchannel.flatMap { channel in
return channel.writeAndFlush(GRPCClientRequestPart<RequestMessage>.end)
Expand All @@ -142,4 +191,10 @@ extension StreamingRequestClientCall {
channel.writeAndFlush(GRPCClientRequestPart<RequestMessage>.end, promise: promise)
}
}

public func flush() {
self.subchannel.whenSuccess { channel in
channel.flush()
}
}
}
77 changes: 77 additions & 0 deletions Tests/GRPCTests/StreamingRequestClientCallTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2019, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Foundation
import GRPC
import XCTest

class StreamingRequestClientCallTests: EchoTestCaseBase {
class ResponseCounter {
var expectation: XCTestExpectation

init(expectation: XCTestExpectation) {
self.expectation = expectation
}

func increment() {
self.expectation.fulfill()
}
}

func testSendMessages() throws {
let firstBatchReceived = self.expectation(description: "first batch received")
let counter = ResponseCounter(expectation: firstBatchReceived)

let update = self.client.update { _ in
counter.increment()
}

// Send the first batch.
let firstBatch = ["foo", "bar", "baz"].map { Echo_EchoRequest(text: $0) }
firstBatchReceived.expectedFulfillmentCount = firstBatch.count
XCTAssertNoThrow(try update.sendMessages(firstBatch).wait())

// Wait for the first batch of resonses.
self.wait(for: [firstBatchReceived], timeout: 0.5)

// Send more messages, but don't flush.
let secondBatchNotReceived = self.expectation(description: "second batch not received")
secondBatchNotReceived.isInverted = true
counter.expectation = secondBatchNotReceived

let secondBatch = (0..<3).map { Echo_EchoRequest(text: "\($0)") }
update.sendMessages(secondBatch, promise: nil, flush: false)

// Wait and check that the expectation hasn't been fulfilled (because we haven't flushed).
self.wait(for: [secondBatchNotReceived], timeout: 0.5)
glbrntt marked this conversation as resolved.
Show resolved Hide resolved

let secondBatchReceived = self.expectation(description: "second batch received")
secondBatchReceived.expectedFulfillmentCount = secondBatch.count
counter.expectation = secondBatchReceived

// Flush the messages: we should get responses now.
update.flush()
self.wait(for: [secondBatchReceived], timeout: 0.5)

// End the call.
update.sendEnd(promise: nil)

let statusReceived = self.expectation(description: "status received")
update.status.map { $0.code }.assertEqual(.ok, fulfill: statusReceived)

self.wait(for: [statusReceived], timeout: 1.0)
}

glbrntt marked this conversation as resolved.
Show resolved Hide resolved
}
10 changes: 10 additions & 0 deletions Tests/GRPCTests/XCTestManifests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,15 @@ extension ServerWebTests {
]
}

extension StreamingRequestClientCallTests {
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
// to regenerate.
static let __allTests__StreamingRequestClientCallTests = [
("testSendMessages", testSendMessages),
]
}

public func __allTests() -> [XCTestCaseEntry] {
return [
testCase(AnyServiceClientTests.__allTests__AnyServiceClientTests),
Expand Down Expand Up @@ -464,6 +473,7 @@ public func __allTests() -> [XCTestCaseEntry] {
testCase(ServerErrorTransformingTests.__allTests__ServerErrorTransformingTests),
testCase(ServerThrowingTests.__allTests__ServerThrowingTests),
testCase(ServerWebTests.__allTests__ServerWebTests),
testCase(StreamingRequestClientCallTests.__allTests__StreamingRequestClientCallTests),
]
}
#endif