Skip to content

Commit

Permalink
Merge #59423
Browse files Browse the repository at this point in the history
59423: kvserver: use combined store/repl critical section in applySnapshot r=nvanbenschoten,irfansharif a=tbg

This PR addresses #58378, which highlighted insufficient locking in `(*Replica).applySnapshot`. Prior to this PR, `applySnapshot` carried out operations in roughly this order:

1. ingest SSTs into the LSM
2. update the replica's descriptor and the store's map of span to replica
3. update the replica state

There was no critical section wrapping both of 2) and 3). As a result, it
was possible (in theory, not observed) to obtain a replica from the store
under descriptor `X` whose state had not entirely been updated to reflect `X`.

It's hard to say where exactly you would see issues with this, but there was
one instance that caused a crash: when a replica was created in response to
an incoming snapshot, its state was completely empty; such a replica is not
initialized; they are never handed out by the store; they don't have anything
in their state.

Due to the race described above, the store would sometimes hand out these
replicas when their descriptor had just been initialized, but its state
not populated yet, which caused at least two crashes related to calls to
`(*Replica).Version`:

1. from `PurgeOutdatedReplicas`, and
2. during evaluation of a lease request, triggering [this assertion]

[this assertion]: ef64519

Both previously had workarounds in place to avoid the crash, which are now removed.

Release note: None


Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Tobias Schottdorf <[email protected]>
  • Loading branch information
