diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index 01bfa8bdd79a..352c7b1807bd 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -1440,7 +1440,7 @@ func TestAddSSTableSSTTimestampToRequestTimestampRespectsClosedTS(t *testing.T) require.NoError(t, err) r, store, err := s.GetStores().(*kvserver.Stores).GetReplicaForRangeID(ctx, rd.RangeID) require.NoError(t, err) - closedTS := r.GetClosedTimestamp(ctx) + closedTS := r.GetCurrentClosedTimestamp(ctx) require.NotZero(t, closedTS) // Add an SST writing below the closed timestamp. It should get pushed above it. diff --git a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go index ecbf7b80754d..eea2c6e0e8a8 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go @@ -56,7 +56,7 @@ func QueryResolvedTimestamp( // because QueryResolvedTimestamp requests are often run without acquiring // latches (see roachpb.INCONSISTENT) and often also on follower replicas, // so latches won't help them to synchronize with writes. - closedTS := cArgs.EvalCtx.GetClosedTimestamp(ctx) + closedTS := cArgs.EvalCtx.GetClosedTimestampOlderThanStorageSnapshot() // Compute the minimum timestamp of any intent in the request's key span, // which may span the entire range, but does not need to. diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index cfd51563854b..5818c29fa318 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -152,7 +152,7 @@ func Subsume( // think about. priorReadSum.Merge(rspb.FromTimestamp(reply.FreezeStart.ToTimestamp())) reply.ReadSummary = &priorReadSum - reply.ClosedTimestamp = cArgs.EvalCtx.GetClosedTimestamp(ctx) + reply.ClosedTimestamp = cArgs.EvalCtx.GetCurrentClosedTimestamp(ctx) return result.Result{}, nil } diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index d8536d9781a0..ff14661a6dc6 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -49,6 +49,8 @@ type Limiters struct { // underlying state. type EvalContext interface { fmt.Stringer + ImmutableEvalContext + ClusterSettings() *cluster.Settings EvalKnobs() kvserverbase.BatchEvalTestingKnobs @@ -111,12 +113,6 @@ type EvalContext interface { // requests on the range. GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary - // GetClosedTimestamp returns the current closed timestamp on the range. - // It is expected that a caller will have performed some action (either - // calling RevokeLease or WatchForMerge) to freeze further progression of - // the closed timestamp before calling this method. - GetClosedTimestamp(ctx context.Context) hlc.Timestamp - GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error) GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage, error) @@ -141,6 +137,24 @@ type EvalContext interface { // GetEngineCapacity returns the store's underlying engine capacity; other // StoreCapacity fields not related to engine capacity are not populated. GetEngineCapacity() (roachpb.StoreCapacity, error) + + // GetCurrentClosedTimestamp returns the current closed timestamp on the + // range. It is expected that a caller will have performed some action (either + // calling RevokeLease or WatchForMerge) to freeze further progression of the + // closed timestamp before calling this method. + GetCurrentClosedTimestamp(ctx context.Context) hlc.Timestamp + + // Release returns the memory allocated by the EvalContext implementation to a + // sync.Pool. + Release() +} + +// ImmutableEvalContext is like EvalContext, but it encapsulates state that +// needs to be immutable during the course of command evaluation. +type ImmutableEvalContext interface { + // GetClosedTimestampOlderThanStorageSnapshot returns the closed timestamp + // that was active before the state of the storage engine was pinned. + GetClosedTimestampOlderThanStorageSnapshot() hlc.Timestamp } // MockEvalCtx is a dummy implementation of EvalContext for testing purposes. @@ -255,7 +269,10 @@ func (m *mockEvalCtxImpl) GetRangeInfo(ctx context.Context) roachpb.RangeInfo { func (m *mockEvalCtxImpl) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary { return m.CurrentReadSummary } -func (m *mockEvalCtxImpl) GetClosedTimestamp(ctx context.Context) hlc.Timestamp { +func (m *mockEvalCtxImpl) GetClosedTimestampOlderThanStorageSnapshot() hlc.Timestamp { + return m.ClosedTimestamp +} +func (m *mockEvalCtxImpl) GetCurrentClosedTimestamp(ctx context.Context) hlc.Timestamp { return m.ClosedTimestamp } func (m *mockEvalCtxImpl) GetExternalStorage( @@ -287,3 +304,4 @@ func (m *mockEvalCtxImpl) GetMaxBytes() int64 { func (m *mockEvalCtxImpl) GetEngineCapacity() (roachpb.StoreCapacity, error) { return roachpb.StoreCapacity{Available: 1, Capacity: 1}, nil } +func (m *mockEvalCtxImpl) Release() {} diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index c2e662ceaeca..d4a6b0d0e10a 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -2149,7 +2149,7 @@ func TestStoreRangeMergeLHSLeaseTransfersAfterFreezeTime(t *testing.T) { } return nil }) - lhsClosedTS := lhsLeaseholder.GetClosedTimestamp(ctx) + lhsClosedTS := lhsLeaseholder.GetCurrentClosedTimestamp(ctx) require.NotEmpty(t, lhsClosedTS) // Finally, allow the merge to complete. It should complete successfully. diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 795d6d79be80..8181bd7044dd 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -998,7 +998,7 @@ func (cbt *circuitBreakerTest) FollowerRead(idx int) error { repl := cbt.repls[idx] get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */) ctx := context.Background() - ts := repl.GetClosedTimestamp(ctx) + ts := repl.GetCurrentClosedTimestamp(ctx) return cbt.SendCtxTS(ctx, idx, get, ts) } diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 862d12b24879..ac846b12e8c3 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -772,7 +772,7 @@ SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true; require.NoError(t, err) r, err := store.GetReplica(rightDesc.RangeID) require.NoError(t, err) - maxClosed := r.GetClosedTimestamp(ctx) + maxClosed := r.GetCurrentClosedTimestamp(ctx) // Note that maxClosed would not necessarily be below the freeze start if // this was a LEAD_FOR_GLOBAL_READS range. assert.True(t, maxClosed.LessEq(freezeStartTimestamp), @@ -807,7 +807,7 @@ SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true; mergedLeaseholder, err := leftLeaseholderStore.GetReplica(leftDesc.RangeID) require.NoError(t, err) writeTime := rhsLeaseStart.Prev() - require.True(t, mergedLeaseholder.GetClosedTimestamp(ctx).Less(writeTime)) + require.True(t, mergedLeaseholder.GetCurrentClosedTimestamp(ctx).Less(writeTime)) var baWrite roachpb.BatchRequest baWrite.Header.RangeID = leftDesc.RangeID baWrite.Header.Timestamp = writeTime diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index d35bd5c91e93..70514440d40a 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -703,8 +703,6 @@ type Replica struct { } } -var _ batcheval.EvalContext = &Replica{} - // String returns the string representation of the replica using an // inconsistent copy of the range descriptor. Therefore, String does not // require a lock and its output may not be atomic with other ongoing work in @@ -1232,7 +1230,7 @@ func (r *Replica) State(ctx context.Context) kvserverpb.RangeInfo { // NB: this acquires an RLock(). Reentrant RLocks are deadlock prone, so do // this first before RLocking below. Performance of this extra lock // acquisition is not a concern. - ri.ActiveClosedTimestamp = r.GetClosedTimestamp(ctx) + ri.ActiveClosedTimestamp = r.GetCurrentClosedTimestamp(ctx) // NB: numRangefeedRegistrations doesn't require Replica.mu to be locked. // However, it does require coordination between multiple goroutines, so diff --git a/pkg/kv/kvserver/replica_closedts_internal_test.go b/pkg/kv/kvserver/replica_closedts_internal_test.go index 97df78100077..627bd4963d2e 100644 --- a/pkg/kv/kvserver/replica_closedts_internal_test.go +++ b/pkg/kv/kvserver/replica_closedts_internal_test.go @@ -509,7 +509,8 @@ func (r *mockReceiver) HTML() string { return "" } -// Test that r.GetClosedTimestamp() mixes its sources of information correctly. +// Test that r.GetCurrentClosedTimestamp() mixes its sources of information +// correctly. func TestReplicaClosedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -567,7 +568,7 @@ func TestReplicaClosedTimestamp(t *testing.T) { tc.repl.mu.state.RaftClosedTimestamp = test.raftClosed tc.repl.mu.state.LeaseAppliedIndex = uint64(test.applied) tc.repl.mu.Unlock() - require.Equal(t, test.expClosed, tc.repl.GetClosedTimestamp(ctx)) + require.Equal(t, test.expClosed, tc.repl.GetCurrentClosedTimestamp(ctx)) }) } } diff --git a/pkg/kv/kvserver/replica_eval_context.go b/pkg/kv/kvserver/replica_eval_context.go index 47744e471d5f..28a59ed0d6dd 100644 --- a/pkg/kv/kvserver/replica_eval_context.go +++ b/pkg/kv/kvserver/replica_eval_context.go @@ -12,10 +12,12 @@ package kvserver import ( "context" + "sync" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -28,19 +30,78 @@ import ( // Do not introduce new uses of this. var todoSpanSet = &spanset.SpanSet{} +var evalContextPool = sync.Pool{ + New: func() interface{} { + return &evalContextImpl{} + }, +} + +// evalContextImpl implements the batcheval.EvalContext interface. +type evalContextImpl struct { + *Replica + // NB: We cannot use the emptiness of `closedTS` to determine whether the + // closed timestamp was elided during the creation of this eval context, so we + // track it separately. + closedTSElided bool + closedTS hlc.Timestamp +} + +func newEvalContextImpl( + ctx context.Context, r *Replica, requiresClosedTSOlderThanStorageSnap bool, +) (ec *evalContextImpl) { + var closedTS hlc.Timestamp + if requiresClosedTSOlderThanStorageSnap { + // We elide this call to get the replica's current closed timestamp unless + // the request requires it, in order to avoid redundant mutex contention. + closedTS = r.GetCurrentClosedTimestamp(ctx) + } + + ec = evalContextPool.Get().(*evalContextImpl) + *ec = evalContextImpl{ + Replica: r, + closedTSElided: !requiresClosedTSOlderThanStorageSnap, + closedTS: closedTS, + } + return ec +} + +// GetClosedTimestampOlderThanStorageSnapshot implements the EvalContext +// interface. +func (ec *evalContextImpl) GetClosedTimestampOlderThanStorageSnapshot() hlc.Timestamp { + if ec.closedTSElided { + panic("closed timestamp was elided during eval context creation; does the" + + " request set the requiresClosedTimestamp flag?") + } + return ec.closedTS +} + +// Release implements the EvalContext interface. +func (ec *evalContextImpl) Release() { + *ec = evalContextImpl{} + evalContextPool.Put(ec) +} + +var _ batcheval.EvalContext = &evalContextImpl{} + // NewReplicaEvalContext returns a batcheval.EvalContext to use for command // evaluation. The supplied SpanSet will be ignored except for race builds, in // which case state access is asserted against it. A SpanSet must always be // passed. -func NewReplicaEvalContext(r *Replica, ss *spanset.SpanSet) batcheval.EvalContext { +// The caller must call rec.Release() once done with the evaluation context in +// order to return its memory back to a sync.Pool. +func NewReplicaEvalContext( + ctx context.Context, r *Replica, ss *spanset.SpanSet, requiresClosedTSOlderThanStorageSnap bool, +) (rec batcheval.EvalContext) { if ss == nil { log.Fatalf(r.AnnotateCtx(context.Background()), "can't create a ReplicaEvalContext with assertions but no SpanSet") } + + rec = newEvalContextImpl(ctx, r, requiresClosedTSOlderThanStorageSnap) if util.RaceEnabled { return &SpanSetReplicaEvalContext{ - i: r, + i: rec, ss: *ss, } } - return r + return rec } diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index 21f767adafbd..80a7fb42af26 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -222,9 +222,15 @@ func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary(ctx context.Context) return rec.i.GetCurrentReadSummary(ctx) } -// GetClosedTimestamp is part of the EvalContext interface. -func (rec *SpanSetReplicaEvalContext) GetClosedTimestamp(ctx context.Context) hlc.Timestamp { - return rec.i.GetClosedTimestamp(ctx) +// GetCurrentClosedTimestamp is part of the EvalContext interface. +func (rec *SpanSetReplicaEvalContext) GetCurrentClosedTimestamp(ctx context.Context) hlc.Timestamp { + return rec.i.GetCurrentClosedTimestamp(ctx) +} + +// GetClosedTimestampOlderThanStorageSnapshot is part of the EvalContext +// interface. +func (rec *SpanSetReplicaEvalContext) GetClosedTimestampOlderThanStorageSnapshot() hlc.Timestamp { + return rec.i.GetClosedTimestampOlderThanStorageSnapshot() } // GetExternalStorage returns an ExternalStorage object, based on @@ -267,3 +273,6 @@ func (rec *SpanSetReplicaEvalContext) GetMaxBytes() int64 { func (rec *SpanSetReplicaEvalContext) GetEngineCapacity() (roachpb.StoreCapacity, error) { return rec.i.GetEngineCapacity() } + +// Release implements the batcheval.EvalContext interface. +func (rec *SpanSetReplicaEvalContext) Release() { rec.i.Release() } diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index a189a6798280..3edd73aa51b5 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -81,7 +81,7 @@ func (r *Replica) canServeFollowerReadRLocked(ctx context.Context, ba *roachpb.B } requiredFrontier := ba.RequiredFrontier() - maxClosed := r.getClosedTimestampRLocked(ctx, requiredFrontier /* sufficient */) + maxClosed := r.getCurrentClosedTimestampLocked(ctx, requiredFrontier /* sufficient */) canServeFollowerRead := requiredFrontier.LessEq(maxClosed) tsDiff := requiredFrontier.GoTime().Sub(maxClosed.GoTime()) if !canServeFollowerRead { @@ -106,13 +106,13 @@ func (r *Replica) canServeFollowerReadRLocked(ctx context.Context, ba *roachpb.B return true } -// getClosedTimestampRLocked is like maxClosed, except that it requires r.mu to be -// rlocked. It also optionally takes a hint: if sufficient is not -// empty, getClosedTimestampRLocked might return a timestamp that's lower than the -// maximum closed timestamp that we know about, as long as the returned -// timestamp is still >= sufficient. This is a performance optimization because -// we can avoid consulting the ClosedTimestampReceiver. -func (r *Replica) getClosedTimestampRLocked( +// getCurrentClosedTimestampRLocked is like GetCurrentClosedTimestamp, except +// that it requires r.mu to be RLocked. It also optionally takes a hint: if +// sufficient is not empty, getClosedTimestampRLocked might return a timestamp +// that's lower than the maximum closed timestamp that we know about, as long as +// the returned timestamp is still >= sufficient. This is a performance +// optimization because we can avoid consulting the ClosedTimestampReceiver. +func (r *Replica) getCurrentClosedTimestampLocked( ctx context.Context, sufficient hlc.Timestamp, ) hlc.Timestamp { appliedLAI := ctpb.LAI(r.mu.state.LeaseAppliedIndex) @@ -126,11 +126,10 @@ func (r *Replica) getClosedTimestampRLocked( return maxClosed } -// GetClosedTimestamp returns the maximum closed timestamp for this range. -// -// GetClosedTimestamp is part of the EvalContext interface. -func (r *Replica) GetClosedTimestamp(ctx context.Context) hlc.Timestamp { +// GetCurrentClosedTimestamp returns the current maximum closed timestamp for +// this range. +func (r *Replica) GetCurrentClosedTimestamp(ctx context.Context) hlc.Timestamp { r.mu.RLock() defer r.mu.RUnlock() - return r.getClosedTimestampRLocked(ctx, hlc.Timestamp{} /* sufficient */) + return r.getCurrentClosedTimestampLocked(ctx, hlc.Timestamp{} /* sufficient */) } diff --git a/pkg/kv/kvserver/replica_gossip.go b/pkg/kv/kvserver/replica_gossip.go index f22a76ced01f..2ce320d8effc 100644 --- a/pkg/kv/kvserver/replica_gossip.go +++ b/pkg/kv/kvserver/replica_gossip.go @@ -174,7 +174,10 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked( ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(span)}) // Call evaluateBatch instead of Send to avoid reacquiring latches. - rec := NewReplicaEvalContext(r, todoSpanSet) + rec := NewReplicaEvalContext( + ctx, r, todoSpanSet, false, /* requiresClosedTSOlderThanStorageSnap */ + ) + defer rec.Release() rw := r.Engine().NewReadOnly(storage.StandardDurability) defer rw.Close() @@ -217,7 +220,10 @@ func (r *Replica) loadSystemConfig(ctx context.Context) (*config.SystemConfigEnt ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(keys.SystemConfigSpan)}) // Call evaluateBatch instead of Send to avoid reacquiring latches. - rec := NewReplicaEvalContext(r, todoSpanSet) + rec := NewReplicaEvalContext( + ctx, r, todoSpanSet, false, /* requiresClosedTSOlderThanStorageSnap */ + ) + defer rec.Release() rw := r.Engine().NewReadOnly(storage.StandardDurability) defer rw.Close() diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 762fc3eec052..495185544a4a 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -435,7 +435,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // Check for an initial closed timestamp update immediately to help // initialize the rangefeed's resolved timestamp as soon as possible. - r.handleClosedTimestampUpdateRaftMuLocked(ctx, r.GetClosedTimestamp(ctx)) + r.handleClosedTimestampUpdateRaftMuLocked(ctx, r.GetCurrentClosedTimestamp(ctx)) return p } diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 9e59fab0b8c9..5879925c5cef 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -49,7 +49,8 @@ func (r *Replica) executeReadOnlyBatch( ui := uncertainty.ComputeInterval(&ba.Header, st, r.Clock().MaxOffset()) // Evaluate read-only batch command. - rec := NewReplicaEvalContext(r, g.LatchSpans()) + rec := NewReplicaEvalContext(ctx, r, g.LatchSpans(), ba.RequiresClosedTSOlderThanStorageSnapshot()) + defer rec.Release() // TODO(irfansharif): It's unfortunate that in this read-only code path, // we're stuck with a ReadWriter because of the way evaluateBatch is @@ -60,6 +61,12 @@ func (r *Replica) executeReadOnlyBatch( // may start relying on this, so we assert here. panic("expected consistent iterators") } + // Pin engine state eagerly so that all iterators created over this Reader are + // based off the state of the engine as of this point and are mutually + // consistent. + if err := rw.PinEngineStateForIterators(); err != nil { + return nil, g, roachpb.NewError(err) + } if util.RaceEnabled { rw = spanset.NewReadWriterAt(rw, g.LatchSpans(), ba.Timestamp) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index d2daa30d2744..d26af6432d2c 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -71,7 +71,7 @@ import ( "github.com/kr/pretty" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - raft "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/raft/v3/tracker" "golang.org/x/net/trace" @@ -945,7 +945,9 @@ func TestReplicaLease(t *testing.T) { } { if _, err := batcheval.RequestLease(ctx, tc.store.Engine(), batcheval.CommandArgs{ - EvalCtx: NewReplicaEvalContext(tc.repl, allSpans()), + EvalCtx: NewReplicaEvalContext( + ctx, tc.repl, allSpans(), false, /* requiresClosedTSOlderThanStorageSnap */ + ), Args: &roachpb.RequestLeaseRequest{ Lease: lease, }, @@ -4984,7 +4986,9 @@ func TestEndTxnDirectGC(t *testing.T) { var gr roachpb.GetResponse if _, err := batcheval.Get( ctx, tc.engine, batcheval.CommandArgs{ - EvalCtx: NewReplicaEvalContext(tc.repl, allSpans()), + EvalCtx: NewReplicaEvalContext( + ctx, tc.repl, allSpans(), false, /* requiresClosedTSOlderThanStorageSnap */ + ), Args: &roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{ Key: keys.TransactionKey(txn.Key, txn.ID), }}, @@ -5367,7 +5371,8 @@ func TestAbortSpanError(t *testing.T) { t.Fatal(err) } - rec := &SpanSetReplicaEvalContext{tc.repl, *allSpans()} + ec := newEvalContextImpl(ctx, tc.repl, false /* requireClosedTS */) + rec := &SpanSetReplicaEvalContext{ec, *allSpans()} pErr := checkIfTxnAborted(ctx, rec, tc.engine, txn) if _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); ok { expected := txn.Clone() @@ -5777,7 +5782,12 @@ func TestResolveIntentPushTxnReplyTxn(t *testing.T) { // return args.PusherTxn. h = roachpb.Header{Timestamp: tc.Clock().Now()} var reply roachpb.PushTxnResponse - if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{EvalCtx: tc.repl, Stats: &ms, Header: h, Args: &pa}, &reply); err != nil { + ec := newEvalContextImpl( + ctx, + tc.repl, + false, /* requireClosedTS */ + ) + if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{EvalCtx: ec, Stats: &ms, Header: h, Args: &pa}, &reply); err != nil { t.Fatal(err) } else if reply.Txn != nil { t.Fatalf("expected nil response txn, but got %s", reply.Txn) @@ -8462,10 +8472,11 @@ func TestGCWithoutThreshold(t *testing.T) { rw := spanset.NewBatch(batch, &spans) var resp roachpb.GCResponse - if _, err := batcheval.GC(ctx, rw, batcheval.CommandArgs{ - Args: &gc, - EvalCtx: NewReplicaEvalContext(tc.repl, &spans), + Args: &gc, + EvalCtx: NewReplicaEvalContext( + ctx, tc.repl, &spans, false, /* requiresClosedTSOlderThanStorageSnap */ + ), }, &resp); err != nil { t.Fatal(err) } diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 29ee8a8665ab..e71a8b32d8e8 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -594,7 +594,7 @@ func (r *Replica) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary { // Forward the read summary by the range's closed timestamp, because any // replica could have served reads below this time. We also return the // closed timestamp separately, in case callers want it split out. - closedTS := r.GetClosedTimestamp(ctx) + closedTS := r.GetCurrentClosedTimestamp(ctx) sum.Merge(rspb.FromTimestamp(closedTS)) return sum } diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 8b9ffc849c38..c12fe8375f53 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -424,7 +424,8 @@ func (r *Replica) evaluateWriteBatch( } ms := new(enginepb.MVCCStats) - rec := NewReplicaEvalContext(r, g.LatchSpans()) + rec := NewReplicaEvalContext(ctx, r, g.LatchSpans(), ba.RequiresClosedTSOlderThanStorageSnapshot()) + defer rec.Release() batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes( ctx, idKey, rec, ms, ba, ui, g, nil /* deadline */) return batch, *ms, br, res, pErr @@ -489,7 +490,8 @@ func (r *Replica) evaluate1PC( // Is this relying on the batch being write-only? ui := uncertainty.Interval{} - rec := NewReplicaEvalContext(r, g.LatchSpans()) + rec := NewReplicaEvalContext(ctx, r, g.LatchSpans(), ba.RequiresClosedTSOlderThanStorageSnapshot()) + defer rec.Release() var br *roachpb.BatchResponse var res result.Result var pErr *roachpb.Error diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 82793f033400..44dcd2245d74 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2294,7 +2294,7 @@ func (s *Store) startRangefeedUpdater(ctx context.Context) { if r == nil { continue } - r.handleClosedTimestampUpdate(ctx, r.GetClosedTimestamp(ctx)) + r.handleClosedTimestampUpdate(ctx, r.GetCurrentClosedTimestamp(ctx)) } case <-confCh: // Loop around to use the updated timer. @@ -3156,7 +3156,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { if w := metrics.LockTableMetrics.TopKLocksByWaitDuration[0].MaxWaitDurationNanos; w > maxLockWaitDurationNanos { maxLockWaitDurationNanos = w } - mc := rep.GetClosedTimestamp(ctx) + mc := rep.GetCurrentClosedTimestamp(ctx) if minMaxClosedTS.IsEmpty() || mc.Less(minMaxClosedTS) { minMaxClosedTS = mc } diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index 45327e102047..434fac27cd2c 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -136,15 +136,15 @@ func splitPreApply( // Persist the closed timestamp. // // In order to tolerate a nil initClosedTS input, let's forward to - // r.GetClosedTimestamp(). Generally, initClosedTS is not expected to be nil - // (and is expected to be in advance of r.GetClosedTimestamp() since it's - // coming hot off a Raft command), but let's not rely on the non-nil. Note - // that r.GetClosedTimestamp() does not yet incorporate initClosedTS because - // the split command has not been applied yet. + // r.GetCurrentClosedTimestamp(). Generally, initClosedTS is not expected to + // be nil (and is expected to be in advance of r.GetCurrentClosedTimestamp() + // since it's coming hot off a Raft command), but let's not rely on the + // non-nil. Note that r.GetCurrentClosedTimestamp() does not yet incorporate + // initClosedTS because the split command has not been applied yet. if initClosedTS == nil { initClosedTS = &hlc.Timestamp{} } - initClosedTS.Forward(r.GetClosedTimestamp(ctx)) + initClosedTS.Forward(r.GetCurrentClosedTimestamp(ctx)) if err := rsl.SetClosedTimestamp(ctx, readWriter, initClosedTS); err != nil { log.Fatalf(ctx, "%s", err) } diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 4d3c9cddfa42..6661fbd6b02e 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -77,24 +77,25 @@ func (rc ReadConsistencyType) SupportsBatch(ba BatchRequest) error { type flag int const ( - isAdmin flag = 1 << iota // admin cmds don't go through raft, but run on lease holder - isRead // read-only cmds don't go through raft, but may run on lease holder - isWrite // write cmds go through raft and must be proposed on lease holder - isTxn // txn commands may be part of a transaction - isLocking // locking cmds acquire locks for their transaction - isIntentWrite // intent write cmds leave intents when they succeed - isRange // range commands may span multiple keys - isReverse // reverse commands traverse ranges in descending direction - isAlone // requests which must be alone in a batch - isPrefix // requests which, in a batch, must not be split from the following request - isUnsplittable // range command that must not be split during sending - skipsLeaseCheck // commands which skip the check that the evaluating replica has a valid lease - appliesTSCache // commands which apply the timestamp cache and closed timestamp - updatesTSCache // commands which update the timestamp cache - updatesTSCacheOnErr // commands which make read data available on errors - needsRefresh // commands which require refreshes to avoid serializable retries - canBackpressure // commands which deserve backpressure when a Range grows too large - bypassesReplicaCircuitBreaker // commands which bypass the replica circuit breaker, i.e. opt out of fail-fast + isAdmin flag = 1 << iota // admin cmds don't go through raft, but run on lease holder + isRead // read-only cmds don't go through raft, but may run on lease holder + isWrite // write cmds go through raft and must be proposed on lease holder + isTxn // txn commands may be part of a transaction + isLocking // locking cmds acquire locks for their transaction + isIntentWrite // intent write cmds leave intents when they succeed + isRange // range commands may span multiple keys + isReverse // reverse commands traverse ranges in descending direction + isAlone // requests which must be alone in a batch + isPrefix // requests which, in a batch, must not be split from the following request + isUnsplittable // range command that must not be split during sending + skipsLeaseCheck // commands which skip the check that the evaluating replica has a valid lease + appliesTSCache // commands which apply the timestamp cache and closed timestamp + updatesTSCache // commands which update the timestamp cache + updatesTSCacheOnErr // commands which make read data available on errors + needsRefresh // commands which require refreshes to avoid serializable retries + canBackpressure // commands which deserve backpressure when a Range grows too large + bypassesReplicaCircuitBreaker // commands which bypass the replica circuit breaker, i.e. opt out of fail-fast + requiresClosedTSOlderThanStorageSnapshot // commands which read a replica's closed timestamp that is older than the state of the storage engine ) // flagDependencies specifies flag dependencies, asserted by TestFlagCombinations. @@ -1404,9 +1405,11 @@ func (r *RefreshRangeRequest) flags() flag { return isRead | isTxn | isRange | updatesTSCache } -func (*SubsumeRequest) flags() flag { return isRead | isAlone | updatesTSCache } -func (*RangeStatsRequest) flags() flag { return isRead } -func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange } +func (*SubsumeRequest) flags() flag { return isRead | isAlone | updatesTSCache } +func (*RangeStatsRequest) flags() flag { return isRead } +func (*QueryResolvedTimestampRequest) flags() flag { + return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot +} func (*ScanInterleavedIntentsRequest) flags() flag { return isRead | isRange } func (*BarrierRequest) flags() flag { return isWrite | isRange } diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 20d65e9d4049..de58e1cef531 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -275,6 +275,15 @@ func (ba *BatchRequest) Require1PC() bool { return etArg.Require1PC } +// RequiresClosedTSOlderThanStorageSnapshot returns true if the batch contains a +// request that needs to read a replica's closed timestamp that is older than +// the state of the storage snapshot the request is evaluating over. +// +// NB: This is only used by QueryResolvedTimestampRequest at the moment. +func (ba *BatchRequest) RequiresClosedTSOlderThanStorageSnapshot() bool { + return ba.hasFlag(requiresClosedTSOlderThanStorageSnapshot) +} + // IsSingleAbortTxnRequest returns true iff the batch contains a single request, // and that request is an EndTxnRequest(commit=false). func (ba *BatchRequest) IsSingleAbortTxnRequest() bool { diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index ccc54793d8ce..6bad3dfeeb7b 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1767,9 +1767,11 @@ type pebbleReadOnly struct { normalIter pebbleIterator prefixEngineIter pebbleIterator normalEngineIter pebbleIterator - iter cloneableIter - durability DurabilityRequirement - closed bool + + iter cloneableIter + iterUnused bool + durability DurabilityRequirement + closed bool } var _ ReadWriter = &pebbleReadOnly{} @@ -1811,6 +1813,13 @@ func (p *pebbleReadOnly) Close() { panic("closing an already-closed pebbleReadOnly") } p.closed = true + if p.iterUnused { + err := p.iter.Close() + if err != nil { + panic(err) + } + } + // Setting iter to nil is sufficient since it will be closed by one of the // subsequent destroy calls. p.iter = nil @@ -1929,11 +1938,12 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions if iter.iter != nil { iter.setBounds(opts.LowerBound, opts.UpperBound) } else { - iter.init(p.parent.db, p.iter, opts, p.durability) + iter.init(p.parent.db, p.iter, p.iterUnused, opts, p.durability) if p.iter == nil { // For future cloning. p.iter = iter.iter } + p.iterUnused = false iter.reusable = true } @@ -1964,11 +1974,12 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator { if iter.iter != nil { iter.setBounds(opts.LowerBound, opts.UpperBound) } else { - iter.init(p.parent.db, p.iter, opts, p.durability) + iter.init(p.parent.db, p.iter, p.iterUnused, opts, p.durability) if p.iter == nil { // For future cloning. p.iter = iter.iter } + p.iterUnused = false iter.reusable = true } @@ -2001,6 +2012,10 @@ func (p *pebbleReadOnly) PinEngineStateForIterators() error { o = &pebble.IterOptions{OnlyReadGuaranteedDurable: true} } p.iter = p.parent.db.NewIter(o) + // Since the iterator is being created just to pin the state of the engine + // for future iterators, we'll avoid cloning it the next time we want an + // iterator and instead just re-use what we created here. + p.iterUnused = true } return nil } diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index 371f430dd590..ffb853472074 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -47,9 +47,11 @@ type pebbleBatch struct { normalIter pebbleIterator prefixEngineIter pebbleIterator normalEngineIter pebbleIterator - iter cloneableIter - writeOnly bool - closed bool + + iter cloneableIter + writeOnly bool + iterUnused bool + closed bool wrappedIntentWriter intentDemuxWriter // scratch space for wrappedIntentWriter. @@ -104,6 +106,12 @@ func (p *pebbleBatch) Close() { } p.closed = true + if p.iterUnused { + if err := p.iter.Close(); err != nil { + panic(err) + } + } + // Setting iter to nil is sufficient since it will be closed by one of the // subsequent destroy calls. p.iter = nil @@ -230,14 +238,15 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M iter.setBounds(opts.LowerBound, opts.UpperBound) } else { if p.batch.Indexed() { - iter.init(p.batch, p.iter, opts, StandardDurability) + iter.init(p.batch, p.iter, p.iterUnused, opts, StandardDurability) } else { - iter.init(p.db, p.iter, opts, StandardDurability) + iter.init(p.db, p.iter, p.iterUnused, opts, StandardDurability) } if p.iter == nil { // For future cloning. p.iter = iter.iter } + p.iterUnused = false } iter.inuse = true @@ -272,14 +281,15 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator { iter.setBounds(opts.LowerBound, opts.UpperBound) } else { if p.batch.Indexed() { - iter.init(p.batch, p.iter, opts, StandardDurability) + iter.init(p.batch, p.iter, p.iterUnused, opts, StandardDurability) } else { - iter.init(p.db, p.iter, opts, StandardDurability) + iter.init(p.db, p.iter, p.iterUnused, opts, StandardDurability) } if p.iter == nil { // For future cloning. p.iter = iter.iter } + p.iterUnused = false } iter.inuse = true @@ -299,6 +309,10 @@ func (p *pebbleBatch) PinEngineStateForIterators() error { } else { p.iter = p.db.NewIter(nil) } + // Since the iterator is being created just to pin the state of the engine + // for future iterators, we'll avoid cloning it the next time we want an + // iterator and instead just re-use what we created here. + p.iterUnused = true } return nil } diff --git a/pkg/storage/pebble_iterator.go b/pkg/storage/pebble_iterator.go index 1797cd4cb1de..8fc315668657 100644 --- a/pkg/storage/pebble_iterator.go +++ b/pkg/storage/pebble_iterator.go @@ -76,6 +76,7 @@ var pebbleIterPool = sync.Pool{ type cloneableIter interface { Clone() (*pebble.Iterator, error) + Close() error } type testingSetBoundsListener interface { @@ -91,7 +92,7 @@ func newPebbleIterator( ) *pebbleIterator { iter := pebbleIterPool.Get().(*pebbleIterator) iter.reusable = false // defensive - iter.init(handle, iterToClone, opts, durability) + iter.init(handle, iterToClone, false /* iterUnused */, opts, durability) return iter } @@ -106,6 +107,7 @@ func newPebbleIterator( func (p *pebbleIterator) init( handle pebble.Reader, iterToClone cloneableIter, + iterUnused bool, opts IterOptions, durability DurabilityRequirement, ) { @@ -183,8 +185,15 @@ func (p *pebbleIterator) init( if doClone { var err error - if p.iter, err = iterToClone.Clone(); err != nil { - panic(err) + if iterUnused { + // NB: If the iterator was never used (at the time of writing, this means + // that the iterator was created by `PinEngineStateForIterators()`), we + // don't need to clone it. + p.iter = iterToClone.(*pebble.Iterator) + } else { + if p.iter, err = iterToClone.Clone(); err != nil { + panic(err) + } } p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound) } else {