Skip to content

Commit

Permalink
[#20321] docdb: Fix move transaction status RPC cleanup race with yb:…
Browse files Browse the repository at this point in the history
…:Transaction destructor

Summary:
It is currently possible for move transaction status RPC to finish after `yb::Transaction`
destructs, because the handler only uses a weak_ptr. In this case, we do not clean up the RPC
handle, because we no longer have access to the `Rpcs` object or the handle.

This causes promotion tests that run into this case to occasionally timeout, because the
`Rpcs::Abort` call in the `yb::Transaction` destructor waits for the move transaction status RPCs
to be unregistered, but this never happens since the weak_ptr lock fails (due to having entered
the destructor).

This diff changes the move transaction status code path to maintain strong references to the
transaction object instead, so that the RPC cleanup can happen.
Jira: DB-9305

Test Plan:
Jenkins. Also ran GeoTransactionsPromotionTest.TestPromotionReturningToAbortedState 500x
and ensured timeout did not happen.

Reviewers: sergei

Reviewed By: sergei

Subscribers: rthallam, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D38792
  • Loading branch information
es1024 committed Oct 9, 2024
1 parent fa3efff commit 71412cf
Showing 1 changed file with 26 additions and 36 deletions.
62 changes: 26 additions & 36 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1967,23 +1967,17 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
TRACE_TO(trace_, __func__);
VLOG_WITH_PREFIX(2) << "SendUpdateTransactionStatusLocationRpcs()";

std::weak_ptr<YBTransaction> weak_transaction = transaction_->shared_from_this();
std::shared_ptr<YBTransaction> transaction = transaction_->shared_from_this();
manager_->client()->messenger()->scheduler().Schedule(
[this, weak_transaction, id = metadata_.transaction_id](const Status&) {
DoSendUpdateTransactionStatusLocationRpcs(weak_transaction, id);
[this, transaction = std::move(transaction), id = metadata_.transaction_id](const Status&) {
DoSendUpdateTransactionStatusLocationRpcs(std::move(transaction), id);
},
std::chrono::milliseconds(FLAGS_TEST_txn_status_moved_rpc_send_delay_ms));
}

void DoSendUpdateTransactionStatusLocationRpcs(
const std::weak_ptr<YBTransaction>& weak_transaction, const TransactionId& id)
std::shared_ptr<YBTransaction> transaction, const TransactionId& id)
EXCLUDES(mutex_) {
auto transaction = weak_transaction.lock();
if (!transaction) {
VLOG(1) << id << ": " << "Transaction destroyed, not sending status location updates";
return;
}

TRACE_TO(trace_, __func__);
VLOG_WITH_PREFIX(2) << "DoSendUpdateTransactionStatusLocationRpcs()";

Expand All @@ -1997,10 +1991,10 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {

if (transaction_status_move_tablets_.empty()) {
auto old_status_tablet = old_status_tablet_;
auto transaction = transaction_->shared_from_this();
lock.unlock();
VLOG_WITH_PREFIX(1) << "No participants to send transaction status location updates to";
SendAbortToOldStatusTabletIfNeeded(TransactionRpcDeadline(), transaction, old_status_tablet);
SendAbortToOldStatusTabletIfNeeded(
TransactionRpcDeadline(), std::move(transaction), old_status_tablet);
return;
}

Expand Down Expand Up @@ -2035,12 +2029,13 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
for (const auto& participant_tablet : participant_tablets) {
VLOG_WITH_PREFIX(2) << "SendUpdateTransactionStatusLocationRpcs() to tablet "
<< participant_tablet;
LookupTabletForTransactionStatusLocationUpdate(weak_transaction, id, req, participant_tablet);
LookupTabletForTransactionStatusLocationUpdate(
std::move(transaction), id, req, participant_tablet);
}
}

void LookupTabletForTransactionStatusLocationUpdate(
const std::weak_ptr<YBTransaction>& weak_transaction,
std::shared_ptr<YBTransaction> transaction,
const TransactionId& id,
const tserver::UpdateTransactionStatusLocationRequestPB& request_template,
const TabletId& tablet_id) EXCLUDES(mutex_) {
Expand All @@ -2051,9 +2046,11 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
master::IncludeInactive::kFalse,
master::IncludeDeleted::kFalse,
TransactionRpcDeadline(),
std::bind(
&Impl::LookupTabletForTransactionStatusLocationUpdateDone, this, _1, weak_transaction,
id, request_template, tablet_id),
[this, transaction = std::move(transaction), id, request_template, tablet_id](
const Result<internal::RemoteTabletPtr>& result) mutable {
LookupTabletForTransactionStatusLocationUpdateDone(
result, std::move(transaction), id, request_template, tablet_id);
},
client::UseCache::kTrue);
}

