From 0ac8ab9046d01e8f89d953c439e738bd3e7f75e1 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 29 Jan 2021 00:03:18 -0500 Subject: [PATCH 1/2] kv: route present-time reads to global_read follower replicas First commit from #59505. This commit updates the kv client routing logic to account for the new `LEAD_FOR_GLOBAL_READS` `RangeClosedTimestampPolicy` added in #59505. In enterprise builds, non-stale read-only requests to ranges with this closed timestamp policy can now be routed to follower replicas, just like stale read-only requests to normal ranges currently are. In addition to this main change, there are a few smaller changes in this PR that were hard to split out, so are included in this commit. First, non-transactional requests are no longer served by followers even if the follower replica detects that the request can be. Previously, non-transactional requests would never be routed intentionally to a follower replica, but `Replica.canServeFollowerRead` would allow them through if their timestamp was low enough. This change was made in order to keep the client and server logic in-sync and because this does not seem safe for non-transactional requests that get their timestamp assigned on the server. We're planning to remove non-transactional requests soon anyway (#58459), so this isn't a big deal. It mostly just caused some testing fallout. Second, transactional requests that had previously written intents are now allowed to have their read-only requests served by followers, as long as those followers have a closed timestamp above the transaction's read *and* write timestamp. Previously, we had avoided this because it seemed to cause issues with read-your-writes. However, it appears safe as long as the write timestamp is below the closed timestamp, because we know all of the transaction's intents are at or below its write timestamp. This is very important for multi-region work, where we want a transaction to be able to write to a REGIONAL table and then later perform local reads (maybe foreign key checks) on GLOBAL tables. Third, a max clock offset shift in `canUseFollowerRead` was removed. It wasn't exactly clear what this was doing. It was introduced in the original 70be833 and seemed to allow a follower read in cases where they otherwise shouldn't be expected to succeed. I thought at first that it was accounting for the fact that the kv client's clock might be leading the kv server's clock and so it was being pessimistic about the expected closed timestamp, but it actually seems to be shifting the other way, so I don't know. I might be missing something. Finally, the concept of a `replicaoracle.OracleFactoryFunc` was removed and the existing `replicaoracle.OracleFactory` takes its place. We no longer need the double-factory approach because the transaction object is now passed directly to `Oracle.ChoosePreferredReplica`. This was a necessary change, because the process of determining whether a follower read can be served now requires transaction information and range information (i.e. the closed timestamp policy), so we need to make it in the Oracle itself instead of in the OracleFactory. This all seems like a simplification anyway. This is still waiting on changes to the closed timestamp system to be able to write end-to-end tests using this new functionality. --- pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel | 6 +- .../kvccl/kvfollowerreadsccl/followerreads.go | 156 ++++--- .../kvfollowerreadsccl/followerreads_test.go | 439 ++++++++++++++---- pkg/kv/kvclient/kvcoord/dist_sender.go | 8 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 10 +- pkg/kv/kvclient/kvcoord/range_iter.go | 13 + pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 7 + pkg/kv/kvserver/closed_timestamp_test.go | 110 +++-- pkg/kv/kvserver/replica_follower_read.go | 17 +- pkg/kv/kvserver/replica_learner_test.go | 5 +- pkg/kv/mock_transactional_sender.go | 5 + pkg/kv/sender.go | 5 + pkg/kv/txn.go | 8 + pkg/roachpb/data.go | 41 +- pkg/sql/physicalplan/replicaoracle/oracle.go | 84 ++-- .../physicalplan/replicaoracle/oracle_test.go | 26 +- pkg/sql/physicalplan/span_resolver.go | 17 +- 17 files changed, 691 insertions(+), 266 deletions(-) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel index de96531e8339..f1b4193fc025 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel @@ -19,7 +19,6 @@ go_library( "//pkg/sql/physicalplan/replicaoracle", "//pkg/sql/sem/builtins", "//pkg/util/hlc", - "//pkg/util/timeutil", "//pkg/util/uuid", ], ) @@ -39,6 +38,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/concurrency/lock", "//pkg/roachpb", "//pkg/rpc", "//pkg/security", @@ -47,21 +47,21 @@ go_test( "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/physicalplan/replicaoracle", - "//pkg/storage/enginepb", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", "//pkg/util/stop", "//pkg/util/syncutil", - "//pkg/util/timeutil", "//pkg/util/tracing", "//pkg/util/uuid", + "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go index 726a337c6487..63544da1696b 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go @@ -11,6 +11,7 @@ package kvfollowerreadsccl import ( + "context" "fmt" "time" @@ -27,7 +28,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -49,11 +49,11 @@ var followerReadMultiple = settings.RegisterFloatSetting( }, ) -// getFollowerReadOffset returns the offset duration which should be used to as -// the offset from now to request a follower read. The same value less the clock -// uncertainty, then is used to determine at the kv layer if a query can use a -// follower read. -func getFollowerReadDuration(st *cluster.Settings) time.Duration { +// getFollowerReadLag returns the (negative) offset duration from hlc.Now() +// which should be used to request a follower read. The same value is used to +// determine at the kv layer if a query can use a follower read for ranges with +// the default LAG_BY_CLUSTER_SETTING closed timestamp policy. +func getFollowerReadLag(st *cluster.Settings) time.Duration { targetMultiple := followerReadMultiple.Get(&st.SV) targetDuration := closedts.TargetDuration.Get(&st.SV) closeFraction := closedts.CloseFraction.Get(&st.SV) @@ -61,91 +61,143 @@ func getFollowerReadDuration(st *cluster.Settings) time.Duration { (1+closeFraction*targetMultiple)) } +// getGlobalReadsLead returns the (positive) offset duration from hlc.Now() +// which clients can expect followers of a range with the LEAD_FOR_GLOBAL_READS +// closed timestamp policy to have closed off. This offset is equal to the +// maximum clock offset, allowing present-time (i.e. those not pushed into the +// future) transactions to serve reads from followers. +func getGlobalReadsLead(clock *hlc.Clock) time.Duration { + return clock.MaxOffset() +} + func checkEnterpriseEnabled(clusterID uuid.UUID, st *cluster.Settings) error { org := sql.ClusterOrganization.Get(&st.SV) return utilccl.CheckEnterpriseEnabled(st, clusterID, org, "follower reads") } +func checkFollowerReadsEnabled(clusterID uuid.UUID, st *cluster.Settings) bool { + if !kvserver.FollowerReadsEnabled.Get(&st.SV) { + return false + } + return checkEnterpriseEnabled(clusterID, st) == nil +} + func evalFollowerReadOffset(clusterID uuid.UUID, st *cluster.Settings) (time.Duration, error) { if err := checkEnterpriseEnabled(clusterID, st); err != nil { return 0, err } - return getFollowerReadDuration(st), nil + // NOTE: we assume that at least some of the ranges being queried use a + // LAG_BY_CLUSTER_SETTING closed timestamp policy. Otherwise, there would + // be no reason to use AS OF SYSTEM TIME follower_read_timestamp(). + return getFollowerReadLag(st), nil } // batchCanBeEvaluatedOnFollower determines if a batch consists exclusively of // requests that can be evaluated on a follower replica. func batchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool { - return !ba.IsLocking() && ba.IsAllTransactional() -} - -// txnCanPerformFollowerRead determines if the provided transaction can perform -// follower reads. -func txnCanPerformFollowerRead(txn *roachpb.Transaction) bool { - // If the request is transactional and that transaction has acquired any - // locks then that request should not perform follower reads. Doing so could - // allow the request to miss its own writes or observe state that conflicts - // with its locks. - return txn != nil && !txn.IsLocking() + return ba.Txn != nil && !ba.IsLocking() && ba.IsAllTransactional() } -// canUseFollowerRead determines if a query can be sent to a follower. -func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timestamp) bool { - if !kvserver.FollowerReadsEnabled.Get(&st.SV) { - return false +// closedTimestampLikelySufficient determines if a request with a given required +// frontier timestamp is likely to be below a follower's closed timestamp and +// serviceable as a follower read were the request to be sent to a follower +// replica. +func closedTimestampLikelySufficient( + st *cluster.Settings, + clock *hlc.Clock, + ctPolicy roachpb.RangeClosedTimestampPolicy, + requiredFrontierTS hlc.Timestamp, +) bool { + var offset time.Duration + switch ctPolicy { + case roachpb.LAG_BY_CLUSTER_SETTING: + offset = getFollowerReadLag(st) + case roachpb.LEAD_FOR_GLOBAL_READS: + offset = getGlobalReadsLead(clock) + default: + panic("unknown RangeClosedTimestampPolicy") } - threshold := (-1 * getFollowerReadDuration(st)) - 1*base.DefaultMaxClockOffset - if timeutil.Since(ts.GoTime()) < threshold { - return false - } - return checkEnterpriseEnabled(clusterID, st) == nil + expectedClosedTS := clock.Now().Add(offset.Nanoseconds(), 0) + return requiredFrontierTS.LessEq(expectedClosedTS) } // canSendToFollower implements the logic for checking whether a batch request // may be sent to a follower. -// TODO(aayush): We should try to bind clusterID to the function here, rather -// than having callers plumb it in every time. -func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool { - return batchCanBeEvaluatedOnFollower(ba) && - txnCanPerformFollowerRead(ba.Txn) && - canUseFollowerRead(clusterID, st, forward(ba.Txn.ReadTimestamp, ba.Txn.GlobalUncertaintyLimit)) +func canSendToFollower( + clusterID uuid.UUID, + st *cluster.Settings, + clock *hlc.Clock, + ctPolicy roachpb.RangeClosedTimestampPolicy, + ba roachpb.BatchRequest, +) bool { + return checkFollowerReadsEnabled(clusterID, st) && + batchCanBeEvaluatedOnFollower(ba) && + closedTimestampLikelySufficient(st, clock, ctPolicy, ba.Txn.RequiredFrontier()) } -func forward(ts hlc.Timestamp, to hlc.Timestamp) hlc.Timestamp { - ts.Forward(to) - return ts -} - -type oracleFactory struct { +type followerReadOracle struct { clusterID *base.ClusterIDContainer st *cluster.Settings + clock *hlc.Clock - binPacking replicaoracle.OracleFactory - closest replicaoracle.OracleFactory + closest replicaoracle.Oracle + binPacking replicaoracle.Oracle } -func newOracleFactory(cfg replicaoracle.Config) replicaoracle.OracleFactory { - return &oracleFactory{ +func newFollowerReadOracle(cfg replicaoracle.Config) replicaoracle.Oracle { + return &followerReadOracle{ clusterID: &cfg.RPCContext.ClusterID, st: cfg.Settings, - binPacking: replicaoracle.NewOracleFactory(replicaoracle.BinPackingChoice, cfg), - closest: replicaoracle.NewOracleFactory(replicaoracle.ClosestChoice, cfg), + clock: cfg.RPCContext.Clock, + closest: replicaoracle.NewOracle(replicaoracle.ClosestChoice, cfg), + binPacking: replicaoracle.NewOracle(replicaoracle.BinPackingChoice, cfg), } } -func (f oracleFactory) Oracle(txn *kv.Txn) replicaoracle.Oracle { - if txn != nil && canUseFollowerRead(f.clusterID.Get(), f.st, txn.ReadTimestamp()) { - return f.closest.Oracle(txn) +func (o *followerReadOracle) ChoosePreferredReplica( + ctx context.Context, + txn *kv.Txn, + desc *roachpb.RangeDescriptor, + leaseholder *roachpb.ReplicaDescriptor, + ctPolicy roachpb.RangeClosedTimestampPolicy, + queryState replicaoracle.QueryState, +) (roachpb.ReplicaDescriptor, error) { + var oracle replicaoracle.Oracle + if o.useClosestOracle(txn, ctPolicy) { + oracle = o.closest + } else { + oracle = o.binPacking } - return f.binPacking.Oracle(txn) + return oracle.ChoosePreferredReplica(ctx, txn, desc, leaseholder, ctPolicy, queryState) +} + +func (o *followerReadOracle) useClosestOracle( + txn *kv.Txn, ctPolicy roachpb.RangeClosedTimestampPolicy, +) bool { + // NOTE: this logic is almost identical to canSendToFollower, except that it + // operates on a *kv.Txn instead of a roachpb.BatchRequest. As a result, the + // function does not check batchCanBeEvaluatedOnFollower. This is because we + // assume that if a request is going to be executed in a distributed DistSQL + // flow (which is why it is consulting a replicaoracle.Oracle), then all of + // the individual BatchRequests that it send will be eligible to be served + // on follower replicas as follower reads. + // + // If we were to get this assumption wrong, the flow might be planned on a + // node with a follower replica, but individual BatchRequests would still be + // sent to the correct replicas once canSendToFollower is checked for each + // BatchRequests in the DistSender. This would hurt performance, but would + // not violate correctness. + return checkFollowerReadsEnabled(o.clusterID.Get(), o.st) && + txn != nil && + closedTimestampLikelySufficient(o.st, o.clock, ctPolicy, txn.RequiredFrontier()) } -// followerReadAwareChoice is a leaseholder choosing policy that detects +// followerReadOraclePolicy is a leaseholder choosing policy that detects // whether a query can be used with a follower read. -var followerReadAwareChoice = replicaoracle.RegisterPolicy(newOracleFactory) +var followerReadOraclePolicy = replicaoracle.RegisterPolicy(newFollowerReadOracle) func init() { - sql.ReplicaOraclePolicy = followerReadAwareChoice + sql.ReplicaOraclePolicy = followerReadOraclePolicy builtins.EvalFollowerReadOffset = evalFollowerReadOffset kvcoord.CanSendToFollower = canSendToFollower } diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 4d1665ea1393..98bec175ea26 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -10,7 +10,6 @@ package kvfollowerreadsccl import ( "context" - "reflect" "testing" "time" @@ -20,24 +19,25 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -69,72 +69,171 @@ func TestEvalFollowerReadOffset(t *testing.T) { func TestCanSendToFollower(t *testing.T) { defer leaktest.AfterTest(t)() - disableEnterprise := utilccl.TestingEnableEnterprise() - defer disableEnterprise() - st := cluster.MakeTestingClusterSettings() - kvserver.FollowerReadsEnabled.Override(&st.SV, true) + clock := hlc.NewClock(hlc.UnixNano, base.DefaultMaxClockOffset) + stale := clock.Now().Add(2*expectedFollowerReadOffset.Nanoseconds(), 0) + current := clock.Now() + future := clock.Now().Add(2*clock.MaxOffset().Nanoseconds(), 0) - old := hlc.Timestamp{ - WallTime: timeutil.Now().Add(2 * expectedFollowerReadOffset).UnixNano(), - } - oldHeader := roachpb.Header{Txn: &roachpb.Transaction{ - ReadTimestamp: old, - }} - rw := roachpb.BatchRequest{Header: oldHeader} - rw.Add(&roachpb.PutRequest{}) - if canSendToFollower(uuid.MakeV4(), st, rw) { - t.Fatalf("should not be able to send a rw request to a follower") - } - roNonTxn := roachpb.BatchRequest{Header: oldHeader} - roNonTxn.Add(&roachpb.QueryTxnRequest{}) - if canSendToFollower(uuid.MakeV4(), st, roNonTxn) { - t.Fatalf("should not be able to send a non-transactional ro request to a follower") + txn := func(ts hlc.Timestamp) *roachpb.Transaction { + txn := roachpb.MakeTransaction("txn", nil, 0, ts, 0) + return &txn } - roNoTxn := roachpb.BatchRequest{} - roNoTxn.Add(&roachpb.GetRequest{}) - if canSendToFollower(uuid.MakeV4(), st, roNoTxn) { - t.Fatalf("should not be able to send a batch with no txn to a follower") + withWriteTimestamp := func(txn *roachpb.Transaction, ts hlc.Timestamp) *roachpb.Transaction { + txn.WriteTimestamp = ts + return txn } - roOld := roachpb.BatchRequest{Header: oldHeader} - roOld.Add(&roachpb.GetRequest{}) - if !canSendToFollower(uuid.MakeV4(), st, roOld) { - t.Fatalf("should be able to send an old ro batch to a follower") + withUncertaintyLimit := func(txn *roachpb.Transaction, ts hlc.Timestamp) *roachpb.Transaction { + txn.GlobalUncertaintyLimit = ts + return txn } - roRWTxnOld := roachpb.BatchRequest{Header: roachpb.Header{ - Txn: &roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{Key: []byte("key")}, - ReadTimestamp: old, - }, - }} - roRWTxnOld.Add(&roachpb.GetRequest{}) - if canSendToFollower(uuid.MakeV4(), st, roRWTxnOld) { - t.Fatalf("should not be able to send a ro request from a rw txn to a follower") + batch := func(txn *roachpb.Transaction, req roachpb.Request) roachpb.BatchRequest { + var ba roachpb.BatchRequest + ba.Txn = txn + ba.Add(req) + return ba } - kvserver.FollowerReadsEnabled.Override(&st.SV, false) - if canSendToFollower(uuid.MakeV4(), st, roOld) { - t.Fatalf("should not be able to send an old ro batch to a follower when follower reads are disabled") - } - kvserver.FollowerReadsEnabled.Override(&st.SV, true) - roNew := roachpb.BatchRequest{Header: roachpb.Header{ - Txn: &roachpb.Transaction{ - ReadTimestamp: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}, - }, - }} - if canSendToFollower(uuid.MakeV4(), st, roNew) { - t.Fatalf("should not be able to send a new ro batch to a follower") - } - roOldWithNewMax := roachpb.BatchRequest{Header: roachpb.Header{ - Txn: &roachpb.Transaction{ - GlobalUncertaintyLimit: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}, - }, - }} - roOldWithNewMax.Add(&roachpb.GetRequest{}) - if canSendToFollower(uuid.MakeV4(), st, roNew) { - t.Fatalf("should not be able to send a ro batch with new GlobalUncertaintyLimit to a follower") + + testCases := []struct { + name string + ba roachpb.BatchRequest + ctPolicy roachpb.RangeClosedTimestampPolicy + disabledEnterprise bool + disabledFollowerReads bool + exp bool + }{ + { + name: "non-txn batch", + ba: batch(nil, &roachpb.GetRequest{}), + exp: false, + }, + { + name: "stale read", + ba: batch(txn(stale), &roachpb.GetRequest{}), + exp: true, + }, + { + name: "stale locking read", + ba: batch(txn(stale), &roachpb.ScanRequest{KeyLocking: lock.Exclusive}), + exp: false, + }, + { + name: "stale write", + ba: batch(txn(stale), &roachpb.PutRequest{}), + exp: false, + }, + { + name: "stale non-txn request", + ba: batch(txn(stale), &roachpb.QueryTxnRequest{}), + exp: false, + }, + { + name: "stale read with current-time writes", + ba: batch(withWriteTimestamp(txn(stale), current), &roachpb.GetRequest{}), + exp: false, + }, + { + name: "stale read with current-time uncertainty limit", + ba: batch(withUncertaintyLimit(txn(stale), current), &roachpb.GetRequest{}), + exp: false, + }, + { + name: "current-time read", + ba: batch(txn(current), &roachpb.GetRequest{}), + exp: false, + }, + { + name: "future read", + ba: batch(txn(future), &roachpb.GetRequest{}), + exp: false, + }, + { + name: "non-txn batch, global reads policy", + ba: batch(nil, &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "stale read, global reads policy", + ba: batch(txn(stale), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: true, + }, + { + name: "stale locking read, global reads policy", + ba: batch(txn(stale), &roachpb.ScanRequest{KeyLocking: lock.Exclusive}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "stale write, global reads policy", + ba: batch(txn(stale), &roachpb.PutRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "stale non-txn request, global reads policy", + ba: batch(txn(stale), &roachpb.QueryTxnRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "stale read with current-time writes, global reads policy", + ba: batch(withWriteTimestamp(txn(stale), current), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: true, + }, + { + name: "stale read with current-time uncertainty limit, global reads policy", + ba: batch(withUncertaintyLimit(txn(stale), current), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: true, + }, + { + name: "current-time read, global reads policy", + ba: batch(txn(current), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: true, + }, + { + name: "current-time read with future writes, global reads policy", + ba: batch(withWriteTimestamp(txn(current), future), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "current-time read with future uncertainty limit, global reads policy", + ba: batch(withUncertaintyLimit(txn(current), future), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "future read, global reads policy", + ba: batch(txn(future), &roachpb.GetRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "non-enterprise", + disabledEnterprise: true, + exp: false, + }, + { + name: "follower reads disabled", + disabledFollowerReads: true, + exp: false, + }, } - disableEnterprise() - if canSendToFollower(uuid.MakeV4(), st, roOld) { - t.Fatalf("should not be able to send an old ro batch to a follower without enterprise enabled") + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + if !c.disabledEnterprise { + defer utilccl.TestingEnableEnterprise()() + } + st := cluster.MakeTestingClusterSettings() + kvserver.FollowerReadsEnabled.Override(&st.SV, !c.disabledFollowerReads) + + can := canSendToFollower(uuid.MakeV4(), st, clock, c.ctPolicy, c.ba) + require.Equal(t, c.exp, can) + }) } } @@ -149,43 +248,193 @@ func TestFollowerReadMultipleValidation(t *testing.T) { followerReadMultiple.Override(&st.SV, .1) } -// TestOracle tests the OracleFactory exposed by this package. -// This test ends up being rather indirect but works by checking if the type -// of the oracle returned from the factory differs between requests we'd -// expect to support follower reads and that which we'd expect not to. -func TestOracleFactory(t *testing.T) { +// mockNodeStore implements the kvcoord.NodeDescStore interface. +type mockNodeStore []roachpb.NodeDescriptor + +func (s mockNodeStore) GetNodeDescriptor(id roachpb.NodeID) (*roachpb.NodeDescriptor, error) { + for i := range s { + desc := &s[i] + if desc.NodeID == id { + return desc, nil + } + } + return nil, errors.Errorf("unable to look up descriptor for n%d", id) +} + +// TestOracle tests the Oracle exposed by this package. +func TestOracle(t *testing.T) { defer leaktest.AfterTest(t)() - disableEnterprise := utilccl.TestingEnableEnterprise() - defer disableEnterprise() - st := cluster.MakeTestingClusterSettings() - kvserver.FollowerReadsEnabled.Override(&st.SV, true) - clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + + ctx := context.Background() stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) + defer stopper.Stop(ctx) + clock := hlc.NewClock(hlc.UnixNano, base.DefaultMaxClockOffset) + stale := clock.Now().Add(2*expectedFollowerReadOffset.Nanoseconds(), 0) + current := clock.Now() + future := clock.Now().Add(2*clock.MaxOffset().Nanoseconds(), 0) + + c := kv.NewDB(log.AmbientContext{Tracer: tracing.NewTracer()}, kv.MockTxnSenderFactory{}, clock, stopper) + staleTxn := kv.NewTxn(ctx, c, 0) + staleTxn.SetFixedTimestamp(ctx, stale) + currentTxn := kv.NewTxn(ctx, c, 0) + currentTxn.SetFixedTimestamp(ctx, current) + futureTxn := kv.NewTxn(ctx, c, 0) + futureTxn.SetFixedTimestamp(ctx, future) + + nodes := mockNodeStore{ + {NodeID: 1, Address: util.MakeUnresolvedAddr("tcp", "1")}, + {NodeID: 2, Address: util.MakeUnresolvedAddr("tcp", "2")}, + {NodeID: 3, Address: util.MakeUnresolvedAddr("tcp", "3")}, + } + replicas := []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 2}, + {NodeID: 3, StoreID: 3}, + } + desc := &roachpb.RangeDescriptor{ + InternalReplicas: replicas, + } + closestFollower := replicas[1] + leaseholder := replicas[2] + rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - c := kv.NewDB(log.AmbientContext{ - Tracer: tracing.NewTracer(), - }, kv.MockTxnSenderFactory{}, hlc.NewClock(hlc.UnixNano, time.Nanosecond), stopper) - txn := kv.NewTxn(context.Background(), c, 0) - of := replicaoracle.NewOracleFactory(followerReadAwareChoice, replicaoracle.Config{ - Settings: st, - RPCContext: rpcContext, - }) - noFollowerReadOracle := of.Oracle(txn) - old := hlc.Timestamp{ - WallTime: timeutil.Now().Add(2 * expectedFollowerReadOffset).UnixNano(), + setLatency := func(addr string, latency time.Duration) { + // All test cases have to have at least 11 measurement values in order for + // the exponentially-weighted moving average to work properly. See the + // comment on the WARMUP_SAMPLES const in the ewma package for details. + for i := 0; i < 11; i++ { + rpcContext.RemoteClocks.UpdateOffset(ctx, addr, rpc.RemoteOffset{}, latency) + } } - txn.SetFixedTimestamp(context.Background(), old) - followerReadOracle := of.Oracle(txn) - if reflect.TypeOf(followerReadOracle) == reflect.TypeOf(noFollowerReadOracle) { - t.Fatalf("expected types of %T and %T to differ", followerReadOracle, - noFollowerReadOracle) + setLatency("1", 100*time.Millisecond) + setLatency("2", 2*time.Millisecond) + setLatency("3", 80*time.Millisecond) + + testCases := []struct { + name string + txn *kv.Txn + lh *roachpb.ReplicaDescriptor + ctPolicy roachpb.RangeClosedTimestampPolicy + disabledEnterprise bool + disabledFollowerReads bool + exp roachpb.ReplicaDescriptor + }{ + { + name: "non-txn, known leaseholder", + txn: nil, + lh: &leaseholder, + exp: leaseholder, + }, + { + name: "non-txn, unknown leaseholder", + txn: nil, + exp: closestFollower, + }, + { + name: "stale txn, known leaseholder", + txn: staleTxn, + lh: &leaseholder, + exp: closestFollower, + }, + { + name: "stale txn, unknown leaseholder", + txn: staleTxn, + exp: closestFollower, + }, + { + name: "current txn, known leaseholder", + txn: currentTxn, + lh: &leaseholder, + exp: leaseholder, + }, + { + name: "current txn, unknown leaseholder", + txn: currentTxn, + exp: closestFollower, + }, + { + name: "future txn, known leaseholder", + txn: futureTxn, + lh: &leaseholder, + exp: leaseholder, + }, + { + name: "future txn, unknown leaseholder", + txn: futureTxn, + exp: closestFollower, + }, + { + name: "stale txn, known leaseholder, global reads policy", + txn: staleTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + lh: &leaseholder, + exp: closestFollower, + }, + { + name: "stale txn, unknown leaseholder, global reads policy", + txn: staleTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: closestFollower, + }, + { + name: "current txn, known leaseholder, global reads policy", + txn: currentTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + lh: &leaseholder, + exp: closestFollower, + }, + { + name: "current txn, unknown leaseholder, global reads policy", + txn: currentTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: closestFollower, + }, + { + name: "future txn, known leaseholder, global reads policy", + txn: futureTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + lh: &leaseholder, + exp: leaseholder, + }, + { + name: "future txn, unknown leaseholder, global reads policy", + txn: futureTxn, + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: closestFollower, + }, + { + name: "stale txn, non-enterprise", + txn: staleTxn, + lh: &leaseholder, + disabledEnterprise: true, + exp: leaseholder, + }, + { + name: "stale txn, follower reads disabled", + txn: staleTxn, + lh: &leaseholder, + disabledFollowerReads: true, + exp: leaseholder, + }, } - disableEnterprise() - disabledFollowerReadOracle := of.Oracle(txn) - if reflect.TypeOf(disabledFollowerReadOracle) != reflect.TypeOf(noFollowerReadOracle) { - t.Fatalf("expected types of %T and %T not to differ", disabledFollowerReadOracle, - noFollowerReadOracle) + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + if !c.disabledEnterprise { + defer utilccl.TestingEnableEnterprise()() + } + st := cluster.MakeTestingClusterSettings() + kvserver.FollowerReadsEnabled.Override(&st.SV, !c.disabledFollowerReads) + + o := replicaoracle.NewOracle(followerReadOraclePolicy, replicaoracle.Config{ + NodeDescs: nodes, + Settings: st, + RPCContext: rpcContext, + }) + + res, err := o.ChoosePreferredReplica(ctx, c.txn, desc, c.lh, c.ctPolicy, replicaoracle.QueryState{}) + require.NoError(t, err) + require.Equal(t, c.exp, res) + }) } } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 5cfb8643af81..369af9ee5b58 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -128,7 +128,11 @@ var ( // followerreadsccl code to inject logic to check if follower reads are enabled. // By default, without CCL code, this function returns false. var CanSendToFollower = func( - clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest, + _ uuid.UUID, + _ *cluster.Settings, + _ *hlc.Clock, + _ roachpb.RangeClosedTimestampPolicy, + _ roachpb.BatchRequest, ) bool { return false } @@ -1771,7 +1775,7 @@ func (ds *DistSender) sendToReplicas( desc := routing.Desc() ba.RangeID = desc.RangeID leaseholder := routing.Leaseholder() - canFollowerRead := (ds.clusterID != nil) && CanSendToFollower(ds.clusterID.Get(), ds.st, ba) + canFollowerRead := CanSendToFollower(ds.clusterID.Get(), ds.st, ds.clock, routing.ClosedTimestampPolicy(), ba) var replicas ReplicaSlice var err error if canFollowerRead { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index fe63f3b2c5ec..052d163b7ef4 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -3432,8 +3432,6 @@ func TestErrorIndexAlignment(t *testing.T) { // TestCanSendToFollower tests that the DistSender abides by the result it // get from CanSendToFollower. -// TODO(nvanbenschoten): update this test once ClosedTimestampPolicy begins -// dictating the decision of CanSendToFollower. func TestCanSendToFollower(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -3444,7 +3442,13 @@ func TestCanSendToFollower(t *testing.T) { old := CanSendToFollower defer func() { CanSendToFollower = old }() canSend := true - CanSendToFollower = func(_ uuid.UUID, _ *cluster.Settings, ba roachpb.BatchRequest) bool { + CanSendToFollower = func( + _ uuid.UUID, + _ *cluster.Settings, + _ *hlc.Clock, + _ roachpb.RangeClosedTimestampPolicy, + ba roachpb.BatchRequest, + ) bool { return !ba.IsLocking() && canSend } diff --git a/pkg/kv/kvclient/kvcoord/range_iter.go b/pkg/kv/kvclient/kvcoord/range_iter.go index b1269974b895..c34a28c2b354 100644 --- a/pkg/kv/kvclient/kvcoord/range_iter.go +++ b/pkg/kv/kvclient/kvcoord/range_iter.go @@ -86,6 +86,19 @@ func (ri *RangeIterator) Leaseholder() *roachpb.ReplicaDescriptor { return ri.token.Leaseholder() } +// ClosedTimestampPolicy returns the closed timestamp policy of the range at +// which the iterator is currently positioned. The iterator must be valid. +// +// The policy information comes from a cache, and so it can be stale. Returns +// the default policy of LAG_BY_CLUSTER_SETTING if no policy information is +// known. +func (ri *RangeIterator) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy { + if !ri.Valid() { + panic(ri.Error()) + } + return ri.token.ClosedTimestampPolicy() +} + // Token returns the eviction token corresponding to the range // descriptor for the current iteration. The iterator must be valid. func (ri *RangeIterator) Token() rangecache.EvictionToken { diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 5a028e0617d2..7da7be9041d1 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -931,6 +931,13 @@ func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestam tc.mu.txn.MinTimestamp.Backward(ts) } +// RequiredFrontier is part of the client.TxnSender interface. +func (tc *TxnCoordSender) RequiredFrontier() hlc.Timestamp { + tc.mu.Lock() + defer tc.mu.Unlock() + return tc.mu.txn.RequiredFrontier() +} + // ManualRestart is part of the client.TxnSender interface. func (tc *TxnCoordSender) ManualRestart( ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index a312b88fe99f..4bc9133ad201 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -90,7 +90,7 @@ func TestClosedTimestampCanServe(t *testing.T) { repls := replsForRange(ctx, t, tc, desc, numNodes) ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -161,7 +161,7 @@ func TestClosedTimestampCanServeOnVoterIncoming(t *testing.T) { reqTS := tc.Server(0).Clock().Now() // Sleep for a sufficiently long time so that reqTS can be closed. time.Sleep(3 * testingTargetDuration) - baRead := makeReadBatchRequestForDesc(desc, reqTS) + baRead := makeTxnReadBatchForDesc(desc, reqTS) repls := replsForRange(ctx, t, tc, desc, numNodes) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) @@ -190,7 +190,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { t.Fatal(err) } ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -222,7 +222,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { // Attempt to send read requests to a replica in a tight loop until deadline // is reached. If an error is seen on any replica then it is returned to the // errgroup. - baRead = makeReadBatchRequestForDesc(desc, ts) + baRead = makeTxnReadBatchForDesc(desc, ts) ensureCanReadFromReplicaUntilDeadline := func(r *kvserver.Replica) { g.Go(func() error { for timeutil.Now().Before(deadline) { @@ -285,14 +285,7 @@ func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) { respCh := make(chan struct{}, len(keys)) for i, key := range keys { go func(repl *kvserver.Replica, key roachpb.Key) { - var baRead roachpb.BatchRequest - r := &roachpb.ScanRequest{} - r.Key = key - r.EndKey = key.Next() - baRead.Add(r) - baRead.Timestamp = ts - baRead.RangeID = desc.RangeID - + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { // Expect 0 rows, because the intents will be aborted. _, err := expectRows(0)(repl.Send(ctx, baRead)) @@ -356,7 +349,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { } // Start by ensuring that the values can be read from all replicas at ts. ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(2)) }) @@ -382,10 +375,10 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { // Now immediately query both the ranges and there's 1 value per range. // We need to tolerate RangeNotFound as the split range may not have been // created yet. - baReadL := makeReadBatchRequestForDesc(lr, ts) + baReadL := makeTxnReadBatchForDesc(lr, ts) require.Nil(t, verifyCanReadFromAllRepls(ctx, t, baReadL, lRepls, respFuncs(retryOnRangeNotFound, expectRows(1)))) - baReadR := makeReadBatchRequestForDesc(rr, ts) + baReadR := makeTxnReadBatchForDesc(rr, ts) require.Nil(t, verifyCanReadFromAllRepls(ctx, t, baReadR, rRepls, respFuncs(retryOnRangeNotFound, expectRows(1)))) @@ -397,7 +390,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { // The hazard here is that a follower is not yet aware of the merge and will // return an error. We'll accept that because a client wouldn't see that error // from distsender. - baReadMerged := makeReadBatchRequestForDesc(merged, ts) + baReadMerged := makeTxnReadBatchForDesc(merged, ts) require.Nil(t, verifyCanReadFromAllRepls(ctx, t, baReadMerged, mergedRepls, respFuncs(retryOnRangeKeyMismatch, expectRows(2)))) } @@ -429,12 +422,12 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) { } // Grab a timestamp before initiating a lease transfer, transfer the lease, - // then ensure that reads at that timestamp can occur from all the replicas. + // then ensure that reads at that timestamp can occur from all the replicas. ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} lh := getCurrentLeaseholder(t, tc, desc) target := pickRandomTarget(tc, lh, desc) require.Nil(t, tc.TransferRangeLease(desc, target)) - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -448,7 +441,7 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) { verifyNotLeaseHolderErrors(t, baRead, repls, 2) } -func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) { +func TestClosedTimestampCanServeForWritingTransaction(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -469,17 +462,30 @@ func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) { // Verify that we can serve a follower read at a timestamp. Wait if necessary. ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) - // Create a read-only batch and attach a read-write transaction. - rwTxn := roachpb.MakeTransaction("test", []byte("key"), roachpb.NormalUserPriority, ts, 0) - baRead.Txn = &rwTxn + // Update the batch to simulate a transaction that has written an intent. + baRead.Txn.Key = []byte("key") + baRead.Txn.Sequence++ - // Send the request to all three replicas. One should succeed and - // the other two should return NotLeaseHolderErrors. + // The write timestamp of the transaction is still closed, so a read-only + // request by the transaction should be servicable by followers. This is + // because the writing transaction can still read its writes on the + // followers. + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) + }) + + // Update the batch to simulate a transaction that has written past its read + // timestamp and past the expected closed timestamp. This should prevent its + // reads from being served by followers. + baRead.Txn.WriteTimestamp = tc.Server(0).Clock().Now().Add(time.Second.Nanoseconds(), 0) + + // Send the request to all three replicas. One should succeed and the other + // two should return NotLeaseHolderErrors. verifyNotLeaseHolderErrors(t, baRead, repls, 2) } @@ -502,9 +508,10 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { t.Fatal(err) } - // Verify that we can serve a follower read at a timestamp. Wait if necessary + // Verify that we can serve a follower read at a timestamp. Wait if + // necessary. ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) + baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -519,11 +526,43 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { baQueryTxn.Add(r) baQueryTxn.Timestamp = ts - // Send the request to all three replicas. One should succeed and - // the other two should return NotLeaseHolderErrors. + // Send the request to all three replicas. One should succeed and the other + // two should return NotLeaseHolderErrors. verifyNotLeaseHolderErrors(t, baQueryTxn, repls, 2) } +func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Limiting how long transactions can run does not work + // well with race unless we're extremely lenient, which + // drives up the test duration. + skip.UnderRace(t) + + ctx := context.Background() + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, testingCloseFraction, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + defer tc.Stopper().Stop(ctx) + repls := replsForRange(ctx, t, tc, desc, numNodes) + + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { + t.Fatal(err) + } + + // Verify that we can serve a follower read at a timestamp with a + // transactional batch. Wait if necessary. + ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + baRead := makeTxnReadBatchForDesc(desc, ts) + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) + }) + + // Remove the transaction and send the request to all three replicas. One + // should succeed and the other two should return NotLeaseHolderErrors. + baRead.Txn = nil + verifyNotLeaseHolderErrors(t, baRead, repls, 2) +} + // TestClosedTimestampInactiveAfterSubsumption verifies that, during a merge, // replicas of the subsumed range (RHS) cannot serve follower reads for // timestamps after the subsumption time. @@ -694,7 +733,7 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { case <-time.After(30 * time.Second): t.Fatal("failed to receive next closed timestamp update") } - baReadAfterLeaseTransfer := makeReadBatchRequestForDesc(rightDesc, inactiveClosedTSBoundary.Next()) + baReadAfterLeaseTransfer := makeTxnReadBatchForDesc(rightDesc, inactiveClosedTSBoundary.Next()) rightReplFollowers := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder) log.Infof(ctx, "sending read requests from followers after the inactiveClosedTSBoundary") verifyNotLeaseHolderErrors(t, baReadAfterLeaseTransfer, rightReplFollowers, 2 /* expectedNLEs */) @@ -755,7 +794,7 @@ func forceLeaseTransferOnSubsumedRange( // that the current rightLeaseholder has stopped heartbeating. This will prompt // it to acquire the range lease for itself. g.Go(func() error { - leaseAcquisitionRequest := makeReadBatchRequestForDesc(rightDesc, freezeStartTimestamp) + leaseAcquisitionRequest := makeTxnReadBatchForDesc(rightDesc, freezeStartTimestamp) log.Infof(ctx, "sending a read request from a follower of RHS (store %d) in order to trigger lease acquisition", newRightLeaseholder.StoreID()) @@ -1355,15 +1394,16 @@ func verifyCanReadFromAllRepls( return g.Wait() } -func makeReadBatchRequestForDesc( - desc roachpb.RangeDescriptor, ts hlc.Timestamp, -) roachpb.BatchRequest { +func makeTxnReadBatchForDesc(desc roachpb.RangeDescriptor, ts hlc.Timestamp) roachpb.BatchRequest { + txn := roachpb.MakeTransaction("txn", nil, 0, ts, 0) + var baRead roachpb.BatchRequest baRead.Header.RangeID = desc.RangeID + baRead.Header.Timestamp = ts + baRead.Header.Txn = &txn r := &roachpb.ScanRequest{} r.Key = desc.StartKey.AsRawKey() r.EndKey = desc.EndKey.AsRawKey() baRead.Add(r) - baRead.Timestamp = ts return baRead } diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 72c4ebdee4fc..ddfea460cc21 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -36,16 +36,15 @@ var FollowerReadsEnabled = settings.RegisterBoolSetting( // canServeFollowerReadRLocked tests, when a range lease could not be acquired, // whether the batch can be served as a follower read despite the error. Only // non-locking, read-only requests can be served as follower reads. The batch -// must be composed exclusively only this kind of request to be accepted as a -// follower read. +// must be transactional and composed exclusively of this kind of request to be +// accepted as a follower read. func (r *Replica) canServeFollowerReadRLocked( ctx context.Context, ba *roachpb.BatchRequest, err error, ) bool { var lErr *roachpb.NotLeaseHolderError eligible := errors.As(err, &lErr) && lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch && - (!ba.IsLocking() && ba.IsAllTransactional()) && // followerreadsccl.batchCanBeEvaluatedOnFollower - (ba.Txn == nil || !ba.Txn.IsLocking()) && // followerreadsccl.txnCanPerformFollowerRead + (ba.Txn != nil && !ba.IsLocking() && ba.IsAllTransactional()) && // followerreadsccl.batchCanBeEvaluatedOnFollower FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) if !eligible { @@ -65,14 +64,10 @@ func (r *Replica) canServeFollowerReadRLocked( return false } - ts := ba.Timestamp - if ba.Txn != nil { - ts.Forward(ba.Txn.GlobalUncertaintyLimit) - } - + requiredFrontier := ba.Txn.RequiredFrontier() maxClosed, _ := r.maxClosedRLocked(ctx) - canServeFollowerRead := ts.LessEq(maxClosed) - tsDiff := ts.GoTime().Sub(maxClosed.GoTime()) + canServeFollowerRead := requiredFrontier.LessEq(maxClosed) + tsDiff := requiredFrontier.GoTime().Sub(maxClosed.GoTime()) if !canServeFollowerRead { uncertaintyLimitStr := "n/a" if ba.Txn != nil { diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 958c31c6170e..960968934d58 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -801,9 +801,12 @@ func TestLearnerAndVoterOutgoingFollowerRead(t *testing.T) { }) check := func() { + ts := tc.Server(0).Clock().Now() + txn := roachpb.MakeTransaction("txn", nil, 0, ts, 0) req := roachpb.BatchRequest{Header: roachpb.Header{ RangeID: scratchDesc.RangeID, - Timestamp: tc.Server(0).Clock().Now(), + Timestamp: ts, + Txn: &txn, }} req.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{ Key: scratchDesc.StartKey.AsRawKey(), EndKey: scratchDesc.EndKey.AsRawKey(), diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 1a27c205172a..642bc02d4683 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -124,6 +124,11 @@ func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Ti m.txn.MinTimestamp.Backward(ts) } +// RequiredFrontier is part of the TxnSender interface. +func (m *MockTransactionalSender) RequiredFrontier() hlc.Timestamp { + return m.txn.RequiredFrontier() +} + // ManualRestart is part of the TxnSender interface. func (m *MockTransactionalSender) ManualRestart( ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 856fbc42e3b9..ec90604c83ee 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -230,6 +230,11 @@ type TxnSender interface { // field on TxnMeta. ProvisionalCommitTimestamp() hlc.Timestamp + // RequiredFrontier returns the largest timestamp at which the + // transaction may read values when performing a read-only + // operation. + RequiredFrontier() hlc.Timestamp + // IsSerializablePushAndRefreshNotPossible returns true if the // transaction is serializable, its timestamp has been pushed and // there's no chance that refreshing the read spans will succeed diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index e09f39ea6117..f77e085e8500 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -309,6 +309,14 @@ func (txn *Txn) ProvisionalCommitTimestamp() hlc.Timestamp { return txn.mu.sender.ProvisionalCommitTimestamp() } +// RequiredFrontier returns the largest timestamp at which the transaction may +// read values when performing a read-only operation. +func (txn *Txn) RequiredFrontier() hlc.Timestamp { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.RequiredFrontier() +} + // SetSystemConfigTrigger sets the system db trigger to true on this transaction. // This will impact the EndTxnRequest. Note that this method takes a boolean // argument indicating whether this transaction is intended for the system diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index 35c91d29ae0d..8266cf219d68 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -909,14 +909,47 @@ func MakeTransaction( // occurred, i.e. the maximum of ReadTimestamp and LastHeartbeat. func (t Transaction) LastActive() hlc.Timestamp { ts := t.LastHeartbeat - // Only forward by the ReadTimestamp if it is a clock timestamp. - // TODO(nvanbenschoten): replace this with look at the Synthetic bool. - if readTS, ok := t.ReadTimestamp.TryToClockTimestamp(); ok { - ts.Forward(readTS.ToTimestamp()) + if !t.ReadTimestamp.Synthetic { + ts.Forward(t.ReadTimestamp) } return ts } +// RequiredFrontier returns the largest timestamp at which the transaction may +// read values when performing a read-only operation. This is the maximum of the +// transaction's read timestamp, its write timestamp, and its global uncertainty +// limit. +func (t *Transaction) RequiredFrontier() hlc.Timestamp { + // A transaction can observe committed values up to its read timestamp. + ts := t.ReadTimestamp + // Forward to the transaction's write timestamp. The transaction will read + // committed values at its read timestamp but may perform reads up to its + // intent timestamps if the transaction is reading its own intent writes, + // which we know to all be at timestamps <= its current write timestamp. See + // the ownIntent cases in pebbleMVCCScanner.getAndAdvance for more. + // + // There is a case where an intent written by a transaction is above the + // transaction's write timestamp — after a successful intent push. Such + // cases do allow a transaction to read values above its required frontier. + // However, this is fine for the purposes of follower reads because an + // intent that was pushed to a higher timestamp must have at some point been + // stored with its original write timestamp. The means that a follower with + // a closed timestamp above the original write timestamp but below the new + // pushed timestamp will either store the pre-pushed intent or the + // post-pushed intent, depending on whether replication of the push has + // completed yet. Either way, the intent will exist in some form on the + // follower, so either way, the transaction will be able to read its own + // write. + ts.Forward(t.WriteTimestamp) + // Forward to the transaction's global uncertainty limit, because the + // transaction may observe committed writes from other transactions up to + // this time and consider them to be "uncertain". When a transaction begins, + // this will be above its read timestamp, but the read timestamp can surpass + // the global uncertainty limit due to refreshes or retries. + ts.Forward(t.GlobalUncertaintyLimit) + return ts +} + // Clone creates a copy of the given transaction. The copy is shallow because // none of the references held by a transaction allow interior mutability. func (t Transaction) Clone() *Transaction { diff --git a/pkg/sql/physicalplan/replicaoracle/oracle.go b/pkg/sql/physicalplan/replicaoracle/oracle.go index 8a01bde6ed5b..38eddb98d07e 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle.go @@ -31,11 +31,11 @@ type Policy byte var ( // RandomChoice chooses lease replicas randomly. - RandomChoice = RegisterPolicy(newRandomOracleFactory) + RandomChoice = RegisterPolicy(newRandomOracle) // BinPackingChoice bin-packs the choices. - BinPackingChoice = RegisterPolicy(newBinPackingOracleFactory) + BinPackingChoice = RegisterPolicy(newBinPackingOracle) // ClosestChoice chooses the node closest to the current node. - ClosestChoice = RegisterPolicy(newClosestOracleFactory) + ClosestChoice = RegisterPolicy(newClosestOracle) ) // Config is used to construct an OracleFactory. @@ -63,43 +63,45 @@ type Oracle interface { // don't care about the leaseholder (e.g. when we're planning for follower // reads). // + // When the range's closed timestamp policy is known, it is passed in. + // Otherwise, the default closed timestamp policy is provided. + // // A RangeUnavailableError can be returned if there's no information in gossip // about any of the nodes that might be tried. ChoosePreferredReplica( - ctx context.Context, rng *roachpb.RangeDescriptor, leaseholder *roachpb.ReplicaDescriptor, qState QueryState, + ctx context.Context, + txn *kv.Txn, + rng *roachpb.RangeDescriptor, + leaseholder *roachpb.ReplicaDescriptor, + ctPolicy roachpb.RangeClosedTimestampPolicy, + qState QueryState, ) (roachpb.ReplicaDescriptor, error) } -// OracleFactory creates an oracle for a Txn. -type OracleFactory interface { - Oracle(*kv.Txn) Oracle -} - -// OracleFactoryFunc creates an OracleFactory from a Config. -type OracleFactoryFunc func(Config) OracleFactory +// OracleFactory creates an oracle from a Config. +type OracleFactory func(Config) Oracle -// NewOracleFactory creates an oracle with the given policy. -func NewOracleFactory(policy Policy, cfg Config) OracleFactory { - ff, ok := oracleFactoryFuncs[policy] +// NewOracle creates an oracle with the given policy. +func NewOracle(policy Policy, cfg Config) Oracle { + ff, ok := oracleFactories[policy] if !ok { panic(errors.Errorf("unknown Policy %v", policy)) } return ff(cfg) } -// RegisterPolicy creates a new policy given a function which constructs an -// OracleFactory. RegisterPolicy is intended to be called only during init and -// is not safe for concurrent use. -func RegisterPolicy(f OracleFactoryFunc) Policy { - if len(oracleFactoryFuncs) == 255 { +// RegisterPolicy creates a new policy given an OracleFactory. RegisterPolicy is +// intended to be called only during init and is not safe for concurrent use. +func RegisterPolicy(f OracleFactory) Policy { + if len(oracleFactories) == 255 { panic("Can only register 255 Policy instances") } - r := Policy(len(oracleFactoryFuncs)) - oracleFactoryFuncs[r] = f + r := Policy(len(oracleFactories)) + oracleFactories[r] = f return r } -var oracleFactoryFuncs = map[Policy]OracleFactoryFunc{} +var oracleFactories = map[Policy]OracleFactory{} // QueryState encapsulates the history of assignments of ranges to nodes // done by an oracle on behalf of one particular query. @@ -122,18 +124,17 @@ type randomOracle struct { nodeDescs kvcoord.NodeDescStore } -var _ OracleFactory = &randomOracle{} - -func newRandomOracleFactory(cfg Config) OracleFactory { +func newRandomOracle(cfg Config) Oracle { return &randomOracle{nodeDescs: cfg.NodeDescs} } -func (o *randomOracle) Oracle(_ *kv.Txn) Oracle { - return o -} - func (o *randomOracle) ChoosePreferredReplica( - ctx context.Context, desc *roachpb.RangeDescriptor, _ *roachpb.ReplicaDescriptor, _ QueryState, + ctx context.Context, + _ *kv.Txn, + desc *roachpb.RangeDescriptor, + _ *roachpb.ReplicaDescriptor, + _ roachpb.RangeClosedTimestampPolicy, + _ QueryState, ) (roachpb.ReplicaDescriptor, error) { replicas, err := replicaSliceOrErr(ctx, o.nodeDescs, desc, kvcoord.OnlyPotentialLeaseholders) if err != nil { @@ -150,7 +151,7 @@ type closestOracle struct { latencyFunc kvcoord.LatencyFunc } -func newClosestOracleFactory(cfg Config) OracleFactory { +func newClosestOracle(cfg Config) Oracle { return &closestOracle{ nodeDescs: cfg.NodeDescs, nodeDesc: cfg.NodeDesc, @@ -158,12 +159,13 @@ func newClosestOracleFactory(cfg Config) OracleFactory { } } -func (o *closestOracle) Oracle(_ *kv.Txn) Oracle { - return o -} - func (o *closestOracle) ChoosePreferredReplica( - ctx context.Context, desc *roachpb.RangeDescriptor, _ *roachpb.ReplicaDescriptor, _ QueryState, + ctx context.Context, + _ *kv.Txn, + desc *roachpb.RangeDescriptor, + _ *roachpb.ReplicaDescriptor, + _ roachpb.RangeClosedTimestampPolicy, + _ QueryState, ) (roachpb.ReplicaDescriptor, error) { // We know we're serving a follower read request, so consider all non-outgoing // replicas. @@ -199,7 +201,7 @@ type binPackingOracle struct { latencyFunc kvcoord.LatencyFunc } -func newBinPackingOracleFactory(cfg Config) OracleFactory { +func newBinPackingOracle(cfg Config) Oracle { return &binPackingOracle{ maxPreferredRangesPerLeaseHolder: maxPreferredRangesPerLeaseHolder, nodeDescs: cfg.NodeDescs, @@ -208,16 +210,12 @@ func newBinPackingOracleFactory(cfg Config) OracleFactory { } } -var _ OracleFactory = &binPackingOracle{} - -func (o *binPackingOracle) Oracle(_ *kv.Txn) Oracle { - return o -} - func (o *binPackingOracle) ChoosePreferredReplica( ctx context.Context, + _ *kv.Txn, desc *roachpb.RangeDescriptor, leaseholder *roachpb.ReplicaDescriptor, + _ roachpb.RangeClosedTimestampPolicy, queryState QueryState, ) (roachpb.ReplicaDescriptor, error) { // If we know the leaseholder, we choose it. diff --git a/pkg/sql/physicalplan/replicaoracle/oracle_test.go b/pkg/sql/physicalplan/replicaoracle/oracle_test.go index c714094b46ed..7298f587362b 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle_test.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle_test.go @@ -30,7 +30,7 @@ import ( // TestRandomOracle defeats TestUnused for RandomChoice. func TestRandomOracle(t *testing.T) { - _ = NewOracleFactory(RandomChoice, Config{}) + _ = NewOracle(RandomChoice, Config{}) } func TestClosest(t *testing.T) { @@ -40,24 +40,30 @@ func TestClosest(t *testing.T) { defer stopper.Stop(ctx) g, _ := makeGossip(t, stopper) nd, _ := g.GetNodeDescriptor(1) - of := NewOracleFactory(ClosestChoice, Config{ + o := NewOracle(ClosestChoice, Config{ NodeDescs: g, NodeDesc: *nd, }) - of.(*closestOracle).latencyFunc = func(s string) (time.Duration, bool) { + o.(*closestOracle).latencyFunc = func(s string) (time.Duration, bool) { if strings.HasSuffix(s, "2") { return time.Nanosecond, true } return time.Millisecond, true } - o := of.Oracle(nil) - info, err := o.ChoosePreferredReplica(ctx, &roachpb.RangeDescriptor{ - InternalReplicas: []roachpb.ReplicaDescriptor{ - {NodeID: 1, StoreID: 1}, - {NodeID: 2, StoreID: 2}, - {NodeID: 3, StoreID: 3}, + info, err := o.ChoosePreferredReplica( + ctx, + nil, /* txn */ + &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 2}, + {NodeID: 3, StoreID: 3}, + }, }, - }, nil /* leaseHolder */, QueryState{}) + nil, /* leaseHolder */ + roachpb.LAG_BY_CLUSTER_SETTING, + QueryState{}, + ) if err != nil { t.Fatalf("Failed to choose closest replica: %v", err) } diff --git a/pkg/sql/physicalplan/span_resolver.go b/pkg/sql/physicalplan/span_resolver.go index 9fac64c525bf..ce47cda2f6e1 100644 --- a/pkg/sql/physicalplan/span_resolver.go +++ b/pkg/sql/physicalplan/span_resolver.go @@ -115,10 +115,10 @@ type SpanResolverIterator interface { // spanResolver implements SpanResolver. type spanResolver struct { - st *cluster.Settings - distSender *kvcoord.DistSender - nodeDesc roachpb.NodeDescriptor - oracleFactory replicaoracle.OracleFactory + st *cluster.Settings + distSender *kvcoord.DistSender + nodeDesc roachpb.NodeDescriptor + oracle replicaoracle.Oracle } var _ SpanResolver = &spanResolver{} @@ -135,7 +135,7 @@ func NewSpanResolver( return &spanResolver{ st: st, nodeDesc: nodeDesc, - oracleFactory: replicaoracle.NewOracleFactory(policy, replicaoracle.Config{ + oracle: replicaoracle.NewOracle(policy, replicaoracle.Config{ NodeDescs: nodeDescs, NodeDesc: nodeDesc, Settings: st, @@ -147,6 +147,8 @@ func NewSpanResolver( // spanResolverIterator implements the SpanResolverIterator interface. type spanResolverIterator struct { + // txn is the transaction using the iterator. + txn *kv.Txn // it is a wrapped RangeIterator. it *kvcoord.RangeIterator // oracle is used to choose a lease holders for ranges when one isn't present @@ -167,8 +169,9 @@ var _ SpanResolverIterator = &spanResolverIterator{} // NewSpanResolverIterator creates a new SpanResolverIterator. func (sr *spanResolver) NewSpanResolverIterator(txn *kv.Txn) SpanResolverIterator { return &spanResolverIterator{ + txn: txn, it: kvcoord.NewRangeIterator(sr.distSender), - oracle: sr.oracleFactory.Oracle(txn), + oracle: sr.oracle, queryState: replicaoracle.MakeQueryState(), } } @@ -260,7 +263,7 @@ func (it *spanResolverIterator) ReplicaInfo( } repl, err := it.oracle.ChoosePreferredReplica( - ctx, it.it.Desc(), it.it.Leaseholder(), it.queryState) + ctx, it.txn, it.it.Desc(), it.it.Leaseholder(), it.it.ClosedTimestampPolicy(), it.queryState) if err != nil { return roachpb.ReplicaDescriptor{}, err } From 2c972f8c2a977920fe4d13920e30a089646aa7ad Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 17 Feb 2021 18:53:38 -0500 Subject: [PATCH 2/2] kv: use hlc.Clock in ClosedTimestamp tests Stop using timeutil.Now to create HLC timestamps. --- pkg/kv/kvserver/closed_timestamp_test.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 4bc9133ad201..0bbc8befec02 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -89,7 +89,7 @@ func TestClosedTimestampCanServe(t *testing.T) { } repls := replsForRange(ctx, t, tc, desc, numNodes) - ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + ts := tc.Server(0).Clock().Now() baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) @@ -189,7 +189,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { t.Fatal(err) } - ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + ts := tc.Server(0).Clock().Now() baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) @@ -348,7 +348,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { t.Fatal(err) } // Start by ensuring that the values can be read from all replicas at ts. - ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + ts := tc.Server(0).Clock().Now() baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(2)) @@ -423,7 +423,7 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) { // Grab a timestamp before initiating a lease transfer, transfer the lease, // then ensure that reads at that timestamp can occur from all the replicas. - ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + ts := tc.Server(0).Clock().Now() lh := getCurrentLeaseholder(t, tc, desc) target := pickRandomTarget(tc, lh, desc) require.Nil(t, tc.TransferRangeLease(desc, target)) @@ -431,11 +431,11 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) { testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) - // Make a non-writing transaction that has a GlobalUncertaintyLimit after - // the lease transfer but a timestamp before. - roTxn := roachpb.MakeTransaction("test", nil, roachpb.NormalUserPriority, ts, - timeutil.Now().UnixNano()-ts.WallTime) - baRead.Header.Txn = &roTxn + + // Update the batch to simulate a transaction that has a global uncertainty + // limit after the lease transfer. Keep its read timestamp from before the + // lease transfer. + baRead.Txn.GlobalUncertaintyLimit = tc.Server(0).Clock().Now().Add(time.Second.Nanoseconds(), 0) // Send the request to all three replicas. One should succeed and // the other two should return NotLeaseHolderErrors. verifyNotLeaseHolderErrors(t, baRead, repls, 2) @@ -461,7 +461,7 @@ func TestClosedTimestampCanServeForWritingTransaction(t *testing.T) { } // Verify that we can serve a follower read at a timestamp. Wait if necessary. - ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + ts := tc.Server(0).Clock().Now() baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) @@ -510,7 +510,7 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { // Verify that we can serve a follower read at a timestamp. Wait if // necessary. - ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + ts := tc.Server(0).Clock().Now() baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) @@ -551,7 +551,7 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) { // Verify that we can serve a follower read at a timestamp with a // transactional batch. Wait if necessary. - ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + ts := tc.Server(0).Clock().Now() baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1))