Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver, batcheval: pin Engine state during read-only command evaluation #76312

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ func TestAddSSTableSSTTimestampToRequestTimestampRespectsClosedTS(t *testing.T)
require.NoError(t, err)
r, store, err := s.GetStores().(*kvserver.Stores).GetReplicaForRangeID(ctx, rd.RangeID)
require.NoError(t, err)
closedTS := r.GetClosedTimestamp(ctx)
closedTS := r.GetCurrentClosedTimestamp(ctx)
require.NotZero(t, closedTS)

// Add an SST writing below the closed timestamp. It should get pushed above it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func QueryResolvedTimestamp(
// because QueryResolvedTimestamp requests are often run without acquiring
// latches (see roachpb.INCONSISTENT) and often also on follower replicas,
// so latches won't help them to synchronize with writes.
closedTS := cArgs.EvalCtx.GetClosedTimestamp(ctx)
closedTS := cArgs.EvalCtx.GetClosedTimestampOlderThanStorageSnapshot()

// Compute the minimum timestamp of any intent in the request's key span,
// which may span the entire range, but does not need to.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func Subsume(
// think about.
priorReadSum.Merge(rspb.FromTimestamp(reply.FreezeStart.ToTimestamp()))
reply.ReadSummary = &priorReadSum
reply.ClosedTimestamp = cArgs.EvalCtx.GetClosedTimestamp(ctx)
reply.ClosedTimestamp = cArgs.EvalCtx.GetCurrentClosedTimestamp(ctx)

