Skip to content

Commit

Permalink
fix: extremely rare race conditions in retry loop (#7789)
Browse files Browse the repository at this point in the history
  • Loading branch information
coryan authored Dec 28, 2021
1 parent d2c0a6b commit 2b9e0fb
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 87 deletions.
253 changes: 181 additions & 72 deletions google/cloud/internal/async_retry_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,110 @@ struct FutureValueType<future<T>> {
* implementation of these stubs is very easy too.
*
* This class implements the retry loop for such an RPC.
*
* @par Cancellation and Thread Safety
*
* This class supports cancelling a retry loop (best-effort as most cancels
* with gRPC). Without cancels this class would require no synchronization, as
* each request, backoff timer, and their callbacks can only occur in sequence.
* Cancel requests, however, may be invoked by any thread and require some form
* of synchronization. The basic idea is that the class maintains a future
* (called `pending_operation_`) that represents the current pending operation,
* i.e., the current gRPC request or backoff timer. Cancelling the loop may
* require cancelling the pending operation. Using such a `pending_operation_`
* future is prone to subtle race conditions. This section explains how we
* ensure safety.
*
* @par The Race Condition
*
* The most common approach to solve race conditions is to use some kind of
* lock, unfortunately this does not work in this case. Consider this code:
*
* @code
* 1: void AsyncRetryLoopImpl::F() {
* 2: auto self = shared_from_this();
* 3: future<...> f = create_future();
* 4: std::unique_lock lk(mu_);
* 5: if (cancelled_) return HandleCancel();
* 6: auto pending = f.then([self](auto g) { self->Callback(); }
* 7: this->pending_operation_ = pending;
* 8: }
* 9:
* 10: void AsyncRetryLoopImpl::Cancel() {
* ....
* 11: this->pending_operation_.cancel();
* ....
* }
* 12:
* 13: void AsyncRetryLoopImpl::Callback() {
* ....
* 14: F();
* ...
* }
* @endcode
*
* Because futures are immediately active, setting up the callback on line (6)
* can result in an immediate call to `Callback()`. Since the mutex is held in
* line (4) that can result in a deadlock as the callback may invoke `F()`
* in line (14) which would want to acquire the mutex.
*
* One could solve this deadlock using a recursive mutex, but that still leaves
* a second problem: as the stack unwinds the `pending_operation_` member
* variable is set, on line (7), to the **first** operation, and we want it to
* remain set to the **last** operation.
*
* @par Solution
*
* We keep a counter reflecting the number of operations performed by the retry
* loop. This counter is incremented before starting a request and before
* starting a backoff timer.
*
* The `pending_operation_` variable is updated **only** if the current
* operation matches the operation counter. This means the `pending_operation_`
* variable always reflects the last pending operation, it can never be set to
* an older operation.
*
* @par Observations
*
* - The initial value of `cancelled_` is false.
* - `Cancel()` is the only operation that changes `cancelled_`, and it holds
* the `mu_` mutex while doing so.
* - Once `cancelled_` is set to `true` it is never set to `false`
*
* While `cancelled_` is false the loop is (basically) single threaded:
* - Each gRPC request or backoff timer is sequenced-after a call to
* `StartOperation()`, see `StartAttempt()` and `StartBackoff()`.
* - Each gRPC request or backoff timer must complete before the next one
* starts, as it is their callbacks (`OnAttempt()` and `OnBackoff()`) that
* start the next step.
* - `StartOperation()` is always sequenced-before calls to `SetPending()`.
* - `SetPending()` never sets `pending_operation_` to the `future<void>`
* representing an operation that has already completed.
*
* As to when the `cancelled_` flag changes to `true`:
* - `StartOperation()` and `SetPending()` both lock the same mutex `mu_` as
* `Cancel()`.
* - It follows that if `Cancel()` is invoked, then the `true` value will be
* visible to any future calls to `StartOperation()` or `SetPending()`.
* - If the next call is `StartOperation()` then no new operation is issued,
* as both `StartAttempt()` and `StartBackoff()` return immediately in this
* case.
* - Note that if `cancelled_` is true, `StartOperation()` terminates the
* retry loop by calling `SetDoneWithCancel()`.
* - If the next call is `SetPending()` the pending operation is immediately
* cancelled.
*
* @par Safety and Progress
*
* While the `cancelled_` flag is `false` the loop remains "safe" because it is
* fundamentally a sequence of calls without concurrency or parallelism.
* Multiple threads may be involved (i.e., each callback can happen in a
* different thread) but the completion queue provides enough synchronization.
*
* Once the `cancelled_` flag is set to `true` the new valuable will become
* visible to the threads running the `StartAttempt()` and/or `StartBackoff()`
* functions. If the value is visible, the retry loop will stop on the next
* callback and/or before the next request or timer is issued.
*/
template <typename Functor, typename Request, typename RetryPolicyType>
class AsyncRetryLoopImpl
Expand Down Expand Up @@ -93,120 +197,120 @@ class AsyncRetryLoopImpl
}

private:
enum State {
kIdle,
kWaiting,
kDone,
using TimerArgType = StatusOr<std::chrono::system_clock::time_point>;

struct State {
bool cancelled;
std::uint_fast32_t operation;
};

State StartOperation() {
std::unique_lock<std::mutex> lk(mu_);
if (!cancelled_) return State{false, ++operation_};
return SetDoneWithCancel(std::move(lk));
}

State OnOperation() {
std::unique_lock<std::mutex> lk(mu_);
if (!cancelled_) return State{false, operation_};
return SetDoneWithCancel(std::move(lk));
}

void StartAttempt() {
auto self = this->shared_from_this();
if (retry_policy_->IsExhausted()) {
SetDone(
return SetDone(
RetryLoopError("Retry policy exhausted in", location_, last_status_));
return;
}
auto self = this->shared_from_this();
auto state = StartOperation();
if (state.cancelled) return;
auto context = absl::make_unique<grpc::ClientContext>();
SetupContext<RetryPolicyType>::Setup(*retry_policy_, *context);
auto op =
SetPending(
state.operation,
functor_(cq_, std::move(context), request_).then([self](future<T> f) {
self->OnAttempt(f.get());
});
SetWaiting(std::move(op));
}));
}

void StartBackoff() {
auto self = this->shared_from_this();
auto state = StartOperation();
if (state.cancelled) return;
SetPending(state.operation,
cq_.MakeRelativeTimer(backoff_policy_->OnCompletion())
.then([self](future<TimerArgType> f) {
self->OnBackoff(f.get());
}));
}

void OnAttempt(T result) {
SetIdle();
// A successful attempt, set the value and finish the loop.
if (result.ok()) {
SetDone(std::move(result));
return;
}
if (result.ok()) return SetDone(std::move(result));
// Some kind of failure, first verify that it is retryable.
last_status_ = GetResultStatus(std::move(result));
if (idempotency_ == Idempotency::kNonIdempotent) {
SetDone(RetryLoopError("Error in non-idempotent operation", location_,
last_status_));
return;
return SetDone(RetryLoopError("Error in non-idempotent operation",
location_, last_status_));
}
if (!retry_policy_->OnFailure(last_status_)) {
if (retry_policy_->IsPermanentFailure(last_status_)) {
SetDone(RetryLoopError("Permanent error in", location_, last_status_));
} else {
SetDone(RetryLoopError("Retry policy exhausted in", location_,
last_status_));
return SetDone(
RetryLoopError("Permanent error in", location_, last_status_));
}
return;
return SetDone(
RetryLoopError("Retry policy exhausted in", location_, last_status_));
}
if (Cancelled()) return;
auto self = this->shared_from_this();
auto op =
cq_.MakeRelativeTimer(backoff_policy_->OnCompletion())
.then(
[self](
future<StatusOr<std::chrono::system_clock::time_point>> f) {
self->OnBackoffTimer(f.get());
});
SetWaiting(std::move(op));
StartBackoff();
}

void OnBackoffTimer(StatusOr<std::chrono::system_clock::time_point> tp) {
SetIdle();
if (Cancelled()) return;
void OnBackoff(TimerArgType tp) {
auto state = OnOperation();
// Check for the retry loop cancellation first. We want to report that
// status instead of the timer failure in that case.
if (state.cancelled) return;
if (!tp) {
// Some kind of error in the CompletionQueue, probably shutting down.
SetDone(RetryLoopError("Timer failure in", location_,
std::move(tp).status()));
return;
return SetDone(RetryLoopError("Timer failure in", location_,
std::move(tp).status()));
}
StartAttempt();
}

void SetIdle() {
std::unique_lock<std::mutex> lk(mu_);
switch (state_) {
case kIdle:
case kDone:
break;
case kWaiting:
state_ = kIdle;
break;
}
}

void SetWaiting(future<void> op) {
void SetPending(std::uint_fast32_t operation, future<void> op) {
std::unique_lock<std::mutex> lk(mu_);
if (state_ != kIdle) return;
state_ = kWaiting;
pending_operation_ = std::move(op);
if (operation_ == operation) pending_operation_ = std::move(op);
if (cancelled_) return Cancel(std::move(lk));
}

// Handle the case where the retry loop finishes due to a successful request
// or the retry policies getting exhausted.
void SetDone(T value) {
std::unique_lock<std::mutex> lk(mu_);
if (state_ == kDone) return;
state_ = kDone;
if (done_) return;
done_ = true;
lk.unlock();
result_.set_value(std::move(value));
}

void Cancel() {
std::unique_lock<std::mutex> lk(mu_);
cancelled_ = true;
if (state_ != kWaiting) return;
future<void> f = std::move(pending_operation_);
state_ = kIdle;
// Handle the case where the retry loop finishes due to a successful cancel
// request.
State SetDoneWithCancel(std::unique_lock<std::mutex> lk) {
if (done_) return State{true, 0};
done_ = true;
lk.unlock();
f.cancel();
result_.set_value(
RetryLoopError("Retry loop cancelled", location_, last_status_));
return State{true, 0};
}

bool Cancelled() {
std::unique_lock<std::mutex> lk(mu_);
if (!cancelled_) return false;
state_ = kDone;
void Cancel() { return Cancel(std::unique_lock<std::mutex>(mu_)); }

void Cancel(std::unique_lock<std::mutex> lk) {
cancelled_ = true;
future<void> f = std::move(pending_operation_);
lk.unlock();
result_.set_value(
RetryLoopError("Retry loop cancelled", location_, last_status_));
return true;
f.cancel();
}

std::unique_ptr<RetryPolicyType> retry_policy_;
Expand All @@ -218,9 +322,14 @@ class AsyncRetryLoopImpl
char const* location_ = "unknown";
Status last_status_ = Status(StatusCode::kUnknown, "Retry policy exhausted");
promise<T> result_;

// Only the following variables require synchronization, as they coordinate
// the work between the retry loop (which would be lock-free) and the cancel
// requests (which need locks).
std::mutex mu_;
State state_ = kIdle;
bool cancelled_ = false;
bool done_ = false;
std::uint_fast32_t operation_ = 0;
future<void> pending_operation_;
};

Expand Down
57 changes: 42 additions & 15 deletions google/cloud/internal/async_retry_loop_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,23 @@ class MockBackoffPolicy : public BackoffPolicy {
MOCK_METHOD(std::chrono::milliseconds, OnCompletion, (), (override));
};

class RetryPolicyWithSetup {
public:
virtual ~RetryPolicyWithSetup() = default;
virtual bool OnFailure(Status const&) = 0;
virtual void Setup(grpc::ClientContext&) const = 0;
virtual bool IsExhausted() const = 0;
virtual bool IsPermanentFailure(Status const&) const = 0;
};

class MockRetryPolicy : public RetryPolicyWithSetup {
public:
MOCK_METHOD(bool, OnFailure, (Status const&), (override));
MOCK_METHOD(void, Setup, (grpc::ClientContext&), (const, override));
MOCK_METHOD(bool, IsExhausted, (), (const, override));
MOCK_METHOD(bool, IsPermanentFailure, (Status const&), (const, override));
};

/// @test Verify the backoff policy is queried after each failure.
TEST(AsyncRetryLoopTest, UsesBackoffPolicy) {
using ms = std::chrono::milliseconds;
Expand Down Expand Up @@ -326,22 +343,32 @@ TEST(AsyncRetryLoopTest, ExhaustedDuringBackoff) {
HasSubstr("test-location"))));
}

class RetryPolicyWithSetup {
public:
virtual ~RetryPolicyWithSetup() = default;
virtual bool OnFailure(Status const&) = 0;
virtual void Setup(grpc::ClientContext&) const = 0;
virtual bool IsExhausted() const = 0;
virtual bool IsPermanentFailure(Status const&) const = 0;
};
TEST(AsyncRetryLoopTest, ExhaustedBeforeStart) {
auto mock = absl::make_unique<MockRetryPolicy>();
EXPECT_CALL(*mock, IsExhausted)
.WillOnce(Return(false))
.WillRepeatedly(Return(true));
EXPECT_CALL(*mock, OnFailure).WillOnce(Return(true));
EXPECT_CALL(*mock, IsPermanentFailure).WillRepeatedly(Return(false));
EXPECT_CALL(*mock, Setup).Times(1);

class MockRetryPolicy : public RetryPolicyWithSetup {
public:
MOCK_METHOD(bool, OnFailure, (Status const&), (override));
MOCK_METHOD(void, Setup, (grpc::ClientContext&), (const, override));
MOCK_METHOD(bool, IsExhausted, (), (const, override));
MOCK_METHOD(bool, IsPermanentFailure, (Status const&), (const, override));
};
AutomaticallyCreatedBackgroundThreads background;
StatusOr<int> actual =
AsyncRetryLoop(
std::unique_ptr<RetryPolicyWithSetup>(std::move(mock)),
TestBackoffPolicy(), Idempotency::kIdempotent, background.cq(),
[](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>, int) {
return make_ready_future(StatusOr<int>(
Status(StatusCode::kUnavailable, "test-message-try-again")));
},
42, "test-location")
.get();
EXPECT_THAT(actual, StatusIs(StatusCode::kUnavailable,
AllOf(HasSubstr("test-message-try-again"),
HasSubstr("Retry policy exhausted"),
HasSubstr("test-location"))));
}

TEST(AsyncRetryLoopTest, SetsTimeout) {
auto mock = absl::make_unique<MockRetryPolicy>();
Expand Down

0 comments on commit 2b9e0fb

Please sign in to comment.