Skip to content

Commit

Permalink
roachpb: rename Txn.MaxTimestamp to Txn.GlobalUncertaintyLimit
Browse files Browse the repository at this point in the history
This commit renames Txn.MaxTimestamp to Txn.GlobalUncertaintyLimit, to
parallel the existing localUncertaintyLimit. The commit updates various
commentary to reflect this change.
  • Loading branch information
nvanbenschoten committed Feb 10, 2021
1 parent b039421 commit 06618a2
Show file tree
Hide file tree
Showing 61 changed files with 809 additions and 803 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timest
func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool {
return batchCanBeEvaluatedOnFollower(ba) &&
txnCanPerformFollowerRead(ba.Txn) &&
canUseFollowerRead(clusterID, st, forward(ba.Txn.ReadTimestamp, ba.Txn.MaxTimestamp))
canUseFollowerRead(clusterID, st, forward(ba.Txn.ReadTimestamp, ba.Txn.GlobalUncertaintyLimit))
}

func forward(ts hlc.Timestamp, to hlc.Timestamp) hlc.Timestamp {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ func TestCanSendToFollower(t *testing.T) {
}
roOldWithNewMax := roachpb.BatchRequest{Header: roachpb.Header{
Txn: &roachpb.Transaction{
MaxTimestamp: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
GlobalUncertaintyLimit: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
},
}}
roOldWithNewMax.Add(&roachpb.GetRequest{})
if canSendToFollower(uuid.MakeV4(), st, roNew) {
t.Fatalf("should not be able to send a ro batch with new MaxTimestamp to a follower")
t.Fatalf("should not be able to send a ro batch with new GlobalUncertaintyLimit to a follower")
}
disableEnterprise()
if canSendToFollower(uuid.MakeV4(), st, roOld) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestam
defer tc.mu.Unlock()
tc.mu.txn.ReadTimestamp = ts
tc.mu.txn.WriteTimestamp = ts
tc.mu.txn.MaxTimestamp = ts
tc.mu.txn.GlobalUncertaintyLimit = ts
tc.mu.txn.CommitTimestampFixed = true

// Set the MinTimestamp to the minimum of the existing MinTimestamp and the fixed
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func DefaultDeclareIsolatedKeys(
access = spanset.SpanReadOnly
if header.Txn != nil {
// For transactional reads, acquire read latches all the way up to
// the transaction's MaxTimestamp, because reads may observe writes
// all the way up to this timestamp.
// the transaction's uncertainty limit, because reads may observe
// writes all the way up to this timestamp.
//
// It is critical that reads declare latches up through their
// uncertainty interval so that they are properly synchronized with
Expand All @@ -62,7 +62,7 @@ func DefaultDeclareIsolatedKeys(
// writer (see AckCommittedEntriesBeforeApplication). Latching is
// the only mechanism that ensures that any observers of the write
// wait for the write apply before reading.
timestamp.Forward(header.Txn.MaxTimestamp)
timestamp.Forward(header.Txn.GlobalUncertaintyLimit)
}
}
latchSpans.AddMVCC(access, req.Header().Span(), timestamp)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func getTableID(db *gosql.DB, dbName, tableName string) (tableID descpb.ID, err
return
}

func TestClosedTimestampCantServeBasedOnMaxTimestamp(t *testing.T) {
func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// Limiting how long transactions can run does not work
Expand Down Expand Up @@ -438,8 +438,8 @@ func TestClosedTimestampCantServeBasedOnMaxTimestamp(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1))
})
// Make a non-writing transaction that has a MaxTimestamp after the lease
// transfer but a timestamp before.
// Make a non-writing transaction that has a GlobalUncertaintyLimit after
// the lease transfer but a timestamp before.
roTxn := roachpb.MakeTransaction("test", nil, roachpb.NormalUserPriority, ts,
timeutil.Now().UnixNano()-ts.WallTime)
baRead.Header.Txn = &roTxn
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import (
//
// The input files use the following DSL:
//
// new-txn name=<txn-name> ts=<int>[,<int>] epoch=<int> [maxts=<int>[,<int>]]
// new-txn name=<txn-name> ts=<int>[,<int>] epoch=<int> [uncertainty-limit=<int>[,<int>]]
// new-request name=<req-name> txn=<txn-name>|none ts=<int>[,<int>] [priority] [inconsistent] [wait-policy=<policy>]
// <proto-name> [<field-name>=<field-value>...] (hint: see scanSingleRequest)
// sequence req=<req-name>
Expand Down Expand Up @@ -97,9 +97,9 @@ func TestConcurrencyManagerBasic(t *testing.T) {
var epoch int
d.ScanArgs(t, "epoch", &epoch)

maxTS := ts
if d.HasArg("maxts") {
maxTS = scanTimestampWithName(t, d, "maxts")
uncertaintyLimit := ts
if d.HasArg("uncertainty-limit") {
uncertaintyLimit = scanTimestampWithName(t, d, "uncertainty-limit")
}

txn, ok := c.txnsByName[txnName]
Expand All @@ -117,8 +117,8 @@ func TestConcurrencyManagerBasic(t *testing.T) {
MinTimestamp: ts,
Priority: 1, // not min or max
},
ReadTimestamp: ts,
MaxTimestamp: maxTS,
ReadTimestamp: ts,
GlobalUncertaintyLimit: uncertaintyLimit,
}
c.registerTxn(txnName, txn)
return ""
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,9 @@ func (w *lockTableWaiterImpl) pushHeader(req Request) roachpb.Header {
// need to push again, but expect to eventually succeed in reading,
// either after lease movement subsides or after the reader's read
// timestamp surpasses its global uncertainty limit.
localMaxTS := req.Txn.MaxTimestamp
localMaxTS.Backward(w.clock.Now())
h.Timestamp.Forward(localMaxTS)
localUncertaintyLimit := req.Txn.GlobalUncertaintyLimit
localUncertaintyLimit.Backward(w.clock.Now())
h.Timestamp.Forward(localUncertaintyLimit)
}
return h
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ func TestLockTableWaiterWithTxn(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()

maxTS := hlc.Timestamp{WallTime: 15}
uncertaintyLimit := hlc.Timestamp{WallTime: 15}
makeReq := func() Request {
txn := makeTxnProto("request")
txn.MaxTimestamp = maxTS
txn.GlobalUncertaintyLimit = uncertaintyLimit
return Request{
Txn: &txn,
Timestamp: txn.ReadTimestamp,
Expand All @@ -133,15 +133,15 @@ func TestLockTableWaiterWithTxn(t *testing.T) {

t.Run("state", func(t *testing.T) {
t.Run("waitFor", func(t *testing.T) {
testWaitPush(t, waitFor, makeReq, maxTS)
testWaitPush(t, waitFor, makeReq, uncertaintyLimit)
})

t.Run("waitForDistinguished", func(t *testing.T) {
testWaitPush(t, waitForDistinguished, makeReq, maxTS)
testWaitPush(t, waitForDistinguished, makeReq, uncertaintyLimit)
})

t.Run("waitElsewhere", func(t *testing.T) {
testWaitPush(t, waitElsewhere, makeReq, maxTS)
testWaitPush(t, waitElsewhere, makeReq, uncertaintyLimit)
})

t.Run("waitSelf", func(t *testing.T) {
Expand Down Expand Up @@ -381,10 +381,10 @@ func TestLockTableWaiterWithErrorWaitPolicy(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()

maxTS := hlc.Timestamp{WallTime: 15}
uncertaintyLimit := hlc.Timestamp{WallTime: 15}
makeReq := func() Request {
txn := makeTxnProto("request")
txn.MaxTimestamp = maxTS
txn.GlobalUncertaintyLimit = uncertaintyLimit
return Request{
Txn: &txn,
Timestamp: txn.ReadTimestamp,
Expand All @@ -394,15 +394,15 @@ func TestLockTableWaiterWithErrorWaitPolicy(t *testing.T) {

t.Run("state", func(t *testing.T) {
t.Run("waitFor", func(t *testing.T) {
testErrorWaitPush(t, waitFor, makeReq, maxTS)
testErrorWaitPush(t, waitFor, makeReq, uncertaintyLimit)
})

t.Run("waitForDistinguished", func(t *testing.T) {
testErrorWaitPush(t, waitForDistinguished, makeReq, maxTS)
testErrorWaitPush(t, waitForDistinguished, makeReq, uncertaintyLimit)
})

t.Run("waitElsewhere", func(t *testing.T) {
testErrorWaitPush(t, waitElsewhere, makeReq, maxTS)
testErrorWaitPush(t, waitElsewhere, makeReq, uncertaintyLimit)
})

t.Run("waitSelf", func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
new-txn name=txn1 ts=10,1 epoch=0
----

new-txn name=txn2 ts=12,1 epoch=0 maxts=15,1
new-txn name=txn2 ts=12,1 epoch=0 uncertainty-limit=15,1
----

new-request name=req1 txn=txn2 ts=12,1
Expand Down Expand Up @@ -63,7 +63,7 @@ reset namespace
# Same situation as above, only here, the read-only transaction
# has an uncertainty interval that extends past present time.
# The transaction only pushes to present time, not all the way
# to its max timestamp. See lockTableWaiterImpl.pushHeader.
# to its uncertainty limit. See lockTableWaiterImpl.pushHeader.
# -------------------------------------------------------------

debug-set-clock ts=135
Expand All @@ -72,7 +72,7 @@ debug-set-clock ts=135
new-txn name=txn1 ts=100,1 epoch=0
----

new-txn name=txn2 ts=120,1 epoch=0 maxts=150,1
new-txn name=txn2 ts=120,1 epoch=0 uncertainty-limit=150,1
----

new-request name=req1 txn=txn2 ts=120,1
Expand Down Expand Up @@ -145,7 +145,7 @@ new-txn name=txn1 ts=14,1 epoch=0
new-txn name=txn2 ts=15,1 epoch=0
----

new-txn name=txn3 ts=12,1 epoch=0 maxts=15,1
new-txn name=txn3 ts=12,1 epoch=0 uncertainty-limit=15,1
----

new-request name=req1 txn=txn2 ts=15,1
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/gc/data_distribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ func newDataDistribution(
var txn *roachpb.Transaction
if len(timestamps) == 0 && haveIntent {
txn = &roachpb.Transaction{
Status: roachpb.PENDING,
ReadTimestamp: ts,
MaxTimestamp: ts.Next().Next(),
Status: roachpb.PENDING,
ReadTimestamp: ts,
GlobalUncertaintyLimit: ts.Next().Next(),
}
txn.ID = uuid.MakeV4()
txn.WriteTimestamp = ts
Expand Down
47 changes: 22 additions & 25 deletions pkg/kv/kvserver/observedts/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import "github.com/cockroachdb/cockroach/pkg/roachpb"

// D1 ————————————————————————————————————————————————
//
// Transaction.MaxTimestamp
// Transaction.GlobalUncertaintyLimit
//
// A transaction's "max timestamp" is the upper bound of its uncertainty
// interval. The value is set to the transaction's initial timestamp + the
// cluster's maximum clock skew. Assuming maximum clock skew bounds are
// A transaction's global uncertainty limit is the inclusive upper bound of its
// uncertainty interval. The value is set to the transaction's initial timestamp
// + the cluster's maximum clock skew. Assuming maximum clock skew bounds are
// respected, this maximum timestamp places an upper bound on the commit
// timestamp of any other transaction that committed causally before the new
// transaction.
var D1 = roachpb.Transaction{}.MaxTimestamp
var D1 = roachpb.Transaction{}.GlobalUncertaintyLimit

// D2 ————————————————————————————————————————————————
//
Expand All @@ -39,7 +39,7 @@ var D1 = roachpb.Transaction{}.MaxTimestamp
// This error forces the transaction to increase its read timestamp, either
// through a refresh or a retry, in order to ensure that the transaction
// observes the "uncertain" value. In doing so, the reading transaction is
// guaranteed to observe any value written by an other transaction with a
// guaranteed to observe any value written by any other transaction with a
// happened-before relation to it, which is paramount to ensure single-key
// linearizability and avoid stale reads.
var D2 = roachpb.ReadWithinUncertaintyIntervalError{}
Expand Down Expand Up @@ -71,18 +71,20 @@ var D4 = (&roachpb.Transaction{}).UpdateObservedTimestamp
// Transaction.ObservedTimestamps
//
// The observed timestamps are collected in a list on the transaction proto. The
// purpose of this list is to avoid uncertainty related restarts which normally
// occur when reading a value in the near future as per the max_timestamp field.
// purpose of this list is to avoid uncertainty related restarts which occur
// when reading a value in the near future, per the global_uncertainty_limit
// field. The list helps avoid these restarts by establishing a lower
// local_uncertainty_limit when evaluating a request on a node in the list.
//
// Meaning
//
// Morally speaking, having an entry for a node in this list means that this
// node has been visited before, and that no more uncertainty restarts are
// expected for operations served from it. However, this is not entirely
// accurate. For example, say a txn starts with read_timestamp=1 (and some large
// max_timestamp). It then reads key "a" from node A, registering an entry `A ->
// 5` in the process (`5` happens to be a timestamp taken off that node's clock
// at the start of the read).
// global_uncertainty_limit). It then reads key "a" from node A, registering an
// entry `A -> 5` in the process (`5` happens to be a timestamp taken off that
// node's clock at the start of the read).
//
// Now assume that some other transaction writes and commits a value at key "b"
// and timestamp 4 (again, served by node A), and our transaction attempts to
Expand Down Expand Up @@ -134,15 +136,16 @@ var D4 = (&roachpb.Transaction{}).UpdateObservedTimestamp
// node who owns the lease that the current request is executing under, we can
// run the request with the list's timestamp as the upper bound for its
// uncertainty interval, limiting (and often avoiding) uncertainty restarts. We
// do this by lowering the request's max_timestamp down to the timestamp in the
// observed timestamp entry, which is done in Replica.limitTxnMaxTimestamp.
// do this by establishing a separate local_uncertainty_limit, which is set to
// the minimum of the global_uncertainty_limit and the node's observed timestamp
// entry in ComputeLocalUncertaintyLimit.
//
// However, as stated, the correctness property only holds for values at higher
// timestamps than the observed timestamp written *by leaseholders on this
// node*. This is critical, as the property tells us nothing about values
// written by leaseholders on different nodes, even if a lease for one of those
// Ranges has since moved to a node that we have an observed timestamp entry
// for. To accommodate this limitation, Replica.limitTxnMaxTimestamp first
// for. To accommodate this limitation, ComputeLocalUncertaintyLimit first
// forwards the timestamp in the observed timestamp entry by the start timestamp
// of the lease that the request is executing under before using it to limit the
// request's uncertainty interval.
Expand Down Expand Up @@ -175,17 +178,11 @@ var D5 = roachpb.Transaction{}.ObservedTimestamps
// ComputeLocalUncertaintyLimit
//
// Observed timestamps allow transactions to avoid uncertainty related restarts
// because they allow transactions to bound their "effective max timestamp" when
// reading on a node which they have previously collected an observed timestamp
// from. Similarly, observed timestamps can also assist a transaction even on
// its first visit to a node in cases where it gets stuck waiting on locks for
// long periods of time.
//
// TODO(nvanbenschoten): update this documentation to discuss the local
// uncertainty limit in place of the "effective max timestamp" once we start
// returning this value instead of updating the max timestamp field. Also,
// update the documentation to reflect to rename from "max timestamp" to
// "global uncertainty limit".
// because they allow transactions to bound their uncertainty limit when reading
// on a node which they have previously collected an observed timestamp from.
// Similarly, observed timestamps can also assist a transaction even on its
// first visit to a node in cases where it gets stuck waiting on locks for long
// periods of time.
var D6 = ComputeLocalUncertaintyLimit

// Ignore unused warnings.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/observedts/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func ComputeLocalUncertaintyLimit(
return hlc.Timestamp{}
}

localUncertaintyLimit := txn.MaxTimestamp
localUncertaintyLimit := txn.GlobalUncertaintyLimit
if status.State != kvserverpb.LeaseState_VALID {
return localUncertaintyLimit
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/observedts/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func TestComputeLocalUncertaintyLimit(t *testing.T) {
defer leaktest.AfterTest(t)()

txn := &roachpb.Transaction{
ReadTimestamp: hlc.Timestamp{WallTime: 10},
MaxTimestamp: hlc.Timestamp{WallTime: 20},
ReadTimestamp: hlc.Timestamp{WallTime: 10},
GlobalUncertaintyLimit: hlc.Timestamp{WallTime: 20},
}
txn.UpdateObservedTimestamp(1, hlc.ClockTimestamp{WallTime: 15})

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,7 +1332,7 @@ func (r *Replica) shouldWaitForPendingMergeRLocked(
freezeStart := r.getFreezeStartRLocked()
ts := ba.Timestamp
if ba.Txn != nil {
ts.Forward(ba.Txn.MaxTimestamp)
ts.Forward(ba.Txn.GlobalUncertaintyLimit)
}
if ts.Less(freezeStart) {
// When the max timestamp of a read request is less than the subsumption
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,23 @@ func (r *Replica) canServeFollowerReadRLocked(

ts := ba.Timestamp
if ba.Txn != nil {
ts.Forward(ba.Txn.MaxTimestamp)
ts.Forward(ba.Txn.GlobalUncertaintyLimit)
}

maxClosed, _ := r.maxClosedRLocked(ctx)
canServeFollowerRead := ts.LessEq(maxClosed)
tsDiff := ts.GoTime().Sub(maxClosed.GoTime())
if !canServeFollowerRead {
maxTsStr := "n/a"
uncertaintyLimitStr := "n/a"
if ba.Txn != nil {
maxTsStr = ba.Txn.MaxTimestamp.String()
uncertaintyLimitStr = ba.Txn.GlobalUncertaintyLimit.String()
}

// We can't actually serve the read based on the closed timestamp.
// Signal the clients that we want an update so that future requests can succeed.
r.store.cfg.ClosedTimestamp.Clients.Request(lErr.LeaseHolder.NodeID, r.RangeID)
log.Eventf(ctx, "can't serve follower read; closed timestamp too low by: %s; maxClosed: %s ts: %s maxTS: %s",
tsDiff, maxClosed, ba.Timestamp, maxTsStr)
log.Eventf(ctx, "can't serve follower read; closed timestamp too low by: %s; maxClosed: %s ts: %s uncertaintyLimit: %s",
tsDiff, maxClosed, ba.Timestamp, uncertaintyLimitStr)

if false {
// NB: this can't go behind V(x) because the log message created by the
Expand Down
Loading

0 comments on commit 06618a2

Please sign in to comment.