From 025c2312bf852f4b7c778f981d8e3f992bfbe090 Mon Sep 17 00:00:00 2001 From: Bvsk Patnaik <bvsaikiran.patnaik@gmail.com> Date: Wed, 26 Jun 2024 18:56:57 -0700 Subject: [PATCH] [BACKPORT 2024.1][#20336] YSQL: GUC to avoid kReadRestart Errors with Bounded Staleness Guarantees Summary: Original commit: 2724346a3d7b15e71fcfeda204ffa25866441af7 / D34002 Read restart errors are a distributed database specific error scenario that do not occur in a single node database such as PostgreSQL. These errors occur due to clock skew, usually when there are reads with simultaneous writes to that same data (refer https://docs.yugabyte.com/preview/architecture/transactions/read-restart-error/ for details). Read restart errors are thrown to maintain the "read-after-commit-visibility" guarantee: any client issued read should see all data that was committed before the read request was issued (even in the presence of clock skew between nodes). In other words, the following example should always work: (1) User X commits some data (for which the db picks a commit timestamp say ht1) (2) Then user X communicates to user Y to inform about the commit via a channel outside the database (say a phone call) (3) Then user Y issues a read to some YB node which picks a read time (less than ht1 due to clock skew) (4) Then it should not happen that user Y gets an output without the data that user Y was informed about. To ensure this guarantee, when the database performs a read at read_time, it picks a global_limit (= read_time + max_clock_skew_us). If it finds any matching records for the read in the window (read_time, global_limit], there is a chance for the above guarantee to be broken. In this case docdb throws a kReadRestart error. However, users migrating from PostgreSQL are surprised by this error. Moreover, some users may not be in a position to change their application code to handle this new error scenario. The number of kReadRestart errors thrown to the external client are reduced currently by retrying the transaction/statement at the query layer or at the docdb layer. A retry at the docdb is layer is possible when this is the first RPC in a transaction/statement and no read time was picked yet on the query layer. The query layer retries have the following limitations: - Is limited by `ysql_output_buffer_size`: if the YSQL to client buffer fills up and some data was already sent to the client, YSQL can't retry the whole query on a new read point. - Has higher tail latency, sometimes, leading to statement timeouts or retries exhaustion. - kReadRestart errors are not retried for statements other than the first one in a transaction block in Repeatable Read isolation. This change aims to provide users with an opposite tradeoff mechanism of sacrificing the read-after-commit-visibility guarantee for ease of use instead. Minimizing read restart errors is a multi-stage plan and here we take the first step. Provide users with a GUC `yb_read_after_commit_visibility` to relax the guarantee. Configurable Options: 1. strict * Default. * Same behavior as current. 2. relaxed * Ignores clock skew and sets the global_limit to be the same as read_time, thus ignoring the uncertainty window. * Pick read time on the query layer and not on the storage layer. This is necessary so that users do not miss commits from their own session. That would be bad. For simplicity, the relaxed option does not affect transactions unless they are marked **READ ONLY**. Handling generic transactions is more involved because of read-write operations. This may be handled in a future change. Moreover, we ignore DDLs and catalog requests for the purposes of this revision. In the next section, we discuss the semantics of the relaxed option. In this section, we discuss what guarantees can be retained even in the relaxed mode. (1) Same Session Guarantees The reads never miss writes from its own session. |//conn1//| |--------| |INSERT | |SELECT | <--- always observes the preceding DML statements. Providing this guarantee is less obvious than one would think. (1a) The read time of SELECT should not be lower than the commit time of the preceding INSERT operation. The insert itself may pick its commit time at any node in the distributed database. However, the hybrid time is propagated back to the local proxy. As a result, the SELECT statement's read time will be higher than the preceding commit time as long as the read time is picked on local proxy, i.e. we do not pick the read time on some remote docdb. Corollary 1a: read time of read only queries must be picked on local proxy whenever we relax the yb_read_after_commit_visibility guarantee. Tested in **PgReadAfterCommitVisibilityTest.SameSessionRecency**. (1b) If hypothetically we were to pick a read time on DocDB even after corollary 1a, that would lead to another problem too: DocDB picks safe time as the read time. This is potentially a time in the past and might be smaller than the commit time of the INSERT before the SELECT. So, ignoring the uncertainty window on docdb might lead to the SELECT not seeing the prior INSERT from the same connection. Corollary 1b: Do not ignore the uncertainty window when the read time is picked on the storage layer. This cannot happen with read-only statements & transactions since we always pick read time on the local proxy. Tested in **PgSingleTServerTest.NoSafeTimeClampingInRelaxedReadAfterCommit**. (1c) Server side connection pooling should not sacrifice the above same session guarantee. Since - server side connection pooling multiplexes connections only within the same node, and - there is a common proxy tserver across all pg connections on the node, we are guaranteed to see commits within the same session even with server-side connection pooling in effect. Tested in **PgReadAfterCommitVisibilityTest.SamePgNodeRecency**. Client-side connection pooling is out of scope for discussion (especially in the case of node failures, smart drivers, etc). (2) Different Session guarantees Relaxed mode does not provide read-after-commit-visibility guarantee with writes from a different session. We still have good consistency guarantees, nonetheless. (2a) The first guarantee is consistent prefix. | //conn1// | //conn2// | | ... | INSERT 1 | | ... | INSERT 2 | | SELECT | ... | First things first, the SELECT statement on conn1 need not observe the `INSERT 2` statement on conn2 even though the insert happens before the SELECT in real time. This may happen in a distributed database because of clock skew between different machines (and no uncertainty window). Next, if SELECT does observe INSERT 2, it must also observe INSERT 1 (and all the preceding statements). This is the consistent prefix guarantee and is maintained by the fact that INSERT 2 will always have a higher commit time than INSERT 1. (2b) Monotonic Reads | //conn1// | //conn2// | | ... | INSERT 1 | | ... | INSERT 2 | | SELECT 1 | ... | | SELECT 2 | ... | A closely related consistency is that we guarantee monotonic reads. If SELECT 1 observes INSERT 1, then SELECT 2 also observes INSERT 1 (and maybe even more such as INSERT 2). This is because SELECT 2 has a higher read time than SELECT 1 because read time increases monotonically within the same session. Note that this would not be the case if we let SELECT pick the read time on the storage layer instead of force picking it on the proxy. Explanation: safe time is not //necessarily// affected by the most recent hybrid time propagation since it is potentially a time in the past. (2c) Bounded Staleness | //conn1// | //conn2// | | ... | INSERT 1 | <--- 500ms old | ... | INSERT 2 | | SELECT 1 | ... | Most intuitive property. Since physical clocks do not skew more than max_clock_skew_usec, the SELECTs always see INSERTs that are older than max_clock_skew_usec. In practice, the staleness bound is even lower since the skew between hybrid time (not physical time) across the machines is the more relevant metric here. hybrid time is close to each other across nodes since there is a regular exchange of messages across yb-tservers and yb-master. Tested in **PgReadAfterCommitVisibilityTest.SessionOnDifferentNodeStaleRead** and **PgReadAfterCommitVisibilityTest.SessionOnDifferentNodeBoundedStaleness**. (3) Thematic worst-case scenario Here, we discuss the type of workload that is most susceptible to stale reads. For a stale read to occur, - The read must touch a node with a higher time (than the pg connection). More likely when the read is touching a lot of nodes. - The writes don't touch enough nodes to ensure hybrid time is propagated to the query layer of the node that performs the read. Happens when the writes are single row inserts/updates. Therefore, thematically, we are most susceptible to miss recent writes with the relaxed option when there are high throughput single-row DML ops happening concurrently with a long read that touches a lot of rows. Backport-through: 2024.1 **Upgrade/Rollback safety:** Fortunately, the only change in proto files is in pg_client.proto. pg_client.proto is used exclusively for communication between postgres and local tserver proxy layer. During upgrades once a node is upgraded, both Pg and local tserver are upgraded. Therefore, both of them understand this new field. Moreover, even though the read behavior is changed in the new relaxed mode, it is only changed for upgraded nodes. Non upgraded nodes do not require any knowledge of changes in the upgraded nodes because the existing interface between the query and storage layers works well to support this new feature. No auto flags are necessary. Jira: DB-9323 Test Plan: Jenkins **In TestPgTransparentRestarts** 1. When yb_read_after_commit_visibility is strict, Long reads that exceed the ysql_output_buffer_size threshold raise a read restart error to the client since they cannot be handled transparently. ``` ./yb_build.sh --java-test TestPgTransparentRestarts#selectStarLong ``` 2. When yb_read_after_commit_visibility is relaxed, For read only txns/stmts, we silently ignore read restart errors. ``` ./yb_build.sh --java-test TestPgTransparentRestarts#selectStarLong_relaxedReadAfterCommitVisibility ``` 3. For execution of prepared statements, relaxed mode must be set before the execute command and not necessarily before the prepare command. ``` ./yb_build.sh --java-test TestPgTransparentRestarts#selectStarLongExecute_relaxedReadAfterCommitVisibility ``` 4. We raise no read restart errors even after transactions restarts due to conflicts. This is because we decided to relax the guarantee only for read only queries/transactions. In addition, read only ops do not run into any transaction conflicts. **In PgReadAfterCommitVisibilityTest** 1. Same session read-after-commit-visibility guarantee. ``` ./yb_build.sh --cxx-test pg_txn-test --gtest_filter PgReadAfterCommitVisibilityTest.SameSessionRecency ``` 2. Same node read-after-commit-visibility guarantee. ``` ./yb_build.sh --cxx-test pg_txn-test --gtest_filter PgReadAfterCommitVisibilityTest.SamePgNodeRecency ``` 3. Sessions connecting to Pg on different nodes - bounded staleness guarantee. ``` ./yb_build.sh --cxx-test pg_txn-test --gtest_filter PgReadAfterCommitVisibilityTest.SessionOnDifferentNodeStaleRead ./yb_build.sh --cxx-test pg_txn-test --gtest_filter PgReadAfterCommitVisibilityTest.SessionOnDifferentNodeBoundedStaleness ``` 1. Guard ourselves against this scenario - Read time is picked on docdb. - Read uncertainty window is ignored. - The picked time is safe time, which is a time before the previous statement in the same session. - We miss recent updates from the same session because the read time is in the past. ``` ./yb_build.sh --java-test PgSingleTServerTest.NoSafeTimeClampingInRelaxedReadAfterCommit ``` 2. We never ignore read uncertainty window (thus do not relax read-after-commit-visibility guarantee) with - INSERT/UPDATE/DELETE - inserts/updates in WITH clause ``` ./yb_build.sh --cxx-test pg_txn-test --gtest_filter PgReadAfterCommitVisibilityTest.NewSessionDuplicateInsertCheck ./yb_build.sh --cxx-test pg_txn-test --gtest_filter PgReadAfterCommitVisibilityTest.NewSessionUpdateKeyCheck ./yb_build.sh --cxx-test pg_txn-test --gtest_filter PgReadAfterCommitVisibilityTest.NewSessionDeleteKeyCheck ./yb_build.sh --cxx-test pg_txn-test --gtest_filter PgReadAfterCommitVisibilityTest.NewSessionDmlHidden ``` 3. We also avoid this scenario (because relaxed mode does not affect INSERTs) - Two concurrent inserts to the same key - both single shard. - The read time of the insert is picked on the local proxy (because this is in relaxed mode). - This read time is used for checking transaction conflicts happening to the same key. - However, the conflict resolution step on the RegularDB is skipped in single-shard inserts, see GitHub issue https://github.com/yugabyte/yugabyte-db/issues/19407. ``` ./yb_build.sh --cxx-test pgwrapper_pg_read_time-test --gtest_filter PgReadTimeTest.CheckRelaxedReadAfterCommitVisibility ``` Reviewers: pjain, smishra, rthallam Reviewed By: pjain Subscribers: ybase, yql, tnayak, hsunder, rthallam Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D36197 --- .../yb/pgsql/TestPgTransparentRestarts.java | 77 +++ .../src/backend/access/transam/xact.c | 10 +- src/postgres/src/backend/executor/execMain.c | 18 +- src/postgres/src/backend/utils/misc/guc.c | 64 +++ src/yb/common/common_fwd.h | 3 + src/yb/common/consistent_read_point.cc | 11 +- src/yb/common/consistent_read_point.h | 7 +- src/yb/integration-tests/mini_cluster.cc | 5 + src/yb/integration-tests/mini_cluster.h | 9 + src/yb/tserver/pg_client.proto | 6 + src/yb/tserver/pg_client_session.cc | 25 +- src/yb/tserver/pg_client_session.h | 3 +- src/yb/yql/pggate/pg_txn_manager.cc | 31 ++ src/yb/yql/pggate/pg_txn_manager.h | 4 + src/yb/yql/pggate/pggate.cc | 8 + src/yb/yql/pggate/pggate.h | 2 + src/yb/yql/pggate/util/yb_guc.cc | 2 + src/yb/yql/pggate/util/yb_guc.h | 2 + src/yb/yql/pggate/util/ybc_util.h | 23 + src/yb/yql/pggate/ybc_pggate.cc | 8 + src/yb/yql/pggate/ybc_pggate.h | 2 + src/yb/yql/pgwrapper/pg_read_time-test.cc | 135 +++++ .../yql/pgwrapper/pg_single_tserver-test.cc | 20 + src/yb/yql/pgwrapper/pg_txn-test.cc | 521 ++++++++++++++++++ src/yb/yql/pgwrapper/pg_wrapper.cc | 3 + 25 files changed, 987 insertions(+), 12 deletions(-) diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgTransparentRestarts.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgTransparentRestarts.java index f4b3db8d4264..d637d8815ffc 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgTransparentRestarts.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgTransparentRestarts.java @@ -383,6 +383,58 @@ public PreparedStatement createStatement(Connection conn) throws Exception { }.runTest(); } + /** + * No restarts expected when yb_read_after_commit_visibility is relaxed. + * Note that this is a long read that exceeds the output buffer size. + */ + @Test + public void selectStarLong_relaxedReadAfterCommitVisibility() throws Exception { + new RegularStatementTester( + getConnectionBuilder(), + "SELECT * FROM test_rr", + getLongString(), + false /* expectRestartErrors */ + ) { + @Override + public String getReadAfterCommitVisibility() { + return "relaxed"; + } + }.runTest(); + } + + /* + * Ensures that we need not set the yb_read_after_commit_visibility GUC + * before the prepare statement. + * This test guards us against scenarios where the GUC + * is captured in the prepared statement to be used during execution. + * + * Example: pg_hint_plan is a planner time configuration and is to be + * captured in the prepare phase to be used during execution. + * + * Also, use simple query mode to test that simultaneously. + */ + @Test + public void selectStarLongExecute_relaxedReadAfterCommitVisibility() throws Exception { + new RegularStatementTester( + getConnectionBuilder().withPreferQueryMode("simple"), + "EXECUTE select_stmt(0)", + getLongString(), + false /* expectRestartErrors */) { + + @Override + public Statement createStatement(Connection conn) throws Exception { + Statement stmt = super.createStatement(conn); + stmt.execute("PREPARE select_stmt (int) AS SELECT * FROM test_rr WHERE i >= $1"); + return stmt; + }; + + @Override + public String getReadAfterCommitVisibility() { + return "relaxed"; + } + }.runTest(); + } + /** * The following two methods attempt to test retries on kReadRestart for all below combinations - * 1. Type of statement - UPDATE/DELETE. @@ -886,9 +938,16 @@ private Boolean expectConflictErrors(IsolationLevel isolation) throws Exception !is_wait_on_conflict_concurrency_control; } + // Override this function to set a different read window behavior. + public String getReadAfterCommitVisibility() { + return "strict"; + } + @Override public List<ThrowingRunnable> getRunnableThreads( ConnectionBuilder cb, BooleanSupplier isExecutionDone) { + String setReadAfterCommitVisibility = "SET yb_read_after_commit_visibility TO "; + List<ThrowingRunnable> runnables = new ArrayList<>(); // // Singular SELECT statement (equal probability of being either serializable/repeatable read/ @@ -926,6 +985,14 @@ public List<ThrowingRunnable> getRunnableThreads( auxSerializableStatement.execute(LOG_RESTARTS_SQL); auxRrStatement.execute(LOG_RESTARTS_SQL); auxRcStatement.execute(LOG_RESTARTS_SQL); + + // SET yb_read_after_commit_visibility + auxSerializableStatement.execute( + setReadAfterCommitVisibility + getReadAfterCommitVisibility()); + auxRrStatement.execute( + setReadAfterCommitVisibility + getReadAfterCommitVisibility()); + auxRcStatement.execute( + setReadAfterCommitVisibility + getReadAfterCommitVisibility()); } for (/* No setup */; !isExecutionDone.getAsBoolean(); /* NOOP */) { @@ -1020,8 +1087,18 @@ public List<ThrowingRunnable> getRunnableThreads( Stmt stmt = createStatement(selectTxnConn)) { try (Statement auxStmt = selectTxnConn.createStatement()) { auxStmt.execute(LOG_RESTARTS_SQL); + + // SET yb_read_after_commit_visibility + auxStmt.execute(setReadAfterCommitVisibility + getReadAfterCommitVisibility()); } selectTxnConn.setAutoCommit(false); + // This is a read only txn, so setReadOnly. + // Moreover, yb_read_after_commit_visibility option relies on txn being read only. + if (isolation != IsolationLevel.SERIALIZABLE) { + // SERIALIZABLE, READ ONLY txns are not actually serializable + // txns and we wish to test SERIALIZABLE txns too. + selectTxnConn.setReadOnly(true); + } for (/* No setup */; !isExecutionDone.getAsBoolean(); ++txnsAttempted) { int numCompletedOps = 0; try { diff --git a/src/postgres/src/backend/access/transam/xact.c b/src/postgres/src/backend/access/transam/xact.c index 5e45b0827f24..195d049ec32b 100644 --- a/src/postgres/src/backend/access/transam/xact.c +++ b/src/postgres/src/backend/access/transam/xact.c @@ -3829,6 +3829,10 @@ BeginTransactionBlock(void) BlockStateAsString(s->blockState)); break; } + + /* YB: Notify pggate that we are within a txn block. */ + if (IsYugaByteEnabled()) + HandleYBStatus(YBCPgSetInTxnBlock(true)); } /* @@ -4157,6 +4161,10 @@ BeginImplicitTransactionBlock(void) */ if (s->blockState == TBLOCK_STARTED) s->blockState = TBLOCK_IMPLICIT_INPROGRESS; + + /* YB: Notify pggate that we are within an (implicit) txn block. */ + if (IsYugaByteEnabled()) + HandleYBStatus(YBCPgSetInTxnBlock(true)); } /* @@ -5091,7 +5099,7 @@ CommitSubTransaction(void) /* Conserve sticky object count before popping transaction state. */ s->parent->ybUncommittedStickyObjectCount = s->ybUncommittedStickyObjectCount; - + PopTransaction(); } diff --git a/src/postgres/src/backend/executor/execMain.c b/src/postgres/src/backend/executor/execMain.c index 54bc967bac69..c167a92d1173 100644 --- a/src/postgres/src/backend/executor/execMain.c +++ b/src/postgres/src/backend/executor/execMain.c @@ -162,8 +162,10 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) * We have lower-level defenses in CommandCounterIncrement and elsewhere * against performing unsafe operations in parallel mode, but this gives a * more user-friendly error message. + * + * YB: We also notify pggate whether the statement is read only. */ - if ((XactReadOnly || IsInParallelMode()) && + if ((IsYugaByteEnabled() || XactReadOnly || IsInParallelMode()) && !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) ExecCheckXactReadOnly(queryDesc->plannedstmt); @@ -762,11 +764,14 @@ ExecCheckRTEPermsModified(Oid relOid, Oid userid, Bitmapset *modifiedCols, * Note: in a Hot Standby this would need to reject writes to temp * tables just as we do in parallel mode; but an HS standby can't have created * any temp tables in the first place, so no need to check that. + * + * YB: We also notify pggate whether the statement is read only. */ static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt) { ListCell *l; + bool yb_is_read_only = true; /* * Fail if write permissions are requested in parallel mode for table @@ -786,10 +791,21 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt) continue; PreventCommandIfReadOnly(CreateCommandTag((Node *) plannedstmt)); + yb_is_read_only = false; } if (plannedstmt->commandType != CMD_SELECT || plannedstmt->hasModifyingCTE) + { PreventCommandIfParallelMode(CreateCommandTag((Node *) plannedstmt)); + yb_is_read_only = false; + } + + if (IsYugaByteEnabled()) + { + if (plannedstmt->rowMarks) + yb_is_read_only = false; + HandleYBStatus(YBCPgSetReadOnlyStmt(yb_is_read_only)); + } } diff --git a/src/postgres/src/backend/utils/misc/guc.c b/src/postgres/src/backend/utils/misc/guc.c index 668d104c6c13..37dabf25be60 100644 --- a/src/postgres/src/backend/utils/misc/guc.c +++ b/src/postgres/src/backend/utils/misc/guc.c @@ -219,6 +219,7 @@ static bool check_transaction_priority_upper_bound(double *newval, void **extra, extern void YBCAssignTransactionPriorityUpperBound(double newval, void* extra); extern double YBCGetTransactionPriority(); extern TxnPriorityRequirement YBCGetTransactionPriorityType(); +static bool yb_check_no_txn(int* newval, void **extra, GucSource source); static void assign_yb_pg_batch_detection_mechanism(int new_value, void *extra); static void assign_ysql_upgrade_mode(bool newval, void *extra); @@ -485,6 +486,12 @@ static struct config_enum_entry shared_memory_options[] = { {NULL, 0, false} }; +const struct config_enum_entry yb_read_after_commit_visibility_options[] = { + {"strict", YB_STRICT_READ_AFTER_COMMIT_VISIBILITY, false}, + {"relaxed", YB_RELAXED_READ_AFTER_COMMIT_VISIBILITY, false}, + {NULL, 0, false} +}; + /* * Options for enum values stored in other modules */ @@ -5444,6 +5451,46 @@ static struct config_enum ConfigureNamesEnum[] = NULL, assign_yb_pg_batch_detection_mechanism, NULL }, + { + /* + * Read-after-commit-visibility guarantee: any client issued read + * should see all data that was committed before the read request + * was issued (even in the presence of clock skew between nodes). + * In other words, the following example should always work: + * (1) User X commits some data (for which the db picks a commit + * timestamp say ht1) + * (2) Then user X communicates to user Y to inform about the commit + * via a channel outside the database (say a phone call) + * (3) Then user Y issues a read to some YB node which picks a + * read time (less than ht1 due to clock skew) + * (4) Then it should not happen that user Y gets an output without + * the data that user Y was informed about. + */ + { + "yb_read_after_commit_visibility", PGC_USERSET, CUSTOM_OPTIONS, + gettext_noop("Control read-after-commit-visibility guarantee."), + gettext_noop( + "This GUC is intended as a crutch for users migrating from PostgreSQL and new to" + " read restart errors. Users can now largely avoid these errors when" + " read-after-commit-visibility guarantee is not a strong requirement." + " This option cannot be set from within a transaction block." + " Configure one of the following options:" + " (a) strict: Default Behavior. The read-after-commit-visibility guarantee is" + " maintained by the database. However, users may see read restart errors that" + " show \"ERROR: Query error: Restart read required at: ...\". The database" + " attempts to retry on such errors internally but that is not always possible." + " (b) relaxed: With this option, the read-after-commit-visibility guarantee is" + " relaxed. Read only statements/transactions do not see read restart errors but" + " may miss recent updates with staleness bounded by clock skew." + ), + 0 + }, + &yb_read_after_commit_visibility, + YB_STRICT_READ_AFTER_COMMIT_VISIBILITY, + yb_read_after_commit_visibility_options, + yb_check_no_txn, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL @@ -12807,5 +12854,22 @@ yb_check_toast_catcache_threshold(int *newVal, void **extra, GucSource source) return true; } +/* + * YB: yb_check_no_txn + * + * Do not allow users to set yb_read_after_commit_visibility + * from within a txn block. + */ +static bool +yb_check_no_txn(int *newVal, void **extra, GucSource source) +{ + if (IsTransactionBlock()) + { + GUC_check_errdetail("Cannot be set within a txn block."); + return false; + } + return true; +} + #include "guc-file.c" diff --git a/src/yb/common/common_fwd.h b/src/yb/common/common_fwd.h index df5e40940f7f..b68ba71a91fc 100644 --- a/src/yb/common/common_fwd.h +++ b/src/yb/common/common_fwd.h @@ -21,6 +21,7 @@ #include "yb/common/ql_protocol.fwd.h" #include "yb/common/redis_protocol.fwd.h" #include "yb/common/wire_protocol.fwd.h" +#include "yb/util/strongly_typed_bool.h" namespace yb { @@ -65,6 +66,8 @@ enum class DataType; enum class SortingType; +YB_STRONGLY_TYPED_BOOL(ClampUncertaintyWindow); + namespace common { class Jsonb; diff --git a/src/yb/common/consistent_read_point.cc b/src/yb/common/consistent_read_point.cc index f27d7698e1d0..4a5decac7d27 100644 --- a/src/yb/common/consistent_read_point.cc +++ b/src/yb/common/consistent_read_point.cc @@ -33,8 +33,11 @@ void ConsistentReadPoint::SetReadTimeUnlocked( restarts_.clear(); } -void ConsistentReadPoint::SetCurrentReadTimeUnlocked() { - SetReadTimeUnlocked(ReadHybridTime::FromHybridTimeRange(clock_->NowRange())); +void ConsistentReadPoint::SetCurrentReadTimeUnlocked(const ClampUncertaintyWindow clamp) { + SetReadTimeUnlocked( + clamp + ? ReadHybridTime::SingleTime(clock_->Now()) + : ReadHybridTime::FromHybridTimeRange(clock_->NowRange())); } void ConsistentReadPoint::SetReadTime( @@ -43,9 +46,9 @@ void ConsistentReadPoint::SetReadTime( SetReadTimeUnlocked(read_time, &local_limits); } -void ConsistentReadPoint::SetCurrentReadTime() { +void ConsistentReadPoint::SetCurrentReadTime(const ClampUncertaintyWindow clamp) { std::lock_guard lock(mutex_); - SetCurrentReadTimeUnlocked(); + SetCurrentReadTimeUnlocked(clamp); } Status ConsistentReadPoint::TrySetDeferredCurrentReadTime() { diff --git a/src/yb/common/consistent_read_point.h b/src/yb/common/consistent_read_point.h index 1f4147563bce..d06fd0b97fd4 100644 --- a/src/yb/common/consistent_read_point.h +++ b/src/yb/common/consistent_read_point.h @@ -41,7 +41,9 @@ class ConsistentReadPoint { void MoveFrom(ConsistentReadPoint* rhs); // Set the current time as the read point. - void SetCurrentReadTime() EXCLUDES(mutex_); + // No uncertainty window when clamp is set. + void SetCurrentReadTime( + const ClampUncertaintyWindow clamp = ClampUncertaintyWindow::kFalse) EXCLUDES(mutex_); // If read point is not set, use the current time as the read point and defer it to the global // limit. If read point was already set, return error if it is not deferred. @@ -91,7 +93,8 @@ class ConsistentReadPoint { private: inline void SetReadTimeUnlocked( const ReadHybridTime& read_time, HybridTimeMap* local_limits = nullptr) REQUIRES(mutex_); - void SetCurrentReadTimeUnlocked() REQUIRES(mutex_); + void SetCurrentReadTimeUnlocked( + const ClampUncertaintyWindow clamp = ClampUncertaintyWindow::kFalse) REQUIRES(mutex_); void UpdateLimitsMapUnlocked( const TabletId& tablet, const HybridTime& local_limit, HybridTimeMap* map) REQUIRES(mutex_); void RestartRequiredUnlocked(const TabletId& tablet, const ReadHybridTime& restart_time) diff --git a/src/yb/integration-tests/mini_cluster.cc b/src/yb/integration-tests/mini_cluster.cc index 04b78c1f2826..feaa05752631 100644 --- a/src/yb/integration-tests/mini_cluster.cc +++ b/src/yb/integration-tests/mini_cluster.cc @@ -874,6 +874,11 @@ server::SkewedClockDeltaChanger JumpClock( return server::SkewedClockDeltaChanger(delta, skewed_clock); } +server::SkewedClockDeltaChanger JumpClock( + tserver::MiniTabletServer* server, std::chrono::milliseconds delta) { + return JumpClock(server->server(), delta); +} + std::vector<server::SkewedClockDeltaChanger> SkewClocks( MiniCluster* cluster, std::chrono::milliseconds clock_skew) { std::vector<server::SkewedClockDeltaChanger> delta_changers; diff --git a/src/yb/integration-tests/mini_cluster.h b/src/yb/integration-tests/mini_cluster.h index a4e51d332b52..c864e90ff84e 100644 --- a/src/yb/integration-tests/mini_cluster.h +++ b/src/yb/integration-tests/mini_cluster.h @@ -298,6 +298,15 @@ class MiniCluster : public MiniClusterBase { PortPicker port_picker_; }; +// Requires that skewed clock is registered as physical clock. +// Jumps the physical clock by delta +// i.e. no effect on hybrid ts unless by physical clock. +// new_clock = old_clock + delta (clocks are moving). +// Returns an RAII structure that resets the delta change +// when it goes out of scope. +server::SkewedClockDeltaChanger JumpClock( + tserver::MiniTabletServer* server, std::chrono::milliseconds delta); + MUST_USE_RESULT std::vector<server::SkewedClockDeltaChanger> SkewClocks( MiniCluster* cluster, std::chrono::milliseconds clock_skew); diff --git a/src/yb/tserver/pg_client.proto b/src/yb/tserver/pg_client.proto index d64fe4cad646..244dc5c8f16b 100644 --- a/src/yb/tserver/pg_client.proto +++ b/src/yb/tserver/pg_client.proto @@ -607,6 +607,12 @@ message PgPerformOptionsPB { int64 pg_txn_start_us = 22; AshMetadataPB ash_metadata = 23; + + // When set, + // - Sets the read time locally. + // - Clamps the read uncertainty window. + // See the commit desc for more info. + bool clamp_uncertainty_window = 24; } message PgPerformRequestPB { diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index dc293008593a..5869706170b1 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -1152,7 +1152,8 @@ Status PgClientSession::DoPerform(const DataPtr& data, CoarseTimePoint deadline, } void PgClientSession::ProcessReadTimeManipulation( - ReadTimeManipulation manipulation, uint64_t read_time_serial_no) { + ReadTimeManipulation manipulation, uint64_t read_time_serial_no, + ClampUncertaintyWindow clamp) { VLOG_WITH_PREFIX(2) << "ProcessReadTimeManipulation: " << manipulation << ", read_time_serial_no: " << read_time_serial_no << ", read_time_serial_no_: " << read_time_serial_no_; @@ -1165,7 +1166,8 @@ void PgClientSession::ProcessReadTimeManipulation( return; case ReadTimeManipulation::ENSURE_READ_TIME_IS_SET : if (!read_point.GetReadTime() || read_time_serial_no_ != read_time_serial_no) { - read_point.SetCurrentReadTime(); + // Clamp read uncertainty window when requested by the query layer. + read_point.SetCurrentReadTime(clamp); VLOG(1) << "Setting current ht as read point " << read_point.GetReadTime(); } return; @@ -1274,7 +1276,9 @@ PgClientSession::SetupSession( RSTATUS_DCHECK( kind == PgClientSessionKind::kPlain, IllegalState, "Read time manipulation can't be specified for non kPlain sessions"); - ProcessReadTimeManipulation(options.read_time_manipulation(), read_time_serial_no); + ProcessReadTimeManipulation( + options.read_time_manipulation(), read_time_serial_no, + ClampUncertaintyWindow(options.clamp_uncertainty_window())); } else if (options.has_read_time() && options.read_time().has_read_ht()) { const auto read_time = ReadHybridTime::FromPB(options.read_time()); session.SetReadPoint(read_time); @@ -1316,6 +1320,21 @@ PgClientSession::SetupSession( // TODO: Shouldn't the below logic for DDL transactions as well? session.SetInTxnLimit(in_txn_limit); } + + if (options.clamp_uncertainty_window() + && !session.read_point()->GetReadTime()) { + RSTATUS_DCHECK( + !(transaction && transaction->isolation() == SERIALIZABLE_ISOLATION), + IllegalState, "Clamping does not apply to SERIALIZABLE txns."); + // Set read time with clamped uncertainty window when reqeusted by + // the query layer. + // Do not mess with the read time if already set. + session.read_point()->SetCurrentReadTime(ClampUncertaintyWindow::kTrue); + VLOG_WITH_PREFIX_AND_FUNC(2) + << "Setting read time to " + << session.read_point()->GetReadTime() + << " for read only txn/stmt"; + } } session.SetDeadline(deadline); diff --git a/src/yb/tserver/pg_client_session.h b/src/yb/tserver/pg_client_session.h index 84a4a9120137..fae27c786dce 100644 --- a/src/yb/tserver/pg_client_session.h +++ b/src/yb/tserver/pg_client_session.h @@ -191,7 +191,8 @@ class PgClientSession { Status ProcessResponse( const PgClientSessionOperations& operations, const PgPerformRequestPB& req, PgPerformResponsePB* resp, rpc::RpcContext* context); - void ProcessReadTimeManipulation(ReadTimeManipulation manipulation, uint64_t txn_serial_no); + void ProcessReadTimeManipulation(ReadTimeManipulation manipulation, uint64_t txn_serial_no, + ClampUncertaintyWindow clamp); client::YBClient& client(); client::YBSessionPtr& EnsureSession(PgClientSessionKind kind, CoarseTimePoint deadline); diff --git a/src/yb/yql/pggate/pg_txn_manager.cc b/src/yb/yql/pggate/pg_txn_manager.cc index 6cf5d7f28bb0..91ffcaec4638 100644 --- a/src/yb/yql/pggate/pg_txn_manager.cc +++ b/src/yb/yql/pggate/pg_txn_manager.cc @@ -185,6 +185,9 @@ Status PgTxnManager::BeginTransaction(int64_t start_time) { return STATUS(IllegalState, "Transaction is already in progress"); } pg_txn_start_us_ = start_time; + // NOTE: Do not reset in_txn_blk_ when restarting txns internally + // (i.e., via PgTxnManager::RecreateTransaction). + in_txn_blk_ = false; return RecreateTransaction(SavePriority::kFalse); } @@ -255,6 +258,20 @@ Status PgTxnManager::SetDeferrable(bool deferrable) { return Status::OK(); } +Status PgTxnManager::SetInTxnBlock(bool in_txn_blk) { + in_txn_blk_ = in_txn_blk; + VLOG_WITH_FUNC(2) << (in_txn_blk ? "In " : "Not in ") << "txn block."; + return Status::OK(); +} + +Status PgTxnManager::SetReadOnlyStmt(bool read_only_stmt) { + read_only_stmt_ = read_only_stmt; + VLOG_WITH_FUNC(2) + << "Executing a " << (read_only_stmt ? "read only " : "read/write ") + << "stmt."; + return Status::OK(); +} + uint64_t PgTxnManager::NewPriority(TxnPriorityRequirement txn_priority_requirement) { if (txn_priority_requirement == kHighestPriority) { return yb::kHighPriTxnUpperBound; @@ -420,6 +437,7 @@ void PgTxnManager::ResetTxnAndSession() { enable_tracing_ = false; read_time_for_follower_reads_ = HybridTime(); read_time_manipulation_ = tserver::ReadTimeManipulation::NONE; + read_only_stmt_ = false; } Status PgTxnManager::EnterSeparateDdlTxnMode() { @@ -525,6 +543,19 @@ void PgTxnManager::SetupPerformOptions( read_time_manipulation_ = tserver::ReadTimeManipulation::NONE; // pg_txn_start_us is similarly only used for kPlain transactions. options->set_pg_txn_start_us(pg_txn_start_us_); + // Only clamp read-only txns/stmts. + // Do not clamp in the serializable case since + // - SERIALIZABLE reads do not pick read time until later. + // - SERIALIZABLE reads do not observe read restarts anyways. + if (yb_read_after_commit_visibility + == YB_RELAXED_READ_AFTER_COMMIT_VISIBILITY + && isolation_level_ != IsolationLevel::SERIALIZABLE_ISOLATION) + // We clamp uncertainty window when + // - either we are working with a read only txn + // - or we are working with a read only stmt + // i.e. no txn block and a pure SELECT stmt. + options->set_clamp_uncertainty_window( + read_only_ || (!in_txn_blk_ && read_only_stmt_)); } if (read_time_for_follower_reads_) { ReadHybridTime::SingleTime(read_time_for_follower_reads_).ToPB(options->mutable_read_time()); diff --git a/src/yb/yql/pggate/pg_txn_manager.h b/src/yb/yql/pggate/pg_txn_manager.h index 2744a4d1d5c9..7afc1e7fd1de 100644 --- a/src/yb/yql/pggate/pg_txn_manager.h +++ b/src/yb/yql/pggate/pg_txn_manager.h @@ -73,6 +73,8 @@ class PgTxnManager : public RefCountedThreadSafe<PgTxnManager> { Status ExitSeparateDdlTxnModeWithAbort(); Status ExitSeparateDdlTxnModeWithCommit(uint32_t db_oid, bool is_silent_altering); void SetDdlHasSyscatalogChanges(); + Status SetInTxnBlock(bool in_txn_blk); + Status SetReadOnlyStmt(bool read_only_stmt); bool IsTxnInProgress() const { return txn_in_progress_; } IsolationLevel GetIsolationLevel() const { return isolation_level_; } @@ -127,6 +129,8 @@ class PgTxnManager : public RefCountedThreadSafe<PgTxnManager> { bool need_restart_ = false; bool need_defer_read_point_ = false; tserver::ReadTimeManipulation read_time_manipulation_ = tserver::ReadTimeManipulation::NONE; + bool in_txn_blk_ = false; + bool read_only_stmt_ = false; // Postgres transaction characteristics. PgIsolationLevel pg_isolation_level_ = PgIsolationLevel::REPEATABLE_READ; diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index 34e8e78ce84a..bd62ba971342 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -2135,6 +2135,14 @@ Status PgApiImpl::SetTransactionDeferrable(bool deferrable) { return pg_txn_manager_->SetDeferrable(deferrable); } +Status PgApiImpl::SetInTxnBlock(bool in_txn_blk) { + return pg_txn_manager_->SetInTxnBlock(in_txn_blk); +} + +Status PgApiImpl::SetReadOnlyStmt(bool read_only_stmt) { + return pg_txn_manager_->SetReadOnlyStmt(read_only_stmt); +} + Status PgApiImpl::EnterSeparateDdlTxnMode() { // Flush all buffered operations as ddl txn use its own transaction session. RETURN_NOT_OK(pg_session_->FlushBufferedOperations()); diff --git a/src/yb/yql/pggate/pggate.h b/src/yb/yql/pggate/pggate.h index 13d4dcd11aae..eac067085e3c 100644 --- a/src/yb/yql/pggate/pggate.h +++ b/src/yb/yql/pggate/pggate.h @@ -651,6 +651,8 @@ class PgApiImpl { Status SetTransactionIsolationLevel(int isolation); Status SetTransactionReadOnly(bool read_only); Status SetTransactionDeferrable(bool deferrable); + Status SetInTxnBlock(bool in_txn_blk); + Status SetReadOnlyStmt(bool read_only_stmt); Status SetEnableTracing(bool tracing); Status EnableFollowerReads(bool enable_follower_reads, int32_t staleness_ms); Status EnterSeparateDdlTxnMode(); diff --git a/src/yb/yql/pggate/util/yb_guc.cc b/src/yb/yql/pggate/util/yb_guc.cc index 4bb46f0d667c..3d949d65d71d 100644 --- a/src/yb/yql/pggate/util/yb_guc.cc +++ b/src/yb/yql/pggate/util/yb_guc.cc @@ -73,3 +73,5 @@ int yb_reorderbuffer_max_changes_in_memory = 4096; uint64_t yb_read_time = 0; bool yb_is_read_time_ht = false; + +int yb_read_after_commit_visibility = 0; diff --git a/src/yb/yql/pggate/util/yb_guc.h b/src/yb/yql/pggate/util/yb_guc.h index 0150a0e623dc..1fe91886debe 100644 --- a/src/yb/yql/pggate/util/yb_guc.h +++ b/src/yb/yql/pggate/util/yb_guc.h @@ -176,6 +176,8 @@ extern int yb_reorderbuffer_max_changes_in_memory; */ extern int yb_walsender_poll_sleep_duration_empty_ms; +extern int yb_read_after_commit_visibility; + #ifdef __cplusplus } // extern "C" #endif diff --git a/src/yb/yql/pggate/util/ybc_util.h b/src/yb/yql/pggate/util/ybc_util.h index 5ca94f702476..4fd3348a221f 100644 --- a/src/yb/yql/pggate/util/ybc_util.h +++ b/src/yb/yql/pggate/util/ybc_util.h @@ -191,6 +191,29 @@ extern int yb_walsender_poll_sleep_duration_empty_ms; */ extern int yb_reorderbuffer_max_changes_in_memory; +/* + * Ease transition to YSQL by reducing read restart errors for new apps. + * + * This option doesn't affect SERIALIZABLE isolation level since + * SERIALIZABLE can't face read restart errors anyway. + * + * See the help text for yb_read_after_commit_visibility GUC for more + * information. + * + * XXX: This GUC is meant as a workaround only by relaxing the + * read-after-commit-visibility guarantee. Ideally, + * (a) Users should fix their apps to handle read restart errors, or + * (b) TODO(#22317): YB should use very accurate clocks to avoid read restart + * errors altogether. + */ +typedef enum { + YB_STRICT_READ_AFTER_COMMIT_VISIBILITY = 0, + YB_RELAXED_READ_AFTER_COMMIT_VISIBILITY = 1, +} YBReadAfterCommitVisibilityEnum; + +/* GUC for the enum above. */ +extern int yb_read_after_commit_visibility; + typedef struct YBCStatusStruct* YBCStatus; bool YBCStatusIsNotFound(YBCStatus s); diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index a7ca331e0bc8..fa7ffcd7ccd7 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -1706,6 +1706,14 @@ YBCStatus YBCPgSetTransactionDeferrable(bool deferrable) { return ToYBCStatus(pgapi->SetTransactionDeferrable(deferrable)); } +YBCStatus YBCPgSetInTxnBlock(bool in_txn_blk) { + return ToYBCStatus(pgapi->SetInTxnBlock(in_txn_blk)); +} + +YBCStatus YBCPgSetReadOnlyStmt(bool read_only_stmt) { + return ToYBCStatus(pgapi->SetReadOnlyStmt(read_only_stmt)); +} + YBCStatus YBCPgEnterSeparateDdlTxnMode() { return ToYBCStatus(pgapi->EnterSeparateDdlTxnMode()); } diff --git a/src/yb/yql/pggate/ybc_pggate.h b/src/yb/yql/pggate/ybc_pggate.h index 700547976444..b51c701aa2c9 100644 --- a/src/yb/yql/pggate/ybc_pggate.h +++ b/src/yb/yql/pggate/ybc_pggate.h @@ -661,6 +661,8 @@ YBCStatus YBCPgAbortPlainTransaction(); YBCStatus YBCPgSetTransactionIsolationLevel(int isolation); YBCStatus YBCPgSetTransactionReadOnly(bool read_only); YBCStatus YBCPgSetTransactionDeferrable(bool deferrable); +YBCStatus YBCPgSetInTxnBlock(bool in_txn_blk); +YBCStatus YBCPgSetReadOnlyStmt(bool read_only_stmt); YBCStatus YBCPgSetEnableTracing(bool tracing); YBCStatus YBCPgEnableFollowerReads(bool enable_follower_reads, int32_t staleness_ms); YBCStatus YBCPgEnterSeparateDdlTxnMode(); diff --git a/src/yb/yql/pgwrapper/pg_read_time-test.cc b/src/yb/yql/pgwrapper/pg_read_time-test.cc index a3e377c01176..1fcb40ac9a51 100644 --- a/src/yb/yql/pgwrapper/pg_read_time-test.cc +++ b/src/yb/yql/pgwrapper/pg_read_time-test.cc @@ -445,4 +445,139 @@ TEST_F(PgMiniTestBase, YB_DISABLE_TEST_IN_SANITIZERS(TestYSQLDumpAsOfTime)) { ASSERT_STR_EQ(ground_truth, dump_as_of_time); } +// Mimics the CheckReadTimePickingLocation test for the relaxed +// yb_read_after_commit_visibility case. +// +// There are two primary effects of relaxed yb_read_after_commit_visibility: +// - SELECTs now always pick their read time on local proxy. +// - The read time is clamped whenever it is picked this way (not relevant). +// +// This implies the following changes compared to the vanilla test +// - Case 1: no pipeline, single operation in first batch, no distributed txn. +// Read time is picked on proxy for SELECTs and not DMLs. +// - Case 3: no pipeline, multiple operations to the same tablet in first batch, no distributed txn. +// Read time is picked on proxy. +TEST_F(PgReadTimeTest, CheckRelaxedReadAfterCommitVisibility) { + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute("SET DEFAULT_TRANSACTION_ISOLATION TO \"REPEATABLE READ\"")); + constexpr auto kTable = "test"sv; + constexpr auto kSingleTabletTable = "test_with_single_tablet"sv; + ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (k INT PRIMARY KEY, v INT)", kTable)); + ASSERT_OK(conn.ExecuteFormat( + "CREATE TABLE $0 (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS", kSingleTabletTable)); + + for (const auto& table_name : {kTable, kSingleTabletTable}) { + ASSERT_OK(conn.ExecuteFormat( + "INSERT INTO $0 SELECT generate_series(1, 100), 0", table_name)); + ASSERT_OK(conn.ExecuteFormat( + "CREATE OR REPLACE PROCEDURE insert_rows_$0(first integer, last integer) " + "LANGUAGE plpgsql " + "as $$body$$ " + "BEGIN " + " FOR i in first..last LOOP " + " INSERT INTO $0 VALUES (i, i); " + " END LOOP; " + "END; " + "$$body$$", table_name)); + } + + // Relax read-after-commit-visiblity guarantee. + ASSERT_OK(conn.Execute("SET yb_read_after_commit_visibility TO relaxed")); + + // 1. no pipeline, single operation in first batch, no distributed txn + // + // relaxed yb_read_after_commit_visibility does not affect DML queries. + for (const auto& table_name : {kTable, kSingleTabletTable}) { + CheckReadTimeProvidedToDocdb( + [&conn, table_name]() { + ASSERT_OK(conn.FetchFormat("SELECT * FROM $0 WHERE k=1", table_name)); + }); + + CheckReadTimePickedOnDocdb( + [&conn, table_name]() { + ASSERT_OK(conn.ExecuteFormat("UPDATE $0 SET v=1 WHERE k=1", table_name)); + }); + + CheckReadTimePickedOnDocdb( + [&conn, table_name]() { + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (1000, 1000)", table_name)); + }); + + CheckReadTimePickedOnDocdb( + [&conn, table_name]() { + ASSERT_OK(conn.ExecuteFormat("DELETE FROM $0 WHERE k=1000", table_name)); + }); + } + + // 2. no pipeline, multiple operations to various tablets in first batch, no distributed txn + CheckReadTimeProvidedToDocdb( + [&conn, kTable]() { + ASSERT_OK(conn.FetchFormat("SELECT COUNT(*) FROM $0", kTable)); + }); + + // 3. no pipeline, multiple operations to the same tablet in first batch, no distributed txn + CheckReadTimeProvidedToDocdb( + [&conn, kSingleTabletTable]() { + ASSERT_OK(conn.FetchFormat("SELECT COUNT(*) FROM $0", kSingleTabletTable)); + }); + + // 4. no pipeline, single operation in first batch, starts a distributed transation + // + // expected_num_picked_read_time_on_doc_db_metric is set because in case of a SELECT FOR UPDATE, + // a read time is picked in read_query.cc, but an extra picking is done in write_query.cc just + // after conflict resolution is done (see DoTransactionalConflictsResolved()). + // + // relaxed yb_read_after_commit_visibility does not affect FOR UDPATE queries. + CheckReadTimePickedOnDocdb( + [&conn, kTable]() { + ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION)); + ASSERT_OK(conn.FetchFormat("SELECT * FROM $0 WHERE k=1 FOR UPDATE", kTable)); + ASSERT_OK(conn.CommitTransaction()); + }, 2 /* expected_num_picked_read_time_on_doc_db_metric */); + + // 5. no pipeline, multiple operations to various tablets in first batch, starts a distributed + // transation + ASSERT_OK(SetHighMaxBatchSize(&conn)); + CheckReadTimeProvidedToDocdb( + [&conn, kTable]() { + ASSERT_OK(conn.ExecuteFormat("CALL insert_rows_$0(101, 110)", kTable)); + }); + ASSERT_OK(ResetMaxBatchSize(&conn)); + + // 6. no pipeline, multiple operations to the same tablet in first batch, starts a distributed + // transation + ASSERT_OK(SetHighMaxBatchSize(&conn)); + CheckReadTimeProvidedToDocdb( + [&conn, kSingleTabletTable]() { + ASSERT_OK(conn.ExecuteFormat("CALL insert_rows_$0(101, 110)", kSingleTabletTable)); + }); + ASSERT_OK(ResetMaxBatchSize(&conn)); + + // 7. Pipeline, single operation in first batch, starts a distributed transation + ASSERT_OK(SetMaxBatchSize(&conn, 1)); + CheckReadTimeProvidedToDocdb( + [&conn, kTable]() { + ASSERT_OK(conn.ExecuteFormat("CALL insert_rows_$0(111, 120)", kTable)); + }); + ASSERT_OK(ResetMaxBatchSize(&conn)); + + // 8. Pipeline, multiple operations to various tablets in first batch, starts a distributed + // transation + ASSERT_OK(SetMaxBatchSize(&conn, 10)); + CheckReadTimeProvidedToDocdb( + [&conn, kTable]() { + ASSERT_OK(conn.ExecuteFormat("CALL insert_rows_$0(121, 150)", kTable)); + }); + ASSERT_OK(ResetMaxBatchSize(&conn)); + + // 9. Pipeline, multiple operations to the same tablet in first batch, starts a distributed + // transation + ASSERT_OK(SetMaxBatchSize(&conn, 10)); + CheckReadTimeProvidedToDocdb( + [&conn, kSingleTabletTable]() { + ASSERT_OK(conn.ExecuteFormat("CALL insert_rows_$0(121, 150)", kSingleTabletTable)); + }); + ASSERT_OK(ResetMaxBatchSize(&conn)); +} + } // namespace yb::pgwrapper diff --git a/src/yb/yql/pgwrapper/pg_single_tserver-test.cc b/src/yb/yql/pgwrapper/pg_single_tserver-test.cc index e1fa2be1ff1d..a67d1f54f98b 100644 --- a/src/yb/yql/pgwrapper/pg_single_tserver-test.cc +++ b/src/yb/yql/pgwrapper/pg_single_tserver-test.cc @@ -912,6 +912,26 @@ TEST_F(PgSingleTServerTest, PagingSelectWithDelayedIntentsApply) { } } +// YB picks read time as safe time when the read time is picked on DocDB. +// Safe time is potentially a time point in the past and we should not ignore +// the uncertainty window for that reason. +// This test guards against SELECTs not seeing prior INSERTs from same session +// in relaxed read-after-commit-visibility for the above mentioned reason. +TEST_F(PgSingleTServerTest, NoSafeTimeClampingInRelaxedReadAfterCommit) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_inject_sleep_before_applying_intents_ms) = 100; + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.Execute("SET yb_read_after_commit_visibility ='relaxed'")); + ASSERT_OK(conn.Execute("CREATE TABLE t (v INT) SPLIT INTO 2 TABLETS")); + for (int i = 0; i != 20; ++i) { + LOG(INFO) << "Delete iteration " << i; + ASSERT_OK(conn.Execute("DELETE FROM t")); + LOG(INFO) << "Insert iteration " << i; + ASSERT_OK(conn.Execute("INSERT INTO t VALUES (1)")); + LOG(INFO) << "Reading iteration " << i; + ASSERT_EQ(ASSERT_RESULT(conn.FetchRow<int32_t>("SELECT * FROM t")), 1); + } +} + TEST_F(PgSingleTServerTest, BoundedRangeScanWithLargeTransaction) { auto conn = ASSERT_RESULT(Connect()); ASSERT_OK(conn.Execute("CREATE TABLE t (r1 INT, r2 INT, PRIMARY KEY (r1 ASC, r2 ASC))")); diff --git a/src/yb/yql/pgwrapper/pg_txn-test.cc b/src/yb/yql/pgwrapper/pg_txn-test.cc index ba60d8622ded..a361fbdc7656 100644 --- a/src/yb/yql/pgwrapper/pg_txn-test.cc +++ b/src/yb/yql/pgwrapper/pg_txn-test.cc @@ -15,11 +15,19 @@ #include "yb/yql/pgwrapper/pg_mini_test_base.h" +#include "yb/common/pgsql_error.h" + +#include "yb/server/skewed_clock.h" + +#include "yb/tserver/mini_tablet_server.h" +#include "yb/tserver/tablet_server.h" + #include "yb/util/scope_exit.h" #include "yb/util/sync_point.h" #include "yb/util/test_macros.h" #include "yb/util/test_thread_holder.h" #include "yb/util/tsan_util.h" +#include "yb/util/yb_pg_errcodes.h" using std::string; @@ -28,6 +36,9 @@ using namespace std::literals; DECLARE_bool(TEST_fail_in_apply_if_no_metadata); DECLARE_bool(yb_enable_read_committed_isolation); DECLARE_bool(enable_wait_queues); +DECLARE_string(time_source); +DECLARE_int32(replication_factor); +DECLARE_bool(TEST_running_test); namespace yb { namespace pgwrapper { @@ -409,5 +420,515 @@ TEST_F_EX( PgTxnTest, SelectForUpdateExclusiveRead, PgTxnTestFailOnConflict) { EXPECT_OK(setup_conn.CommitTransaction()); } +// Helper class to test the semantics of yb_read_after_commit_visibility option. +// +// Additional infrastructure was required for the test. +// +// The test requires us to simulate two connections to separate postmaster +// processes on different tservers. Usually, we could get away with +// ExternalMiniCluster if we required two different postmaster processes. +// However, the test also requires that we register skewed clocks and jump +// the clocks as necessary. +// +// Here, we take the easier approach of using a MiniCluster (that supports +// skewed clocks out of the box) and then simulate multiple postmaster +// processes by explicitly spawning PgSupervisor processes for each tserver. +// +// Typical setup: +// 1. MiniCluster with 2 tservers. +// 2. One server hosts a test table with single tablet and RF 1. +// 3. The other server, proxy, is blacklisted to control hybrid propagation. +// This is the node that the external client connects to, for the read +// query and "expects" to the see the recent commit. +// 4. Ensure that the proxy is also not on the master node. +// 5. Pre-populate the catalog cache so there are no surprise communications +// between the servers. +// 6. Register skewed clocks. We only jump the clock on the node that hosts +// the data. +// +// Additional considerations/caveats: +// - Register a thread prefix for each supervisor. +// Otherwise, the callback registration fails with name conflicts. +// - Do NOT use PgPickTabletServer. It does not work. Moreover, it is +// insufficient for our usecase even if it did work as intended. +class PgReadAfterCommitVisibilityTest : public PgMiniTestBase { + public: + void SetUp() override { + server::SkewedClock::Register(); + ANNOTATE_UNPROTECTED_WRITE(FLAGS_time_source) = server::SkewedClock::kName; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_replication_factor) = 1; + PgMiniTestBase::SetUp(); + SpawnSupervisors(); + } + + void DoTearDown() override { + // Exit supervisors cleanly ... + // Risk of false positive segfaults otherwise ... + for (auto&& supervisor : pg_supervisors_) { + if (supervisor) { + supervisor->Stop(); + } + } + PgMiniTestBase::DoTearDown(); + } + + size_t NumTabletServers() override { + // One server for a proxy and the other server to host the data. + return 2; + } + + void BeforePgProcessStart() override { + // Identify the tserver index that hosts the MiniCluster postmaster + // process so that we do NOT spawn a PgSupervisor for that tserver. + auto connParams = MakeConnSettings(); + auto ntservers = static_cast<int>(cluster_->num_tablet_servers()); + for (int idx = 0; idx < ntservers; idx++) { + auto server = cluster_->mini_tablet_server(idx); + if (server->bound_rpc_addr().address().to_string() == connParams.host) { + conn_idx_ = idx; + break; + } + } + } + + Result<PGConn> ConnectToIdx(int idx) const { + // postmaster hosted by PgMiniTestBase itself. + if (idx == conn_idx_) { + return Connect(); + } + + // We own the postmaster process for this tserver idx. + // Use the appropriate postmaster process to setup a pg connection. + auto connParams = PGConnSettings { + .host = pg_host_ports_[idx].host(), + .port = pg_host_ports_[idx].port() + }; + + auto result = VERIFY_RESULT(PGConnBuilder(connParams).Connect()); + RETURN_NOT_OK(SetupConnection(&result)); + return result; + } + + // Called for the first connection. + // Use ConnectToIdx() directly for subsequent connections. + Result<PGConn> ConnectToProxy() { + // Avoid proxy on the node that hosts the master because + // tservers and masters regularly exchange heartbeats with each other. + // This means there is constant hybrid time propagation between + // the master and the tservers. + // We wish to avoid hyrbid time from propagating to the proxy node. + if (static_cast<int>(cluster_->LeaderMasterIdx()) == proxy_idx_) { + return STATUS(IllegalState, "Proxy cannot be on the master node ..."); + } + + // Add proxy to the blacklist to limit hybrid time prop. + auto res = cluster_->AddTServerToBlacklist(proxy_idx_); + if (!res.ok()) { + return res; + } + + // Now, we are ready to connect to the proxy. + return ConnectToIdx(proxy_idx_); + } + + Result<PGConn> ConnectToDataHost() { + return ConnectToIdx(host_idx_); + } + + // Jump the clocks of the nodes hosting the data. + std::vector<server::SkewedClockDeltaChanger> JumpClockDataNodes( + std::chrono::milliseconds skew) { + std::vector<server::SkewedClockDeltaChanger> changers; + auto ntservers = static_cast<int>(cluster_->num_tablet_servers()); + for (int idx = 0; idx < ntservers; idx++) { + if (idx == proxy_idx_) { + continue; + } + changers.push_back(JumpClock(cluster_->mini_tablet_server(idx), skew)); + } + return changers; + } + + protected: + // Setup to create the postmaster process corresponding to the tserver idx. + Status CreateSupervisor(int idx) { + auto pg_ts = cluster_->mini_tablet_server(idx); + auto port = cluster_->AllocateFreePort(); + PgProcessConf pg_process_conf = VERIFY_RESULT(PgProcessConf::CreateValidateAndRunInitDb( + AsString(Endpoint(pg_ts->bound_rpc_addr().address(), port)), + pg_ts->options()->fs_opts.data_paths.front() + "/pg_data", + pg_ts->server()->GetSharedMemoryFd())); + + pg_process_conf.master_addresses = pg_ts->options()->master_addresses_flag; + pg_process_conf.force_disable_log_file = true; + pg_host_ports_[idx] = HostPort(pg_process_conf.listen_addresses, pg_process_conf.pg_port); + + pg_supervisors_[idx] = std::make_unique<PgSupervisor>( + pg_process_conf, nullptr); + + return Status::OK(); + } + + void SpawnSupervisors() { + auto ntservers = static_cast<int>(cluster_->num_tablet_servers()); + + // Allocate space for the host ports and supervisors. + pg_host_ports_.resize(ntservers); + pg_supervisors_.resize(ntservers); + + // Create and start the PgSupervisors. + for (int idx = 0; idx < ntservers; idx++) { + if (idx == conn_idx_) { + // Postmaster already started for this tserver. + continue; + } + // Prefix registered to avoid name clash among callback + // registrations. + TEST_SetThreadPrefixScoped prefix_se(std::to_string(idx)); + ASSERT_OK(CreateSupervisor(idx)); + ASSERT_OK(pg_supervisors_[idx]->Start()); + } + } + + int conn_idx_ = 0; + int proxy_idx_ = 1; + int host_idx_ = 0; + std::vector<HostPort> pg_host_ports_; + std::vector<std::unique_ptr<PgSupervisor>> pg_supervisors_; +}; + +// Ensures that clock skew does not affect read-after-commit guarantees on same +// session with relaxed yb_read_after_commit_visibility option. +// +// Test Setup: +// 1. Cluster with RF 1, skewed clocks, 2 tservers. +// 2. Add a pg process on tserver that does not have one. +// This is done since MiniCluster only creates one postmaster process +// on some random tserver. +// We wish to use the minicluster and not external minicluster since +// there is better test infrastructure to manipulate clock skew. +// This approach is the less painful one at the moment. +// 3. Add a tablet server to the blacklist +// so we can ensure hybrid time propagation doesn't occur between +// the data host node and the proxy +// 4. Connect to the proxy tserver that does not host the data. +// We simulate this by blacklisting the target tserver. +// 5. Create a table with a single tablet and a single row. +// 6. Populate the catalog cache on the pg backend so that +// catalog cache misses does not interfere with hybrid time propagation. +// 7. Jump the clock of the tserver hosting the table to the future. +// 8. Insert a row using the proxy conn and a fast path txn. +// Commit ts for the insert is picked on the server whose clock is ahead. +// The hybrid time is propagated to the proxy conn since the request +// originated from the proxy conn. +// 9. Read the table from the proxy connection. +// The read has a timestamp that is ahead of the physical clock accounting +// for the timestamp of the insert DML due hybrid time propagation. +// Hence, the read does not miss recent updates of the same connection. +// This property also applies when dealing with different pg connections +// but they all go through the same tserver proxy. +TEST_F(PgReadAfterCommitVisibilityTest, SameSessionRecency) { + // Connect to local proxy. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + + // Create table test. + ASSERT_OK(proxyConn.Execute( + "CREATE TABLE test (key INT) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit time + // on the data node. + // This hybrid time is propagated to the local proxy. + ASSERT_OK(proxyConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform a select using the the relaxed yb_read_after_commit_visibility option. + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto rows = ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Observe the recent insert despite the clock skew. + ASSERT_EQ(rows.size(), 1); +} + +// Similar to SameSessionRecency except we have +// two connections instead of one to the same node. +// +// This property is necessary to maintain same session guarantees even in the +// presence of server-side connection pooling. +TEST_F(PgReadAfterCommitVisibilityTest, SamePgNodeRecency) { + // Connect to local proxy. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + // Not calling ConnectToProxy() again since we already added + // the proxy to the blacklist. + auto proxyConn2 = ASSERT_RESULT(ConnectToIdx(proxy_idx_)); + + // Create table test. + ASSERT_OK(proxyConn.Execute( + "CREATE TABLE test (key INT) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(proxyConn2.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit ts + // on the data node. + // This ts is propagated to the local proxy. + ASSERT_OK(proxyConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform a select using the relaxed yb_read_after_commit_visibility option. + ASSERT_OK(proxyConn2.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto rows = ASSERT_RESULT(proxyConn2.FetchRows<int32_t>("SELECT * FROM test")); + + // Observe the recent insert despite the clock skew. + ASSERT_EQ(rows.size(), 1); +} + +// Demonstrate that read from a connection to a different node +// (than the one which had the Pg connection to write data) may miss the +// commit when using the relaxed yb_read_after_commit_visibility option. +// +// Test Setup: +// 1. Cluster with RF 1, skewed clocks, 2 tservers. +// 2. Add a pg process on tserver that does not have one. +// This is done since MiniCluster only creates one postmaster process +// on some random tserver. +// We wish to use the minicluster and not external minicluster since +// there is better test infrastructure to manipulate clock skew. +// This approach is the less painful one at the moment. +// 3. Add a tablet server to the blacklist. +// 4. Connect to both servers for Pg connection, data host and proxy. +// 5. Create a table with a single tablet and a single row. +// 6. Populate the catalog cache on both pg processes so that +// catalog cache misses does not interfere with hybrid time propagation. +// 7. Jump the clock of the tserver hosting the table to the future. +// 8. Insert a row using the host conn and a fast path txn. +// Commit ts for the insert is picked on the server whose clock is ahead. +// The hybrid time is not propagated to the proxy conn since +// there is no reason to touch proxy conn. +// 9. Read the table from the proxy connection. +// The read can miss the recent commit since there is no hybrid time +// propagation from the host conn before the read time is picked. +TEST_F(PgReadAfterCommitVisibilityTest, SessionOnDifferentNodeStaleRead) { + // Connect to both proxy and data nodes. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + auto hostConn = ASSERT_RESULT(ConnectToDataHost()); + + // Create table test. + ASSERT_OK(hostConn.Execute( + "CREATE TABLE test (key INT) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit time + // on the data node. + ASSERT_OK(hostConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform a select using the relaxed yb_read_after_commit_visibility option. + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto rows = ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Miss the recent insert due to clock skew. + ASSERT_EQ(rows.size(), 0); +} + +// Same test as SessionOnDifferentNodeStaleRead +// except we verify that the staleness is bounded +// by waiting out the clock skew. +TEST_F(PgReadAfterCommitVisibilityTest, + SessionOnDifferentNodeBoundedStaleness) { + // Connect to both proxy and data nodes. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + auto hostConn = ASSERT_RESULT(ConnectToDataHost()); + + // Create table test. + ASSERT_OK(hostConn.Execute( + "CREATE TABLE test (key INT) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto skew = 100ms; + auto changers = JumpClockDataNodes(skew); + + // Perform a fast path insert that picks the commit ts + // on the data node. + ASSERT_OK(hostConn.Execute("INSERT INTO test VALUES (1)")); + + // Sleep for a while to verify that the staleness is bounded. + // We sleep for the same duration that we jump the clock, i.e. 100ms. + SleepFor(skew); + + // Perform a select using the relaxed yb_read_after_commit_visibility option. + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto rows = ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + + // We do not miss the prior insert since the clock skew has passed. + ASSERT_EQ(rows.size(), 1); +} + +// Duplicate insert check. +// +// Inserts should not miss other recent inserts to +// avoid missing duplicate key violations. This is guaranteed because +// we don't apply "relaxed" to non-read transactions. +TEST_F(PgReadAfterCommitVisibilityTest, + SessionOnDifferentNodeDuplicateInsertCheck) { + // Connect to both proxy and data nodes. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + auto hostConn = ASSERT_RESULT(ConnectToDataHost()); + + // Create table test. + ASSERT_OK(hostConn.Execute( + "CREATE TABLE test (key INT PRIMARY KEY) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit ts + // on the data node. + ASSERT_OK(hostConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform an insert using the relaxed yb_read_after_commit_visibility option. + // Must still observe the duplicate key! + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto res = proxyConn.Execute("INSERT INTO test VALUES (1)"); + ASSERT_FALSE(res.ok()); + + auto pg_err_ptr = res.ErrorData(PgsqlError::kCategory); + ASSERT_NE(pg_err_ptr, nullptr); + YBPgErrorCode error_code = PgsqlErrorTag::Decode(pg_err_ptr); + ASSERT_EQ(error_code, YBPgErrorCode::YB_PG_UNIQUE_VIOLATION); +} + +// Updates should not miss recent DMLs either. This is guaranteed +// because we don't apply "relaxed" to non-read transactions. +TEST_F(PgReadAfterCommitVisibilityTest, SessionOnDifferentNodeUpdateKeyCheck) { + // Connect to both proxy and data nodes. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + auto hostConn = ASSERT_RESULT(ConnectToDataHost()); + + // Create table test. + ASSERT_OK(hostConn.Execute( + "CREATE TABLE test (key INT PRIMARY KEY) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit ts + // on the data node. + ASSERT_OK(hostConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform an update using the relaxed yb_read_after_commit_visibility option. + // Must still observe the key with value 1 + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto res = proxyConn.Execute("UPDATE test SET key = 2 WHERE key = 1"); + + // Ensure that the update happened. + auto row = ASSERT_RESULT(hostConn.FetchRow<int32_t>("SELECT key FROM test")); + ASSERT_EQ(row, 2); +} + +// Same for DELETEs. They should not miss recent DMLs. +// Otherwise, DELETE FROM table would not delete all the rows. +// This is guaranteed because we don't apply "relaxed" to non-read +// transactions. +TEST_F(PgReadAfterCommitVisibilityTest, SessionOnDifferentNodeDeleteKeyCheck) { + // Connect to both proxy and data nodes. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + auto hostConn = ASSERT_RESULT(ConnectToDataHost()); + + // Create table test. + ASSERT_OK(hostConn.Execute( + "CREATE TABLE test (key INT PRIMARY KEY) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit ts + // on the data node. + ASSERT_OK(hostConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform a delete using the relaxed yb_read_after_commit_visibility option. + // Must still observe the key with value 1 + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto res = proxyConn.Execute("DELETE FROM test WHERE key = 1"); + + // Ensure that the update happened. + auto rows = ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT key FROM test")); + ASSERT_EQ(rows.size(), 0); +} + +// We also consider the case where the query looks like +// a SELECT but there is an insert hiding underneath. +// We are guaranteed read-after-commit-visibility in this case +// since "relaxed" is not applied to non-read transactions. +TEST_F(PgReadAfterCommitVisibilityTest, SessionOnDifferentNodeDmlHidden) { + // Connect to both proxy and data nodes. + auto proxyConn = ASSERT_RESULT(ConnectToProxy()); + auto hostConn = ASSERT_RESULT(ConnectToDataHost()); + + // Create table test. + ASSERT_OK(hostConn.Execute( + "CREATE TABLE test (key INT PRIMARY KEY) SPLIT INTO 1 TABLETS")); + + // Populate catalog cache. + ASSERT_RESULT(proxyConn.FetchRows<int32_t>("SELECT * FROM test")); + ASSERT_RESULT(hostConn.FetchRows<int32_t>("SELECT * FROM test")); + + // Jump the clock on the tserver hosting the table. + auto changers = JumpClockDataNodes(100ms); + + // Perform a fast path insert that picks the commit ts + // on the data node. + ASSERT_OK(hostConn.Execute("INSERT INTO test VALUES (1)")); + + // Perform an insert using the relaxed yb_read_after_commit_visibility option. + // Must still observe the duplicate key! + ASSERT_OK(proxyConn.Execute( + "SET yb_read_after_commit_visibility = relaxed")); + auto res = proxyConn.Execute("WITH new_test AS (" + "INSERT INTO test (key) VALUES (1) RETURNING key" + ") SELECT key FROM new_test"); + ASSERT_FALSE(res.ok()); + + auto pg_err_ptr = res.ErrorData(PgsqlError::kCategory); + ASSERT_NE(pg_err_ptr, nullptr); + YBPgErrorCode error_code = PgsqlErrorTag::Decode(pg_err_ptr); + ASSERT_EQ(error_code, YBPgErrorCode::YB_PG_UNIQUE_VIOLATION); +} + } // namespace pgwrapper } // namespace yb diff --git a/src/yb/yql/pgwrapper/pg_wrapper.cc b/src/yb/yql/pgwrapper/pg_wrapper.cc index 47a621a3d822..d5b8e616198e 100644 --- a/src/yb/yql/pgwrapper/pg_wrapper.cc +++ b/src/yb/yql/pgwrapper/pg_wrapper.cc @@ -266,6 +266,9 @@ DEFINE_RUNTIME_PG_FLAG( DEFINE_RUNTIME_PG_FLAG(int32, yb_toast_catcache_threshold, -1, "Size threshold in bytes for a catcache tuple to be compressed."); +DEFINE_RUNTIME_PG_FLAG(string, yb_read_after_commit_visibility, "strict", + "Determines the behavior of read-after-commit-visibility guarantee."); + static bool ValidateXclusterConsistencyLevel(const char* flagname, const std::string& value) { if (value != "database" && value != "tablet") { fprintf(