From ea078ddba529609439253ee8fa33915081d8c605 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 28 Feb 2019 21:09:15 -0500 Subject: [PATCH] storage: assert that timestamp cache updates are safe This new assertion would fire if bug like the one hypothesized in the following comment exist: https://github.com/cockroachdb/cockroach/pull/22315#discussion_r252834889 Release note: None --- pkg/storage/replica_test.go | 11 +++++++-- pkg/storage/replica_tscache.go | 43 +++++++++++++++++++++++++++------- 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 77df56288c0f..0138e35b355c 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -292,6 +292,7 @@ func (tc *testContext) Sender() client.Sender { tc.Fatal(err) } } + tc.Clock().Update(ba.Timestamp) return ba }) } @@ -4842,7 +4843,10 @@ func TestPushTxnUpgradeExistingTxn(t *testing.T) { pushee.Timestamp = test.ts args := pushTxnArgs(pusher, pushee, roachpb.PUSH_ABORT) - resp, pErr := tc.SendWrapped(&args) + // Set header timestamp to the maximum of the pusher and pushee timestamps. + h := roachpb.Header{Timestamp: args.PushTo} + h.Timestamp.Forward(pushee.Timestamp) + resp, pErr := tc.SendWrappedWith(h, &args) if pErr != nil { t.Fatal(pErr) } @@ -5120,7 +5124,10 @@ func TestPushTxnPriorities(t *testing.T) { // Now, attempt to push the transaction with intent epoch set appropriately. args := pushTxnArgs(pusher, pushee, test.pushType) - _, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: args.PushTo}, &args) + // Set header timestamp to the maximum of the pusher and pushee timestamps. + h := roachpb.Header{Timestamp: args.PushTo} + h.Timestamp.Forward(pushee.Timestamp) + _, pErr := tc.SendWrappedWith(h, &args) if test.expSuccess != (pErr == nil) { t.Errorf("expected success on trial %d? %t; got err %s", i, test.expSuccess, pErr) diff --git a/pkg/storage/replica_tscache.go b/pkg/storage/replica_tscache.go index 1e66465433ec..577d712e2286 100644 --- a/pkg/storage/replica_tscache.go +++ b/pkg/storage/replica_tscache.go @@ -16,9 +16,12 @@ package storage import ( "context" + "fmt" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/tscache" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -29,7 +32,10 @@ import ( func (r *Replica) updateTimestampCache( ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error, ) { - tc := r.store.tsCache + addToTSCache := r.store.tsCache.Add + if util.RaceEnabled { + addToTSCache = checkedTSCacheUpdate(r.store.Clock().Now(), r.store.tsCache, ba, br, pErr) + } // Update the timestamp cache using the timestamp at which the batch // was executed. Note this may have moved forward from ba.Timestamp, // as when the request is retried locally on WriteTooOldErrors. @@ -66,7 +72,7 @@ func (r *Replica) updateTimestampCache( // to or greater than the transaction's OrigTimestamp, // which is consulted in CanCreateTxnRecord. key := keys.TransactionKey(start, txnID) - tc.Add(key, nil, ts, txnID, false /* readCache */) + addToTSCache(key, nil, ts, txnID, false /* readCache */) case *roachpb.PushTxnRequest: // A successful PushTxn request bumps the timestamp cache for // the pushee's transaction key. The pushee will consult the @@ -92,7 +98,7 @@ func (r *Replica) updateTimestampCache( continue } key := keys.TransactionKey(start, pushee.ID) - tc.Add(key, nil, pushee.Timestamp, t.PusherTxn.ID, readCache) + addToTSCache(key, nil, pushee.Timestamp, t.PusherTxn.ID, readCache) case *roachpb.ConditionalPutRequest: if pErr != nil { // ConditionalPut still updates on ConditionFailedErrors. @@ -100,7 +106,7 @@ func (r *Replica) updateTimestampCache( continue } } - tc.Add(start, end, ts, txnID, true /* readCache */) + addToTSCache(start, end, ts, txnID, true /* readCache */) case *roachpb.InitPutRequest: if pErr != nil { // InitPut still updates on ConditionFailedErrors. @@ -108,7 +114,7 @@ func (r *Replica) updateTimestampCache( continue } } - tc.Add(start, end, ts, txnID, true /* readCache */) + addToTSCache(start, end, ts, txnID, true /* readCache */) case *roachpb.ScanRequest: resp := br.Responses[i].GetInner().(*roachpb.ScanResponse) if resp.ResumeSpan != nil { @@ -117,7 +123,7 @@ func (r *Replica) updateTimestampCache( // end key for the span to update the timestamp cache. end = resp.ResumeSpan.Key } - tc.Add(start, end, ts, txnID, true /* readCache */) + addToTSCache(start, end, ts, txnID, true /* readCache */) case *roachpb.ReverseScanRequest: resp := br.Responses[i].GetInner().(*roachpb.ReverseScanResponse) if resp.ResumeSpan != nil { @@ -127,7 +133,7 @@ func (r *Replica) updateTimestampCache( // the span to update the timestamp cache. start = resp.ResumeSpan.EndKey } - tc.Add(start, end, ts, txnID, true /* readCache */) + addToTSCache(start, end, ts, txnID, true /* readCache */) case *roachpb.QueryIntentRequest: if t.IfMissing == roachpb.QueryIntentRequest_PREVENT { resp := br.Responses[i].GetInner().(*roachpb.QueryIntentResponse) @@ -140,16 +146,35 @@ func (r *Replica) updateTimestampCache( // transaction ID so that we block the intent regardless // of whether it is part of the current batch's transaction // or not. - tc.Add(start, end, t.Txn.Timestamp, uuid.UUID{}, true /* readCache */) + addToTSCache(start, end, t.Txn.Timestamp, uuid.UUID{}, true /* readCache */) } } default: - tc.Add(start, end, ts, txnID, !roachpb.UpdatesWriteTimestampCache(args)) + addToTSCache(start, end, ts, txnID, !roachpb.UpdatesWriteTimestampCache(args)) } } } } +// checkedTSCacheUpdate wraps tscache.Cache and asserts that any update to the +// cache is at or below the specified time. +func checkedTSCacheUpdate( + now hlc.Timestamp, + tc tscache.Cache, + ba *roachpb.BatchRequest, + br *roachpb.BatchResponse, + pErr *roachpb.Error, +) func(roachpb.Key, roachpb.Key, hlc.Timestamp, uuid.UUID, bool) { + return func(start, end roachpb.Key, ts hlc.Timestamp, txnID uuid.UUID, readCache bool) { + if now.Less(ts) { + panic(fmt.Sprintf("Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+ + "cache after evaluating %v (resp=%v; err=%v) with local hlc clock at timestamp %s. "+ + "The timestamp cache update could be lost on a lease transfer.", ts, ba, br, pErr, now)) + } + tc.Add(start, end, ts, txnID, readCache) + } +} + // applyTimestampCache moves the batch timestamp forward depending on // the presence of overlapping entries in the timestamp cache. If the // batch is transactional, the txn timestamp and the txn.WriteTooOld