Skip to content

Commit

Permalink
fix(common): revamp the async polling loop (#7762)
Browse files Browse the repository at this point in the history
* Do not drop a cancellation request made before we receive
  the `longrunning::Operation`.

  Instead, remember that a cancellation request was made,
  and then cancel the operation once we know it has started.

  This (probabilistically) saves a significant chunk of time
  in the `spanner_cancel_backup_create` sample, as we now
  always cancel the `CreateBackup()`.

* Do not fabricate a cancelled `Status`.

  The only way we really know an operation has been cancelled
  is by examining the `GetOperation()` response.
  • Loading branch information
devbww authored Dec 19, 2021
1 parent 5f3fcf9 commit b2fe267
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ TEST(GoldenThingAdminConnectionTest, CreateDatabaseCancel) {
get.PopFront().set_value(op);
auto g = get.PopFront();
fut.cancel();
g.set_value(op);
g.set_value(Status{StatusCode::kCancelled, "cancelled"});
auto db = fut.get();
EXPECT_THAT(db, StatusIs(StatusCode::kCancelled));
}
Expand Down Expand Up @@ -355,7 +355,7 @@ TEST(GoldenThingAdminConnectionTest, UpdateDatabaseDdlCancel) {
get.PopFront().set_value(op);
auto g = get.PopFront();
fut.cancel();
g.set_value(op);
g.set_value(Status{StatusCode::kCancelled, "cancelled"});
auto db = fut.get();
EXPECT_THAT(db, StatusIs(StatusCode::kCancelled));
}
Expand Down Expand Up @@ -696,7 +696,7 @@ TEST(GoldenThingAdminConnectionTest, CreateBackupCancel) {
get.PopFront().set_value(op);
auto g = get.PopFront();
fut.cancel();
g.set_value(op);
g.set_value(Status{StatusCode::kCancelled, "cancelled"});
auto db = fut.get();
EXPECT_THAT(db, StatusIs(StatusCode::kCancelled));
}
Expand Down Expand Up @@ -1006,7 +1006,7 @@ TEST(GoldenThingAdminConnectionTest, RestoreBackupCancel) {
get.PopFront().set_value(op);
auto g = get.PopFront();
fut.cancel();
g.set_value(op);
g.set_value(Status{StatusCode::kCancelled, "cancelled"});
auto db = fut.get();
EXPECT_THAT(db, StatusIs(StatusCode::kCancelled));
}
Expand Down
18 changes: 10 additions & 8 deletions google/cloud/internal/async_long_running_operation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ using ::google::cloud::testing_util::IsProtoEqual;
using ::google::cloud::testing_util::MockCompletionQueueImpl;
using ::google::cloud::testing_util::StatusIs;
using ::google::longrunning::Operation;
using ::testing::AtLeast;
using ::testing::Return;

class MockStub {
Expand Down Expand Up @@ -197,8 +196,6 @@ TEST(AsyncLongRunningTest, RequestPollThenSuccessResponse) {
}

TEST(AsyncLongRunningTest, RequestPollThenCancel) {
Instance expected;
expected.set_name("test-instance-name");
google::longrunning::Operation starting_op;
starting_op.set_name("test-op-name");

Expand All @@ -219,11 +216,14 @@ TEST(AsyncLongRunningTest, RequestPollThenCancel) {
return make_ready_future(make_status_or(starting_op));
});
EXPECT_CALL(*mock, AsyncGetOperation)
.Times(AtLeast(1))
.WillRepeatedly([&](CompletionQueue&,
std::unique_ptr<grpc::ClientContext>,
google::longrunning::GetOperationRequest const&) {
.WillOnce([&](CompletionQueue&, std::unique_ptr<grpc::ClientContext>,
google::longrunning::GetOperationRequest const&) {
return make_ready_future(make_status_or(starting_op));
})
.WillOnce([&](CompletionQueue&, std::unique_ptr<grpc::ClientContext>,
google::longrunning::GetOperationRequest const&) {
return make_ready_future(StatusOr<google::longrunning::Operation>(
Status{StatusCode::kCancelled, "cancelled"}));
});
EXPECT_CALL(*mock, AsyncCancelOperation)
.WillOnce([&](CompletionQueue&, std::unique_ptr<grpc::ClientContext>,
Expand All @@ -232,7 +232,9 @@ TEST(AsyncLongRunningTest, RequestPollThenCancel) {
});
auto policy = absl::make_unique<MockPollingPolicy>();
EXPECT_CALL(*policy, clone()).Times(0);
EXPECT_CALL(*policy, OnFailure).WillRepeatedly(Return(true));
EXPECT_CALL(*policy, OnFailure).WillRepeatedly([](Status const& status) {
return status.code() != StatusCode::kCancelled;
});
EXPECT_CALL(*policy, WaitPeriod)
.WillRepeatedly(Return(std::chrono::milliseconds(1)));
CreateInstanceRequest request;
Expand Down
38 changes: 17 additions & 21 deletions google/cloud/internal/async_polling_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "google/cloud/internal/async_polling_loop.h"
#include "google/cloud/log.h"
#include <algorithm>
#include <mutex>
#include <string>

Expand All @@ -38,7 +39,7 @@ class AsyncPollingLoopImpl
polling_policy_(std::move(polling_policy)),
location_(std::move(location)),
promise_(null_promise_t{}),
canceled_(false) {}
delayed_cancel_(false) {}

future<StatusOr<Operation>> Start(future<StatusOr<Operation>> op) {
auto self = shared_from_this();
Expand All @@ -61,7 +62,10 @@ class AsyncPollingLoopImpl
google::longrunning::CancelOperationRequest request;
{
std::unique_lock<std::mutex> lk(mu_);
if (op_name_.empty()) return;
if (op_name_.empty()) {
delayed_cancel_ = true; // Wait for OnStart() to set `op_name_`.
return;
}
request.set_name(op_name_);
}
// Cancels are best effort, so we use weak pointers.
Expand All @@ -73,27 +77,21 @@ class AsyncPollingLoopImpl
}

void OnCancel(Status const& status) {
if (!status.ok()) return;
std::unique_lock<std::mutex> lk(mu_);
canceled_ = true;
GCP_LOG(DEBUG) << location_ << "() cancelled: " << status;
}

void OnStart(StatusOr<Operation> op) {
GCP_LOG(DEBUG) << location_ << "() polling loop starting";
if (!op || op->done()) return promise_.set_value(std::move(op));
GCP_LOG(DEBUG) << location_ << "() polling loop starting for "
<< op->name();
bool do_cancel = false;
{
std::unique_lock<std::mutex> lk(mu_);
if (canceled_) return Cancelled();
std::swap(delayed_cancel_, do_cancel);
op_name_ = std::move(*op->mutable_name());
}
Wait();
}

void Cancelled() {
GCP_LOG(DEBUG) << location_ << "() polling loop cancelled";
promise_.set_value(
Status{StatusCode::kCancelled,
location_ + "() - polling loop terminated via cancel()"});
if (do_cancel) DoCancel();
return Wait();
}

void Wait() {
Expand All @@ -112,7 +110,6 @@ class AsyncPollingLoopImpl
google::longrunning::GetOperationRequest request;
{
std::unique_lock<std::mutex> lk(mu_);
if (canceled_) return Cancelled();
request.set_name(op_name_);
}
auto self = shared_from_this();
Expand Down Expand Up @@ -140,10 +137,9 @@ class AsyncPollingLoopImpl
}
if (op) {
std::unique_lock<std::mutex> lk(mu_);
if (canceled_) return Cancelled();
op_name_ = std::move(*op->mutable_name());
}
Wait();
return Wait();
}

// These member variables are initialized in the constructor or from
Expand All @@ -156,10 +152,10 @@ class AsyncPollingLoopImpl
std::string location_;
promise<StatusOr<Operation>> promise_;

// `canceled_` and `op_name_`, in contrast, are also used from `DoCancel()`,
// which is called asynchronously, so they require synchronization.
// `delayed_cancel_` and `op_name_`, in contrast, are also used from
// `DoCancel()`, which is called asynchronously, so they need locking.
std::mutex mu_;
bool canceled_; // GUARDED_BY(mu_)
bool delayed_cancel_; // GUARDED_BY(mu_)
std::string op_name_; // GUARDED_BY(mu_)
};

Expand Down
81 changes: 64 additions & 17 deletions google/cloud/internal/async_polling_loop_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,52 @@ TEST(AsyncPollingLoopTest, ImmediateSuccess) {
EXPECT_THAT(*actual, IsProtoEqual(op));
}

TEST(AsyncPollingLoopTest, ImmediateCancel) {
google::longrunning::Operation starting_op;
starting_op.set_name("test-op-name");

auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
EXPECT_CALL(*mock_cq, MakeRelativeTimer)
.WillOnce([](std::chrono::nanoseconds) {
return make_ready_future(
make_status_or(std::chrono::system_clock::now()));
});
CompletionQueue cq(mock_cq);

auto mock = std::make_shared<MockStub>();
EXPECT_CALL(*mock, AsyncGetOperation)
.WillOnce([](CompletionQueue&, std::unique_ptr<grpc::ClientContext>,
google::longrunning::GetOperationRequest const&) {
return make_ready_future(StatusOr<Operation>(Status{
StatusCode::kCancelled, "test-function: operation cancelled"}));
});
EXPECT_CALL(*mock, AsyncCancelOperation)
.WillOnce([](CompletionQueue&, std::unique_ptr<grpc::ClientContext>,
google::longrunning::CancelOperationRequest const& request) {
EXPECT_EQ(request.name(), "test-op-name");
return make_ready_future(Status{});
});
auto policy = absl::make_unique<MockPollingPolicy>();
EXPECT_CALL(*policy, clone()).Times(0);
EXPECT_CALL(*policy, OnFailure).WillOnce([](Status const& status) {
EXPECT_THAT(status, StatusIs(StatusCode::kCancelled));
return false;
});
EXPECT_CALL(*policy, WaitPeriod)
.WillOnce(Return(std::chrono::milliseconds(1)));

promise<StatusOr<google::longrunning::Operation>> p;
auto pending =
AsyncPollingLoop(cq, p.get_future(), MakePoll(mock), MakeCancel(mock),
std::move(policy), "test-function");
pending.cancel();
p.set_value(starting_op);
auto actual = pending.get();
EXPECT_THAT(actual, StatusIs(StatusCode::kCancelled,
AllOf(HasSubstr("test-function"),
HasSubstr("operation cancelled"))));
}

TEST(AsyncPollingLoopTest, PollThenSuccess) {
Instance instance;
instance.set_name("test-instance-name");
Expand Down Expand Up @@ -352,13 +398,8 @@ TEST(AsyncPollingLoopTest, PollLifetime) {
}

TEST(AsyncPollingLoopTest, PollThenCancelDuringTimer) {
Instance instance;
instance.set_name("test-instance-name");
google::longrunning::Operation starting_op;
starting_op.set_name("test-op-name");
google::longrunning::Operation expected = starting_op;
expected.set_done(true);
expected.mutable_metadata()->PackFrom(instance);

using TimerType = StatusOr<std::chrono::system_clock::time_point>;
AsyncSequencer<TimerType> timer_sequencer;
Expand All @@ -385,7 +426,11 @@ TEST(AsyncPollingLoopTest, PollThenCancelDuringTimer) {
});
auto policy = absl::make_unique<MockPollingPolicy>();
EXPECT_CALL(*policy, clone()).Times(0);
EXPECT_CALL(*policy, OnFailure).WillRepeatedly(Return(true));
EXPECT_CALL(*policy, OnFailure)
.Times(2)
.WillRepeatedly([](Status const& status) {
return status.code() != StatusCode::kCancelled;
});
EXPECT_CALL(*policy, WaitPeriod)
.WillRepeatedly(Return(std::chrono::milliseconds(1)));

Expand All @@ -398,21 +443,18 @@ TEST(AsyncPollingLoopTest, PollThenCancelDuringTimer) {
auto t = timer_sequencer.PopFront();
pending.cancel();
t.set_value(std::chrono::system_clock::now());
get_sequencer.PopFront().set_value(
Status{StatusCode::kCancelled, "test-function: operation cancelled"});

auto actual = pending.get();
EXPECT_THAT(actual, StatusIs(Not(Eq(StatusCode::kOk)),
EXPECT_THAT(actual, StatusIs(StatusCode::kCancelled,
AllOf(HasSubstr("test-function"),
HasSubstr("terminated via cancel"))));
HasSubstr("operation cancelled"))));
}

TEST(AsyncPollingLoopTest, PollThenCancelDuringPoll) {
Instance instance;
instance.set_name("test-instance-name");
google::longrunning::Operation starting_op;
starting_op.set_name("test-op-name");
google::longrunning::Operation expected = starting_op;
expected.set_done(true);
expected.mutable_metadata()->PackFrom(instance);

using TimerType = StatusOr<std::chrono::system_clock::time_point>;
AsyncSequencer<TimerType> timer_sequencer;
Expand All @@ -439,7 +481,11 @@ TEST(AsyncPollingLoopTest, PollThenCancelDuringPoll) {
});
auto policy = absl::make_unique<MockPollingPolicy>();
EXPECT_CALL(*policy, clone()).Times(0);
EXPECT_CALL(*policy, OnFailure).WillRepeatedly(Return(true));
EXPECT_CALL(*policy, OnFailure)
.Times(2)
.WillRepeatedly([](Status const& status) {
return status.code() != StatusCode::kCancelled;
});
EXPECT_CALL(*policy, WaitPeriod)
.WillRepeatedly(Return(std::chrono::milliseconds(1)));

Expand All @@ -452,12 +498,13 @@ TEST(AsyncPollingLoopTest, PollThenCancelDuringPoll) {
timer_sequencer.PopFront().set_value(std::chrono::system_clock::now());
auto g = get_sequencer.PopFront();
pending.cancel();
g.set_value(starting_op);
g.set_value(
Status{StatusCode::kCancelled, "test-function: operation cancelled"});

auto actual = pending.get();
EXPECT_THAT(actual, StatusIs(Not(Eq(StatusCode::kOk)),
EXPECT_THAT(actual, StatusIs(StatusCode::kCancelled,
AllOf(HasSubstr("test-function"),
HasSubstr("terminated via cancel"))));
HasSubstr("operation cancelled"))));
}

} // namespace
Expand Down

0 comments on commit b2fe267

Please sign in to comment.