Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: extremely rare race conditions in retry loop #7789

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 181 additions & 72 deletions google/cloud/internal/async_retry_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,110 @@ struct FutureValueType<future<T>> {
* implementation of these stubs is very easy too.
*
* This class implements the retry loop for such an RPC.
*
* @par Cancellation and Thread Safety
*
* This class supports cancelling a retry loop (best-effort as most cancels
* with gRPC). Without cancels this class would require no synchronization, as
* each request, backoff timer, and their callbacks can only occur in sequence.
* Cancel requests, however, may be invoked by any thread and require some form
* of synchronization. The basic idea is that the class maintains a future
* (called `pending_operation_`) that represents the current pending operation,
* i.e., the current gRPC request or backoff timer. Cancelling the loop may
* require cancelling the pending operation. Using such a `pending_operation_`
* future is prone to subtle race conditions. This section explains how we
* ensure safety.
*
* @par The Race Condition
*
* The most common approach to solve race conditions is to use some kind of
* lock, unfortunately this does not work in this case. Consider this code:
*
* @code
* 1: void AsyncRetryLoopImpl::F() {
* 2: auto self = shared_from_this();
* 3: future<...> f = create_future();
* 4: std::unique_lock lk(mu_);
* 5: if (cancelled_) return HandleCancel();
* 6: auto pending = f.then([self](auto g) { self->Callback(); }
* 7: this->pending_operation_ = pending;
* 8: }
* 9:
* 10: void AsyncRetryLoopImpl::Cancel() {
* ....
* 11: this->pending_operation_.cancel();
* ....
* }
* 12:
* 13: void AsyncRetryLoopImpl::Callback() {
* ....
* 14: F();
* ...
* }
* @endcode
*
* Because futures are immediately active, setting up the callback on line (6)
* can result in an immediate call to `Callback()`. Since the mutex is held in
* line (4) that can result in a deadlock as the callback may invoke `F()`
* in line (14) which would want to acquire the mutex.
*
* One could solve this deadlock using a recursive mutex, but that still leaves
* a second problem: as the stack unwinds the `pending_operation_` member
* variable is set, on line (7), to the **first** operation, and we want it to
* remain set to the **last** operation.
*
* @par Solution
*
* We keep a counter reflecting the number of operations performed by the retry
* loop. This counter is incremented before starting a request and before
* starting a backoff timer.
*
* The `pending_operation_` variable is updated **only** if the current
* operation matches the operation counter. This means the `pending_operation_`
* variable always reflects the last pending operation, it can never be set to
* an older operation.
*
* @par Observations
*
* - The initial value of `cancelled_` is false.
* - `Cancel()` is the only operation that changes `cancelled_`, and it holds
* the `mu_` mutex while doing so.
* - Once `cancelled_` is set to `true` it is never set to `false`
*
* While `cancelled_` is false the loop is (basically) single threaded:
* - Each gRPC request or backoff timer is sequenced-after a call to
* `StartOperation()`, see `StartAttempt()` and `StartBackoff()`.
* - Each gRPC request or backoff timer must complete before the next one
* starts, as it is their callbacks (`OnAttempt()` and `OnBackoff()`) that
* start the next step.
* - `StartOperation()` is always sequenced-before calls to `SetPending()`.
* - `SetPending()` never sets `pending_operation_` to the `future<void>`
* representing an operation that has already completed.
*
* As to when the `cancelled_` flag changes to `true`:
* - `StartOperation()` and `SetPending()` both lock the same mutex `mu_` as
* `Cancel()`.
* - It follows that if `Cancel()` is invoked, then the `true` value will be
* visible to any future calls to `StartOperation()` or `SetPending()`.
* - If the next call is `StartOperation()` then no new operation is issued,
* as both `StartAttempt()` and `StartBackoff()` return immediately in this
* case.
* - Note that if `cancelled_` is true, `StartOperation()` terminates the
* retry loop by calling `SetDoneWithCancel()`.
* - If the next call is `SetPending()` the pending operation is immediately
* cancelled.
*
* @par Safety and Progress
*
* While the `cancelled_` flag is `false` the loop remains "safe" because it is
* fundamentally a sequence of calls without concurrency or parallelism.
* Multiple threads may be involved (i.e., each callback can happen in a
* different thread) but the completion queue provides enough synchronization.
*
* Once the `cancelled_` flag is set to `true` the new valuable will become
* visible to the threads running the `StartAttempt()` and/or `StartBackoff()`
* functions. If the value is visible, the retry loop will stop on the next
* callback and/or before the next request or timer is issued.
*/
template <typename Functor, typename Request, typename RetryPolicyType>
class AsyncRetryLoopImpl
Expand Down Expand Up @@ -93,120 +197,120 @@ class AsyncRetryLoopImpl
}

private:
enum State {
kIdle,
kWaiting,
kDone,
using TimerArgType = StatusOr<std::chrono::system_clock::time_point>;

struct State {
bool cancelled;
std::uint_fast32_t operation;
};

State StartOperation() {
std::unique_lock<std::mutex> lk(mu_);
if (!cancelled_) return State{false, ++operation_};
return SetDoneWithCancel(std::move(lk));
}

State OnOperation() {
std::unique_lock<std::mutex> lk(mu_);
if (!cancelled_) return State{false, operation_};
return SetDoneWithCancel(std::move(lk));
}

void StartAttempt() {
auto self = this->shared_from_this();
if (retry_policy_->IsExhausted()) {
SetDone(
return SetDone(
RetryLoopError("Retry policy exhausted in", location_, last_status_));
return;
}
auto self = this->shared_from_this();
auto state = StartOperation();
if (state.cancelled) return;
auto context = absl::make_unique<grpc::ClientContext>();
SetupContext<RetryPolicyType>::Setup(*retry_policy_, *context);
auto op =
SetPending(
state.operation,
functor_(cq_, std::move(context), request_).then([self](future<T> f) {
self->OnAttempt(f.get());
});
SetWaiting(std::move(op));
}));
}

void StartBackoff() {
auto self = this->shared_from_this();
auto state = StartOperation();
if (state.cancelled) return;
SetPending(state.operation,
cq_.MakeRelativeTimer(backoff_policy_->OnCompletion())
.then([self](future<TimerArgType> f) {
self->OnBackoff(f.get());
}));
}

void OnAttempt(T result) {
SetIdle();
// A successful attempt, set the value and finish the loop.
if (result.ok()) {
SetDone(std::move(result));
return;
}
if (result.ok()) return SetDone(std::move(result));
// Some kind of failure, first verify that it is retryable.
last_status_ = GetResultStatus(std::move(result));
if (idempotency_ == Idempotency::kNonIdempotent) {
SetDone(RetryLoopError("Error in non-idempotent operation", location_,
last_status_));
return;
return SetDone(RetryLoopError("Error in non-idempotent operation",
location_, last_status_));
}
if (!retry_policy_->OnFailure(last_status_)) {
if (retry_policy_->IsPermanentFailure(last_status_)) {
SetDone(RetryLoopError("Permanent error in", location_, last_status_));
} else {
SetDone(RetryLoopError("Retry policy exhausted in", location_,
last_status_));
return SetDone(
RetryLoopError("Permanent error in", location_, last_status_));
}
return;
return SetDone(
RetryLoopError("Retry policy exhausted in", location_, last_status_));
}
if (Cancelled()) return;
auto self = this->shared_from_this();
auto op =
cq_.MakeRelativeTimer(backoff_policy_->OnCompletion())
.then(
[self](
future<StatusOr<std::chrono::system_clock::time_point>> f) {
self->OnBackoffTimer(f.get());
});
SetWaiting(std::move(op));
StartBackoff();
}

void OnBackoffTimer(StatusOr<std::chrono::system_clock::time_point> tp) {
SetIdle();
if (Cancelled()) return;
void OnBackoff(TimerArgType tp) {
auto state = OnOperation();
// Check for the retry loop cancellation first. We want to report that
// status instead of the timer failure in that case.
if (state.cancelled) return;
if (!tp) {
// Some kind of error in the CompletionQueue, probably shutting down.
SetDone(RetryLoopError("Timer failure in", location_,
std::move(tp).status()));
return;
return SetDone(RetryLoopError("Timer failure in", location_,
std::move(tp).status()));
}
StartAttempt();
}

void SetIdle() {
std::unique_lock<std::mutex> lk(mu_);
switch (state_) {
case kIdle:
case kDone:
break;
case kWaiting:
state_ = kIdle;
break;
}
}

void SetWaiting(future<void> op) {
void SetPending(std::uint_fast32_t iteration, future<void> op) {
std::unique_lock<std::mutex> lk(mu_);
if (state_ != kIdle) return;
state_ = kWaiting;
pending_operation_ = std::move(op);
if (operation_ == iteration) pending_operation_ = std::move(op);
if (cancelled_) return Cancel(std::move(lk));
}

// Handle the case where the retry loop finishes due to a successful request
// or the retry policies getting exhausted.
void SetDone(T value) {
std::unique_lock<std::mutex> lk(mu_);
if (state_ == kDone) return;
state_ = kDone;
if (done_) return;
done_ = true;
lk.unlock();
result_.set_value(std::move(value));
}

void Cancel() {
std::unique_lock<std::mutex> lk(mu_);
cancelled_ = true;
if (state_ != kWaiting) return;
future<void> f = std::move(pending_operation_);
state_ = kIdle;
// Handle the case where the retry loop finishes due to a successful cancel
// request.
State SetDoneWithCancel(std::unique_lock<std::mutex> lk) {
if (done_) return State{true, 0};
done_ = true;
lk.unlock();
f.cancel();
result_.set_value(
RetryLoopError("Retry loop cancelled", location_, last_status_));
return State{true, 0};
}

bool Cancelled() {
std::unique_lock<std::mutex> lk(mu_);
if (!cancelled_) return false;
state_ = kDone;
void Cancel() { return Cancel(std::unique_lock<std::mutex>(mu_)); }

void Cancel(std::unique_lock<std::mutex> lk) {
cancelled_ = true;
future<void> f = std::move(pending_operation_);
lk.unlock();
result_.set_value(
RetryLoopError("Retry loop cancelled", location_, last_status_));
return true;
f.cancel();
}

std::unique_ptr<RetryPolicyType> retry_policy_;
Expand All @@ -218,9 +322,14 @@ class AsyncRetryLoopImpl
char const* location_ = "unknown";
Status last_status_ = Status(StatusCode::kUnknown, "Retry policy exhausted");
promise<T> result_;

// Only the following variables require synchronization, as they coordinate
// the work between the retry loop (which would be lock-free) and the cancel
// requests (which need locks).
std::mutex mu_;
State state_ = kIdle;
bool cancelled_ = false;
bool done_ = false;
std::uint_fast32_t operation_ = 0;
future<void> pending_operation_;
};

Expand Down
57 changes: 42 additions & 15 deletions google/cloud/internal/async_retry_loop_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,23 @@ class MockBackoffPolicy : public BackoffPolicy {
MOCK_METHOD(std::chrono::milliseconds, OnCompletion, (), (override));
};

class RetryPolicyWithSetup {
public:
virtual ~RetryPolicyWithSetup() = default;
virtual bool OnFailure(Status const&) = 0;
virtual void Setup(grpc::ClientContext&) const = 0;
virtual bool IsExhausted() const = 0;
virtual bool IsPermanentFailure(Status const&) const = 0;
};

class MockRetryPolicy : public RetryPolicyWithSetup {
public:
MOCK_METHOD(bool, OnFailure, (Status const&), (override));
MOCK_METHOD(void, Setup, (grpc::ClientContext&), (const, override));
MOCK_METHOD(bool, IsExhausted, (), (const, override));
MOCK_METHOD(bool, IsPermanentFailure, (Status const&), (const, override));
};

/// @test Verify the backoff policy is queried after each failure.
TEST(AsyncRetryLoopTest, UsesBackoffPolicy) {
using ms = std::chrono::milliseconds;
Expand Down Expand Up @@ -326,22 +343,32 @@ TEST(AsyncRetryLoopTest, ExhaustedDuringBackoff) {
HasSubstr("test-location"))));
}

class RetryPolicyWithSetup {
public:
virtual ~RetryPolicyWithSetup() = default;
virtual bool OnFailure(Status const&) = 0;
virtual void Setup(grpc::ClientContext&) const = 0;
virtual bool IsExhausted() const = 0;
virtual bool IsPermanentFailure(Status const&) const = 0;
};
TEST(AsyncRetryLoopTest, ExhaustedBeforeStart) {
auto mock = absl::make_unique<MockRetryPolicy>();
EXPECT_CALL(*mock, IsExhausted)
.WillOnce(Return(false))
.WillRepeatedly(Return(true));
EXPECT_CALL(*mock, OnFailure).WillOnce(Return(true));
EXPECT_CALL(*mock, IsPermanentFailure).WillRepeatedly(Return(false));
EXPECT_CALL(*mock, Setup).Times(1);

class MockRetryPolicy : public RetryPolicyWithSetup {
public:
MOCK_METHOD(bool, OnFailure, (Status const&), (override));
MOCK_METHOD(void, Setup, (grpc::ClientContext&), (const, override));
MOCK_METHOD(bool, IsExhausted, (), (const, override));
MOCK_METHOD(bool, IsPermanentFailure, (Status const&), (const, override));
};
AutomaticallyCreatedBackgroundThreads background;
StatusOr<int> actual =
AsyncRetryLoop(
std::unique_ptr<RetryPolicyWithSetup>(std::move(mock)),
TestBackoffPolicy(), Idempotency::kIdempotent, background.cq(),
[](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>, int) {
return make_ready_future(StatusOr<int>(
Status(StatusCode::kUnavailable, "test-message-try-again")));
},
42, "test-location")
.get();
EXPECT_THAT(actual, StatusIs(StatusCode::kUnavailable,
AllOf(HasSubstr("test-message-try-again"),
HasSubstr("Retry policy exhausted"),
HasSubstr("test-location"))));
}

TEST(AsyncRetryLoopTest, SetsTimeout) {
auto mock = absl::make_unique<MockRetryPolicy>();
Expand Down