Skip to content

Commit

Permalink
[#24596] YSQL: Introduce TransactionProvider for PgClientSession needs
Browse files Browse the repository at this point in the history
Summary:
The `TransactionProvider` is a wrapper over transaction pool used in `PgClientService`. This class allows to know id of transaction which will be used next for `PgClientSessionKind::kPlain` session type in `PgClientSession` object.  The `TransactionProvider::NextTxnIdForPlain` is used for this purpose.

The knowing of txn id is required to introduce object locks (will be implemented separately).
By design object locks are identified by distributed txn id. But it is too costly to start distributed txn in case current postgre txn starts using object locks because this will slowdown read only transactions.
In case current transaction is read only object locks will use next txn id for identification. After commit/abort of such a transaction all the object locks will be removed. As a result same distributed txn id could be used for object locks in context of next transaction.
In case of performing write operation by current transaction the distributed txn will be started. And it will have same txn id which was returned by the `TransactionProvider::NextTxnIdForPlain` method. This approach guaranties that all the object locks taken prior to performing of write operation will have same txn id like all the intents written by distributed txn started within current postgres's transaction.
Jira: DB-13634

Test Plan: Jenkins

Reviewers: bkolagani, pjain, esheng, sergei

Reviewed By: esheng

Subscribers: rthallam, sergei, yql, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D41282
  • Loading branch information
d-uspenskiy committed Jan 30, 2025
1 parent c5ab152 commit f9d389d
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 28 deletions.
9 changes: 6 additions & 3 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -724,9 +724,12 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
return true;
}