return result.Result{}, nil
}
32 changes: 25 additions & 7 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Limiters struct {
// underlying state.
type EvalContext interface {
fmt.Stringer
ImmutableEvalContext

ClusterSettings() *cluster.Settings
EvalKnobs() kvserverbase.BatchEvalTestingKnobs

Expand Down Expand Up @@ -111,12 +113,6 @@ type EvalContext interface {
// requests on the range.
GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary

// GetClosedTimestamp returns the current closed timestamp on the range.
// It is expected that a caller will have performed some action (either
// calling RevokeLease or WatchForMerge) to freeze further progression of
// the closed timestamp before calling this method.
GetClosedTimestamp(ctx context.Context) hlc.Timestamp

GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error)
GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage,
error)
Expand All @@ -141,6 +137,24 @@ type EvalContext interface {
// GetEngineCapacity returns the store's underlying engine capacity; other
// StoreCapacity fields not related to engine capacity are not populated.
GetEngineCapacity() (roachpb.StoreCapacity, error)

// GetCurrentClosedTimestamp returns the current closed timestamp on the
// range. It is expected that a caller will have performed some action (either
// calling RevokeLease or WatchForMerge) to freeze further progression of the
// closed timestamp before calling this method.
GetCurrentClosedTimestamp(ctx context.Context) hlc.Timestamp

// Release returns the memory allocated by the EvalContext implementation to a
// sync.Pool.
Release()
}

// ImmutableEvalContext is like EvalContext, but it encapsulates state that
// needs to be immutable during the course of command evaluation.
type ImmutableEvalContext interface {
// GetClosedTimestampOlderThanStorageSnapshot returns the closed timestamp
// that was active before the state of the storage engine was pinned.
GetClosedTimestampOlderThanStorageSnapshot() hlc.Timestamp
}

// MockEvalCtx is a dummy implementation of EvalContext for testing purposes.
Expand Down Expand Up @@ -255,7 +269,10 @@ func (m *mockEvalCtxImpl) GetRangeInfo(ctx context.Context) roachpb.RangeInfo {
func (m *mockEvalCtxImpl) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary {
return m.CurrentReadSummary
}
func (m *mockEvalCtxImpl) GetClosedTimestamp(ctx context.Context) hlc.Timestamp {
func (m *mockEvalCtxImpl) GetClosedTimestampOlderThanStorageSnapshot() hlc.Timestamp {
return m.ClosedTimestamp
}
func (m *mockEvalCtxImpl) GetCurrentClosedTimestamp(ctx context.Context) hlc.Timestamp {
return m.ClosedTimestamp
}
func (m *mockEvalCtxImpl) GetExternalStorage(
Expand Down Expand Up @@ -287,3 +304,4 @@ func (m *mockEvalCtxImpl) GetMaxBytes() int64 {
func (m *mockEvalCtxImpl) GetEngineCapacity() (roachpb.StoreCapacity, error) {
return roachpb.StoreCapacity{Available: 1, Capacity: 1}, nil
}
func (m *mockEvalCtxImpl) Release() {}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2149,7 +2149,7 @@ func TestStoreRangeMergeLHSLeaseTransfersAfterFreezeTime(t *testing.T) {
}
return nil
})
lhsClosedTS := lhsLeaseholder.GetClosedTimestamp(ctx)
lhsClosedTS := lhsLeaseholder.GetCurrentClosedTimestamp(ctx)
require.NotEmpty(t, lhsClosedTS)

// Finally, allow the merge to complete. It should complete successfully.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ func (cbt *circuitBreakerTest) FollowerRead(idx int) error {
repl := cbt.repls[idx]
get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */)
ctx := context.Background()
ts := repl.GetClosedTimestamp(ctx)
ts := repl.GetCurrentClosedTimestamp(ctx)
return cbt.SendCtxTS(ctx, idx, get, ts)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true;
require.NoError(t, err)
r, err := store.GetReplica(rightDesc.RangeID)
require.NoError(t, err)
maxClosed := r.GetClosedTimestamp(ctx)
maxClosed := r.GetCurrentClosedTimestamp(ctx)
// Note that maxClosed would not necessarily be below the freeze start if
// this was a LEAD_FOR_GLOBAL_READS range.
assert.True(t, maxClosed.LessEq(freezeStartTimestamp),
Expand Down Expand Up @@ -807,7 +807,7 @@ SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true;
mergedLeaseholder, err := leftLeaseholderStore.GetReplica(leftDesc.RangeID)
require.NoError(t, err)
writeTime := rhsLeaseStart.Prev()
require.True(t, mergedLeaseholder.GetClosedTimestamp(ctx).Less(writeTime))
require.True(t, mergedLeaseholder.GetCurrentClosedTimestamp(ctx).Less(writeTime))
var baWrite roachpb.BatchRequest
baWrite.Header.RangeID = leftDesc.RangeID
baWrite.Header.Timestamp = writeTime
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,6 @@ type Replica struct {
}
}

var _ batcheval.EvalContext = &Replica{}

// String returns the string representation of the replica using an
// inconsistent copy of the range descriptor. Therefore, String does not
// require a lock and its output may not be atomic with other ongoing work in
Expand Down Expand Up @@ -1232,7 +1230,7 @@ func (r *Replica) State(ctx context.Context) kvserverpb.RangeInfo {
// NB: this acquires an RLock(). Reentrant RLocks are deadlock prone, so do
// this first before RLocking below. Performance of this extra lock
// acquisition is not a concern.
ri.ActiveClosedTimestamp = r.GetClosedTimestamp(ctx)
ri.ActiveClosedTimestamp = r.GetCurrentClosedTimestamp(ctx)

// NB: numRangefeedRegistrations doesn't require Replica.mu to be locked.
// However, it does require coordination between multiple goroutines, so
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/replica_closedts_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,8 @@ func (r *mockReceiver) HTML() string {
return ""
}

// Test that r.GetClosedTimestamp() mixes its sources of information correctly.
// Test that r.GetCurrentClosedTimestamp() mixes its sources of information
// correctly.
func TestReplicaClosedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -567,7 +568,7 @@ func TestReplicaClosedTimestamp(t *testing.T) {
tc.repl.mu.state.RaftClosedTimestamp = test.raftClosed
tc.repl.mu.state.LeaseAppliedIndex = uint64(test.applied)
tc.repl.mu.Unlock()
require.Equal(t, test.expClosed, tc.repl.GetClosedTimestamp(ctx))
require.Equal(t, test.expClosed, tc.repl.GetCurrentClosedTimestamp(ctx))
})
}
}
Expand Down
67 changes: 64 additions & 3 deletions pkg/kv/kvserver/replica_eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ package kvserver

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