3 people committed Feb 4, 2021
2 parents 40c51a1 + 6a282ce commit 11db4d0
Show file tree
Hide file tree
Showing 15 changed files with 174 additions and 83 deletions.
49 changes: 28 additions & 21 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,22 @@ type Replica struct {
log.AmbientContext

RangeID roachpb.RangeID // Only set by the constructor
// The start key of a Range remains constant throughout its lifetime (it does
// not change through splits or merges). This field carries a copy of
// r.mu.state.Desc.StartKey (and nil if the replica is not initialized). The
// copy is maintained to allow inserting locked Replicas into
// Store.mu.replicasByKey (keyed on start key) without the risk of deadlock.
// The synchronization for this field works as follows:
//
// - the field must not be accessed for uninitialized replicas, except:
// - when setting the field (i.e. when initializing the replica), under `mu`.
//
// Due to the first rule, any access to the field is preceded by an
// acquisition of `mu` (Replica.IsInitialized) which serializes the write and
// any subsequent reads of the field.
//
// The writes to this key happen in Replica.setStartKeyLocked.
startKey roachpb.RKey

store *Store
abortSpan *abortspan.AbortSpan // Avoids anomalous reads after abort
Expand Down Expand Up @@ -792,11 +808,6 @@ func (r *Replica) GetGCThreshold() hlc.Timestamp {
func (r *Replica) Version() roachpb.Version {
r.mu.RLock()
defer r.mu.RUnlock()

if r.mu.state.Version == nil {
// TODO(irfansharif): This is a stop-gap for #58523.
return roachpb.Version{}
}
return *r.mu.state.Version
}

Expand Down Expand Up @@ -1089,8 +1100,6 @@ func (r *Replica) State() kvserverpb.RangeInfo {
// assertStateLocked can be called from the Raft goroutine to check that the
// in-memory and on-disk states of the Replica are congruent.
// Requires that both r.raftMu and r.mu are held.
//
// TODO(tschottdorf): Consider future removal (for example, when #7224 is resolved).
func (r *Replica) assertStateLocked(ctx context.Context, reader storage.Reader) {
diskState, err := r.mu.stateLoader.Load(ctx, reader, r.mu.state.Desc)
if err != nil {
Expand All @@ -1106,6 +1115,11 @@ func (r *Replica) assertStateLocked(ctx context.Context, reader storage.Reader)
log.Fatalf(ctx, "on-disk and in-memory state diverged: %s",
log.Safe(pretty.Diff(diskState, r.mu.state)))
}
if r.isInitializedRLocked() {
if !r.startKey.Equal(r.mu.state.Desc.StartKey) {
log.Fatalf(ctx, "denormalized start key %s diverged from %s", r.startKey, r.mu.state.Desc.StartKey)
}
}
}

// checkExecutionCanProceed returns an error if a batch request cannot be
Expand Down Expand Up @@ -1407,7 +1421,13 @@ func (ec *endCmds) done(
// except for read-only requests that are older than `freezeStart`, until the
// merge completes.
func (r *Replica) maybeWatchForMerge(ctx context.Context, freezeStart hlc.Timestamp) error {
desc := r.Desc()
r.mu.Lock()
defer r.mu.Unlock()
return r.maybeWatchForMergeLocked(ctx, freezeStart)
}

func (r *Replica) maybeWatchForMergeLocked(ctx context.Context, freezeStart hlc.Timestamp) error {
desc := r.descRLocked()
descKey := keys.RangeDescriptorKey(desc.StartKey)
_, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, r.Clock().Now(),
storage.MVCCGetOptions{Inconsistent: true})
Expand All @@ -1430,12 +1450,10 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context, freezeStart hlc.Timest
// whether the merge succeeded or not.

mergeCompleteCh := make(chan struct{})
r.mu.Lock()
if r.mu.mergeComplete != nil {
// Another request already noticed the merge, installed a mergeComplete
// channel, and launched a goroutine to watch for the merge's completion.
// Nothing more to do.
r.mu.Unlock()
return nil
}
// Note that if the merge txn retries for any reason (for example, if the
Expand All @@ -1451,7 +1469,6 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context, freezeStart hlc.Timest
// range in case it managed to quiesce between when the Subsume request
// arrived and now, which is rare but entirely legal.
r.unquiesceLocked()
r.mu.Unlock()

taskCtx := r.AnnotateCtx(context.Background())
err = r.store.stopper.RunAsyncTask(taskCtx, "wait-for-merge", func(ctx context.Context) {
Expand Down Expand Up @@ -1578,12 +1595,6 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context, freezeStart hlc.Timest
return err
}

func (r *Replica) maybeTransferRaftLeadershipToLeaseholder(ctx context.Context) {
r.mu.Lock()
r.maybeTransferRaftLeadershipToLeaseholderLocked(ctx)
r.mu.Unlock()
}

// maybeTransferRaftLeadershipToLeaseholderLocked attempts to transfer the
// leadership away from this node to the leaseholder, if this node is the
// current raft leader but not the leaseholder. We don't attempt to transfer
Expand Down Expand Up @@ -1655,10 +1666,6 @@ func checkIfTxnAborted(
return nil
}

func (r *Replica) startKey() roachpb.RKey {
return r.Desc().StartKey
}

// GetLeaseHistory returns the lease history stored on this replica.
func (r *Replica) GetLeaseHistory() []roachpb.Lease {
if r.leaseHistory == nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ func (r *Replica) handleDescResult(ctx context.Context, desc *roachpb.RangeDescr
}

func (r *Replica) handleLeaseResult(ctx context.Context, lease *roachpb.Lease) {
r.leasePostApply(ctx, *lease, false /* permitJump */)
r.mu.Lock()
defer r.mu.Unlock()
r.leasePostApplyLocked(ctx, *lease, false /* permitJump */)
}

func (r *Replica) handleTruncatedStateResult(
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ const configGossipTTL = 0 // does not expire
func (r *Replica) gossipFirstRange(ctx context.Context) {
r.mu.Lock()
defer r.mu.Unlock()
r.gossipFirstRangeLocked(ctx)
}

func (r *Replica) gossipFirstRangeLocked(ctx context.Context) {
// Gossip is not provided for the bootstrap store and for some tests.
if r.store.Gossip() == nil {
return
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,20 @@ func newUnloadedReplica(
return r
}

// setStartKeyLocked sets r.startKey. Note that this field has special semantics
// described on its comment. Callers to this method are initializing an
// uninitialized Replica and hold Replica.mu.
func (r *Replica) setStartKeyLocked(startKey roachpb.RKey) {
r.mu.AssertHeld()
if r.startKey != nil {
log.Fatalf(
r.AnnotateCtx(context.Background()),
"start key written twice: was %s, now %s", r.startKey, startKey,
)
}
r.startKey = startKey
}

// loadRaftMuLockedReplicaMuLocked will load the state of the replica from disk.
// If desc is initialized, the Replica will be initialized when this method
// returns. An initialized Replica may not be reloaded. If this method is called
Expand All @@ -142,6 +156,9 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor)
// NB: This is just a defensive check as r.mu.replicaID should never be 0.
log.Fatalf(ctx, "r%d: cannot initialize replica without a replicaID", desc.RangeID)
}
if desc.IsInitialized() {
r.setStartKeyLocked(desc.StartKey)
}

// Clear the internal raft group in case we're being reset. Since we're
// reloading the raft state below, it isn't safe to use the existing raft
Expand Down
59 changes: 30 additions & 29 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,24 +293,23 @@ A file preventing this node from restarting was placed at:
}
}

// leasePostApply updates the Replica's internal state to reflect the
// leasePostApplyLocked updates the Replica's internal state to reflect the
// application of a new Range lease. The method is idempotent, so it can be
// called repeatedly for the same lease safely. However, the method will panic
// if passed a lease with a lower sequence number than the current lease. By
// default, the method will also panic if passed a lease that indicates a
// forward sequence number jump (i.e. a skipped lease). This behavior can
// be disabled by passing permitJump as true.
func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, permitJump bool) {
r.mu.RLock()
replicaID := r.mu.replicaID
func (r *Replica) leasePostApplyLocked(
ctx context.Context, newLease roachpb.Lease, permitJump bool,
) {
// Pull out the last lease known to this Replica. It's possible that this is
// not actually the last lease in the Range's lease sequence because the
// Replica may have missed the application of a lease between prevLease and
// newLease. However, this should only be possible if a snapshot includes a
// lease update. All other forms of lease updates should be continuous
// without jumps (see permitJump).
prevLease := *r.mu.state.Lease
r.mu.RUnlock()

// Sanity check to make sure that the lease sequence is moving in the right
// direction.
Expand All @@ -337,7 +336,7 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
}
}

iAmTheLeaseHolder := newLease.Replica.ReplicaID == replicaID
iAmTheLeaseHolder := newLease.Replica.ReplicaID == r.mu.replicaID
// NB: in the case in which a node restarts, minLeaseProposedTS forces it to
// get a new lease and we make sure it gets a new sequence number, thus
// causing the right half of the disjunction to fire so that we update the
Expand All @@ -363,7 +362,7 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
// TODO(aayush): In the future, if we permit co-operative lease transfers
// when a range is subsumed, it should be relatively straightforward to
// allow historical reads on the subsumed RHS after such lease transfers.
if err := r.maybeWatchForMerge(ctx, hlc.Timestamp{} /* freezeStart */); err != nil {
if err := r.maybeWatchForMergeLocked(ctx, hlc.Timestamp{} /* freezeStart */); err != nil {
// We were unable to determine whether a merge was in progress. We cannot
// safely proceed.
log.Fatalf(ctx, "failed checking for in-progress merge while installing new lease %s: %s",
Expand All @@ -381,7 +380,7 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
// requests, this is kosher). This means that we don't use the old
// lease's expiration but instead use the new lease's start to initialize
// the timestamp cache low water.
setTimestampCacheLowWaterMark(r.store.tsCache, r.Desc(), newLease.Start.ToTimestamp())
setTimestampCacheLowWaterMark(r.store.tsCache, r.descRLocked(), newLease.Start.ToTimestamp())

// Reset the request counts used to make lease placement decisions whenever
// starting a new lease.
Expand All @@ -391,30 +390,32 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
}

// Inform the concurrency manager that the lease holder has been updated.
// We do this before installing the new lease in `r.mu.state` as we have
// an invariant that any replica with a lease has the concurrency manager
// enabled. (In practice, since both happen under `r.mu`, it is likely
// to not matter).
r.concMgr.OnRangeLeaseUpdated(newLease.Sequence, iAmTheLeaseHolder)

// Ordering is critical here. We only install the new lease after we've
// checked for an in-progress merge and updated the timestamp cache. If the
// ordering were reversed, it would be possible for requests to see the new
// lease but not the updated merge or timestamp cache state, which can result
// in serializability violations.
r.mu.Lock()
r.mu.state.Lease = &newLease
expirationBasedLease := r.requiresExpiringLeaseRLocked()
r.mu.Unlock()

// Gossip the first range whenever its lease is acquired. We check to make
// sure the lease is active so that a trailing replica won't process an old
// lease request and attempt to gossip the first range.
now := r.store.Clock().NowAsClockTimestamp()
if leaseChangingHands && iAmTheLeaseHolder && r.IsFirstRange() && r.OwnsValidLease(ctx, now) {
r.gossipFirstRange(ctx)
if leaseChangingHands && iAmTheLeaseHolder && r.IsFirstRange() && r.ownsValidLeaseRLocked(ctx, now) {
r.gossipFirstRangeLocked(ctx)
}

// Whenever we first acquire an expiration-based lease, notify the lease
// renewer worker that we want it to keep proactively renewing the lease
// before it expires.
if leaseChangingHands && iAmTheLeaseHolder && expirationBasedLease && r.OwnsValidLease(ctx, now) {
if leaseChangingHands && iAmTheLeaseHolder && expirationBasedLease && r.ownsValidLeaseRLocked(ctx, now) {
r.store.renewableLeases.Store(int64(r.RangeID), unsafe.Pointer(r))
select {
case r.store.renewableLeasesSignal <- struct{}{}:
Expand All @@ -425,7 +426,7 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
// If we're the current raft leader, may want to transfer the leadership to
// the new leaseholder. Note that this condition is also checked periodically
// when ticking the replica.
r.maybeTransferRaftLeadershipToLeaseholder(ctx)
r.maybeTransferRaftLeadershipToLeaseholderLocked(ctx)

// Notify the store that a lease change occurred and it may need to
// gossip the updated store descriptor (with updated capacity).
Expand All @@ -450,18 +451,21 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
// will be gossiped rarely because it falls on a range with an epoch-based
// range lease that is only reacquired extremely infrequently.
if iAmTheLeaseHolder {
if err := r.MaybeGossipSystemConfig(ctx); err != nil {
log.Errorf(ctx, "%v", err)
}
if err := r.MaybeGossipNodeLiveness(ctx, keys.NodeLivenessSpan); err != nil {
log.Errorf(ctx, "%v", err)
}

// Emit an MLAI on the leaseholder replica, as follower will be looking
// for one and if we went on to quiesce, they wouldn't necessarily get
// one otherwise (unless they ask for it, which adds latency).
r.EmitMLAI()
// NB: run these in an async task to keep them out of the critical section
// (r.mu is held here).
_ = r.store.stopper.RunAsyncTask(ctx, "lease-triggers", func(ctx context.Context) {
if err := r.MaybeGossipSystemConfig(ctx); err != nil {
log.Errorf(ctx, "%v", err)
}
if err := r.MaybeGossipNodeLiveness(ctx, keys.NodeLivenessSpan); err != nil {
log.Errorf(ctx, "%v", err)
}

// Emit an MLAI on the leaseholder replica, as follower will be looking
// for one and if we went on to quiesce, they wouldn't necessarily get
// one otherwise (unless they ask for it, which adds latency).
r.EmitMLAI()
})
if leaseChangingHands && log.V(1) {
// This logging is useful to troubleshoot incomplete drains.
log.Info(ctx, "is now leaseholder")
Expand Down Expand Up @@ -781,10 +785,7 @@ func (r *Replica) evaluateProposal(
activeVersion := r.ClusterSettings().Version.ActiveVersion(ctx).Version
migrationVersion := clusterversion.ByKey(clusterversion.TruncatedAndRangeAppliedStateMigration)
if migrationVersion.Less(activeVersion) {
// TODO(irfansharif): This was originally a fatal, which seems
// to have been a racey assertion (see #58378). We've downgraded
// it to an error to not trip up builds while we investigate.
log.Error(ctx, "not using applied state key in v21.1")
log.Fatal(ctx, "not using applied state key in v21.1")
}
// The range applied state was introduced in v2.1. It's possible to
// still find ranges that haven't activated it. If so, activate it.
Expand Down
14 changes: 9 additions & 5 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,13 +964,18 @@ func (r *Replica) applySnapshot(
}

// Atomically swap the placeholder, if any, for the replica, and update the
// replica's descriptor.
// replica's state. Note that this is intentionally in one critical section.
// to avoid exposing an inconsistent in-memory state. We did however already
// consume the SSTs above, meaning that at this point the in-memory state lags
// the on-disk state.

r.mu.Lock()
r.store.mu.Lock()
if r.store.removePlaceholderLocked(ctx, r.RangeID) {
atomic.AddInt32(&r.store.counts.filledPlaceholders, 1)
}
r.setDescRaftMuLocked(ctx, s.Desc)
if err := r.store.maybeMarkReplicaInitializedLocked(ctx, r); err != nil {
r.setDescLockedRaftMuLocked(ctx, s.Desc)
if err := r.store.maybeMarkReplicaInitializedLockedReplLocked(ctx, r); err != nil {
log.Fatalf(ctx, "unable to mark replica initialized while applying snapshot: %+v", err)
}
r.store.mu.Unlock()
Expand All @@ -979,12 +984,11 @@ func (r *Replica) applySnapshot(
// replica according to whether it holds the lease. We allow jumps in the
// lease sequence because there may be multiple lease changes accounted for
// in the snapshot.
r.leasePostApply(ctx, *s.Lease, true /* permitJump */)
r.leasePostApplyLocked(ctx, *s.Lease, true /* permitJump */)

// Inform the concurrency manager that this replica just applied a snapshot.
r.concMgr.OnReplicaSnapshotApplied()

r.mu.Lock()
// We set the persisted last index to the last applied index. This is
// not a correctness issue, but means that we may have just transferred
// some entries we're about to re-request from the leader and overwrite.
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,25 @@ func TestReplicaDrainLease(t *testing.T) {
require.Equal(t, r1.RaftStatus().Lead, uint64(r1.ReplicaID()),
"expected leadership to still be on the first replica")

// Wait until n1 has heartbeat its liveness record (epoch >= 1) and n2
// knows about it. Otherwise, the following could occur:
//
// - n1's heartbeats to epoch 1 and acquires lease
// - n2 doesn't receive this yet (gossip)
// - when n2 is asked to acquire the lease, it uses a lease with epoch 1
// but the liveness record with epoch zero
// - lease status is ERROR, lease acquisition (and thus test) fails.
testutils.SucceedsSoon(t, func() error {
nl, ok := s2.NodeLiveness().(*liveness.NodeLiveness).GetLiveness(s1.NodeID())
if !ok {
return errors.New("no liveness record for n1")
}
if nl.Epoch < 1 {
return errors.New("epoch for n1 still zero")
}
return nil
})

// Mark the stores as draining. We'll then start checking how acquiring leases
// behaves while draining.
store1.draining.Store(true)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func newTestRangeSet(count int, t *testing.T) *testRangeSet {
LiveCount: 1,
}
repl.mu.state.Desc = desc
repl.startKey = desc.StartKey // actually used by replicasByKey
if exRngItem := rs.replicasByKey.ReplaceOrInsert((*btreeReplica)(repl)); exRngItem != nil {
t.Fatalf("failed to insert range %s", repl)
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2719,10 +2719,6 @@ func (s *Store) PurgeOutdatedReplicas(ctx context.Context, version roachpb.Versi
qp := quotapool.NewIntPool("purge-outdated-replicas", 50)
g := ctxgroup.WithContext(ctx)
s.VisitReplicas(func(repl *Replica) (wantMore bool) {
if (repl.Version() == roachpb.Version{}) {
// TODO(irfansharif): This is a stop gap for #58523.
return true
}
if !repl.Version().Less(version) {
// Nothing to do here.
return true
Expand Down
Loading

0 comments on commit 11db4d0

Please sign in to comment.