Status PromoteToGlobal(const CoarseTimePoint& deadline) EXCLUDES(mutex_) {
Status EnsureGlobal(const CoarseTimePoint& deadline) EXCLUDES(mutex_) {
{
UniqueLock lock(mutex_);
if (metadata_.locality == TransactionLocality::GLOBAL) {
return Status::OK();
}
RETURN_NOT_OK(StartPromotionToGlobal());
}
DoPromoteToGlobal(deadline);
Expand Down Expand Up @@ -2534,8 +2537,8 @@ void YBTransaction::Abort(CoarseTimePoint deadline) {
impl_->Abort(AdjustDeadline(deadline));
}

Status YBTransaction::PromoteToGlobal(CoarseTimePoint deadline) {
return impl_->PromoteToGlobal(AdjustDeadline(deadline));
Status YBTransaction::EnsureGlobal(CoarseTimePoint deadline) {
return impl_->EnsureGlobal(AdjustDeadline(deadline));
}

bool YBTransaction::IsRestartRequired() const {
Expand Down
4 changes: 2 additions & 2 deletions src/yb/client/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ class YBTransaction : public std::enable_shared_from_this<YBTransaction> {
// Aborts this transaction.
void Abort(CoarseTimePoint deadline = CoarseTimePoint());

// Promote a local transaction into a global transaction.
Status PromoteToGlobal(CoarseTimePoint deadline = CoarseTimePoint());
// Make sure transaction is global.
Status EnsureGlobal(CoarseTimePoint deadline = CoarseTimePoint());

// Returns transaction ID.
const TransactionId& id() const;
Expand Down
85 changes: 63 additions & 22 deletions src/yb/tserver/pg_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <mutex>
#include <queue>
#include <set>
#include <tuple>

#include "yb/cdc/cdc_service.h"

Expand Down Expand Up @@ -860,6 +861,50 @@ std::ostream& operator<<(std::ostream& str, const PgClientSession::PrefixLogger&
return str << "Session id " << logger.id_ << ": ";
}

PgClientSession::TransactionProvider::TransactionProvider(TransactionBuilder&& builder)
: builder_(std::move(builder)) {}

client::YBTransactionPtr PgClientSession::TransactionProvider::TakeForPgSession(
CoarseTimePoint deadline) {
// The transaction coordinator needs to know that this is a session level transaction as the
// handling on deadlocks and heartbeats etc are different for regular docdb transactions and
// session level transactions. Hence, we create a new transaction instead of using a ready
// transaction (whose state at the coordinator would be different from what we want to set).
//
// Advisory locks table is not placement local, hence we need a global transaction for tagging
// the requested session advisory locks.
return Build(deadline, {.force_global = true, .force_create = true});
}

client::YBTransactionPtr PgClientSession::TransactionProvider::TakeForDdl(
CoarseTimePoint deadline) {
return Build(deadline, {.is_ddl = true, .force_global = true});
}

auto PgClientSession::TransactionProvider::TakeForPlain(
client::ForceGlobalTransaction force_global,
CoarseTimePoint deadline) -> TakeForPlainReturnType {
using RT = TakeForPlainReturnType;
return next_plain_
? RT{std::exchange(next_plain_, {}), EnsureGlobal{force_global}}
: RT{Build(deadline, {.force_global = force_global}), EnsureGlobal::kFalse};
}

const TransactionId& PgClientSession::TransactionProvider::NextTxnIdForPlain(
CoarseTimePoint deadline) {
if (!next_plain_) {
next_plain_ = Build(deadline, {});
}
return next_plain_->id();
}

client::YBTransactionPtr PgClientSession::TransactionProvider::Build(
CoarseTimePoint deadline, const BuildStrategy& strategy) {
return builder_(
IsDDL{strategy.is_ddl}, client::ForceGlobalTransaction{strategy.force_global}, deadline,
client::ForceCreateTransaction{strategy.force_create});
}

bool PgClientSession::ReadPointHistory::Restore(
ConsistentReadPoint* read_point, uint64_t read_time_serial_no) {
auto result = false;
Expand Down Expand Up @@ -903,7 +948,7 @@ PgClientSession::PgClientSession(
context_(context),
shared_this_(std::shared_ptr<PgClientSession>(std::move(shared_this_source), this)),
id_(id),
transaction_builder_(std::move(transaction_builder)),
transaction_provider_(std::move(transaction_builder)),
big_shared_mem_expiration_task_(&scheduler),
read_point_history_(PrefixLogger(id_)) {}

Expand Down Expand Up @@ -1613,20 +1658,12 @@ Status PgClientSession::UpdateReadPointForXClusterConsistentReads(

Result<const PgClientSession::SessionData&> PgClientSession::BeginPgSessionLevelTxnIfNecessary(
CoarseTimePoint deadline) {
EnsureSession(PgClientSessionKind::kPgSession, deadline);
auto& session_data = GetSessionData(PgClientSessionKind::kPgSession);
constexpr auto kSessionKind = PgClientSessionKind::kPgSession;
EnsureSession(kSessionKind, deadline);
auto& session_data = GetSessionData(kSessionKind);
auto& txn = session_data.transaction;
if (!txn) {
// The transaction coordinator needs to know that this is a session level transaction as the
// handling on deadlocks and heartbeats etc are different for regular docdb transactions and
// session level transactions. Hence, we create a new transaction instead of using a ready
// transaction (whose state at the coordinator would be different from what we want to set).
//
// Advisory locks table is not placement local, hence we need a global transaction for tagging
// the requested session advisory locks.
txn = transaction_builder_(
IsDDL::kFalse, client::ForceGlobalTransaction::kTrue, deadline,
client::ForceCreateTransaction::kTrue);
txn = transaction_provider_.Take<kSessionKind>(deadline);
txn->SetLogPrefixTag(kTxnLogPrefixTag, id_);
txn->InitPgSessionRequestVersion();
// Isolation level doesn't matter but we need to set it for conflict resolution to not treat
Expand Down Expand Up @@ -1814,8 +1851,9 @@ Status PgClientSession::DoBeginTransactionIfNecessary(
const auto isolation = static_cast<IsolationLevel>(options.isolation());

auto priority = options.priority();
auto& session = EnsureSession(PgClientSessionKind::kPlain, deadline);
auto& txn = GetSessionData(PgClientSessionKind::kPlain).transaction;
constexpr auto kSessionKind = PgClientSessionKind::kPlain;
auto& session = EnsureSession(kSessionKind, deadline);
auto& txn = GetSessionData(kSessionKind).transaction;
if (txn && txn_serial_no_ != options.txn_serial_no()) {
VLOG_WITH_PREFIX(2)
<< "Abort previous transaction, use existing priority: " << options.use_existing_priority()
Expand Down Expand Up @@ -1849,9 +1887,9 @@ Status PgClientSession::DoBeginTransactionIfNecessary(
: Status::OK();
}

txn = transaction_builder_(
IsDDL::kFalse, client::ForceGlobalTransaction(options.force_global_transaction()), deadline,
client::ForceCreateTransaction::kFalse);
TransactionProvider::EnsureGlobal ensure_global{false};
std::tie(txn, ensure_global) = transaction_provider_.Take<kSessionKind>(
client::ForceGlobalTransaction{options.force_global_transaction()}, deadline);
txn->SetLogPrefixTag(kTxnLogPrefixTag, id_);
RETURN_NOT_OK(txn->SetPgTxnStart(options.pg_txn_start_us()));
if ((isolation == IsolationLevel::SNAPSHOT_ISOLATION ||
Expand All @@ -1868,6 +1906,9 @@ Status PgClientSession::DoBeginTransactionIfNecessary(
<< ", new read time";
RETURN_NOT_OK(txn->Init(isolation));
}
if (ensure_global) {
RETURN_NOT_OK(txn->EnsureGlobal(deadline));
}

RETURN_NOT_OK(UpdateReadPointForXClusterConsistentReads(options, deadline, &txn->read_point()));

Expand All @@ -1887,16 +1928,16 @@ Result<const TransactionMetadata*> PgClientSession::GetDdlTransactionMetadata(
return nullptr;
}

auto& txn = GetSessionData(PgClientSessionKind::kDdl).transaction;
constexpr auto kSessionKind = PgClientSessionKind::kDdl;
auto& txn = GetSessionData(kSessionKind).transaction;
if (!txn) {
const auto isolation = FLAGS_ysql_serializable_isolation_for_ddl_txn
? IsolationLevel::SERIALIZABLE_ISOLATION : IsolationLevel::SNAPSHOT_ISOLATION;
txn = transaction_builder_(IsDDL::kTrue, client::ForceGlobalTransaction::kTrue, deadline,
client::ForceCreateTransaction::kFalse);
txn = transaction_provider_.Take<kSessionKind>(deadline);
RETURN_NOT_OK(txn->Init(isolation));
txn->SetLogPrefixTag(kTxnLogPrefixTag, id_);
ddl_txn_metadata_ = VERIFY_RESULT(Copy(txn->GetMetadata(deadline).get()));
EnsureSession(PgClientSessionKind::kDdl, deadline)->SetTransaction(txn);
EnsureSession(kSessionKind, deadline)->SetTransaction(txn);
}

return &ddl_txn_metadata_;
Expand Down
44 changes: 43 additions & 1 deletion src/yb/tserver/pg_client_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,53 @@ class PgClientSession final {
std::unordered_map<uint64_t, ConsistentReadPoint::Momento> read_points_;
};

class TransactionProvider {
public:
YB_STRONGLY_TYPED_BOOL(EnsureGlobal);
using TakeForPlainReturnType = std::pair<client::YBTransactionPtr, EnsureGlobal>;

explicit TransactionProvider(TransactionBuilder&& builder);

template<PgClientSessionKind kind, class... Args>
requires(
kind == PgClientSessionKind::kPlain ||
kind == PgClientSessionKind::kDdl ||
kind == PgClientSessionKind::kPgSession)
auto Take(Args&&... args) {
if constexpr (kind == PgClientSessionKind::kPlain) {
return TakeForPlain(std::forward<Args>(args)...);
} else if constexpr (kind == PgClientSessionKind::kDdl) {
return TakeForDdl(std::forward<Args>(args)...);
} else if constexpr (kind == PgClientSessionKind::kPgSession) {
return TakeForPgSession(std::forward<Args>(args)...);
}
}

const TransactionId& NextTxnIdForPlain(CoarseTimePoint deadline);

private:
struct BuildStrategy {
bool is_ddl = false;
bool force_global = false;
bool force_create = false;
};

client::YBTransactionPtr TakeForPgSession(CoarseTimePoint deadline);
client::YBTransactionPtr TakeForDdl(CoarseTimePoint deadline);
TakeForPlainReturnType TakeForPlain(
client::ForceGlobalTransaction force_global, CoarseTimePoint deadline);

client::YBTransactionPtr Build(CoarseTimePoint deadline, const BuildStrategy& strategy);

const TransactionBuilder builder_;
client::YBTransactionPtr next_plain_;
};

client::YBClient& client_;
const PgClientSessionContext& context_;
const std::weak_ptr<PgClientSession> shared_this_;
const uint64_t id_;
const TransactionBuilder transaction_builder_;
TransactionProvider transaction_provider_;
std::mutex big_shared_mem_mutex_;
std::atomic<CoarseTimePoint> last_big_shared_memory_access_;
SharedMemorySegmentHandle big_shared_mem_handle_ GUARDED_BY(big_shared_mem_mutex_);
Expand Down

0 comments on commit f9d389d

Please sign in to comment.