diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgTransparentRestarts.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgTransparentRestarts.java index f4b3db8d4264..d637d8815ffc 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgTransparentRestarts.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgTransparentRestarts.java @@ -383,6 +383,58 @@ public PreparedStatement createStatement(Connection conn) throws Exception { }.runTest(); } + /** + * No restarts expected when yb_read_after_commit_visibility is relaxed. + * Note that this is a long read that exceeds the output buffer size. + */ + @Test + public void selectStarLong_relaxedReadAfterCommitVisibility() throws Exception { + new RegularStatementTester( + getConnectionBuilder(), + "SELECT * FROM test_rr", + getLongString(), + false /* expectRestartErrors */ + ) { + @Override + public String getReadAfterCommitVisibility() { + return "relaxed"; + } + }.runTest(); + } + + /* + * Ensures that we need not set the yb_read_after_commit_visibility GUC + * before the prepare statement. + * This test guards us against scenarios where the GUC + * is captured in the prepared statement to be used during execution. + * + * Example: pg_hint_plan is a planner time configuration and is to be + * captured in the prepare phase to be used during execution. + * + * Also, use simple query mode to test that simultaneously. + */ + @Test + public void selectStarLongExecute_relaxedReadAfterCommitVisibility() throws Exception { + new RegularStatementTester( + getConnectionBuilder().withPreferQueryMode("simple"), + "EXECUTE select_stmt(0)", + getLongString(), + false /* expectRestartErrors */) { + + @Override + public Statement createStatement(Connection conn) throws Exception { + Statement stmt = super.createStatement(conn); + stmt.execute("PREPARE select_stmt (int) AS SELECT * FROM test_rr WHERE i >= $1"); + return stmt; + }; + + @Override + public String getReadAfterCommitVisibility() { + return "relaxed"; + } + }.runTest(); + } + /** * The following two methods attempt to test retries on kReadRestart for all below combinations - * 1. Type of statement - UPDATE/DELETE. @@ -886,9 +938,16 @@ private Boolean expectConflictErrors(IsolationLevel isolation) throws Exception !is_wait_on_conflict_concurrency_control; } + // Override this function to set a different read window behavior. + public String getReadAfterCommitVisibility() { + return "strict"; + } + @Override public List<ThrowingRunnable> getRunnableThreads( ConnectionBuilder cb, BooleanSupplier isExecutionDone) { + String setReadAfterCommitVisibility = "SET yb_read_after_commit_visibility TO "; + List<ThrowingRunnable> runnables = new ArrayList<>(); // // Singular SELECT statement (equal probability of being either serializable/repeatable read/ @@ -926,6 +985,14 @@ public List<ThrowingRunnable> getRunnableThreads( auxSerializableStatement.execute(LOG_RESTARTS_SQL); auxRrStatement.execute(LOG_RESTARTS_SQL); auxRcStatement.execute(LOG_RESTARTS_SQL); + + // SET yb_read_after_commit_visibility + auxSerializableStatement.execute( + setReadAfterCommitVisibility + getReadAfterCommitVisibility()); + auxRrStatement.execute( + setReadAfterCommitVisibility + getReadAfterCommitVisibility()); + auxRcStatement.execute( + setReadAfterCommitVisibility + getReadAfterCommitVisibility()); } for (/* No setup */; !isExecutionDone.getAsBoolean(); /* NOOP */) { @@ -1020,8 +1087,18 @@ public List<ThrowingRunnable> getRunnableThreads( Stmt stmt = createStatement(selectTxnConn)) { try (Statement auxStmt = selectTxnConn.createStatement()) { auxStmt.execute(LOG_RESTARTS_SQL); + + // SET yb_read_after_commit_visibility + auxStmt.execute(setReadAfterCommitVisibility + getReadAfterCommitVisibility()); } selectTxnConn.setAutoCommit(false); + // This is a read only txn, so setReadOnly. + // Moreover, yb_read_after_commit_visibility option relies on txn being read only. + if (isolation != IsolationLevel.SERIALIZABLE) { + // SERIALIZABLE, READ ONLY txns are not actually serializable + // txns and we wish to test SERIALIZABLE txns too. + selectTxnConn.setReadOnly(true); + } for (/* No setup */; !isExecutionDone.getAsBoolean(); ++txnsAttempted) { int numCompletedOps = 0; try { diff --git a/src/postgres/src/backend/access/transam/xact.c b/src/postgres/src/backend/access/transam/xact.c index 5e45b0827f24..195d049ec32b 100644 --- a/src/postgres/src/backend/access/transam/xact.c +++ b/src/postgres/src/backend/access/transam/xact.c @@ -3829,6 +3829,10 @@ BeginTransactionBlock(void) BlockStateAsString(s->blockState)); break; } + + /* YB: Notify pggate that we are within a txn block. */ + if (IsYugaByteEnabled()) + HandleYBStatus(YBCPgSetInTxnBlock(true)); } /* @@ -4157,6 +4161,10 @@ BeginImplicitTransactionBlock(void) */ if (s->blockState == TBLOCK_STARTED) s->blockState = TBLOCK_IMPLICIT_INPROGRESS; + + /* YB: Notify pggate that we are within an (implicit) txn block. */ + if (IsYugaByteEnabled()) + HandleYBStatus(YBCPgSetInTxnBlock(true)); } /* @@ -5091,7 +5099,7 @@ CommitSubTransaction(void) /* Conserve sticky object count before popping transaction state. */ s->parent->ybUncommittedStickyObjectCount = s->ybUncommittedStickyObjectCount; - + PopTransaction(); } diff --git a/src/postgres/src/backend/executor/execMain.c b/src/postgres/src/backend/executor/execMain.c index 54bc967bac69..c167a92d1173 100644 --- a/src/postgres/src/backend/executor/execMain.c +++ b/src/postgres/src/backend/executor/execMain.c @@ -162,8 +162,10 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) * We have lower-level defenses in CommandCounterIncrement and elsewhere * against performing unsafe operations in parallel mode, but this gives a * more user-friendly error message. + * + * YB: We also notify pggate whether the statement is read only. */ - if ((XactReadOnly || IsInParallelMode()) && + if ((IsYugaByteEnabled() || XactReadOnly || IsInParallelMode()) && !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) ExecCheckXactReadOnly(queryDesc->plannedstmt); @@ -762,11 +764,14 @@ ExecCheckRTEPermsModified(Oid relOid, Oid userid, Bitmapset *modifiedCols, * Note: in a Hot Standby this would need to reject writes to temp * tables just as we do in parallel mode; but an HS standby can't have created * any temp tables in the first place, so no need to check that. + * + * YB: We also notify pggate whether the statement is read only. */ static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt) { ListCell *l; + bool yb_is_read_only = true; /* * Fail if write permissions are requested in parallel mode for table @@ -786,10 +791,21 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt) continue; PreventCommandIfReadOnly(CreateCommandTag((Node *) plannedstmt)); + yb_is_read_only = false; } if (plannedstmt->commandType != CMD_SELECT || plannedstmt->hasModifyingCTE) + { PreventCommandIfParallelMode(CreateCommandTag((Node *) plannedstmt)); + yb_is_read_only = false; + } + + if (IsYugaByteEnabled()) + { + if (plannedstmt->rowMarks) + yb_is_read_only = false; + HandleYBStatus(YBCPgSetReadOnlyStmt(yb_is_read_only)); + } } diff --git a/src/postgres/src/backend/utils/misc/guc.c b/src/postgres/src/backend/utils/misc/guc.c index 668d104c6c13..37dabf25be60 100644 --- a/src/postgres/src/backend/utils/misc/guc.c +++ b/src/postgres/src/backend/utils/misc/guc.c @@ -219,6 +219,7 @@ static bool check_transaction_priority_upper_bound(double *newval, void **extra, extern void YBCAssignTransactionPriorityUpperBound(double newval, void* extra); extern double YBCGetTransactionPriority(); extern TxnPriorityRequirement YBCGetTransactionPriorityType(); +static bool yb_check_no_txn(int* newval, void **extra, GucSource source); static void assign_yb_pg_batch_detection_mechanism(int new_value, void *extra); static void assign_ysql_upgrade_mode(bool newval, void *extra); @@ -485,6 +486,12 @@ static struct config_enum_entry shared_memory_options[] = { {NULL, 0, false} }; +const struct config_enum_entry yb_read_after_commit_visibility_options[] = { + {"strict", YB_STRICT_READ_AFTER_COMMIT_VISIBILITY, false}, + {"relaxed", YB_RELAXED_READ_AFTER_COMMIT_VISIBILITY, false}, + {NULL, 0, false} +}; + /* * Options for enum values stored in other modules */ @@ -5444,6 +5451,46 @@ static struct config_enum ConfigureNamesEnum[] = NULL, assign_yb_pg_batch_detection_mechanism, NULL }, + { + /* + * Read-after-commit-visibility guarantee: any client issued read + * should see all data that was committed before the read request + * was issued (even in the presence of clock skew between nodes). + * In other words, the following example should always work: + * (1) User X commits some data (for which the db picks a commit + * timestamp say ht1) + * (2) Then user X communicates to user Y to inform about the commit + * via a channel outside the database (say a phone call) + * (3) Then user Y issues a read to some YB node which picks a + * read time (less than ht1 due to clock skew) + * (4) Then it should not happen that user Y gets an output without + * the data that user Y was informed about. + */ + { + "yb_read_after_commit_visibility", PGC_USERSET, CUSTOM_OPTIONS, + gettext_noop("Control read-after-commit-visibility guarantee."), + gettext_noop( + "This GUC is intended as a crutch for users migrating from PostgreSQL and new to" + " read restart errors. Users can now largely avoid these errors when" + " read-after-commit-visibility guarantee is not a strong requirement." + " This option cannot be set from within a transaction block." + " Configure one of the following options:" + " (a) strict: Default Behavior. The read-after-commit-visibility guarantee is" + " maintained by the database. However, users may see read restart errors that" + " show \"ERROR: Query error: Restart read required at: ...\". The database" + " attempts to retry on such errors internally but that is not always possible." + " (b) relaxed: With this option, the read-after-commit-visibility guarantee is" + " relaxed. Read only statements/transactions do not see read restart errors but" + " may miss recent updates with staleness bounded by clock skew." + ), + 0 + }, + &yb_read_after_commit_visibility, + YB_STRICT_READ_AFTER_COMMIT_VISIBILITY, + yb_read_after_commit_visibility_options, + yb_check_no_txn, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL @@ -12807,5 +12854,22 @@ yb_check_toast_catcache_threshold(int *newVal, void **extra, GucSource source) return true; } +/* + * YB: yb_check_no_txn + * + * Do not allow users to set yb_read_after_commit_visibility + * from within a txn block. + */ +static bool +yb_check_no_txn(int *newVal, void **extra, GucSource source) +{ + if (IsTransactionBlock()) + { + GUC_check_errdetail("Cannot be set within a txn block."); + return false; + } + return true; +} + #include "guc-file.c" diff --git a/src/yb/common/common_fwd.h b/src/yb/common/common_fwd.h index df5e40940f7f..b68ba71a91fc 100644 --- a/src/yb/common/common_fwd.h +++ b/src/yb/common/common_fwd.h @@ -21,6 +21,7 @@ #include "yb/common/ql_protocol.fwd.h" #include "yb/common/redis_protocol.fwd.h" #include "yb/common/wire_protocol.fwd.h" +#include "yb/util/strongly_typed_bool.h" namespace yb { @@ -65,6 +66,8 @@ enum class DataType; enum class SortingType; +YB_STRONGLY_TYPED_BOOL(ClampUncertaintyWindow); + namespace common { class Jsonb; diff --git a/src/yb/common/consistent_read_point.cc b/src/yb/common/consistent_read_point.cc index f27d7698e1d0..4a5decac7d27 100644 --- a/src/yb/common/consistent_read_point.cc +++ b/src/yb/common/consistent_read_point.cc @@ -33,8 +33,11 @@ void ConsistentReadPoint::SetReadTimeUnlocked( restarts_.clear(); } -void ConsistentReadPoint::SetCurrentReadTimeUnlocked() { - SetReadTimeUnlocked(ReadHybridTime::FromHybridTimeRange(clock_->NowRange())); +void ConsistentReadPoint::SetCurrentReadTimeUnlocked(const ClampUncertaintyWindow clamp) { + SetReadTimeUnlocked( + clamp + ? ReadHybridTime::SingleTime(clock_->Now()) + : ReadHybridTime::FromHybridTimeRange(clock_->NowRange())); } void ConsistentReadPoint::SetReadTime( @@ -43,9 +46,9 @@ void ConsistentReadPoint::SetReadTime( SetReadTimeUnlocked(read_time, &local_limits); } -void ConsistentReadPoint::SetCurrentReadTime() { +void ConsistentReadPoint::SetCurrentReadTime(const ClampUncertaintyWindow clamp) { std::lock_guard lock(mutex_); - SetCurrentReadTimeUnlocked(); + SetCurrentReadTimeUnlocked(clamp); } Status ConsistentReadPoint::TrySetDeferredCurrentReadTime() { diff --git a/src/yb/common/consistent_read_point.h b/src/yb/common/consistent_read_point.h index 1f4147563bce..d06fd0b97fd4 100644 --- a/src/yb/common/consistent_read_point.h +++ b/src/yb/common/consistent_read_point.h @@ -41,7 +41,9 @@ class ConsistentReadPoint { void MoveFrom(ConsistentReadPoint* rhs); // Set the current time as the read point. - void SetCurrentReadTime() EXCLUDES(mutex_); + // No uncertainty window when clamp is set. + void SetCurrentReadTime( + const ClampUncertaintyWindow clamp = ClampUncertaintyWindow::kFalse) EXCLUDES(mutex_); // If read point is not set, use the current time as the read point and defer it to the global // limit. If read point was already set, return error if it is not deferred. @@ -91,7 +93,8 @@ class ConsistentReadPoint { private: inline void SetReadTimeUnlocked( const ReadHybridTime& read_time, HybridTimeMap* local_limits = nullptr) REQUIRES(mutex_); - void SetCurrentReadTimeUnlocked() REQUIRES(mutex_); + void SetCurrentReadTimeUnlocked( + const ClampUncertaintyWindow clamp = ClampUncertaintyWindow::kFalse) REQUIRES(mutex_); void UpdateLimitsMapUnlocked( const TabletId& tablet, const HybridTime& local_limit, HybridTimeMap* map) REQUIRES(mutex_); void RestartRequiredUnlocked(const TabletId& tablet, const ReadHybridTime& restart_time) diff --git a/src/yb/integration-tests/mini_cluster.cc b/src/yb/integration-tests/mini_cluster.cc index 04b78c1f2826..feaa05752631 100644 --- a/src/yb/integration-tests/mini_cluster.cc +++ b/src/yb/integration-tests/mini_cluster.cc @@ -874,6 +874,11 @@ server::SkewedClockDeltaChanger JumpClock( return server::SkewedClockDeltaChanger(delta, skewed_clock); } +server::SkewedClockDeltaChanger JumpClock( + tserver::MiniTabletServer* server, std::chrono::milliseconds delta) { + return JumpClock(server->server(), delta); +} + std::vector<server::SkewedClockDeltaChanger> SkewClocks( MiniCluster* cluster, std::chrono::milliseconds clock_skew) { std::vector<server::SkewedClockDeltaChanger> delta_changers; diff --git a/src/yb/integration-tests/mini_cluster.h b/src/yb/integration-tests/mini_cluster.h index a4e51d332b52..c864e90ff84e 100644 --- a/src/yb/integration-tests/mini_cluster.h +++ b/src/yb/integration-tests/mini_cluster.h @@ -298,6 +298,15 @@ class MiniCluster : public MiniClusterBase { PortPicker port_picker_; }; +// Requires that skewed clock is registered as physical clock. +// Jumps the physical clock by delta +// i.e. no effect on hybrid ts unless by physical clock. +// new_clock = old_clock + delta (clocks are moving). +// Returns an RAII structure that resets the delta change +// when it goes out of scope. +server::SkewedClockDeltaChanger JumpClock( + tserver::MiniTabletServer* server, std::chrono::milliseconds delta); + MUST_USE_RESULT std::vector<server::SkewedClockDeltaChanger> SkewClocks( MiniCluster* cluster, std::chrono::milliseconds clock_skew); diff --git a/src/yb/tserver/pg_client.proto b/src/yb/tserver/pg_client.proto index d64fe4cad646..244dc5c8f16b 100644 --- a/src/yb/tserver/pg_client.proto +++ b/src/yb/tserver/pg_client.proto @@ -607,6 +607,12 @@ message PgPerformOptionsPB { int64 pg_txn_start_us = 22; AshMetadataPB ash_metadata = 23; + + // When set, + // - Sets the read time locally. + // - Clamps the read uncertainty window. + // See the commit desc for more info. + bool clamp_uncertainty_window = 24; } message PgPerformRequestPB { diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index dc293008593a..5869706170b1 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -1152,7 +1152,8 @@ Status PgClientSession::DoPerform(const DataPtr& data, CoarseTimePoint deadline, } void PgClientSession::ProcessReadTimeManipulation( - ReadTimeManipulation manipulation, uint64_t read_time_serial_no) { + ReadTimeManipulation manipulation, uint64_t read_time_serial_no, + ClampUncertaintyWindow clamp) { VLOG_WITH_PREFIX(2) << "ProcessReadTimeManipulation: " << manipulation << ", read_time_serial_no: " << read_time_serial_no << ", read_time_serial_no_: " << read_time_serial_no_; @@ -1165,7 +1166,8 @@ void PgClientSession::ProcessReadTimeManipulation( return; case ReadTimeManipulation::ENSURE_READ_TIME_IS_SET : if (!read_point.GetReadTime() || read_time_serial_no_ != read_time_serial_no) { - read_point.SetCurrentReadTime(); + // Clamp read uncertainty window when requested by the query layer. + read_point.SetCurrentReadTime(clamp); VLOG(1) << "Setting current ht as read point " << read_point.GetReadTime(); } return; @@ -1274,7 +1276,9 @@ PgClientSession::SetupSession( RSTATUS_DCHECK( kind == PgClientSessionKind::kPlain, IllegalState, "Read time manipulation can't be specified for non kPlain sessions"); - ProcessReadTimeManipulation(options.read_time_manipulation(), read_time_serial_no); + ProcessReadTimeManipulation( + options.read_time_manipulation(), read_time_serial_no, + ClampUncertaintyWindow(options.clamp_uncertainty_window())); } else if (options.has_read_time() && options.read_time().has_read_ht()) { const auto read_time = ReadHybridTime::FromPB(options.read_time()); session.SetReadPoint(read_time); @@ -1316,6 +1320,21 @@ PgClientSession::SetupSession( // TODO: Shouldn't the below logic for DDL transactions as well? session.SetInTxnLimit(in_txn_limit); } + + if (options.clamp_uncertainty_window() + && !session.read_point()->GetReadTime()) { + RSTATUS_DCHECK( + !(transaction && transaction->isolation() == SERIALIZABLE_ISOLATION), + IllegalState, "Clamping does not apply to SERIALIZABLE txns."); + // Set read time with clamped uncertainty window when reqeusted by + // the query layer. + // Do not mess with the read time if already set. + session.read_point()->SetCurrentReadTime(ClampUncertaintyWindow::kTrue); + VLOG_WITH_PREFIX_AND_FUNC(2) + << "Setting read time to " + << session.read_point()->GetReadTime() + << " for read only txn/stmt"; + } } session.SetDeadline(deadline); diff --git a/src/yb/tserver/pg_client_session.h b/src/yb/tserver/pg_client_session.h index 84a4a9120137..fae27c786dce 100644 --- a/src/yb/tserver/pg_client_session.h +++ b/src/yb/tserver/pg_client_session.h @@ -191,7 +191,8 @@ class PgClientSession { Status ProcessResponse( const PgClientSessionOperations& operations, const PgPerformRequestPB& req, PgPerformResponsePB* resp, rpc::RpcContext* context); - void ProcessReadTimeManipulation(ReadTimeManipulation manipulation, uint64_t txn_serial_no); + void ProcessReadTimeManipulation(ReadTimeManipulation manipulation, uint64_t txn_serial_no, + ClampUncertaintyWindow clamp); client::YBClient& client(); client::YBSessionPtr& EnsureSession(PgClientSessionKind kind, CoarseTimePoint deadline); diff --git a/src/yb/yql/pggate/pg_txn_manager.cc b/src/yb/yql/pggate/pg_txn_manager.cc index 6cf5d7f28bb0..91ffcaec4638 100644 --- a/src/yb/yql/pggate/pg_txn_manager.cc +++ b/src/yb/yql/pggate/pg_txn_manager.cc @@ -185,6 +185,9 @@ Status PgTxnManager::BeginTransaction(int64_t start_time) { return STATUS(IllegalState, "Transaction is already in progress"); } pg_txn_start_us_ = start_time; + // NOTE: Do not reset in_txn_blk_ when restarting txns internally + // (i.e., via PgTxnManager::RecreateTransaction). + in_txn_blk_ = false; return RecreateTransaction(SavePriority::kFalse); } @@ -255,6 +258,20 @@ Status PgTxnManager::SetDeferrable(bool deferrable) { return Status::OK(); } +Status PgTxnManager::SetInTxnBlock(bool in_txn_blk) { + in_txn_blk_ = in_txn_blk; + VLOG_WITH_FUNC(2) << (in_txn_blk ? "In " : "Not in ") << "txn block."; + return Status::OK(); +} + +Status PgTxnManager::SetReadOnlyStmt(bool read_only_stmt) { + read_only_stmt_ = read_only_stmt; + VLOG_WITH_FUNC(2) + << "Executing a " << (read_only_stmt ? "read only " : "read/write ") + << "stmt."; + return Status::OK(); +} + uint64_t PgTxnManager::NewPriority(TxnPriorityRequirement txn_priority_requirement) { if (txn_priority_requirement == kHighestPriority) { return yb::kHighPriTxnUpperBound; @@ -420,6 +437,7 @@ void PgTxnManager::ResetTxnAndSession() { enable_tracing_ = false; read_time_for_follower_reads_ = HybridTime(); read_time_manipulation_ = tserver::ReadTimeManipulation::NONE; + read_only_stmt_ = false; } Status PgTxnManager::EnterSeparateDdlTxnMode() { @@ -525,6 +543,19 @@ void PgTxnManager::SetupPerformOptions( read_time_manipulation_ = tserver::ReadTimeManipulation::NONE; // pg_txn_start_us is similarly only used for kPlain transactions. options->set_pg_txn_start_us(pg_txn_start_us_); + // Only clamp read-only txns/stmts. + // Do not clamp in the serializable case since + // - SERIALIZABLE reads do not pick read time until later. + // - SERIALIZABLE reads do not observe read restarts anyways. + if (yb_read_after_commit_visibility + == YB_RELAXED_READ_AFTER_COMMIT_VISIBILITY + && isolation_level_ != IsolationLevel::SERIALIZABLE_ISOLATION) + // We clamp uncertainty window when + // - either we are working with a read only txn + // - or we are working with a read only stmt + // i.e. no txn block and a pure SELECT stmt. + options->set_clamp_uncertainty_window( + read_only_ || (!in_txn_blk_ && read_only_stmt_)); } if (read_time_for_follower_reads_) { ReadHybridTime::SingleTime(read_time_for_follower_reads_).ToPB(options->mutable_read_time()); diff --git a/src/yb/yql/pggate/pg_txn_manager.h b/src/yb/yql/pggate/pg_txn_manager.h index 2744a4d1d5c9..7afc1e7fd1de 100644 --- a/src/yb/yql/pggate/pg_txn_manager.h +++ b/src/yb/yql/pggate/pg_txn_manager.h @@ -73,6 +73,8 @@ class PgTxnManager : public RefCountedThreadSafe<PgTxnManager> { Status ExitSeparateDdlTxnModeWithAbort(); Status ExitSeparateDdlTxnModeWithCommit(uint32_t db_oid, bool is_silent_altering); void SetDdlHasSyscatalogChanges(); + Status SetInTxnBlock(bool in_txn_blk); + Status SetReadOnlyStmt(bool read_only_stmt); bool IsTxnInProgress() const { return txn_in_progress_; } IsolationLevel GetIsolationLevel() const { return isolation_level_; } @@ -127,6 +129,8 @@ class PgTxnManager : public RefCountedThreadSafe<PgTxnManager> { bool need_restart_ = false; bool need_defer_read_point_ = false; tserver::ReadTimeManipulation read_time_manipulation_ = tserver::ReadTimeManipulation::NONE; + bool in_txn_blk_ = false; + bool read_only_stmt_ = false; // Postgres transaction characteristics. PgIsolationLevel pg_isolation_level_ = PgIsolationLevel::REPEATABLE_READ; diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index 34e8e78ce84a..bd62ba971342 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -2135,6 +2135,14 @@ Status PgApiImpl::SetTransactionDeferrable(bool deferrable) { return pg_txn_manager_->SetDeferrable(deferrable); } +Status PgApiImpl::SetInTxnBlock(bool in_txn_blk) { + return pg_txn_manager_->SetInTxnBlock(in_txn_blk); +} + +Status PgApiImpl::SetReadOnlyStmt(bool read_only_stmt) { + return pg_txn_manager_->SetReadOnlyStmt(read_only_stmt); +} + Status PgApiImpl::EnterSeparateDdlTxnMode() { // Flush all buffered operations as ddl txn use its own transaction session. RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); diff --git a/src/yb/yql/pggate/pggate.h b/src/yb/yql/pggate/pggate.h index 13d4dcd11aae..eac067085e3c 100644 --- a/src/yb/yql/pggate/pggate.h +++ b/src/yb/yql/pggate/pggate.h @@ -651,6 +651,8 @@ class PgApiImpl { Status SetTransactionIsolationLevel(int isolation); Status SetTransactionReadOnly(bool read_only); Status SetTransactionDeferrable(bool deferrable); + Status SetInTxnBlock(bool in_txn_blk); + Status SetReadOnlyStmt(bool read_only_stmt); Status SetEnableTracing(bool tracing); Status EnableFollowerReads(bool enable_follower_reads, int32_t staleness_ms); Status EnterSeparateDdlTxnMode(); diff --git a/src/yb/yql/pggate/util/yb_guc.cc b/src/yb/yql/pggate/util/yb_guc.cc index 4bb46f0d667c..3d949d65d71d 100644 --- a/src/yb/yql/pggate/util/yb_guc.cc +++ b/src/yb/yql/pggate/util/yb_guc.cc @@ -73,3 +73,5 @@ int yb_reorderbuffer_max_changes_in_memory = 4096; uint64_t yb_read_time = 0; bool yb_is_read_time_ht = false; + +int yb_read_after_commit_visibility = 0; diff --git a/src/yb/yql/pggate/util/yb_guc.h b/src/yb/yql/pggate/util/yb_guc.h index 0150a0e623dc..1fe91886debe 100644 --- a/src/yb/yql/pggate/util/yb_guc.h +++ b/src/yb/yql/pggate/util/yb_guc.h @@ -176,6 +176,8 @@ extern int yb_reorderbuffer_max_changes_in_memory; */ extern int yb_walsender_poll_sleep_duration_empty_ms; +extern int yb_read_after_commit_visibility; + #ifdef __cplusplus } // extern "C" #endif diff --git a/src/yb/yql/pggate/util/ybc_util.h b/src/yb/yql/pggate/util/ybc_util.h index 5ca94f702476..4fd3348a221f 100644 --- a/src/yb/yql/pggate/util/ybc_util.h +++ b/src/yb/yql/pggate/util/ybc_util.h @@ -191,6 +191,29 @@ extern int yb_walsender_poll_sleep_duration_empty_ms; */ extern int yb_reorderbuffer_max_changes_in_memory; +/* + * Ease transition to YSQL by reducing read restart errors for new apps. + * + * This option doesn't affect SERIALIZABLE isolation level since + * SERIALIZABLE can't face read restart errors anyway. + * + * See the help text for yb_read_after_commit_visibility GUC for more + * information. + * + * XXX: This GUC is meant as a workaround only by relaxing the + * read-after-commit-visibility guarantee. Ideally, + * (a) Users should fix their apps to handle read restart errors, or + * (b) TODO(#22317): YB should use very accurate clocks to avoid read restart + * errors altogether. + */ +typedef enum { + YB_STRICT_READ_AFTER_COMMIT_VISIBILITY = 0, + YB_RELAXED_READ_AFTER_COMMIT_VISIBILITY = 1, +} YBReadAfterCommitVisibilityEnum; + +/* GUC for the enum above. */ +extern int yb_read_after_commit_visibility; + typedef struct YBCStatusStruct* YBCStatus; bool YBCStatusIsNotFound(YBCStatus s); diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index a7ca331e0bc8..fa7ffcd7ccd7 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -1706,6 +1706,14 @@ YBCStatus YBCPgSetTransactionDeferrable(bool deferrable) { return ToYBCStatus(pgapi->SetTransactionDeferrable(deferrable)); } +YBCStatus YBCPgSetInTxnBlock(bool in_txn_blk) { + return ToYBCStatus(pgapi->SetInTxnBlock(in_txn_blk)); +} + +YBCStatus YBCPgSetReadOnlyStmt(bool read_only_stmt) { + return ToYBCStatus(pgapi->SetReadOnlyStmt(read_only_stmt)); +} + YBCStatus YBCPgEnterSeparateDdlTxnMode() { return ToYBCStatus(pgapi->EnterSeparateDdlTxnMode()); } diff --git a/src/yb/yql/pggate/ybc_pggate.h b/src/yb/yql/pggate/ybc_pggate.h index 700547976444..b51c701aa2c9 100644 --- a/src/yb/yql/pggate/ybc_pggate.h +++ b/src/yb/yql/pggate/ybc_pggate.h @@ -661,6 +661,8 @@ YBCStatus YBCPgAbortPlainTransaction(); YBCStatus YBCPgSetTransactionIsolationLevel(int isolation); YBCStatus YBCPgSetTransactionReadOnly(bool read_only); YBCStatus YBCPgSetTransactionDeferrable(bool deferrable); +YBCStatus YBCPgSetInTxnBlock(bool in_txn_blk); +YBCStatus YBCPgSetReadOnlyStmt(bool read_only_stmt); YBCStatus YBCPgSetEnableTracing(bool tracing); YBCStatus YBCPgEnableFollowerReads(bool enable_follower_reads, int32_t staleness_ms); YBCStatus YBCPgEnterSeparateDdlTxnMode(); diff --git a/src/yb/yql/pgwrapper/pg_read_time-test.cc b/src/yb/yql/pgwrapper/pg_read_time-test.cc index a3e377c01176..1fcb40ac9a51 100644 --- a/src/yb/yql/pgwrapper/pg_read_time-test.cc +++ b/src/yb/yql/pgwrapper/pg_read_time-test.cc @@ -445,4 +445,139 @@ TEST_F(PgMiniTestBase, YB_DISABLE_TEST_IN_SANITIZERS(TestYSQLDumpAsOfTime)) { ASSERT_STR_EQ(ground_truth, dump_as_of_time); } +// Mimics the CheckReadTimePickingLocation test for the relaxed +// yb_read_after_commit_visibility case. +// +// There are two primary effects of relaxed yb_read_after_commit_visibility: +// - SELECTs now always pick their read time on local proxy. +// - The read time is clamped whenever it is picked this way (not relevant). +// +// This implies the following changes compared to the vanilla test +// - Case 1: no pipeline, single operation in first batch, no distributed txn. +// Read time is picked on proxy for SELECTs and not DMLs. +// - Case 3: no pipeline, multiple operations to the same tablet in first batch, no distributed txn. +// Read time is picked on proxy. +TEST_F(PgReadTimeTest, CheckRelaxedReadAfterCommitVisibility) { + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute("SET DEFAULT_TRANSACTION_ISOLATION TO \"REPEATABLE READ\"")); + constexpr auto kTable = "test"sv; + constexpr auto kSingleTabletTable = "test_with_single_tablet"sv; + ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (k INT PRIMARY KEY, v INT)", kTable)); + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0 (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS", kSingleTabletTable)); + + for (const auto& table_name : {kTable, kSingleTabletTable}) { + ASSERT_OK(conn.ExecuteFormat( + "INSERT INTO $0 SELECT generate_series(1, 100), 0", table_name)); + ASSERT_OK(conn.ExecuteFormat( + "CREATE OR REPLACE PROCEDURE insert_rows_$0(first integer, last integer) " + "LANGUAGE plpgsql " + "as $$body$$ " + "BEGIN " + " FOR i in first..last LOOP " + " INSERT INTO $0 VALUES (i, i); " + " END LOOP; " + "END; " + "$$body$$", table_name)); + } + + // Relax read-after-commit-visiblity guarantee. + ASSERT_OK(conn.Execute("SET yb_read_after_commit_visibility TO relaxed")); + + // 1. no pipeline, single operation in first batch, no distributed txn + // + // relaxed yb_read_after_commit_visibility does not affect DML queries. + for (const auto& table_name : {kTable, kSingleTabletTable}) { + CheckReadTimeProvidedToDocdb( + [&conn, table_name]() { + ASSERT_OK(conn.FetchFormat("SELECT * FROM $0 WHERE k=1", table_name)); + }); + + CheckReadTimePickedOnDocdb( + [&conn, table_name]() { + ASSERT_OK(conn.ExecuteFormat("UPDATE $0 SET v=1 WHERE k=1", table_name)); + }); + + CheckReadTimePickedOnDocdb( + [&conn, table_name]() { + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (1000, 1000)", table_name)); + }); + + CheckReadTimePickedOnDocdb( + [&conn, table_name]() { + ASSERT_OK(conn.ExecuteFormat("DELETE FROM $0 WHERE k=1000", table_name)); + }); + } + + // 2. no pipeline, multiple operations to various tablets in first batch, no distributed txn + CheckReadTimeProvidedToDocdb( + [&conn, kTable]() { + ASSERT_OK(conn.FetchFormat("SELECT COUNT(*) FROM $0", kTable)); + }); + + // 3. no pipeline, multiple operations to the same tablet in first batch, no distributed txn + CheckReadTimeProvidedToDocdb( + [&conn, kSingleTabletTable]() { + ASSERT_OK(conn.FetchFormat("SELECT COUNT(*) FROM $0", kSingleTabletTable)); + }); + + // 4. no pipeline, single operation in first batch, starts a distributed transation + // + // expected_num_picked_read_time_on_doc_db_metric is set because in case of a SELECT FOR UPDATE, + // a read time is picked in read_query.cc, but an extra picking is done in write_query.cc just + // after conflict resolution is done (see DoTransactionalConflictsResolved()). + // + // relaxed yb_read_after_commit_visibility does not affect FOR UDPATE queries. + CheckReadTimePickedOnDocdb( + [&conn, kTable]() { + ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION)); + ASSERT_OK(conn.FetchFormat("SELECT * FROM $0 WHERE k=1 FOR UPDATE", kTable)); + ASSERT_OK(conn.CommitTransaction()); + }, 2 /* expected_num_picked_read_time_on_doc_db_metric */); + + // 5. no pipeline, multiple operations to various tablets in first batch, starts a distributed + // transation + ASSERT_OK(SetHighMaxBatchSize(&conn)); + CheckReadTimeProvidedToDocdb( + [&conn, kTable]() { + ASSERT_OK(conn.ExecuteFormat("CALL insert_rows_$0(101, 110)", kTable)); + }); + ASSERT_OK(ResetMaxBatchSize(&conn)); + + // 6. no pipeline, multiple operations to the same tablet in first batch, starts a distributed + // transation + ASSERT_OK(SetHighMaxBatchSize(&conn)); + CheckReadTimeProvidedToDocdb( + [&conn, kSingleTabletTable]() { + ASSERT_OK(conn.ExecuteFormat("CALL insert_rows_$0(101, 110)", kSingleTabletTable)); + }); + ASSERT_OK(ResetMaxBatchSize(&conn)); + + // 7. Pipeline, single operation in first batch, starts a distributed transation + ASSERT_OK(SetMaxBatchSize(&conn, 1)); + CheckReadTimeProvidedToDocdb( + [&conn, kTable]() { + ASSERT_OK(conn.ExecuteFormat("CALL insert_rows_$0(111, 120)", kTable)); + }); + ASSERT_OK(ResetMaxBatchSize(&conn)); + + // 8. Pipeline, multiple operations to various tablets in first batch, starts a distributed + // transation + ASSERT_OK(SetMaxBatchSize(&conn, 10)); + CheckReadTimeProvidedToDocdb( + [&conn, kTable]() { + ASSERT_OK(conn.ExecuteFormat("CALL insert_rows_$0(121, 150)", kTable)); + }); + ASSERT_OK(ResetMaxBatchSize(&conn)); + + // 9. Pipeline, multiple operations to the same tablet in first batch, starts a distributed + // transation + ASSERT_OK(SetMaxBatchSize(&conn, 10)); + CheckReadTimeProvidedToDocdb( + [&conn, kSingleTabletTable]() { + ASSERT_OK(conn.ExecuteFormat("CALL insert_rows_$0(121, 150)", kSingleTabletTable)); + }); + ASSERT_OK(ResetMaxBatchSize(&conn)); +} + } // namespace yb::pgwrapper diff --git a/src/yb/yql/pgwrapper/pg_single_tserver-test.cc b/src/yb/yql/pgwrapper/pg_single_tserver-test.cc index e1fa2be1ff1d..a67d1f54f98b 100644 --- a/src/yb/yql/pgwrapper/pg_single_tserver-test.cc +++ b/src/yb/yql/pgwrapper/pg_single_tserver-test.cc @@ -912,6 +912,26 @@ TEST_F(PgSingleTServerTest, PagingSelectWithDelayedIntentsApply) { } } +// YB picks read time as safe time when the read time is picked on DocDB. +// Safe time is potentially a time point in the past and we should not ignore +// the uncertainty window for that reason. +// This test guards against SELECTs not seeing prior INSERTs from same session +// in relaxed read-after-commit-visibility for the above mentioned reason. +TEST_F(PgSingleTServerTest, NoSafeTimeClampingInRelaxedReadAfterCommit) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_inject_sleep_before_applying_intents_ms) = 100; + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute("SET yb_read_after_commit_visibility ='relaxed'")); + ASSERT_OK(conn.Execute("CREATE TABLE t (v INT) SPLIT INTO 2 TABLETS")); + for (int i = 0; i != 20; ++i) { + LOG(INFO) << "Delete iteration " << i; + ASSERT_OK(conn.Execute("DELETE FROM t")); + LOG(INFO) << "Insert iteration " << i; + ASSERT_OK(conn.Execute("INSERT INTO t VALUES (1)")); + LOG(INFO) << "Reading iteration " << i; + ASSERT_EQ(ASSERT_RESULT(conn.FetchRow<int32_t>("SELECT * FROM t")), 1); + } +} + TEST_F(PgSingleTServerTest, BoundedRangeScanWithLargeTransaction) { auto conn = ASSERT_RESULT(Connect()); ASSERT_OK(conn.Execute("CREATE TABLE t (r1 INT, r2 INT, PRIMARY KEY (r1 ASC, r2 ASC))")); diff --git a/src/yb/yql/pgwrapper/pg_txn-test.cc b/src/yb/yql/pgwrapper/pg_txn-test.cc index ba60d8622ded..a361fbdc7656 100644 --- a/src/yb/yql/pgwrapper/pg_txn-test.cc +++ b/src/yb/yql/pgwrapper/pg_txn-test.cc @@ -15,11 +15,19 @@ #include "yb/yql/pgwrapper/pg_mini_test_base.h" +#include "yb/common/pgsql_error.h" + +#include "yb/server/skewed_clock.h" + +#include "yb/tserver/mini_tablet_server.h" +#include "yb/tserver/tablet_server.h" + #include "yb/util/scope_exit.h" #include "yb/util/sync_point.h" #include "yb/util/test_macros.h" #include "yb/util/test_thread_holder.h" #include "yb/util/tsan_util.h" +#include "yb/util/yb_pg_errcodes.h" using std::string; @@ -28,6 +36,9 @@ using namespace std::literals; DECLARE_bool(TEST_fail_in_apply_if_no_metadata); DECLARE_bool(yb_enable_read_committed_isolation); DECLARE_bool(enable_wait_queues); +DECLARE_string(time_source); +DECLARE_int32(replication_factor); +DECLARE_bool(TEST_running_test); namespace yb { namespace pgwrapper { @@ -409,5 +420,515 @@ TEST_F_EX( PgTxnTest, SelectForUpdateExclusiveRead, PgTxnTestFailOnConflict) { EXPECT_OK(setup_conn.CommitTransaction()); } +// Helper class to test the semantics of yb_read_after_commit_visibility option. +// +// Additional infrastructure was required for the test. +// +// The test requires us to simulate two connections to separate postmaster +// processes on different tservers. Usually, we could get away with +// ExternalMiniCluster if we required two different postmaster processes. +// However, the test also requires that we register skewed clocks and jump +// the clocks as necessary. +// +// Here, we take the easier approach of using a MiniCluster (that supports +// skewed clocks out of the box) and then simulate multiple postmaster +// processes by explicitly spawning PgSupervisor processes for each tserver. +// +// Typical setup: +// 1. MiniCluster with 2 tservers. +// 2. One server hosts a test table with single tablet and RF 1. +// 3. The other server, proxy, is blacklisted to control hybrid propagation. +// This is the node that the external client connects to, for the read +// query and "expects" to the see the recent commit. +// 4. Ensure that the proxy is also not on the master node. +// 5. Pre-populate the catalog cache so there are no surprise communications +// between the servers. +// 6. Register skewed clocks. We only jump the clock on the node that hosts +// the data. +// +// Additional considerations/caveats: +// - Register a thread prefix for each supervisor. +// Otherwise, the callback registration fails with name conflicts. +// - Do NOT use PgPickTabletServer. It does not work. Moreover, it is +// insufficient for our usecase even if it did work as intended. +class PgReadAfterCommitVisibilityTest : public PgMiniTestBase { + public: + void SetUp() override { + server::SkewedClock::Register(); + ANNOTATE_UNPROTECTED_WRITE(FLAGS_time_source) = server::SkewedClock::kName; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_replication_factor) = 1; + PgMiniTestBase::SetUp(); + SpawnSupervisors(); + } + + void DoTearDown() override { + // Exit supervisors cleanly ... + // Risk of false positive segfaults otherwise ... + for (auto&& supervisor : pg_supervisors_) { + if (supervisor) { + supervisor->Stop(); + } + } + PgMiniTestBase::DoTearDown(); + } + + size_t NumTabletServers() override { + // One server for a proxy and the other server to host the data. + return 2; + } + + void BeforePgProcessStart() override { + // Identify the tserver index that hosts the MiniCluster postmaster + // process so that we do NOT spawn a PgSupervisor for that tserver. + auto connParams = MakeConnSettings(); + auto ntservers = static_cast<int>(cluster_->num_tablet_servers()); + for (int idx = 0; idx < ntservers; idx++) { + auto server = cluster_->mini_tablet_server(idx); + if (server->bound_rpc_addr().address().to_string() == connParams.host) { + conn_idx_ = idx; + break; + } + } + } + + Result<PGConn> ConnectToIdx(int idx) const { + // postmaster hosted by PgMiniTestBase itself. + if (idx == conn_idx_) { + return Connect(); + } + + // We own the postmaster process for this tserver idx. + // Use the appropriate postmaster process to setup a pg connection. + auto connParams = PGConnSettings { + .host = pg_host_ports_[idx].host(), + .port = pg_host_ports_[idx].port() + }; + + auto result = VERIFY_RESULT(PGConnBuilder(connParams).Connect()); + RETURN_NOT_OK(SetupConnection(&result)); + return result; + } + + // Called for the first connection. + // Use ConnectToIdx() directly for subsequent connections. + Result<PGConn> ConnectToProxy() { + // Avoid proxy on the node that hosts the master because + // tservers and masters regularly exchange heartbeats with each other. + // This means there is constant hybrid time propagation between + // the master and the tservers. + // We wish to avoid hyrbid time from propagating to the proxy node. + if (static_cast<int>(cluster_->LeaderMasterIdx()) == proxy_idx_) { + return STATUS(IllegalState, "Proxy cannot be on the master node ..."); + } + + // Add proxy to the blacklist to limit hybrid time prop. + auto res = cluster_->AddTServerToBlacklist(proxy_idx_); + if (!res.ok()) { + return res; + } + + // Now, we are ready to connect to the proxy. + return ConnectToIdx(proxy_idx_); + } + + Result<PGConn> ConnectToDataHost() { + return ConnectToIdx(host_idx_); + } + + // Jump the clocks of the nodes hosting the data. + std::vector<server::SkewedClockDeltaChanger> JumpClockDataNodes( + std::chrono::milliseconds skew) { + std::vector<server::SkewedClockDeltaChanger> changers; + auto ntservers = static_cast<int>(cluster_->num_tablet_servers()); + for (int idx = 0; idx < ntservers; idx++) { + if (idx == proxy_idx_) { + continue; + } + changers.push_back(JumpClock(cluster_->mini_tablet_server(idx), skew)); + } + return changers; + } + + protected: + // Setup to create the postmaster process corresponding to the tserver idx. + Status CreateSupervisor(int idx) { + auto pg_ts = cluster_->mini_tablet_server(idx); + auto port = cluster_->AllocateFreePort(); + PgProcessConf pg_process_conf = VERIFY_RESULT(PgProcessConf::CreateValidateAndRunInitDb( + AsString(Endpoint(pg_ts->bound_rpc_addr().address(), port)), + pg_ts->options()->fs_opts.data_paths.front() + "/pg_data", + pg_ts->server()->GetSharedMemoryFd())); + + pg_process_conf.master_addresses = pg_ts->options()->master_addresses_flag; + pg_process_conf.force_disable_log_file = true; + pg_host_ports_[idx] = HostPort(pg_process_conf.listen_addresses, pg_process_conf.pg_port); + + pg_supervisors_[idx] = std::make_unique<PgSupervisor>( + pg_process_conf, nullptr); + + return Status::OK(); + } + + void SpawnSupervisors() { + auto ntservers = static_cast<int>(cluster_->num_tablet_servers()); + + // Allocate space for the host ports and supervisors. + pg_host_ports_.resize(ntservers); + pg_supervisors_.resize(ntservers); + + // Create and start the PgSupervisors. + for (int idx = 0; idx < ntservers; idx++) { + if (idx == conn_idx_) { + // Postmaster already started for this tserver. + continue; + } + // Prefix registered to avoid name clash among callback + // registrations. + TEST_SetThreadPrefixScoped prefix_se(std::to_string(idx)); + ASSERT_OK(CreateSupervisor(idx)); + ASSERT_OK(pg_supervisors_[idx]->Start()); + } + } + + int conn_idx_ = 0; + int proxy_idx_ = 1; + int host_idx_ = 0; + std::vector<HostPort> pg_host_ports_; + std::vector<std::unique_ptr<PgSupervisor>> pg_supervisors_; +}; + +// Ensures that clock skew does not affect read-after-commit guarantees on same +// session with relaxed yb_read_after_commit_visibility option. +// +// Test Setup: +// 1. Cluster with RF 1, skewed clocks, 2 tservers. +// 2. Add a pg process on tserver that does not have one. +// This is done since MiniCluster only creates one postmaster process +// on some random tserver. +// We wish to use the minicluster and not external minicluster since +// there is better test infrastructure to manipulate clock skew. +// This approach is the less painful one at the moment. +// 3. Add a tablet server to the blacklist +// so we can ensure hybrid time propagation doesn't occur between +// the data host node and the proxy +// 4. Connect to the proxy tserver that does not host the data. +// We simulate this by blacklisting the target tserver. +// 5. Create a table with a single tablet and a single row. +// 6. Populate the catalog cache on the pg backend so that +// catalog cache misses does not interfere with hybrid time propagation. +// 7. Jump the clock of the tserver hosting the table to the future. +// 8. Insert a row using the proxy conn and a fast path txn. +// Commit ts for the insert is picked on the server whose clock is ahead. +// The hybrid time is propagated to the proxy conn since the request +// originated from the proxy conn. +// 9. Read the table from the proxy connection. +// The read has a timestamp that is ahead of the physical clock accounting +// for the timestamp of the insert DML due hybrid time propagation. +// Hence, the read does not miss recent updates of the same connection. +// This property also applies when dealing with different pg connections +// but they all go through the same tserver proxy. +TEST_F(PgReadAfterCommitVisibilityTest, SameSessionRecency) { + // Connect to local proxy. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + + // Create table test. + ASSERT_OK(proxyConn.Execute( + "CREATE TABLE test (key INT) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit time + // on the data node. + // This hybrid time is propagated to the local proxy. + ASSERT_OK(proxyConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform a select using the the relaxed yb_read_after_commit_visibility option. + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto rows = ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Observe the recent insert despite the clock skew. + ASSERT_EQ(rows.size(), 1); +} + +// Similar to SameSessionRecency except we have +// two connections instead of one to the same node. +// +// This property is necessary to maintain same session guarantees even in the +// presence of server-side connection pooling. +TEST_F(PgReadAfterCommitVisibilityTest, SamePgNodeRecency) { + // Connect to local proxy. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + // Not calling ConnectToProxy() again since we already added + // the proxy to the blacklist. + auto proxyConn2 = ASSERT_RESULT(ConnectToIdx(proxy_idx_)); + + // Create table test. + ASSERT_OK(proxyConn.Execute( + "CREATE TABLE test (key INT) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(proxyConn2.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit ts + // on the data node. + // This ts is propagated to the local proxy. + ASSERT_OK(proxyConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform a select using the relaxed yb_read_after_commit_visibility option. + ASSERT_OK(proxyConn2.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto rows = ASSERT_RESULT(proxyConn2.FetchRows<int32_t>("SELECT * FROM test")); + + // Observe the recent insert despite the clock skew. + ASSERT_EQ(rows.size(), 1); +} + +// Demonstrate that read from a connection to a different node +// (than the one which had the Pg connection to write data) may miss the +// commit when using the relaxed yb_read_after_commit_visibility option. +// +// Test Setup: +// 1. Cluster with RF 1, skewed clocks, 2 tservers. +// 2. Add a pg process on tserver that does not have one. +// This is done since MiniCluster only creates one postmaster process +// on some random tserver. +// We wish to use the minicluster and not external minicluster since +// there is better test infrastructure to manipulate clock skew. +// This approach is the less painful one at the moment. +// 3. Add a tablet server to the blacklist. +// 4. Connect to both servers for Pg connection, data host and proxy. +// 5. Create a table with a single tablet and a single row. +// 6. Populate the catalog cache on both pg processes so that +// catalog cache misses does not interfere with hybrid time propagation. +// 7. Jump the clock of the tserver hosting the table to the future. +// 8. Insert a row using the host conn and a fast path txn. +// Commit ts for the insert is picked on the server whose clock is ahead. +// The hybrid time is not propagated to the proxy conn since +// there is no reason to touch proxy conn. +// 9. Read the table from the proxy connection. +// The read can miss the recent commit since there is no hybrid time +// propagation from the host conn before the read time is picked. +TEST_F(PgReadAfterCommitVisibilityTest, SessionOnDifferentNodeStaleRead) { + // Connect to both proxy and data nodes. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + auto hostConn = ASSERT_RESULT(ConnectToDataHost()); + + // Create table test. + ASSERT_OK(hostConn.Execute( + "CREATE TABLE test (key INT) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit time + // on the data node. + ASSERT_OK(hostConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform a select using the relaxed yb_read_after_commit_visibility option. + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto rows = ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Miss the recent insert due to clock skew. + ASSERT_EQ(rows.size(), 0); +} + +// Same test as SessionOnDifferentNodeStaleRead +// except we verify that the staleness is bounded +// by waiting out the clock skew. +TEST_F(PgReadAfterCommitVisibilityTest, + SessionOnDifferentNodeBoundedStaleness) { + // Connect to both proxy and data nodes. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + auto hostConn = ASSERT_RESULT(ConnectToDataHost()); + + // Create table test. + ASSERT_OK(hostConn.Execute( + "CREATE TABLE test (key INT) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto skew = 100ms; + auto changers = JumpClockDataNodes(skew); + + // Perform a fast path insert that picks the commit ts + // on the data node. + ASSERT_OK(hostConn.Execute("INSERT INTO test VALUES (1)")); + + // Sleep for a while to verify that the staleness is bounded. + // We sleep for the same duration that we jump the clock, i.e. 100ms. + SleepFor(skew); + + // Perform a select using the relaxed yb_read_after_commit_visibility option. + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto rows = ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + + // We do not miss the prior insert since the clock skew has passed. + ASSERT_EQ(rows.size(), 1); +} + +// Duplicate insert check. +// +// Inserts should not miss other recent inserts to +// avoid missing duplicate key violations. This is guaranteed because +// we don't apply "relaxed" to non-read transactions. +TEST_F(PgReadAfterCommitVisibilityTest, + SessionOnDifferentNodeDuplicateInsertCheck) { + // Connect to both proxy and data nodes. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + auto hostConn = ASSERT_RESULT(ConnectToDataHost()); + + // Create table test. + ASSERT_OK(hostConn.Execute( + "CREATE TABLE test (key INT PRIMARY KEY) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit ts + // on the data node. + ASSERT_OK(hostConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform an insert using the relaxed yb_read_after_commit_visibility option. + // Must still observe the duplicate key! + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto res = proxyConn.Execute("INSERT INTO test VALUES (1)"); + ASSERT_FALSE(res.ok()); + + auto pg_err_ptr = res.ErrorData(PgsqlError::kCategory); + ASSERT_NE(pg_err_ptr, nullptr); + YBPgErrorCode error_code = PgsqlErrorTag::Decode(pg_err_ptr); + ASSERT_EQ(error_code, YBPgErrorCode::YB_PG_UNIQUE_VIOLATION); +} + +// Updates should not miss recent DMLs either. This is guaranteed +// because we don't apply "relaxed" to non-read transactions. +TEST_F(PgReadAfterCommitVisibilityTest, SessionOnDifferentNodeUpdateKeyCheck) { + // Connect to both proxy and data nodes. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + auto hostConn = ASSERT_RESULT(ConnectToDataHost()); + + // Create table test. + ASSERT_OK(hostConn.Execute( + "CREATE TABLE test (key INT PRIMARY KEY) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit ts + // on the data node. + ASSERT_OK(hostConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform an update using the relaxed yb_read_after_commit_visibility option. + // Must still observe the key with value 1 + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto res = proxyConn.Execute("UPDATE test SET key = 2 WHERE key = 1"); + + // Ensure that the update happened. + auto row = ASSERT_RESULT(hostConn.FetchRow<int32_t>("SELECT key FROM test")); + ASSERT_EQ(row, 2); +} + +// Same for DELETEs. They should not miss recent DMLs. +// Otherwise, DELETE FROM table would not delete all the rows. +// This is guaranteed because we don't apply "relaxed" to non-read +// transactions. +TEST_F(PgReadAfterCommitVisibilityTest, SessionOnDifferentNodeDeleteKeyCheck) { + // Connect to both proxy and data nodes. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + auto hostConn = ASSERT_RESULT(ConnectToDataHost()); + + // Create table test. + ASSERT_OK(hostConn.Execute( + "CREATE TABLE test (key INT PRIMARY KEY) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit ts + // on the data node. + ASSERT_OK(hostConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform a delete using the relaxed yb_read_after_commit_visibility option. + // Must still observe the key with value 1 + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto res = proxyConn.Execute("DELETE FROM test WHERE key = 1"); + + // Ensure that the update happened. + auto rows = ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT key FROM test")); + ASSERT_EQ(rows.size(), 0); +} + +// We also consider the case where the query looks like +// a SELECT but there is an insert hiding underneath. +// We are guaranteed read-after-commit-visibility in this case +// since "relaxed" is not applied to non-read transactions. +TEST_F(PgReadAfterCommitVisibilityTest, SessionOnDifferentNodeDmlHidden) { + // Connect to both proxy and data nodes. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + auto hostConn = ASSERT_RESULT(ConnectToDataHost()); + + // Create table test. + ASSERT_OK(hostConn.Execute( + "CREATE TABLE test (key INT PRIMARY KEY) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit ts + // on the data node. + ASSERT_OK(hostConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform an insert using the relaxed yb_read_after_commit_visibility option. + // Must still observe the duplicate key! + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto res = proxyConn.Execute("WITH new_test AS (" + "INSERT INTO test (key) VALUES (1) RETURNING key" + ") SELECT key FROM new_test"); + ASSERT_FALSE(res.ok()); + + auto pg_err_ptr = res.ErrorData(PgsqlError::kCategory); + ASSERT_NE(pg_err_ptr, nullptr); + YBPgErrorCode error_code = PgsqlErrorTag::Decode(pg_err_ptr); + ASSERT_EQ(error_code, YBPgErrorCode::YB_PG_UNIQUE_VIOLATION); +} + } // namespace pgwrapper } // namespace yb diff --git a/src/yb/yql/pgwrapper/pg_wrapper.cc b/src/yb/yql/pgwrapper/pg_wrapper.cc index 47a621a3d822..d5b8e616198e 100644 --- a/src/yb/yql/pgwrapper/pg_wrapper.cc +++ b/src/yb/yql/pgwrapper/pg_wrapper.cc @@ -266,6 +266,9 @@ DEFINE_RUNTIME_PG_FLAG( DEFINE_RUNTIME_PG_FLAG(int32, yb_toast_catcache_threshold, -1, "Size threshold in bytes for a catcache tuple to be compressed."); +DEFINE_RUNTIME_PG_FLAG(string, yb_read_after_commit_visibility, "strict", + "Determines the behavior of read-after-commit-visibility guarantee."); + static bool ValidateXclusterConsistencyLevel(const char* flagname, const std::string& value) { if (value != "database" && value != "tablet") { fprintf(