Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

impl(pubsublite): resumable async streaming read write rpc #8233

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
113 commits
Select commit Hold shift + click to select a range
394affc
init commit for read
18suresha Jan 31, 2022
7ec1ca4
add question
18suresha Jan 31, 2022
24c0da1
switch template types to interfaces
18suresha Feb 1, 2022
0e75e6c
more questions
18suresha Feb 1, 2022
69552cd
Merge remote-tracking branch 'upstream/main' into feature/resumable_a…
18suresha Feb 2, 2022
2591a8d
retry logic
18suresha Feb 2, 2022
8924e60
add todo comment
18suresha Feb 2, 2022
9694efc
update helper
18suresha Feb 2, 2022
4cd3b14
async reinitializer
18suresha Feb 2, 2022
4738065
lock guard Cancel
18suresha Feb 2, 2022
d1c5ccd
Merge remote-tracking branch 'upstream/main' into feature/resumable_a…
18suresha Feb 3, 2022
c216052
update StreamReinitializer type
18suresha Feb 3, 2022
5101ade
dry retry loop & Dan feedback
18suresha Feb 4, 2022
e65c295
have Status return RetryLoop
18suresha Feb 4, 2022
c55eaa1
optional response future
18suresha Feb 4, 2022
f421d26
feedback, single retry loop
18suresha Feb 5, 2022
4ec721c
ok status
18suresha Feb 5, 2022
fc5c18a
refactor to pubsub lite
18suresha Feb 8, 2022
faebf1f
Merge remote-tracking branch 'upstream/main' into feature/resumable_a…
18suresha Feb 8, 2022
e9ed257
update includes
18suresha Feb 8, 2022
3574d8e
feedback besides lifetime
18suresha Feb 8, 2022
7b6f4e8
remove unnecessary this
18suresha Feb 8, 2022
46f91e0
remove nested then
18suresha Feb 8, 2022
0e97b82
future feedback
18suresha Feb 9, 2022
bbdbf9f
Finish comment
18suresha Feb 9, 2022
9032b40
set_value outside of lock
18suresha Feb 9, 2022
c50a0d3
add reverse promise
18suresha Feb 9, 2022
c322473
return simple future within scoped lock
18suresha Feb 9, 2022
8adbf67
feedback
18suresha Feb 10, 2022
781479c
Merge remote-tracking branch 'upstream/main' into feature/resumable_a…
18suresha Feb 10, 2022
ead8824
clean up a bit
18suresha Feb 10, 2022
100b738
remove test include;
18suresha Feb 10, 2022
667c247
handle all Finish cases
18suresha Feb 10, 2022
51dba14
Merge remote-tracking branch 'upstream/main' into feature/resumable_a…
18suresha Feb 10, 2022
b9a8ca2
bail on shutdown for Read and Write
18suresha Feb 10, 2022
80d21da
set shutdown on failed retry_policy
18suresha Feb 10, 2022
6bd32da
Finish returns future<void>
18suresha Feb 11, 2022
4afb760
remove change in test file
18suresha Feb 11, 2022
cb98c8f
remove unused helper
18suresha Feb 11, 2022
54d0cab
compiles with test suite
18suresha Feb 14, 2022
66484b1
basic unit test compiles and passes
18suresha Feb 15, 2022
6ba0d60
mock RetryPolicy
18suresha Feb 15, 2022
ed82e3c
create factory function
18suresha Feb 15, 2022
fe3ca75
mock async stream directly
18suresha Feb 15, 2022
bb6f5a7
add write to test
18suresha Feb 15, 2022
c0799fb
response equal overload
18suresha Feb 15, 2022
a5ccffb
single read failure test
18suresha Feb 15, 2022
4215779
add start failure test
18suresha Feb 15, 2022
63f0a55
finish in middle of write test
18suresha Feb 15, 2022
30865ab
strict mock
18suresha Feb 16, 2022
1460e21
read fail while write test
18suresha Feb 16, 2022
fd7c61c
fail while shutdown test case
18suresha Feb 16, 2022
73d2c6c
finish during sleep
18suresha Feb 16, 2022
1d1f7f8
start fails during retry test
18suresha Feb 16, 2022
a563f35
Merge remote-tracking branch 'upstream/main' into feature/resumable_a…
18suresha Feb 16, 2022
de1efb4
lint
18suresha Feb 16, 2022
e862fa5
read during retry test
18suresha Feb 16, 2022
ebb2344
write finishes after shutdown test
18suresha Feb 16, 2022
bcc30a3
lint
18suresha Feb 16, 2022
fc88607
remove unnecessary var
18suresha Feb 16, 2022
2c5ff95
optimize locking
18suresha Feb 16, 2022
767c787
optimize locking II
18suresha Feb 16, 2022
e8e60d9
rename locking functions
18suresha Feb 16, 2022
184d78b
update test
18suresha Feb 16, 2022
3b9c792
formatting
18suresha Feb 16, 2022
02cae04
dry little with macro
18suresha Feb 16, 2022
dd5e175
DRY a bit more
18suresha Feb 16, 2022
3de4c78
bazel ide .gitignore
18suresha Feb 16, 2022
7c3f38f
put mocks in files
18suresha Feb 17, 2022
a1ec757
starting to use test harness
18suresha Feb 17, 2022
bc9340c
remove duplicate test
18suresha Feb 17, 2022
cbb6d16
test harness works on retry test
18suresha Feb 17, 2022
a82a463
test harness almost halfway
18suresha Feb 17, 2022
c13b427
test harness
18suresha Feb 17, 2022
8133ec4
remove unused header
18suresha Feb 17, 2022
f494a4f
use return in willonce
18suresha Feb 17, 2022
05a5d9f
use Return more
18suresha Feb 17, 2022
64bbcd7
use Return
18suresha Feb 17, 2022
93c8ef5
remove nested expect
18suresha Feb 17, 2022
114c4f3
fix comment
18suresha Feb 17, 2022
5747172
fix comment
18suresha Feb 17, 2022
d2af0cd
Merge remote-tracking branch 'upstream/main' into feature/resumable_a…
18suresha Feb 18, 2022
8406c27
check order of finishing
18suresha Feb 18, 2022
07471e8
stdfunction
18suresha Feb 18, 2022
657c3c3
Merge remote-tracking branch 'upstream/main' into feature/resumable_a…
18suresha Feb 18, 2022
212af94
behavior style
18suresha Feb 19, 2022
32ef328
optimize locking
18suresha Feb 19, 2022
6d11924
format
18suresha Feb 19, 2022
148cd25
feedback for class
18suresha Feb 22, 2022
2a28999
Merge remote-tracking branch 'upstream/main' into feature/resumable_a…
18suresha Feb 22, 2022
9e747be
some test feedback
18suresha Feb 22, 2022
da5ac34
write in middle of retry
18suresha Feb 22, 2022
a1e41d6
start permanent failure test
18suresha Feb 22, 2022
2bb56e5
start permanent failure
18suresha Feb 22, 2022
c12c7a0
initializer permanent failure
18suresha Feb 22, 2022
14a7e9d
initializer fail then good
18suresha Feb 22, 2022
2f31da7
feedback
18suresha Feb 22, 2022
a416bc2
Merge remote-tracking branch 'upstream/main' into feature/resumable_a…
18suresha Feb 22, 2022
8f342ee
destructor
18suresha Feb 23, 2022
595c4b9
format
18suresha Feb 23, 2022
e465ce0
explicit initialization for test sequencing
18suresha Feb 23, 2022
6cb63c7
format
18suresha Feb 23, 2022
a91fe0c
override destructor
18suresha Feb 23, 2022
3311414
feedback
18suresha Feb 23, 2022
9decde7
format
18suresha Feb 23, 2022
af1f268
format
18suresha Feb 23, 2022
3692420
refactored from other PR
18suresha Feb 24, 2022
39eb085
format
18suresha Feb 24, 2022
5309692
address left comment
18suresha Feb 24, 2022
9802f80
rebase from other PR
18suresha Feb 28, 2022
98123e8
use empty promise
18suresha Feb 28, 2022
9c3610e
return immediate value in callback
18suresha Feb 28, 2022
842db15
return immediate future
18suresha Feb 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,54 @@ class ResumableAsyncStreamingReadWriteRpcImpl
}