Expand All @@ -2064,7 +2061,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
}

rpc::RpcCommandPtr PrepareUpdateTransactionStatusLocationRpc(
const std::weak_ptr<YBTransaction>& weak_transaction,
std::shared_ptr<YBTransaction> transaction,
const TransactionId& id,
const TabletId &tablet_id, internal::RemoteTabletPtr participant_tablet,
const tserver::UpdateTransactionStatusLocationRequestPB& request_template) {
Expand All @@ -2077,23 +2074,22 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
participant_tablet.get(),
manager_->client(),
&request,
std::bind(
&Impl::UpdateTransactionStatusLocationDone, this, weak_transaction, id, tablet_id,
participant_tablet, request_template, _1, _2));
[this, transaction = std::move(transaction), id, tablet_id, participant_tablet,
request_template](
const Status& status,
const tserver::UpdateTransactionStatusLocationResponsePB& response) mutable {
UpdateTransactionStatusLocationDone(
std::move(transaction), id, tablet_id, participant_tablet, request_template,
status, response);
});
}

void LookupTabletForTransactionStatusLocationUpdateDone(
const Result<internal::RemoteTabletPtr>& result,
const std::weak_ptr<YBTransaction>& weak_transaction,
std::shared_ptr<YBTransaction> transaction,
const TransactionId& id,
const tserver::UpdateTransactionStatusLocationRequestPB& request_template,
const TabletId& tablet_id) EXCLUDES(mutex_) {
auto transaction = weak_transaction.lock();
if (!transaction) {
VLOG(1) << id << ": " << "Transaction destroyed, not sending status location updates";
return;
}

TRACE_TO(trace_, __func__);
VLOG_WITH_PREFIX(1) << "Lookup tablet done: " << yb::ToString(result);

Expand All @@ -2106,25 +2102,19 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {

auto handle = GetTransactionStatusMoveHandle(tablet_id);
auto rpc = PrepareUpdateTransactionStatusLocationRpc(
weak_transaction, id, tablet_id, *result, request_template);
std::move(transaction), id, tablet_id, *result, request_template);

lock.unlock();
manager_->rpcs().RegisterAndStart(rpc, handle);
}

void UpdateTransactionStatusLocationDone(
const std::weak_ptr<YBTransaction>& weak_transaction,
std::shared_ptr<YBTransaction> transaction,
const TransactionId& id,
const TabletId &tablet_id, internal::RemoteTabletPtr participant_tablet,
const tserver::UpdateTransactionStatusLocationRequestPB& request_template,
const Status& status,
const tserver::UpdateTransactionStatusLocationResponsePB& response) EXCLUDES(mutex_) {
auto transaction = weak_transaction.lock();
if (!transaction) {
VLOG(1) << id << ": " << "Transaction destroyed before status location update finished";
return;
}

TRACE_TO(trace_, __func__);
VLOG_WITH_PREFIX(1) << "Transaction status update for participant tablet "
<< tablet_id << ": " << yb::ToString(status);
Expand Down

0 comments on commit 71412cf

Please sign in to comment.