From 7ff05f15725617368a175cce187b15c5aba81698 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Thu, 4 Aug 2022 18:08:50 -0400 Subject: [PATCH 1/2] kvserver: allow certain read-only requests to drop latches before evaluation This commit introduces a change to the way certain types of read-only requests are evaluated. Traditionally, read-only requests have held their latches throughout their execution. This commit allows certain qualifying reads to be able to release their latches earlier. At a high level, reads may attempt to resolve all conflicts upfront by performing a sort of "validation" phase before they perform their MVCC scan. This validation phase performs a scan of the lock table keyspace in order to find any conflicting intents that may need to be resolved before the actual evaluation of the request over the MVCC keyspace. If no conflicting intents are found, then (since https://github.com/cockroachdb/cockroach/pull/76312), the request is guaranteed to be fully isolated against all other concurrent requests and can be allowed to release its latches at this point. This allows the actual evaluation of the read (over the MVCC part of the keyspace) to proceed without latches being held, which is the main motivation of this work. This validation phase could be thought of as an extension to the validation that the concurrency manager already performs when requests are sequenced through it, by trying to detect any conflicting intents that have already been pulled into the in-memory lock table. Additionally, for certain types of requests that can drop their latches early, and do not need to access the `IntentHistory` for any of their parent txn's intents, this commit attempts to make their MVCC scan cheaper by eliminating the need for an `intentInterleavingIterator`. This is enabled by the observation that once the validation phase is complete, the only remaining intents in the read's declared span must be intents belonging to the reader's transaction. So if the reader doesn't need to read an intent that isn't the latest intent on a key, then it doesn't need access to the key's `IntentHistory` (which lives in the lock-table keyspace), and doesn't need to use an `intentInterleavingIterator`. Release note (performance improvement): certain types of reads will now have a far smaller contention footprint with conflicting concurrent writers Resolves https://github.com/cockroachdb/cockroach/issues/66485 --- pkg/kv/kvserver/batcheval/cmd_export.go | 9 + pkg/kv/kvserver/batcheval/cmd_get.go | 15 +- pkg/kv/kvserver/batcheval/cmd_reverse_scan.go | 25 +-- pkg/kv/kvserver/batcheval/cmd_scan.go | 27 +-- pkg/kv/kvserver/batcheval/declare.go | 7 +- pkg/kv/kvserver/client_replica_test.go | 46 ++-- pkg/kv/kvserver/replica_evaluate.go | 89 +++++++- pkg/kv/kvserver/replica_evaluate_test.go | 10 +- pkg/kv/kvserver/replica_gossip.go | 5 +- pkg/kv/kvserver/replica_read.go | 199 +++++++++++++++--- pkg/kv/kvserver/replica_send.go | 54 ++--- pkg/kv/kvserver/replica_test.go | 101 +++++++-- pkg/kv/kvserver/replica_tscache.go | 17 +- pkg/kv/kvserver/replica_write.go | 2 +- pkg/kv/kvserver/store_test.go | 116 +++++++++- pkg/sql/explain_test.go | 5 +- pkg/storage/engine.go | 92 +++++++- pkg/storage/mvcc.go | 173 +++++++++------ 18 files changed, 771 insertions(+), 221 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 8a4c491c76dd..36e450df72c4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -90,6 +90,11 @@ func declareKeysExport( ) { DefaultDeclareIsolatedKeys(rs, header, req, latchSpans, lockSpans, maxOffset) latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeGCThresholdKey(header.RangeID)}) + // Export requests will usually not hold latches during their evaluation. + // + // See call to `AssertAllowed()` in GetGCThreshold() to understand why we need + // to disable these assertions for export requests. + latchSpans.DisableUndeclaredAccessAssertions() } // evalExport dumps the requested keys into files of non-overlapping key ranges @@ -123,6 +128,10 @@ func evalExport( // *revisions* since the gc threshold, so noting that in the reply allows the // BACKUP to correctly note the supported time bounds for RESTORE AS OF SYSTEM // TIME. + // + // NOTE: Since export requests may not be holding latches during evaluation, + // this `GetGCThreshold()` call is going to potentially return a higher GC + // threshold than the pebble state we're evaluating over. This is copacetic. if args.MVCCFilter == roachpb.MVCCFilter_All { reply.StartTime = args.StartTime if args.StartTime.IsEmpty() { diff --git a/pkg/kv/kvserver/batcheval/cmd_get.go b/pkg/kv/kvserver/batcheval/cmd_get.go index 78b270643e55..8a6506ce11aa 100644 --- a/pkg/kv/kvserver/batcheval/cmd_get.go +++ b/pkg/kv/kvserver/batcheval/cmd_get.go @@ -53,13 +53,14 @@ func Get( var intent *roachpb.Intent var err error val, intent, err = storage.MVCCGet(ctx, reader, args.Key, h.Timestamp, storage.MVCCGetOptions{ - Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, - SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, - Txn: h.Txn, - FailOnMoreRecent: args.KeyLocking != lock.None, - Uncertainty: cArgs.Uncertainty, - MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), - LockTable: cArgs.Concurrency, + Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, + SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, + Txn: h.Txn, + FailOnMoreRecent: args.KeyLocking != lock.None, + Uncertainty: cArgs.Uncertainty, + MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), + LockTable: cArgs.Concurrency, + DontInterleaveIntents: cArgs.DontInterleaveIntents, }) if err != nil { return result.Result{}, err diff --git a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go index cb83a622c4f5..8ae129f7976e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go @@ -40,18 +40,19 @@ func ReverseScan( var err error opts := storage.MVCCScanOptions{ - Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, - SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, - Txn: h.Txn, - MaxKeys: h.MaxSpanRequestKeys, - MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), - TargetBytes: h.TargetBytes, - AllowEmpty: h.AllowEmpty, - WholeRowsOfSize: h.WholeRowsOfSize, - FailOnMoreRecent: args.KeyLocking != lock.None, - Reverse: true, - MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), - LockTable: cArgs.Concurrency, + Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, + SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, + Txn: h.Txn, + MaxKeys: h.MaxSpanRequestKeys, + MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), + TargetBytes: h.TargetBytes, + AllowEmpty: h.AllowEmpty, + WholeRowsOfSize: h.WholeRowsOfSize, + FailOnMoreRecent: args.KeyLocking != lock.None, + Reverse: true, + MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), + LockTable: cArgs.Concurrency, + DontInterleaveIntents: cArgs.DontInterleaveIntents, } switch args.ScanFormat { diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index ffac770c5232..19f2f6aa48c4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -40,19 +40,20 @@ func Scan( var err error opts := storage.MVCCScanOptions{ - Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, - SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, - Txn: h.Txn, - Uncertainty: cArgs.Uncertainty, - MaxKeys: h.MaxSpanRequestKeys, - MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), - TargetBytes: h.TargetBytes, - AllowEmpty: h.AllowEmpty, - WholeRowsOfSize: h.WholeRowsOfSize, - FailOnMoreRecent: args.KeyLocking != lock.None, - Reverse: false, - MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), - LockTable: cArgs.Concurrency, + Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, + SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, + Txn: h.Txn, + Uncertainty: cArgs.Uncertainty, + MaxKeys: h.MaxSpanRequestKeys, + MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), + TargetBytes: h.TargetBytes, + AllowEmpty: h.AllowEmpty, + WholeRowsOfSize: h.WholeRowsOfSize, + FailOnMoreRecent: args.KeyLocking != lock.None, + Reverse: false, + MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), + LockTable: cArgs.Concurrency, + DontInterleaveIntents: cArgs.DontInterleaveIntents, } switch args.ScanFormat { diff --git a/pkg/kv/kvserver/batcheval/declare.go b/pkg/kv/kvserver/batcheval/declare.go index 52979d9d808f..2e230d3b20a5 100644 --- a/pkg/kv/kvserver/batcheval/declare.go +++ b/pkg/kv/kvserver/batcheval/declare.go @@ -121,7 +121,8 @@ type CommandArgs struct { Args roachpb.Request Now hlc.ClockTimestamp // *Stats should be mutated to reflect any writes made by the command. - Stats *enginepb.MVCCStats - Concurrency *concurrency.Guard - Uncertainty uncertainty.Interval + Stats *enginepb.MVCCStats + Concurrency *concurrency.Guard + Uncertainty uncertainty.Interval + DontInterleaveIntents bool } diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 6e99bcfdac89..1e7607ec35b1 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -254,17 +254,13 @@ func TestTxnPutOutOfOrder(t *testing.T) { restartKey = "restart" ) // Set up a filter to so that the get operation at Step 3 will return an error. - var numGets int32 + var shouldFailGet atomic.Value testingEvalFilter := func(filterArgs kvserverbase.FilterArgs) *roachpb.Error { if _, ok := filterArgs.Req.(*roachpb.GetRequest); ok && filterArgs.Req.Header().Key.Equal(roachpb.Key(key)) && filterArgs.Hdr.Txn == nil { - // The Reader executes two get operations, each of which triggers two get requests - // (the first request fails and triggers txn push, and then the second request - // succeeds). Returns an error for the fourth get request to avoid timestamp cache - // update after the third get operation pushes the txn timestamp. - if atomic.AddInt32(&numGets, 1) == 4 { + if shouldFail := shouldFailGet.Load(); shouldFail != nil && shouldFail.(bool) { return roachpb.NewErrorWithTxn(errors.Errorf("Test"), filterArgs.Hdr.Txn) } } @@ -403,6 +399,7 @@ func TestTxnPutOutOfOrder(t *testing.T) { manual.Increment(100) h.Timestamp = s.Clock().Now() + shouldFailGet.Store(true) if _, err := kv.SendWrappedWith( context.Background(), store.TestSender(), h, &roachpb.GetRequest{RequestHeader: requestHeader}, ); err == nil { @@ -4582,20 +4579,6 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { var txn2ID atomic.Value var txn2BBlockOnce sync.Once txn2BlockedC := make(chan chan struct{}) - postEvalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error { - if txn := args.Hdr.Txn; txn != nil && txn.ID == txn2ID.Load() { - txn2BBlockOnce.Do(func() { - if !errors.HasType(args.Err, (*roachpb.WriteIntentError)(nil)) { - t.Errorf("expected WriteIntentError; got %v", args.Err) - } - - unblockCh := make(chan struct{}) - txn2BlockedC <- unblockCh - <-unblockCh - }) - } - return nil - } // Detect when txn4 discovers txn3's intent and begins to push. var txn4ID atomic.Value @@ -4616,10 +4599,20 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ - EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ - TestingPostEvalFilter: postEvalFilter, - }, TestingRequestFilter: requestFilter, + TestingConcurrencyRetryFilter: func(ctx context.Context, ba *roachpb.BatchRequest, pErr *roachpb.Error) { + if txn := ba.Txn; txn != nil && txn.ID == txn2ID.Load() { + txn2BBlockOnce.Do(func() { + if !errors.HasType(pErr.GoError(), (*roachpb.WriteIntentError)(nil)) { + t.Errorf("expected WriteIntentError; got %v", pErr) + } + + unblockCh := make(chan struct{}) + txn2BlockedC <- unblockCh + <-unblockCh + }) + } + }, // Required by TestCluster.MoveRangeLeaseNonCooperatively. AllowLeaseRequestProposalsWhenNotLeader: true, }, @@ -4652,7 +4645,12 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { _, err := txn2.Get(ctx, key) err2C <- err }() - txn2UnblockC := <-txn2BlockedC + var txn2UnblockC chan struct{} + select { + case txn2UnblockC = <-txn2BlockedC: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for txn2 to block") + } // Transfer the lease to Server 1. Do so non-cooperatively instead of using // a lease transfer, because the cooperative lease transfer would get stuck diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index d3b555bc80ba..59c39c1cb987 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -13,10 +13,12 @@ package kvserver import ( "bytes" "context" + "fmt" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" @@ -153,7 +155,7 @@ func evaluateBatch( g *concurrency.Guard, st *kvserverpb.LeaseStatus, ui uncertainty.Interval, - readOnly bool, + evalPath batchEvalPath, ) (_ *roachpb.BatchResponse, _ result.Result, retErr *roachpb.Error) { defer func() { // Ensure that errors don't carry the WriteTooOld flag set. The client @@ -176,7 +178,7 @@ func evaluateBatch( br := ba.CreateReply() // Optimize any contiguous sequences of put and conditional put ops. - if len(baReqs) >= optimizePutThreshold && !readOnly { + if len(baReqs) >= optimizePutThreshold && evalPath == readWrite { baReqs = optimizePuts(readWriter, baReqs, baHeader.DistinctSpans) } @@ -271,7 +273,8 @@ func evaluateBatch( // may carry a response transaction and in the case of WriteTooOldError // (which is sometimes deferred) it is fully populated. curResult, err := evaluateCommand( - ctx, readWriter, rec, ms, baHeader, args, reply, g, st, ui) + ctx, readWriter, rec, ms, baHeader, args, reply, g, st, ui, evalPath, + ) if filter := rec.EvalKnobs().TestingPostEvalFilter; filter != nil { filterArgs := kvserverbase.FilterArgs{ @@ -481,6 +484,7 @@ func evaluateCommand( g *concurrency.Guard, st *kvserverpb.LeaseStatus, ui uncertainty.Interval, + evalPath batchEvalPath, ) (result.Result, error) { var err error var pd result.Result @@ -491,13 +495,14 @@ func evaluateCommand( now = st.Now } cArgs := batcheval.CommandArgs{ - EvalCtx: rec, - Header: h, - Args: args, - Now: now, - Stats: ms, - Concurrency: g, - Uncertainty: ui, + EvalCtx: rec, + Header: h, + Args: args, + Now: now, + Stats: ms, + Concurrency: g, + Uncertainty: ui, + DontInterleaveIntents: evalPath == readOnlyWithoutInterleavedIntents, } if cmd.EvalRW != nil { @@ -608,3 +613,67 @@ func canDoServersideRetry( } return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp) } + +// canReadOnlyRequestDropLatchesBeforeEval determines whether the batch request +// can potentially resolve its conflicts upfront (by scanning just the lock +// table first), bump the ts cache, release latches and then proceed with +// evaluation. Only non-locking read requests that aren't being evaluated under +// the `OptimisticEval` path are eligible for this optimization. +func canReadOnlyRequestDropLatchesBeforeEval(ba *roachpb.BatchRequest, g *concurrency.Guard) bool { + if g == nil { + // NB: A nil guard indicates that the caller is not holding latches. + return false + } + switch ba.Header.ReadConsistency { + case roachpb.CONSISTENT: + // TODO(aayush): INCONSISTENT and READ_UNCOMMITTED reads do not care about + // resolving lock conflicts at all. Yet, they can still drop latches early and + // evaluate once they've pinned their pebble engine state. We should consider + // supporting this by letting these kinds of requests drop latches early while + // also skipping the initial validation step of scanning the lock table. + case roachpb.INCONSISTENT, roachpb.READ_UNCOMMITTED: + return false + default: + panic(fmt.Sprintf("unexpected ReadConsistency: %s", ba.Header.ReadConsistency)) + } + switch g.EvalKind { + case concurrency.PessimisticEval, concurrency.PessimisticAfterFailedOptimisticEval: + case concurrency.OptimisticEval: + // Requests going through the optimistic path are not allowed to drop their + // latches before evaluation since we do not know upfront the extent to + // which they will end up reading, and thus we cannot determine how much of + // the timestamp cache to update. + return false + default: + panic(fmt.Sprintf("unexpected EvalKind: %v", g.EvalKind)) + } + // Only non-locking reads are eligible. This is because requests that need to + // lock the keys that they end up reading need to be isolated against other + // conflicting requests during their execution. Thus, they cannot release + // their latches before evaluation. + if ba.IsLocking() { + return false + } + switch ba.WaitPolicy { + case lock.WaitPolicy_Block, lock.WaitPolicy_Error: + case lock.WaitPolicy_SkipLocked: + // SkipLocked requests should only bump the timestamp cache over the keys + // that they actually ended up reading, and not the keys they ended up + // skipping over. Thus, they are not allowed to drop their latches before + // evaluation. + return false + default: + panic(fmt.Sprintf("unexpected WaitPolicy: %s", ba.WaitPolicy)) + } + // We allow all non-locking, pessimistically evaluating read requests to try + // and resolve their conflicts upfront. + for _, req := range ba.Requests { + inner := req.GetInner() + switch inner.(type) { + case *roachpb.ExportRequest, *roachpb.GetRequest, *roachpb.ScanRequest, *roachpb.ReverseScanRequest: + default: + return false + } + } + return true +} diff --git a/pkg/kv/kvserver/replica_evaluate_test.go b/pkg/kv/kvserver/replica_evaluate_test.go index 312bde10a818..31f0241e2eba 100644 --- a/pkg/kv/kvserver/replica_evaluate_test.go +++ b/pkg/kv/kvserver/replica_evaluate_test.go @@ -674,6 +674,10 @@ func TestEvaluateBatch(t *testing.T) { var r resp r.d = d + evalPath := readWrite + if d.readOnly { + evalPath = readOnlyDefault + } r.br, r.res, r.pErr = evaluateBatch( ctx, d.idKey, @@ -681,10 +685,10 @@ func TestEvaluateBatch(t *testing.T) { d.MockEvalCtx.EvalContext(), &d.ms, &d.ba, - nil, /* g */ - nil, /* st */ + nil, + nil, uncertainty.Interval{}, - d.readOnly, + evalPath, ) tc.check(t, r) diff --git a/pkg/kv/kvserver/replica_gossip.go b/pkg/kv/kvserver/replica_gossip.go index b42af3cde957..2521f8e63536 100644 --- a/pkg/kv/kvserver/replica_gossip.go +++ b/pkg/kv/kvserver/replica_gossip.go @@ -97,7 +97,10 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked( defer rw.Close() br, result, pErr := - evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, nil /* g */, nil /* st */, uncertainty.Interval{}, true /* readOnly */) + evaluateBatch( + ctx, kvserverbase.CmdIDKey(""), rw, rec, nil /* ms */, &ba, + nil /* g */, nil /* st */, uncertainty.Interval{}, readOnlyDefault, + ) if pErr != nil { return errors.Wrapf(pErr.GoError(), "couldn't scan node liveness records in span %s", span) } diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 50a5aa1c6044..2e9b3012445a 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -24,12 +24,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" "github.com/kr/pretty" ) @@ -89,27 +91,38 @@ func (r *Replica) executeReadOnlyBatch( if err := r.checkExecutionCanProceedAfterStorageSnapshot(ctx, ba, st); err != nil { return nil, g, nil, roachpb.NewError(err) } - // TODO(nvanbenschoten): once all replicated intents are pulled into the - // concurrency manager's lock-table, we can be sure that if we reached this - // point, we will not conflict with any of them during evaluation. This in - // turn means that we can bump the timestamp cache *before* evaluation - // without risk of starving writes. Once we start doing that, we're free to - // release latches immediately after we acquire an engine iterator as long - // as we're performing a non-locking read. Note that this also requires that - // the request is not being optimistically evaluated (optimistic evaluation - // does not wait for latches or check locks). It would also be nice, but not - // required for correctness, that the read-only engine eagerly create an - // iterator (that is later cloned) while the latches are held, so that this - // request does not "see" the effect of any later requests that happen after - // the latches are released. + ok, stillNeedsInterleavedIntents, pErr := r.canDropLatchesBeforeEval(ctx, rw, ba, g, st) + if pErr != nil { + return nil, g, nil, pErr + } + evalPath := readOnlyDefault + if ok { + // Since the concurrency manager has sequenced this request all the intents + // that are in the concurrency manager's lock table, and we've scanned the + // replicated lock-table keyspace above in `canDropLatchesBeforeEval`, we + // can be sure that if we reached this point, we will not conflict with any + // of them during evaluation. This in turn means that we can bump the + // timestamp cache *before* evaluation without risk of starving writes. + // Consequently, we're free to release latches here since we've acquired a + // pebble iterator as long as we're performing a non-locking read (also + // checked in `canDropLatchesBeforeEval`). Note that this also requires that + // the request is not being optimistically evaluated (optimistic evaluation + // does not wait for latches or check locks). + log.VEventf(ctx, 3, "lock table scan complete without conflicts; dropping latches early") + if !stillNeedsInterleavedIntents { + evalPath = readOnlyWithoutInterleavedIntents + } + r.updateTimestampCacheAndDropLatches(ctx, g, ba, nil /* br */, nil /* pErr */, st) + g = nil + } var result result.Result - br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, g, &st, ui) + br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, g, &st, ui, evalPath) // If the request hit a server-side concurrency retry error, immediately // propagate the error. Don't assume ownership of the concurrency guard. if isConcurrencyRetryError(pErr) { - if g.EvalKind == concurrency.OptimisticEval { + if g != nil && g.EvalKind == concurrency.OptimisticEval { // Since this request was not holding latches, it could have raced with // intent resolution. So we can't trust it to add discovered locks, if // there is a latch conflict. This means that a discovered lock plus a @@ -130,7 +143,7 @@ func (r *Replica) executeReadOnlyBatch( return nil, g, nil, pErr } - if g.EvalKind == concurrency.OptimisticEval { + if g != nil && g.EvalKind == concurrency.OptimisticEval { if pErr == nil { // Gather the spans that were read -- we distinguish the spans in the // request from the spans that were actually read, using resume spans in @@ -162,17 +175,11 @@ func (r *Replica) executeReadOnlyBatch( if pErr == nil { pErr = r.handleReadOnlyLocalEvalResult(ctx, ba, result.Local) } - - // Otherwise, update the timestamp cache and release the concurrency guard. - // Note: - // - The update to the timestamp cache is not gated on pErr == nil, - // since certain semantic errors (e.g. ConditionFailedError on CPut) - // require updating the timestamp cache (see updatesTSCacheOnErr). - // - For optimistic evaluation, used for limited scans, the update to the - // timestamp cache limits itself to the spans that were read, by using - // the ResumeSpans. - ec, g := endCmds{repl: r, g: g, st: st}, nil - ec.done(ctx, ba, br, pErr) + if g != nil { + // If we didn't already drop latches earlier, do so now. + r.updateTimestampCacheAndDropLatches(ctx, g, ba, br, pErr, st) + g = nil + } // Semi-synchronously process any intents that need resolving here in // order to apply back pressure on the client which generated them. The @@ -192,7 +199,11 @@ func (r *Replica) executeReadOnlyBatch( // prohibits any concurrent requests for the same range. See #17760. allowSyncProcessing := ba.ReadConsistency == roachpb.CONSISTENT && ba.WaitPolicy != lock.WaitPolicy_SkipLocked - if err := r.store.intentResolver.CleanupIntentsAsync(ctx, intents, allowSyncProcessing); err != nil { + if err := r.store.intentResolver.CleanupIntentsAsync( + ctx, + intents, + allowSyncProcessing, + ); err != nil { log.Warningf(ctx, "%v", err) } } @@ -208,6 +219,109 @@ func (r *Replica) executeReadOnlyBatch( return br, nil, nil, pErr } +// updateTimestampCacheAndDropLatches updates the timestamp cache and releases +// the concurrency guard. +// Note: +// - If `br` is nil, then this method assumes that latches are being released +// before evaluation of the request, and the timestamp cache is updated based +// only on the spans declared in the request. +// - The update to the timestamp cache is not gated on pErr == nil, since +// certain semantic errors (e.g. ConditionFailedError on CPut) require updating +// the timestamp cache (see updatesTSCacheOnErr). +// - For optimistic evaluation, used for limited scans, the update to the +// timestamp cache limits itself to the spans that were read, by using the +// ResumeSpans. +func (r *Replica) updateTimestampCacheAndDropLatches( + ctx context.Context, + g *concurrency.Guard, + ba *roachpb.BatchRequest, + br *roachpb.BatchResponse, + pErr *roachpb.Error, + st kvserverpb.LeaseStatus, +) { + ec := endCmds{repl: r, g: g, st: st} + ec.done(ctx, ba, br, pErr) +} + +var allowDroppingLatchesBeforeEval = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.transaction.dropping_latches_before_eval.enabled", + "if enabled, allows certain read-only KV requests to drop latches before they evaluate", + true, +) + +// canDropLatchesBeforeEval determines whether a given batch request can proceed +// with evaluation without continuing to hold onto its latches[1] and if so, +// whether the evaluation of the requests in the batch needs an intent +// interleaving iterator[2]. +// +// [1] whether the request can safely release latches at this point in the +// execution. +// For certain qualifying types of requests (certain types of read-only +// requests: see `canReadOnlyRequestDropLatchesBeforeEval`), this method +// performs a scan of the lock table keyspace corresponding to the latch spans +// declared by the BatchRequest. +// If no conflicting intents are found, then it is deemed safe for this request +// to release its latches at this point. This is because read-only requests +// evaluate over a stable pebble snapshot (see the call to +// `PinEngineStateForIterators` in `executeReadOnlyBatch`), so if there are no +// lock conflicts, the rest of the execution is guaranteed to be isolated from +// the effects of other requests. +// If any conflicting intents are found, then it returns a WriteIntentError +// which needs to be handled by the caller before proceeding. +// +// [2] if the request can drop its latches early, whether it needs an intent +// interleaving iterator to perform its evaluation. +// If the aforementioned lock table scan determines that any of the requests in +// the batch may need access to the intent history of a key, then an intent +// interleaving iterator is needed to perform the evaluation. +func (r *Replica) canDropLatchesBeforeEval( + ctx context.Context, + rw storage.ReadWriter, + ba *roachpb.BatchRequest, + g *concurrency.Guard, + st kvserverpb.LeaseStatus, +) (ok, stillNeedsIntentInterleaving bool, pErr *roachpb.Error) { + if !allowDroppingLatchesBeforeEval.Get(&r.store.cfg.Settings.SV) || + !canReadOnlyRequestDropLatchesBeforeEval(ba, g) { + // If the request does not qualify, we neither drop latches nor use a + // non-interleaving iterator. + return false /* ok */, true /* stillNeedsIntentInterleaving */, nil + } + + log.VEventf( + ctx, 3, "can drop latches early for batch (%v); scanning lock table first to detect conflicts", ba, + ) + + maxIntents := storage.MaxIntentsPerWriteIntentError.Get(&r.store.cfg.Settings.SV) + var intents []roachpb.Intent + // Check if any of the requests within the batch need to resolve any intents + // or if any of them need to use an intent interleaving iterator. + for _, req := range ba.Requests { + start, end := req.GetInner().Header().Key, req.GetInner().Header().EndKey + needsIntentInterleavingForThisRequest, err := storage.ScanConflictingIntents( + ctx, rw, ba.Txn, ba.Header.Timestamp, start, end, &intents, maxIntents, + ) + if err != nil { + return false /* ok */, true /* stillNeedsIntentInterleaving */, roachpb.NewError( + errors.Wrap(err, "scanning intents"), + ) + } + stillNeedsIntentInterleaving = stillNeedsIntentInterleaving || needsIntentInterleavingForThisRequest + if maxIntents != 0 && int64(len(intents)) >= maxIntents { + break + } + } + if len(intents) > 0 { + return false /* ok */, false /* stillNeedsIntentInterleaving */, maybeAttachLease( + roachpb.NewError(&roachpb.WriteIntentError{Intents: intents}), &st.Lease, + ) + } + // If there were no conflicts, then the request can drop its latches and + // proceed with evaluation. + return true /* ok */, stillNeedsIntentInterleaving, nil +} + // evalContextWithAccount wraps an EvalContext to provide a non-nil // mon.BoundAccount. This wrapping is conditional on various factors, and // specific to a request (see executeReadOnlyBatchWithServersideRefreshes), @@ -256,6 +370,19 @@ func (e evalContextWithAccount) GetResponseMemoryAccount() *mon.BoundAccount { return e.memAccount } +// batchEvalPath enumerates the different evaluation paths that can be taken by +// a batch. +type batchEvalPath int + +const ( + // readOnlyDefault is the default evaluation path taken by read only requests. + readOnlyDefault batchEvalPath = iota + // readOnlyWithoutInterleavedIntents indicates that the request does not need + // an intent interleaving iterator during its evaluation. + readOnlyWithoutInterleavedIntents + readWrite +) + // executeReadOnlyBatchWithServersideRefreshes invokes evaluateBatch and retries // at a higher timestamp in the event of some retriable errors if allowed by the // batch/txn. @@ -267,6 +394,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( g *concurrency.Guard, st *kvserverpb.LeaseStatus, ui uncertainty.Interval, + evalPath batchEvalPath, ) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) { log.Event(ctx, "executing read-only batch") @@ -317,14 +445,25 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( log.VEventf(ctx, 2, "server-side retry of batch") } now := timeutil.Now() - br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, g, st, ui, true /* readOnly */) + br, res, pErr = evaluateBatch( + ctx, kvserverbase.CmdIDKey(""), rw, rec, nil /* ms */, ba, g, st, ui, evalPath, + ) r.store.metrics.ReplicaReadBatchEvaluationLatency.RecordValue(timeutil.Since(now).Nanoseconds()) // Allow only one retry. if pErr == nil || retries > 0 { break } // If we can retry, set a higher batch timestamp and continue. - if !canDoServersideRetry(ctx, pErr, ba, br, g, hlc.Timestamp{} /* deadline */) { + // + // Note that if the batch request has already released its latches (as + // indicated by the latch guard being nil) before this point, then it cannot + // retry at a higher timestamp because it is not isolated at higher + // timestamps. + latchesHeld := g != nil + if !latchesHeld || !canDoServersideRetry(ctx, pErr, ba, br, g, hlc.Timestamp{}) { + // TODO(aayush,arul): These metrics are incorrect at the moment since + // hitting this branch does not mean that we won't serverside retry, it + // just means that we will have to reacquire latches. r.store.Metrics().ReadEvaluationServerSideRetryFailure.Inc(1) break } else { diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 70796469ce40..8f123c74d4d5 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -487,10 +487,22 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, nil, pErr } - // The batch execution func returned a server-side concurrency retry - // error. It must have also handed back ownership of the concurrency - // guard without having already released the guard's latches. - g.AssertLatches() + // The batch execution func returned a server-side concurrency retry error. + // It may have either handed back ownership of the concurrency guard without + // having already released the guard's latches, or in case of certain types + // of read-only requests (see `canReadOnlyRequestDropLatchesBeforeEval`), it + // may have released the guard's latches. + dropLatchesAndLockWaitQueues := func(reuseLatchAndLockSpans bool) { + if g != nil { + latchSpans, lockSpans = nil, nil + if reuseLatchAndLockSpans { + latchSpans, lockSpans = g.TakeSpanSets() + } + r.concMgr.FinishReq(g) + g = nil + } + } + if filter := r.store.cfg.TestingKnobs.TestingConcurrencyRetryFilter; filter != nil { filter(ctx, ba, pErr) } @@ -505,19 +517,18 @@ func (r *Replica) executeBatchWithConcurrencyRetries( switch t := pErr.GetDetail().(type) { case *roachpb.WriteIntentError: // Drop latches, but retain lock wait-queues. + g.AssertLatches() if g, pErr = r.handleWriteIntentError(ctx, ba, g, pErr, t); pErr != nil { return nil, nil, pErr } case *roachpb.TransactionPushError: // Drop latches, but retain lock wait-queues. + g.AssertLatches() if g, pErr = r.handleTransactionPushError(ctx, ba, g, pErr, t); pErr != nil { return nil, nil, pErr } case *roachpb.IndeterminateCommitError: - // Drop latches and lock wait-queues. - latchSpans, lockSpans = g.TakeSpanSets() - r.concMgr.FinishReq(g) - g = nil + dropLatchesAndLockWaitQueues(true /* reuseLatchAndLockSpans */) // Then launch a task to handle the indeterminate commit error. No error // is returned if the transaction is recovered successfully to either a // COMMITTED or ABORTED state. @@ -525,13 +536,10 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, nil, pErr } case *roachpb.ReadWithinUncertaintyIntervalError: - // Drop latches and lock wait-queues. - r.concMgr.FinishReq(g) - g = nil // If the batch is able to perform a server-side retry in order to avoid // the uncertainty error, it will have a new timestamp. Force a refresh of // the latch and lock spans. - latchSpans, lockSpans = nil, nil + dropLatchesAndLockWaitQueues(false /* reuseLatchAndLockSpans */) // Attempt to adjust the batch's timestamp to avoid the uncertainty error // and allow for a server-side retry. For transactional requests, there // are strict conditions that must be met for this to be permitted. For @@ -544,10 +552,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, nil, pErr } case *roachpb.InvalidLeaseError: - // Drop latches and lock wait-queues. - latchSpans, lockSpans = g.TakeSpanSets() - r.concMgr.FinishReq(g) - g = nil + dropLatchesAndLockWaitQueues(true /* reuseLatchAndLockSpans */) // Then attempt to acquire the lease if not currently held by any // replica or redirect to the current leaseholder if currently held // by a different replica. @@ -555,10 +560,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, nil, pErr } case *roachpb.MergeInProgressError: - // Drop latches and lock wait-queues. - latchSpans, lockSpans = g.TakeSpanSets() - r.concMgr.FinishReq(g) - g = nil + dropLatchesAndLockWaitQueues(true /* reuseLatchAndLockSpans */) // Then listen for the merge to complete. if pErr = r.handleMergeInProgressError(ctx, ba, pErr, t); pErr != nil { return nil, nil, pErr @@ -1255,7 +1257,9 @@ func (ec *endCmds) poison() { } // done releases the latches acquired by the command and updates the timestamp -// cache using the final timestamp of each command. +// cache using the final timestamp of each command. If `br` is nil, it is +// assumed that `done` is being called by a request that's dropping its latches +// before evaluation. // // No-op if the receiver has been zeroed out by a call to move. Idempotent and // is safe to call more than once. @@ -1268,10 +1272,10 @@ func (ec *endCmds) done( } defer ec.move() // clear - // Update the timestamp cache. Each request within the batch is considered - // in turn; only those marked as affecting the cache are processed. However, - // only do so if the request is consistent and was operating on the - // leaseholder under a valid range lease. + // Update the timestamp cache. Each request within the batch is considered in + // turn; only those marked as affecting the cache are processed. However, only + // do so if the request is consistent and was operating on the leaseholder + // under a valid range lease. if ba.ReadConsistency == roachpb.CONSISTENT && ec.st.State == kvserverpb.LeaseState_VALID { ec.repl.updateTimestampCache(ctx, &ec.st, ba, br, pErr) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 5c22a46bff4a..17e6df7121e5 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -2133,8 +2133,8 @@ func TestReplicaLatching(t *testing.T) { }{ // Read/read doesn't wait. {true, true, false, false}, - // A write doesn't wait for an earlier read (except for local keys). - {true, false, false, true}, + // A write doesn't wait for an earlier read. + {true, false, false, false}, // A read must wait for an earlier write. {false, true, true, true}, // Writes always wait for other writes. @@ -2458,7 +2458,7 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { blockReader.Store(false) blockWriter.Store(false) blockCh := make(chan struct{}, 1) - blockedCh := make(chan struct{}, 1) + waitForRequestBlocked := make(chan struct{}, 1) tc := testContext{} tsc := TestStoreConfig(nil) @@ -2469,10 +2469,10 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { return nil } if filterArgs.Req.Method() == roachpb.Get && blockReader.Load().(bool) { - blockedCh <- struct{}{} + waitForRequestBlocked <- struct{}{} <-blockCh } else if filterArgs.Req.Method() == roachpb.Put && blockWriter.Load().(bool) { - blockedCh <- struct{}{} + waitForRequestBlocked <- struct{}{} <-blockCh } return nil @@ -2490,17 +2490,79 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { interferes bool }{ // Reader & writer have same timestamps. - {makeTS(1, 0), makeTS(1, 0), roachpb.Key("a"), true, true}, - {makeTS(1, 0), makeTS(1, 0), roachpb.Key("b"), false, true}, - // Reader has earlier timestamp. - {makeTS(1, 0), makeTS(1, 1), roachpb.Key("c"), true, false}, - {makeTS(1, 0), makeTS(1, 1), roachpb.Key("d"), false, false}, - // Writer has earlier timestamp. - {makeTS(1, 1), makeTS(1, 0), roachpb.Key("e"), true, true}, - {makeTS(1, 1), makeTS(1, 0), roachpb.Key("f"), false, true}, - // Local keys always interfere. - {makeTS(1, 0), makeTS(1, 1), keys.RangeDescriptorKey(roachpb.RKey("a")), true, true}, - {makeTS(1, 0), makeTS(1, 1), keys.RangeDescriptorKey(roachpb.RKey("b")), false, true}, + // + // Reader goes first, but the reader does not need to hold latches during + // evaluation, so we expect no interference. + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 0), + key: roachpb.Key("a"), + readerFirst: true, + interferes: false, + }, + // Writer goes first, but the writer does need to hold latches during + // evaluation, so it should block the reader. + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 0), + key: roachpb.Key("b"), + readerFirst: false, + interferes: true, + }, + // Reader has earlier timestamp, so it doesn't interfere with the write + // that's in its future. + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 1), + key: roachpb.Key("c"), + readerFirst: true, + interferes: false, + }, + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 1), + key: roachpb.Key("d"), + readerFirst: false, + interferes: false, + }, + // Writer has an earlier timestamp. We expect no interference for the writer + // as the reader will be evaluating over a pebble snapshot. We'd expect the + // writer to be able to continue without interference but to get bumped by + // the timestamp cache. + { + readerTS: makeTS(1, 1), + writerTS: makeTS(1, 0), + key: roachpb.Key("e"), + readerFirst: true, + interferes: false, + }, + // We expect the reader to block for the writer that's writing in the + // reader's past. + { + readerTS: makeTS(1, 1), + writerTS: makeTS(1, 0), + key: roachpb.Key("f"), + readerFirst: false, + interferes: true, + }, + // Even though local key accesses are NonMVCC, the reader should not block + // the writer because it should not need to hold its latches during + // evaluation. + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 1), + key: keys.RangeDescriptorKey(roachpb.RKey("a")), + readerFirst: true, + interferes: false, + }, + // The writer will block the reader since it holds NonMVCC latches during + // evaluation. + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 1), + key: keys.RangeDescriptorKey(roachpb.RKey("b")), + interferes: true, + }, } for _, test := range testCases { t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { @@ -2524,7 +2586,8 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { _, pErr := tc.Sender().Send(context.Background(), baR) errCh <- pErr }() - <-blockedCh + // Wait for the above read to get blocked on blockCh. + <-waitForRequestBlocked go func() { _, pErr := tc.Sender().Send(context.Background(), baW) errCh <- pErr @@ -2535,7 +2598,9 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { _, pErr := tc.Sender().Send(context.Background(), baW) errCh <- pErr }() - <-blockedCh + // Wait for the above write to get blocked on blockCh while it's holding + // latches. + <-waitForRequestBlocked go func() { _, pErr := tc.Sender().Send(context.Background(), baR) errCh <- pErr diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 309a4850089a..58f22029b9a1 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -51,9 +51,11 @@ func (r *Replica) addToTSCacheChecked( r.store.tsCache.Add(start, end, ts, txnID) } -// updateTimestampCache updates the timestamp cache in order to set a low water -// mark for the timestamp at which mutations to keys overlapping the provided -// request can write, such that they don't re-write history. +// updateTimestampCache updates the timestamp cache in order to set a low +// watermark for the timestamp at which mutations to keys overlapping the +// provided request can write, such that they don't re-write history. It can be +// called before or after a batch is done evaluating. A nil `br` indicates that +// this method is being called before the batch is done evaluating. func (r *Replica) updateTimestampCache( ctx context.Context, st *kvserverpb.LeaseStatus, @@ -75,6 +77,7 @@ func (r *Replica) updateTimestampCache( if ba.Txn != nil { txnID = ba.Txn.ID } + beforeEval := br == nil && pErr == nil for i, union := range ba.Requests { req := union.GetInner() if !roachpb.UpdatesTimestampCache(req) { @@ -220,13 +223,14 @@ func (r *Replica) updateTimestampCache( addToTSCache(start, end, ts, txnID) } case *roachpb.GetRequest: - if resume := resp.(*roachpb.GetResponse).ResumeSpan; resume != nil { + if !beforeEval && resp.(*roachpb.GetResponse).ResumeSpan != nil { // The request did not evaluate. Ignore it. continue } addToTSCache(start, end, ts, txnID) case *roachpb.ScanRequest: - if resume := resp.(*roachpb.ScanResponse).ResumeSpan; resume != nil { + if !beforeEval && resp.(*roachpb.ScanResponse).ResumeSpan != nil { + resume := resp.(*roachpb.ScanResponse).ResumeSpan if start.Equal(resume.Key) { // The request did not evaluate. Ignore it. continue @@ -238,7 +242,8 @@ func (r *Replica) updateTimestampCache( } addToTSCache(start, end, ts, txnID) case *roachpb.ReverseScanRequest: - if resume := resp.(*roachpb.ReverseScanResponse).ResumeSpan; resume != nil { + if !beforeEval && resp.(*roachpb.ReverseScanResponse).ResumeSpan != nil { + resume := resp.(*roachpb.ReverseScanResponse).ResumeSpan if end.Equal(resume.EndKey) { // The request did not evaluate. Ignore it. continue diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 8686b2b54696..ad969cd1d0e8 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -677,7 +677,7 @@ func (r *Replica) evaluateWriteBatchWrapper( ) (storage.Batch, *roachpb.BatchResponse, result.Result, *roachpb.Error) { batch, opLogger := r.newBatchedEngine(ba, g) now := timeutil.Now() - br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, g, st, ui, false /* readOnly */) + br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, g, st, ui, readWrite) r.store.metrics.ReplicaWriteBatchEvaluationLatency.RecordValue(timeutil.Since(now).Nanoseconds()) if pErr == nil { if opLogger != nil { diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 20b6860da1e4..b2e18d0e995b 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -48,6 +48,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -1904,7 +1905,11 @@ func TestStoreScanResumeTSCache(t *testing.T) { ctx := context.Background() stopper := stop.NewStopper() defer stopper.Stop(ctx) - store, manualClock := createTestStore(ctx, t, testStoreOpts{createSystemRanges: true}, stopper) + manualClock := timeutil.NewManualTime(timeutil.Unix(0, 123)) + cfg := TestStoreConfig(hlc.NewClock(manualClock, time.Nanosecond)) + cfg.Settings = cluster.MakeTestingClusterSettings() + allowDroppingLatchesBeforeEval.Override(ctx, &cfg.Settings.SV, false) + store := createTestStoreWithConfig(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) // Write three keys at time t0. t0 := timeutil.Unix(1, 0) @@ -2115,8 +2120,8 @@ func TestStoreScanIntents(t *testing.T) { expFinish bool // do we expect the scan to finish? expCount int32 // how many times do we expect to scan? }{ - // Consistent which can push will make two loops. - {true, true, true, 2}, + // Consistent which can push will detect conflicts and resolve them. + {true, true, true, 1}, // Consistent but can't push will backoff and retry and not finish. {true, false, false, -1}, // Inconsistent and can push will make one loop, with async resolves. @@ -2204,6 +2209,111 @@ func TestStoreScanIntents(t *testing.T) { } } +// TestStoreScanIntentsRespectsLimit verifies that when reads are allowed to +// resolve their conflicts before eval (i.e. when they are allowed to drop their +// latches early), the scan for conflicting intents respects the max intent +// limits. +// +// The test proceeds as follows: a writer lays down more than +// `MaxIntentsPerWriteIntentErrorDefault` intents, and a reader is expected to +// encounter these intents and raise a `WriteIntentError` with exactly +// `MaxIntentsPerWriteIntentErrorDefault` intents in the error. +func TestStoreScanIntentsRespectsLimit(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + skip.UnderRace( + t, "this test writes a ton of intents and tries to clean them up, too slow under race", + ) + + var interceptWriteIntentErrors atomic.Value + // `commitCh` is used to block the writer from committing until the reader has + // encountered the intents laid down by the writer. + commitCh := make(chan struct{}) + // intentsLaidDownCh is signalled when the writer is done laying down intents. + intentsLaidDownCh := make(chan struct{}) + tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &StoreTestingKnobs{ + TestingConcurrencyRetryFilter: func( + ctx context.Context, ba *roachpb.BatchRequest, pErr *roachpb.Error, + ) { + if errors.HasType(pErr.GoError(), (*roachpb.WriteIntentError)(nil)) { + // Assert that the WriteIntentError has MaxIntentsPerWriteIntentErrorIntents. + if trap := interceptWriteIntentErrors.Load(); trap != nil && trap.(bool) { + require.Equal( + t, storage.MaxIntentsPerWriteIntentErrorDefault, + len(pErr.GetDetail().(*roachpb.WriteIntentError).Intents), + ) + interceptWriteIntentErrors.Store(false) + // Allow the writer to commit. + t.Logf("allowing writer to commit") + close(commitCh) + } + } + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + store, err := tc.Server(0).GetStores().(*Stores).GetStore(tc.Server(0).GetFirstStoreID()) + require.NoError(t, err) + var intentKeys []roachpb.Key + var wg sync.WaitGroup + wg.Add(2) + + // Lay down more than `MaxIntentsPerWriteIntentErrorDefault` intents. + go func() { + defer wg.Done() + txn := newTransaction( + "test", roachpb.Key("test-key"), roachpb.NormalUserPriority, tc.Server(0).Clock(), + ) + for j := 0; j < storage.MaxIntentsPerWriteIntentErrorDefault+10; j++ { + var key roachpb.Key + key = append(key, keys.ScratchRangeMin...) + key = append(key, []byte(fmt.Sprintf("%d", j))...) + intentKeys = append(intentKeys, key) + args := putArgs(key, []byte(fmt.Sprintf("value%07d", j))) + _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn}, &args) + require.Nil(t, pErr) + } + intentsLaidDownCh <- struct{}{} + <-commitCh // Wait for the test to tell us to commit the txn. + args, header := endTxnArgs(txn, true /* commit */) + _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), header, &args) + require.Nil(t, pErr) + }() + + select { + case <-intentsLaidDownCh: + case <-time.After(testutils.DefaultSucceedsSoonDuration): + t.Fatal("timed out waiting for intents to be laid down") + } + + // Now, expect a conflicting reader to encounter the intents and raise a + // WriteIntentError with exactly `MaxIntentsPerWriteIntentErrorDefault` + // intents. See the TestingConcurrencyRetryFilter above. + var ba kv.Batch + for i := 0; i < storage.MaxIntentsPerWriteIntentErrorDefault+10; i += 10 { + for _, key := range intentKeys[i : i+10] { + args := getArgs(key) + ba.AddRawRequest(&args) + } + } + t.Logf("issuing gets while intercepting WriteIntentErrors") + interceptWriteIntentErrors.Store(true) + go func() { + defer wg.Done() + err := store.DB().Run(ctx, &ba) + require.NoError(t, err) + }() + + wg.Wait() +} + // TestStoreScanInconsistentResolvesIntents lays down 10 intents, // commits the txn without resolving intents, then does repeated // inconsistent reads until the data shows up, showing that the diff --git a/pkg/sql/explain_test.go b/pkg/sql/explain_test.go index 9e5f5ec2e55c..a5e90b12a727 100644 --- a/pkg/sql/explain_test.go +++ b/pkg/sql/explain_test.go @@ -337,11 +337,12 @@ func TestExplainMVCCSteps(t *testing.T) { srv, godb, _ := serverutils.StartServer(t, base.TestServerArgs{Insecure: true}) defer srv.Stopper().Stop(ctx) r := sqlutils.MakeSQLRunner(godb) + r.Exec(t, "CREATE TABLE ab (a PRIMARY KEY, b) AS SELECT g, g FROM generate_series(1,1000) g(g)") r.Exec(t, "CREATE TABLE bc (b PRIMARY KEY, c) AS SELECT g, g FROM generate_series(1,1000) g(g)") scanQuery := "SELECT count(*) FROM ab" - expectedSteps, expectedSeeks := 1000, 2 + expectedSteps, expectedSeeks := 1000, 1 foundSteps, foundSeeks := getMVCCStats(t, r, scanQuery) assert.Equal(t, expectedSteps, foundSteps) @@ -352,7 +353,7 @@ func TestExplainMVCCSteps(t *testing.T) { // Update all rows. r.Exec(t, "UPDATE ab SET b=b+1 WHERE true") - expectedSteps, expectedSeeks = 2000, 2 + expectedSteps, expectedSeeks = 2000, 1 foundSteps, foundSeeks = getMVCCStats(t, r, scanQuery) assert.Equal(t, expectedSteps, foundSteps) diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 82eb2787f863..e0644148100d 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -1627,8 +1627,10 @@ func assertMVCCIteratorInvariants(iter MVCCIterator) error { // UnsafeRawMVCCKey must match Key. if r, err := DecodeMVCCKey(iter.UnsafeRawMVCCKey()); err != nil { - return errors.NewAssertionErrorWithWrappedErrf(err, "failed to decode UnsafeRawMVCCKey at %s", - key) + return errors.NewAssertionErrorWithWrappedErrf( + err, "failed to decode UnsafeRawMVCCKey at %s", + key, + ) } else if !r.Equal(key) { return errors.AssertionFailedf("UnsafeRawMVCCKey %s does not match Key %s", r, key) } @@ -1650,7 +1652,8 @@ func assertMVCCIteratorInvariants(iter MVCCIterator) error { return errors.AssertionFailedf("UnsafeRawKey lock table key %s does not match Key %s", k, key) } else if !key.Timestamp.IsEmpty() { return errors.AssertionFailedf( - "UnsafeRawKey lock table key %s for Key %s with non-zero timestamp", k, key) + "UnsafeRawKey lock table key %s for Key %s with non-zero timestamp", k, key, + ) } } else { return errors.AssertionFailedf("unknown type for engine key %s", engineKey) @@ -1673,3 +1676,86 @@ func assertMVCCIteratorInvariants(iter MVCCIterator) error { return nil } + +// ScanConflictingIntents scans intents using only the separated intents lock +// table. The result set is added to the given `intents` slice. It ignores +// intents that do not conflict with `txn`. If it encounters intents that were +// written by `txn` that are either at a higher sequence number than txn's or at +// a lower sequence number but at a higher timestamp, `needIntentHistory` is set +// to true. This flag is used to signal to the caller that a subsequent scan +// over the MVCC key space (for the batch in question) will need to be performed +// using an intent interleaving iterator in order to be able to read the correct +// provisional value. +func ScanConflictingIntents( + ctx context.Context, + reader Reader, + txn *roachpb.Transaction, + ts hlc.Timestamp, + start, end roachpb.Key, + intents *[]roachpb.Intent, + maxIntents int64, +) (needIntentHistory bool, err error) { + if err := ctx.Err(); err != nil { + return false, err + } + + upperBoundUnset := len(end) == 0 // NB: Get requests do not set the end key. + if !upperBoundUnset && bytes.Compare(start, end) >= 0 { + return true, errors.AssertionFailedf("start key must be less than end key") + } + ltStart, _ := keys.LockTableSingleKey(start, nil) + opts := IterOptions{LowerBound: ltStart} + if upperBoundUnset { + opts.Prefix = true + } else { + ltEnd, _ := keys.LockTableSingleKey(end, nil) + opts.UpperBound = ltEnd + } + iter := reader.NewEngineIterator(opts) + defer iter.Close() + + var meta enginepb.MVCCMetadata + var ok bool + for ok, err = iter.SeekEngineKeyGE(EngineKey{Key: ltStart}); ok; ok, err = iter.NextEngineKey() { + if maxIntents != 0 && int64(len(*intents)) >= maxIntents { + break + } + if err = protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil { + return false, err + } + if meta.Txn == nil { + return false, errors.Errorf("intent without transaction") + } + ownIntent := txn != nil && txn.ID == meta.Txn.ID + if ownIntent { + // If we ran into one of our own intents, check whether the intent has a + // higher (or equal) sequence number or a higher (or equal) timestamp. If + // either of these conditions is true, a corresponding scan over the MVCC + // key space will need access to the key's intent history in order to read + // the correct provisional value. So we set `needIntentHistory` to true. + if txn.Sequence <= meta.Txn.Sequence || ts.LessEq(meta.Timestamp.ToTimestamp()) { + needIntentHistory = true + } + continue + } + if conflictingIntent := meta.Timestamp.ToTimestamp().LessEq(ts); !conflictingIntent { + continue + } + key, err := iter.EngineKey() + if err != nil { + return false, err + } + lockedKey, err := keys.DecodeLockTableSingleKey(key.Key) + if err != nil { + return false, err + } + *intents = append(*intents, roachpb.MakeIntent(meta.Txn, lockedKey)) + } + if err != nil { + return false, err + } + if err := ctx.Err(); err != nil { + return false, err + } + return needIntentHistory, nil /* err */ +} diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 349988ce44ea..1f6a7cd7ae85 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -57,12 +57,12 @@ const ( // minimum total for a single store node must be under 2048 for Windows // compatibility. MinimumMaxOpenFiles = 1700 - // Default value for maximum number of intents reported by ExportToSST - // and Scan operations in WriteIntentError is set to half of the maximum - // lock table size. - // This value is subject to tuning in real environment as we have more - // data available. - maxIntentsPerWriteIntentErrorDefault = 5000 + // MaxIntentsPerWriteIntentErrorDefault is the default value for maximum + // number of intents reported by ExportToSST and Scan operations in + // WriteIntentError is set to half of the maximum lock table size. + // This value is subject to tuning in real environment as we have more data + // available. + MaxIntentsPerWriteIntentErrorDefault = 5000 ) var minWALSyncInterval = settings.RegisterDurationSetting( @@ -103,7 +103,8 @@ var MaxIntentsPerWriteIntentError = settings.RegisterIntSetting( settings.TenantWritable, "storage.mvcc.max_intents_per_error", "maximum number of intents returned in error during export of scan requests", - maxIntentsPerWriteIntentErrorDefault) + MaxIntentsPerWriteIntentErrorDefault, +) var rocksdbConcurrency = envutil.EnvOrDefaultInt( "COCKROACH_ROCKSDB_CONCURRENCY", func() int { @@ -889,6 +890,14 @@ type MVCCGetOptions struct { // LockTable is used to determine whether keys are locked in the in-memory // lock table when scanning with the SkipLocked option. LockTable LockTableView + // DontInterleaveIntents, when set, makes it such that intent metadata is not + // interleaved with the results of the scan. Setting this option means that + // the underlying pebble iterator will only scan over the MVCC keyspace and + // will not use an `intentInterleavingIter`. It is only appropriate to use + // this when the caller does not need to know whether a given key is an intent + // or not. It is usually set by read-only requests that have resolved their + // conflicts before they begin their MVCC scan. + DontInterleaveIntents bool } func (opts *MVCCGetOptions) validate() error { @@ -901,6 +910,9 @@ func (opts *MVCCGetOptions) validate() error { if opts.Inconsistent && opts.FailOnMoreRecent { return errors.Errorf("cannot allow inconsistent reads with fail on more recent option") } + if opts.DontInterleaveIntents && opts.SkipLocked { + return errors.Errorf("cannot disable interleaved intents with skip locked option") + } return nil } @@ -909,12 +921,16 @@ func (opts *MVCCGetOptions) errOnIntents() bool { } // newMVCCIterator sets up a suitable iterator for high-level MVCC operations -// operating at the given timestamp. If timestamp is empty, the iterator is -// considered to be used for inline values, disabling intents and range keys. -// If rangeKeyMasking is true, IterOptions.RangeKeyMaskingBelow is set to the -// given timestamp. +// operating at the given timestamp. If timestamp is empty or if +// `noInterleavedIntents` is set, the iterator is considered to be used for +// inline values, disabling intents and range keys. If rangeKeyMasking is true, +// IterOptions.RangeKeyMaskingBelow is set to the given timestamp. func newMVCCIterator( - reader Reader, timestamp hlc.Timestamp, rangeKeyMasking bool, opts IterOptions, + reader Reader, + timestamp hlc.Timestamp, + rangeKeyMasking bool, + noInterleavedIntents bool, + opts IterOptions, ) MVCCIterator { // If reading inline then just return a plain MVCCIterator without intents. // However, we allow the caller to enable range keys, since they may be needed @@ -927,7 +943,11 @@ func newMVCCIterator( opts.RangeKeyMaskingBelow.IsEmpty() { opts.RangeKeyMaskingBelow = timestamp } - return reader.NewMVCCIterator(MVCCKeyAndIntentsIterKind, opts) + iterKind := MVCCKeyAndIntentsIterKind + if noInterleavedIntents { + iterKind = MVCCKeyIterKind + } + return reader.NewMVCCIterator(iterKind, opts) } // MVCCGet returns the most recent value for the specified key whose timestamp @@ -971,10 +991,12 @@ func newMVCCIterator( func MVCCGet( ctx context.Context, reader Reader, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) { - iter := newMVCCIterator(reader, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + reader, timestamp, false /* rangeKeyMasking */, opts.DontInterleaveIntents, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() value, intent, err := mvccGet(ctx, iter, key, timestamp, opts) return value.ToPointer(), intent, err @@ -1311,10 +1333,12 @@ func MVCCPut( var iter MVCCIterator blind := ms == nil && timestamp.IsEmpty() if !blind { - iter = newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter = newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() } return mvccPutUsingIter(ctx, rw, iter, ms, key, timestamp, localTimestamp, value, txn, nil) @@ -1361,10 +1385,12 @@ func MVCCDelete( localTimestamp hlc.ClockTimestamp, txn *roachpb.Transaction, ) (foundKey bool, err error) { - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() buf := newPutBuffer() @@ -2094,10 +2120,12 @@ func MVCCIncrement( txn *roachpb.Transaction, inc int64, ) (int64, error) { - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() var int64Val int64 @@ -2171,10 +2199,12 @@ func MVCCConditionalPut( allowIfDoesNotExist CPutMissingBehavior, txn *roachpb.Transaction, ) error { - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() return mvccConditionalPutUsingIter( @@ -2256,10 +2286,12 @@ func MVCCInitPut( failOnTombstones bool, txn *roachpb.Transaction, ) error { - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() return mvccInitPutUsingIter(ctx, rw, iter, ms, key, timestamp, localTimestamp, value, failOnTombstones, txn) } @@ -2855,10 +2887,12 @@ func MVCCDeleteRange( buf := newPutBuffer() defer buf.release() - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() var keys []roachpb.Key @@ -3018,10 +3052,12 @@ func MVCCPredicateDeleteRange( // Create some reusable machinery for flushing a run with point tombstones // that is typically used in a single MVCCPut call. - pointTombstoneIter := newMVCCIterator(rw, endTime, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + pointTombstoneIter := newMVCCIterator( + rw, endTime, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer pointTombstoneIter.Close() pointTombstoneBuf := newPutBuffer() defer pointTombstoneBuf.release() @@ -3678,6 +3714,14 @@ type MVCCScanOptions struct { // LockTable is used to determine whether keys are locked in the in-memory // lock table when scanning with the SkipLocked option. LockTable LockTableView + // DontInterleaveIntents, when set, makes it such that intent metadata is not + // interleaved with the results of the scan. Setting this option means that + // the underlying pebble iterator will only scan over the MVCC keyspace and + // will not use an `intentInterleavingIter`. It is only appropriate to use + // this when the caller does not need to know whether a given key is an intent + // or not. It is usually set by read-only requests that have resolved their + // conflicts before they begin their MVCC scan. + DontInterleaveIntents bool } func (opts *MVCCScanOptions) validate() error { @@ -3690,6 +3734,9 @@ func (opts *MVCCScanOptions) validate() error { if opts.Inconsistent && opts.FailOnMoreRecent { return errors.Errorf("cannot allow inconsistent reads with fail on more recent option") } + if opts.DontInterleaveIntents && opts.SkipLocked { + return errors.Errorf("cannot disable interleaved intents with skip locked option") + } return nil } @@ -3776,11 +3823,13 @@ func MVCCScan( timestamp hlc.Timestamp, opts MVCCScanOptions, ) (MVCCScanResult, error) { - iter := newMVCCIterator(reader, timestamp, !opts.Tombstones, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - LowerBound: key, - UpperBound: endKey, - }) + iter := newMVCCIterator( + reader, timestamp, !opts.Tombstones, opts.DontInterleaveIntents, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + LowerBound: key, + UpperBound: endKey, + }, + ) defer iter.Close() return mvccScanToKvs(ctx, iter, key, endKey, timestamp, opts) } @@ -3793,11 +3842,13 @@ func MVCCScanToBytes( timestamp hlc.Timestamp, opts MVCCScanOptions, ) (MVCCScanResult, error) { - iter := newMVCCIterator(reader, timestamp, !opts.Tombstones, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - LowerBound: key, - UpperBound: endKey, - }) + iter := newMVCCIterator( + reader, timestamp, !opts.Tombstones, opts.DontInterleaveIntents, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + LowerBound: key, + UpperBound: endKey, + }, + ) defer iter.Close() return mvccScanToBytes(ctx, iter, key, endKey, timestamp, opts) } @@ -3842,11 +3893,13 @@ func MVCCIterate( opts MVCCScanOptions, f func(roachpb.KeyValue) error, ) ([]roachpb.Intent, error) { - iter := newMVCCIterator(reader, timestamp, !opts.Tombstones, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - LowerBound: key, - UpperBound: endKey, - }) + iter := newMVCCIterator( + reader, timestamp, !opts.Tombstones, opts.DontInterleaveIntents, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + LowerBound: key, + UpperBound: endKey, + }, + ) defer iter.Close() var intents []roachpb.Intent From 52a0e006c4fd31bc0c71d8d876b9e6528b39c8ae Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Mon, 7 Nov 2022 15:51:19 -0500 Subject: [PATCH 2/2] kvserver: add metrics for when read-only batches drop latches Release note: None --- pkg/kv/kvserver/metrics.go | 19 ++++++++++++++++++- pkg/kv/kvserver/replica_read.go | 2 ++ pkg/ts/catalog/chart_catalog.go | 12 ++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index dd1498d5c424..368fb07989a9 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -1651,7 +1651,6 @@ Note that the measurement does not include the duration for replicating the eval Measurement: "Nanoseconds", Unit: metric.Unit_NANOSECONDS, } - metaPopularKeyCount = metric.Metadata{ Name: "kv.loadsplitter.popularkey", Help: "Load-based splitter could not find a split key and the most popular sampled split key occurs in >= 25% of the samples.", @@ -1679,6 +1678,18 @@ Note that the measurement does not include the duration for replicating the eval Measurement: "Fsync Latency", Unit: metric.Unit_NANOSECONDS, } + metaReplicaReadBatchDroppedLatchesBeforeEval = metric.Metadata{ + Name: "kv.replica_read_batch_evaluate.dropped_latches_before_eval", + Help: `Number of times read-only batches dropped latches before evaluation.`, + Measurement: "Batches", + Unit: metric.Unit_COUNT, + } + metaReplicaReadBatchWithoutInterleavingIter = metric.Metadata{ + Name: "kv.replica_read_batch_evaluate.without_interleaving_iter", + Help: `Number of read-only batches evaluated without an intent interleaving iter.`, + Measurement: "Batches", + Unit: metric.Unit_COUNT, + } ) // StoreMetrics is the set of metrics for a given store. @@ -1975,6 +1986,9 @@ type StoreMetrics struct { ReplicaReadBatchEvaluationLatency *metric.Histogram ReplicaWriteBatchEvaluationLatency *metric.Histogram + ReplicaReadBatchDroppedLatchesBeforeEval *metric.Counter + ReplicaReadBatchWithoutInterleavingIter *metric.Counter + FlushUtilization *metric.GaugeFloat64 FsyncLatency *metric.ManualWindowHistogram } @@ -2517,6 +2531,9 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { ), FlushUtilization: metric.NewGaugeFloat64(metaStorageFlushUtilization), FsyncLatency: metric.NewManualWindowHistogram(metaStorageFsyncLatency, pebble.FsyncLatencyBuckets), + + ReplicaReadBatchDroppedLatchesBeforeEval: metric.NewCounter(metaReplicaReadBatchDroppedLatchesBeforeEval), + ReplicaReadBatchWithoutInterleavingIter: metric.NewCounter(metaReplicaReadBatchWithoutInterleavingIter), } { diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 2e9b3012445a..de7e521ba75f 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -109,7 +109,9 @@ func (r *Replica) executeReadOnlyBatch( // the request is not being optimistically evaluated (optimistic evaluation // does not wait for latches or check locks). log.VEventf(ctx, 3, "lock table scan complete without conflicts; dropping latches early") + r.store.metrics.ReplicaReadBatchDroppedLatchesBeforeEval.Inc(1) if !stillNeedsInterleavedIntents { + r.store.metrics.ReplicaReadBatchWithoutInterleavingIter.Inc(1) evalPath = readOnlyWithoutInterleavedIntents } r.updateTimestampCacheAndDropLatches(ctx, g, ba, nil /* br */, nil /* pErr */, st) diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 0ca5853957e5..5368c0788b26 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -3750,4 +3750,16 @@ var charts = []sectionDescription{ }, }, }, + { + Organization: [][]string{{ReplicationLayer, "Batches"}}, + Charts: []chartDescription{ + { + Title: "Total number of attempts to evaluate read-only batches", + Metrics: []string{ + "kv.replica_read_batch_evaluate.dropped_latches_before_eval", + "kv.replica_read_batch_evaluate.without_interleaving_iter", + }, + }, + }, + }, }