future<absl::optional<ResponseType>> Read() override {
// TODO(18suresha) implement
return make_ready_future(absl::optional<ResponseType>());
future<absl::optional<ResponseType>> read_future;

{
std::lock_guard<std::mutex> g{mu_};
switch (stream_state_) {
case State::kShutdown:
return make_ready_future(absl::optional<ResponseType>());
case State::kRetrying:
assert(!read_reinit_done_.has_value());
read_reinit_done_.emplace();
return read_reinit_done_->get_future().then(
[](future<void>) { return absl::optional<ResponseType>(); });
case State::kInitialized:
read_future = stream_->Read();
assert(!in_progress_read_.has_value());
in_progress_read_.emplace();
}
}

return read_future.then(
[this](future<absl::optional<ResponseType>> optional_response_future) {
return OnReadFutureFinish(optional_response_future.get());
});
}

future<bool> Write(RequestType const&, grpc::WriteOptions) override {
// TODO(18suresha) implement
return make_ready_future(false);
future<bool> Write(RequestType const& r, grpc::WriteOptions o) override {
future<bool> write_future;

{
std::lock_guard<std::mutex> g{mu_};
switch (stream_state_) {
case State::kShutdown:
return make_ready_future(false);
case State::kRetrying:
assert(!write_reinit_done_.has_value());
write_reinit_done_.emplace();
return write_reinit_done_->get_future().then(
[](future<void>) { return false; });
case State::kInitialized:
write_future = stream_->Write(r, o);
assert(!in_progress_write_.has_value());
in_progress_write_.emplace();
}
}

return write_future.then([this](future<bool> write_fu) {
return OnWriteFutureFinish(write_fu.get());
});
}

future<void> Shutdown() override {
Expand All @@ -252,6 +293,121 @@ class ResumableAsyncStreamingReadWriteRpcImpl

enum class State { kRetrying, kInitialized, kShutdown };

future<absl::optional<ResponseType>> OnReadFutureFinish(
absl::optional<ResponseType> optional_response) {
promise<void> in_progress_read(null_promise_t{});
future<void> read_reinit_done;
bool shutdown;

{
std::lock_guard<std::mutex> g{mu_};
assert(in_progress_read_.has_value());
in_progress_read = std::move(*in_progress_read_);
in_progress_read_.reset();
shutdown = stream_state_ == State::kShutdown;
if (!optional_response.has_value() && !shutdown) {
assert(!read_reinit_done_.has_value());
read_reinit_done_.emplace();
read_reinit_done = read_reinit_done_->get_future();
}
}

in_progress_read.set_value();

if (shutdown) {
return make_ready_future(absl::optional<ResponseType>());
}

if (optional_response.has_value()) {
return make_ready_future(std::move(optional_response));
}

ReadWriteRetryFailedStream();

return read_reinit_done.then(
[](future<void>) { return absl::optional<ResponseType>(); });
}

future<bool> OnWriteFutureFinish(bool write_response) {
promise<void> in_progress_write(null_promise_t{});
future<void> write_reinit_done;
bool shutdown;

{
std::lock_guard<std::mutex> g{mu_};
assert(in_progress_write_.has_value());
in_progress_write = std::move(*in_progress_write_);
in_progress_write_.reset();
shutdown = stream_state_ == State::kShutdown;
if (!write_response && !shutdown) {
assert(!write_reinit_done_.has_value());
write_reinit_done_.emplace();
write_reinit_done = write_reinit_done_->get_future();
}
}

in_progress_write.set_value();

if (shutdown) return make_ready_future(false);

if (write_response) return make_ready_future(true);

ReadWriteRetryFailedStream();

return write_reinit_done.then([](future<void>) { return false; });
}

void ReadWriteRetryFailedStream() {
promise<void> root;
{
std::lock_guard<std::mutex> g{mu_};
if (stream_state_ != State::kInitialized) return;

stream_state_ = State::kRetrying;
assert(!retry_promise_.has_value());
retry_promise_.emplace();

// Assuming that a `Read` fails:
// If an outstanding operation is present, we can't enter the retry
// loop, so we defer it until the outstanding `Write` finishes at
// which point we can enter the retry loop. Since we will return
// `reinit_done_`, we guarantee that another operation of the same type
// is not called while we're waiting for the outstanding operation to
// finish and the retry loop to finish afterward.

future<void> root_future = root.get_future();

// at most one of these will be set
if (in_progress_read_.has_value()) {
root_future =
root_future.then(ChainFuture(in_progress_read_->get_future()));
}
if (in_progress_write_.has_value()) {
root_future =
root_future.then(ChainFuture(in_progress_write_->get_future()));
}

root_future.then([this](future<void>) { FinishOnStreamFail(); });
}

root.set_value();
}

void FinishOnStreamFail() {
future<Status> fail_finish;
{
std::lock_guard<std::mutex> g{mu_};
fail_finish = stream_->Finish();
}
fail_finish.then([this](future<Status> finish_status) {
// retry policy refactor
auto retry_policy = retry_factory_();
auto backoff_policy = backoff_policy_prototype_->clone();
AttemptRetry(finish_status.get(), std::move(retry_policy),
std::move(backoff_policy));
});
}

future<void> ConfigureShutdownOrder(future<void> root_future) {
std::unique_lock<std::mutex> lk{mu_};
switch (stream_state_) {
Expand Down
Loading