Expand All @@ -28,19 +30,78 @@ import (
// Do not introduce new uses of this.
var todoSpanSet = &spanset.SpanSet{}

var evalContextPool = sync.Pool{
New: func() interface{} {
return &evalContextImpl{}
},
}

// evalContextImpl implements the batcheval.EvalContext interface.
type evalContextImpl struct {
*Replica
// NB: We cannot use the emptiness of `closedTS` to determine whether the
// closed timestamp was elided during the creation of this eval context, so we
// track it separately.
closedTSElided bool
closedTS hlc.Timestamp
}

func newEvalContextImpl(
ctx context.Context, r *Replica, requiresClosedTSOlderThanStorageSnap bool,
) (ec *evalContextImpl) {
var closedTS hlc.Timestamp
if requiresClosedTSOlderThanStorageSnap {
// We elide this call to get the replica's current closed timestamp unless
// the request requires it, in order to avoid redundant mutex contention.
closedTS = r.GetCurrentClosedTimestamp(ctx)
}

ec = evalContextPool.Get().(*evalContextImpl)
*ec = evalContextImpl{
Replica: r,
closedTSElided: !requiresClosedTSOlderThanStorageSnap,
closedTS: closedTS,
}
return ec
}

// GetClosedTimestampOlderThanStorageSnapshot implements the EvalContext
// interface.
func (ec *evalContextImpl) GetClosedTimestampOlderThanStorageSnapshot() hlc.Timestamp {
if ec.closedTSElided {
panic("closed timestamp was elided during eval context creation; does the" +
" request set the requiresClosedTimestamp flag?")
}
return ec.closedTS
}

// Release implements the EvalContext interface.
func (ec *evalContextImpl) Release() {
*ec = evalContextImpl{}
evalContextPool.Put(ec)
}

var _ batcheval.EvalContext = &evalContextImpl{}

// NewReplicaEvalContext returns a batcheval.EvalContext to use for command
// evaluation. The supplied SpanSet will be ignored except for race builds, in
// which case state access is asserted against it. A SpanSet must always be
// passed.
func NewReplicaEvalContext(r *Replica, ss *spanset.SpanSet) batcheval.EvalContext {
// The caller must call rec.Release() once done with the evaluation context in
// order to return its memory back to a sync.Pool.
func NewReplicaEvalContext(
ctx context.Context, r *Replica, ss *spanset.SpanSet, requiresClosedTSOlderThanStorageSnap bool,
) (rec batcheval.EvalContext) {
if ss == nil {
log.Fatalf(r.AnnotateCtx(context.Background()), "can't create a ReplicaEvalContext with assertions but no SpanSet")
}

rec = newEvalContextImpl(ctx, r, requiresClosedTSOlderThanStorageSnap)
if util.RaceEnabled {
return &SpanSetReplicaEvalContext{
i: r,
i: rec,
ss: *ss,
}
}
return r
return rec
}
15 changes: 12 additions & 3 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,15 @@ func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary(ctx context.Context)
return rec.i.GetCurrentReadSummary(ctx)
}

// GetClosedTimestamp is part of the EvalContext interface.
func (rec *SpanSetReplicaEvalContext) GetClosedTimestamp(ctx context.Context) hlc.Timestamp {
return rec.i.GetClosedTimestamp(ctx)
// GetCurrentClosedTimestamp is part of the EvalContext interface.
func (rec *SpanSetReplicaEvalContext) GetCurrentClosedTimestamp(ctx context.Context) hlc.Timestamp {
return rec.i.GetCurrentClosedTimestamp(ctx)
}

// GetClosedTimestampOlderThanStorageSnapshot is part of the EvalContext
// interface.
func (rec *SpanSetReplicaEvalContext) GetClosedTimestampOlderThanStorageSnapshot() hlc.Timestamp {
return rec.i.GetClosedTimestampOlderThanStorageSnapshot()
}

// GetExternalStorage returns an ExternalStorage object, based on
Expand Down Expand Up @@ -267,3 +273,6 @@ func (rec *SpanSetReplicaEvalContext) GetMaxBytes() int64 {
func (rec *SpanSetReplicaEvalContext) GetEngineCapacity() (roachpb.StoreCapacity, error) {
return rec.i.GetEngineCapacity()
}

// Release implements the batcheval.EvalContext interface.
func (rec *SpanSetReplicaEvalContext) Release() { rec.i.Release() }
25 changes: 12 additions & 13 deletions pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (r *Replica) canServeFollowerReadRLocked(ctx context.Context, ba *roachpb.B
}

requiredFrontier := ba.RequiredFrontier()
maxClosed := r.getClosedTimestampRLocked(ctx, requiredFrontier /* sufficient */)
maxClosed := r.getCurrentClosedTimestampLocked(ctx, requiredFrontier /* sufficient */)
canServeFollowerRead := requiredFrontier.LessEq(maxClosed)
tsDiff := requiredFrontier.GoTime().Sub(maxClosed.GoTime())
if !canServeFollowerRead {
Expand All @@ -106,13 +106,13 @@ func (r *Replica) canServeFollowerReadRLocked(ctx context.Context, ba *roachpb.B
return true
}

// getClosedTimestampRLocked is like maxClosed, except that it requires r.mu to be
// rlocked. It also optionally takes a hint: if sufficient is not
// empty, getClosedTimestampRLocked might return a timestamp that's lower than the
// maximum closed timestamp that we know about, as long as the returned
// timestamp is still >= sufficient. This is a performance optimization because
// we can avoid consulting the ClosedTimestampReceiver.
func (r *Replica) getClosedTimestampRLocked(
// getCurrentClosedTimestampRLocked is like GetCurrentClosedTimestamp, except
// that it requires r.mu to be RLocked. It also optionally takes a hint: if
// sufficient is not empty, getClosedTimestampRLocked might return a timestamp
// that's lower than the maximum closed timestamp that we know about, as long as
// the returned timestamp is still >= sufficient. This is a performance
// optimization because we can avoid consulting the ClosedTimestampReceiver.
func (r *Replica) getCurrentClosedTimestampLocked(
ctx context.Context, sufficient hlc.Timestamp,
) hlc.Timestamp {
appliedLAI := ctpb.LAI(r.mu.state.LeaseAppliedIndex)
Expand All @@ -126,11 +126,10 @@ func (r *Replica) getClosedTimestampRLocked(
return maxClosed
}

// GetClosedTimestamp returns the maximum closed timestamp for this range.
//
// GetClosedTimestamp is part of the EvalContext interface.
func (r *Replica) GetClosedTimestamp(ctx context.Context) hlc.Timestamp {
// GetCurrentClosedTimestamp returns the current maximum closed timestamp for
// this range.
func (r *Replica) GetCurrentClosedTimestamp(ctx context.Context) hlc.Timestamp {
r.mu.RLock()
defer r.mu.RUnlock()
return r.getClosedTimestampRLocked(ctx, hlc.Timestamp{} /* sufficient */)
return r.getCurrentClosedTimestampLocked(ctx, hlc.Timestamp{} /* sufficient */)
}
10 changes: 8 additions & 2 deletions pkg/kv/kvserver/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked(
ba.Timestamp = r.store.Clock().Now()
ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(span)})
// Call evaluateBatch instead of Send to avoid reacquiring latches.
rec := NewReplicaEvalContext(r, todoSpanSet)
rec := NewReplicaEvalContext(
ctx, r, todoSpanSet, false, /* requiresClosedTSOlderThanStorageSnap */
)
defer rec.Release()
rw := r.Engine().NewReadOnly(storage.StandardDurability)
defer rw.Close()

Expand Down Expand Up @@ -217,7 +220,10 @@ func (r *Replica) loadSystemConfig(ctx context.Context) (*config.SystemConfigEnt
ba.Timestamp = r.store.Clock().Now()
ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(keys.SystemConfigSpan)})
// Call evaluateBatch instead of Send to avoid reacquiring latches.
rec := NewReplicaEvalContext(r, todoSpanSet)
rec := NewReplicaEvalContext(
ctx, r, todoSpanSet, false, /* requiresClosedTSOlderThanStorageSnap */
)
defer rec.Release()
rw := r.Engine().NewReadOnly(storage.StandardDurability)
defer rw.Close()

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(

// Check for an initial closed timestamp update immediately to help
// initialize the rangefeed's resolved timestamp as soon as possible.
r.handleClosedTimestampUpdateRaftMuLocked(ctx, r.GetClosedTimestamp(ctx))
r.handleClosedTimestampUpdateRaftMuLocked(ctx, r.GetCurrentClosedTimestamp(ctx))

return p
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func (r *Replica) executeReadOnlyBatch(
ui := uncertainty.ComputeInterval(&ba.Header, st, r.Clock().MaxOffset())

// Evaluate read-only batch command.
rec := NewReplicaEvalContext(r, g.LatchSpans())
rec := NewReplicaEvalContext(ctx, r, g.LatchSpans(), ba.RequiresClosedTSOlderThanStorageSnapshot())
defer rec.Release()

// TODO(irfansharif): It's unfortunate that in this read-only code path,
// we're stuck with a ReadWriter because of the way evaluateBatch is
Expand All @@ -60,6 +61,12 @@ func (r *Replica) executeReadOnlyBatch(
// may start relying on this, so we assert here.
panic("expected consistent iterators")
}
// Pin engine state eagerly so that all iterators created over this Reader are
// based off the state of the engine as of this point and are mutually
// consistent.
if err := rw.PinEngineStateForIterators(); err != nil {
return nil, g, roachpb.NewError(err)
}
if util.RaceEnabled {
rw = spanset.NewReadWriterAt(rw, g.LatchSpans(), ba.Timestamp)
}
Expand Down
Loading