Skip to content

Commit

Permalink
kv: make Lease.Start and LeaseStatus.Now ClockTimestamps
Browse files Browse the repository at this point in the history
This commit updates the Go type of the `Lease.Start` and `LeaseStatus.Now`
fields to be more strictly typed. Instead of being a `Timestamp` type, they now
are `ClockTimestamp` types, which more effectively ensures that they come from a
real clock. This prevents mistakes and makes their role in the leasing protocol
more clear. For instance, it makes it clear that it is not possible to create a
lease that starts in the future.

This results in a large amount of plumbing, which is all fine.
  • Loading branch information
nvanbenschoten committed Jan 28, 2021
1 parent 71de4f7 commit fa32402
Show file tree
Hide file tree
Showing 63 changed files with 500 additions and 484 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func evalNewLease(
// a newFailedLeaseTrigger() to satisfy stats.

// Ensure either an Epoch is set or Start < Expiration.
if (lease.Type() == roachpb.LeaseExpiration && lease.GetExpiration().LessEq(lease.Start)) ||
if (lease.Type() == roachpb.LeaseExpiration && lease.GetExpiration().LessEq(lease.Start.ToTimestamp())) ||
(lease.Type() == roachpb.LeaseEpoch && lease.Expiration != nil) {
// This amounts to a bug.
return newFailedLeaseTrigger(isTransfer),
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_lease_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ func RequestLease(
// The bug prevented with this is unlikely to occur in practice
// since earlier commands usually apply before this lease will.
if ts := args.MinProposedTS; isExtension && ts != nil {
effectiveStart.Forward(ts.ToTimestamp())
effectiveStart.Forward(*ts)
}

} else if prevLease.Type() == roachpb.LeaseExpiration {
effectiveStart.Backward(prevLease.Expiration.Next())
effectiveStart.BackwardWithTimestamp(prevLease.Expiration.Next())
}

if isExtension {
Expand All @@ -125,7 +125,7 @@ func RequestLease(
newLease.Expiration = &t
newLease.Expiration.Forward(prevLease.GetExpiration())
}
} else if prevLease.Type() == roachpb.LeaseExpiration && effectiveStart.Less(prevLease.GetExpiration()) {
} else if prevLease.Type() == roachpb.LeaseExpiration && effectiveStart.ToTimestamp().Less(prevLease.GetExpiration()) {
rErr.Message = "requested lease overlaps previous lease"
return newFailedLeaseTrigger(false /* isTransfer */), rErr
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestClosedTimestampWorksWhenRequestsAreSentToNonLeaseHolders(t *testing.T)
target := tc.Target(serverIdx)
transferLease(repl.Desc(), target)
testutils.SucceedsSoon(t, func() error {
if !repl.OwnsValidLease(ctx, db1.Clock().Now()) {
if !repl.OwnsValidLease(ctx, db1.Clock().NowAsClockTimestamp()) {
return errors.Errorf("don't yet have the lease")
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func mergeCheckingTimestampCaches(t *testing.T, disjointLeaseholders bool) {
if err != nil {
return err
}
if !rhsRepl.OwnsValidLease(ctx, tc.Servers[1].Clock().Now()) {
if !rhsRepl.OwnsValidLease(ctx, tc.Servers[1].Clock().NowAsClockTimestamp()) {
return errors.New("rhs store does not own valid lease for rhs range")
}
return nil
Expand Down Expand Up @@ -639,7 +639,7 @@ func TestStoreRangeMergeTimestampCacheCausality(t *testing.T) {
if err != nil {
return err
}
if !lhsRepl1.OwnsValidLease(ctx, tc.Servers[1].Clock().Now()) {
if !lhsRepl1.OwnsValidLease(ctx, tc.Servers[1].Clock().NowAsClockTimestamp()) {
return errors.New("s2 does not own valid lease for lhs range")
}
return nil
Expand Down Expand Up @@ -2127,7 +2127,7 @@ func TestStoreRangeMergeSlowUnabandonedFollower_WithSplit(t *testing.T) {
if err != nil {
return err
}
if !rhsRepl.OwnsValidLease(ctx, mtc.clock().Now()) {
if !rhsRepl.OwnsValidLease(ctx, mtc.clock().NowAsClockTimestamp()) {
return errors.New("rhs store does not own valid lease for rhs range")
}
return nil
Expand Down Expand Up @@ -2359,7 +2359,7 @@ func TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected(t *testi
if err != nil {
return err
}
if !rhsRepl.OwnsValidLease(ctx, mtc.clock().Now()) {
if !rhsRepl.OwnsValidLease(ctx, mtc.clock().NowAsClockTimestamp()) {
return errors.New("store2 does not own valid lease for rhs range")
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ func TestLeaseExpirationBasedRangeTransfer(t *testing.T) {
// low water mark, we make sure that the high water mark is equal to or
// greater than the new lease start time, which is less than the
// previous lease's expiration time.
if highWater := l.replica1.GetTSCacheHighWater(); highWater.Less(replica1Lease.Start) {
if highWater := l.replica1.GetTSCacheHighWater(); highWater.Less(replica1Lease.Start.ToTimestamp()) {
t.Fatalf("expected timestamp cache high water %s, but found %s",
replica1Lease.Start, highWater)
}
Expand Down Expand Up @@ -1420,7 +1420,7 @@ func TestLeaseExtensionNotBlockedByRead(t *testing.T) {
Key: key,
},
Lease: roachpb.Lease{
Start: s.Clock().Now(),
Start: s.Clock().NowAsClockTimestamp(),
Expiration: s.Clock().Now().Add(time.Second.Nanoseconds(), 0).Clone(),
Replica: replDesc,
},
Expand Down Expand Up @@ -3135,7 +3135,7 @@ func TestStrictGCEnforcement(t *testing.T) {
ptp := tc.Server(i).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider
_, r := getFirstStoreReplica(t, tc.Server(i), tableKey)
l, _ := r.GetLease()
require.NoError(t, ptp.Refresh(ctx, l.Start.Next()))
require.NoError(t, ptp.Refresh(ctx, l.Start.ToTimestamp().Next()))
r.ReadProtectedTimestamps(ctx)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func forceLeaseTransferOnSubsumedRange(
if newLease.Sequence == oldLease.Sequence {
return errors.New("RHS lease not updated")
}
leaseStart = newLease.Start
leaseStart = newLease.Start.ToTimestamp()
return nil
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func newConsistencyQueue(store *Store, gossip *gossip.Gossip) *consistencyQueue
}

func (q *consistencyQueue) shouldQueue(
ctx context.Context, now hlc.Timestamp, repl *Replica, _ *config.SystemConfig,
ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ *config.SystemConfig,
) (bool, float64) {
return consistencyQueueShouldQueueImpl(ctx, now,
consistencyShouldQueueData{
Expand All @@ -120,7 +120,7 @@ func (q *consistencyQueue) shouldQueue(
// ConsistencyQueueShouldQueueImpl is exposed for testability without having
// to setup a fully fledged replica.
func consistencyQueueShouldQueueImpl(
ctx context.Context, now hlc.Timestamp, data consistencyShouldQueueData,
ctx context.Context, now hlc.ClockTimestamp, data consistencyShouldQueueData,
) (bool, float64) {
if data.interval <= 0 {
return false, 0
Expand All @@ -132,7 +132,7 @@ func consistencyQueueShouldQueueImpl(
if err != nil {
return false, 0
}
if shouldQ, priority = shouldQueueAgain(now, lpTS, data.interval); !shouldQ {
if shouldQ, priority = shouldQueueAgain(now.ToTimestamp(), lpTS, data.interval); !shouldQ {
return false, 0
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ func TestConsistencyQueueRequiresLive(t *testing.T) {
}

if shouldQ, priority := kvserver.ConsistencyQueueShouldQueue(
context.Background(), clock.Now(), desc, getQueueLastProcessed, isNodeLive,
context.Background(), clock.NowAsClockTimestamp(), desc, getQueueLastProcessed, isNodeLive,
false, interval); !shouldQ {
t.Fatalf("expected shouldQ true; got %t, %f", shouldQ, priority)
}

live = false

if shouldQ, priority := kvserver.ConsistencyQueueShouldQueue(
context.Background(), clock.Now(), desc, getQueueLastProcessed, isNodeLive,
context.Background(), clock.NowAsClockTimestamp(), desc, getQueueLastProcessed, isNodeLive,
false, interval); shouldQ {
t.Fatalf("expected shouldQ false; got %t, %f", shouldQ, priority)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (r gcQueueScore) String() string {
// in the event that the cumulative ages of GC'able bytes or extant
// intents exceed thresholds.
func (gcq *gcQueue) shouldQueue(
ctx context.Context, now hlc.Timestamp, repl *Replica, _ *config.SystemConfig,
ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ *config.SystemConfig,
) (bool, float64) {

// Consult the protected timestamp state to determine whether we can GC and
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *Store) ComputeMVCCStats() (enginepb.MVCCStats, error) {
// store's consistency queue.
func ConsistencyQueueShouldQueue(
ctx context.Context,
now hlc.Timestamp,
now hlc.ClockTimestamp,
desc *roachpb.RangeDescriptor,
getQueueLastProcessed func(ctx context.Context) (hlc.Timestamp, error),
isNodeLive func(nodeID roachpb.NodeID) (bool, error),
Expand Down
80 changes: 42 additions & 38 deletions pkg/kv/kvserver/kvserverpb/lease_status.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions pkg/kv/kvserver/kvserverpb/lease_status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,15 @@ enum LeaseState {
}

// LeaseStatus holds the lease state, the timestamp at which the state
// is accurate, the lease and optionally the liveness if the lease is
// is accurate, the lease, and optionally the liveness if the lease is
// epoch-based.
message LeaseStatus {
// Lease which this status describes.
roachpb.Lease lease = 1 [(gogoproto.nullable) = false];
// Timestamp that the lease was evaluated at.
util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false];
// State of the lease at timestamp.
util.hlc.Timestamp now = 2 [(gogoproto.nullable) = false,
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"];
// State of the lease at now.
LeaseState state = 3;
// Liveness if this is an epoch-based lease.
kv.kvserver.liveness.livenesspb.Liveness liveness = 4 [(gogoproto.nullable) = false];
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (mq *mergeQueue) enabled() bool {
}

func (mq *mergeQueue) shouldQueue(
ctx context.Context, now hlc.Timestamp, repl *Replica, sysCfg *config.SystemConfig,
ctx context.Context, now hlc.ClockTimestamp, repl *Replica, sysCfg *config.SystemConfig,
) (shouldQ bool, priority float64) {
if !mq.enabled() {
return false, 0
Expand Down Expand Up @@ -217,8 +217,8 @@ func (mq *mergeQueue) process(
}

// Range was manually split and not expired, so skip merging.
now := mq.store.Clock().Now()
if now.Less(rhsDesc.GetStickyBit()) {
now := mq.store.Clock().NowAsClockTimestamp()
if now.ToTimestamp().Less(rhsDesc.GetStickyBit()) {
log.VEventf(ctx, 2, "skipping merge: ranges were manually split and sticky bit was not expired")
// TODO(jeffreyxiao): Consider returning a purgatory error to avoid
// repeatedly processing ranges that cannot be merged.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/merge_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestMergeQueueShouldQueue(t *testing.T) {
zoneConfig := zonepb.DefaultZoneConfigRef()
zoneConfig.RangeMinBytes = proto.Int64(tc.minBytes)
repl.SetZoneConfig(zoneConfig)
shouldQ, priority := mq.shouldQueue(ctx, hlc.Timestamp{}, repl, config.NewSystemConfig(zoneConfig))
shouldQ, priority := mq.shouldQueue(ctx, hlc.ClockTimestamp{}, repl, config.NewSystemConfig(zoneConfig))
if tc.expShouldQ != shouldQ {
t.Errorf("incorrect shouldQ: expected %v but got %v", tc.expShouldQ, shouldQ)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/observedts/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ func LimitTxnMaxTimestamp(
if !ok {
return txn
}
obsTS := obsClockTS.ToTimestamp()
// If the lease is valid, we use the greater of the observed timestamp and
// the lease start time, up to the max timestamp. This ensures we avoid
// incorrect assumptions about when data was written, in absolute time on a
// different node, which held the lease before this replica acquired it.
obsTS.Forward(status.Lease.Start)
obsClockTS.Forward(status.Lease.Start)
obsTS := obsClockTS.ToTimestamp()
// If the observed timestamp reduces the transaction's uncertainty interval,
// update the transacion proto.
if obsTS.Less(txn.MaxTimestamp) {
Expand Down
Loading

0 comments on commit fa32402

Please sign in to comment.