Skip to content

Commit

Permalink
kvserver: avoid races where replication changes can get interrupted
Browse files Browse the repository at this point in the history
This commit adds a safeguard inside
`Replica.maybeLeaveAtomicChangeReplicasAndRemoveLearners()` to avoid removing
learner replicas _when we know_ that that learner replica is in the process of
receiving its initial snapshot (as indicated by an in-memory lock on log
truncations that we place while the snapshot is in-flight).

This change should considerably reduce the instances where `AdminRelocateRange`
calls are interrupted by the mergeQueue or the replicateQueue (and vice versa).

Fixes cockroachdb#57129
Relates to cockroachdb#79118

Release note: none
  • Loading branch information
aayushshah15 committed Apr 4, 2022
1 parent bb2c29c commit ca87e67
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 8 deletions.
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,11 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err
raftStatus := r.raftStatusRLocked()

const anyRecipientStore roachpb.StoreID = 0
pendingSnapshotIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now, anyRecipientStore)
pendingSnapshotIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(
now,
anyRecipientStore,
false, /* ignoreDeadline */
)
lastIndex := r.mu.lastIndex
// NB: raftLogSize above adjusts for pending truncations that have already
// been successfully replicated via raft, but logSizeTrusted does not see if
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,9 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
assertMin := func(exp uint64, now time.Time) {
t.Helper()
const anyRecipientStore roachpb.StoreID = 0
if maxIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now, anyRecipientStore); maxIndex != exp {
if maxIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(
now, anyRecipientStore, false, /* ignoreDeadline */
); maxIndex != exp {
t.Fatalf("unexpected max index %d, wanted %d", maxIndex, exp)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
return nil
}
if index := repl.getAndGCSnapshotLogTruncationConstraints(
timeutil.Now(), repDesc.StoreID,
timeutil.Now(), repDesc.StoreID, false, /* ignoreDeadline */
); index > 0 {
// There is a snapshot being transferred. It's probably an INITIAL snap,
// so bail for now and try again later.
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1289,8 +1289,15 @@ func (r *Replica) maybeLeaveAtomicChangeReplicasAndRemoveLearners(
return nil, err
}

// If we detect that there are learners in the process of receiving their
// initial upreplication snapshot, we don't want to remove them and we bail
// out.
if r.hasOutstandingLearnerSnapshotInFlight() {
return desc, errors.New("cannot remove learner while a snapshot is in flight")
}

// Now the config isn't joint any more, but we may have demoted some voters
// into learners. These learners should go as well.
// into learners. If so, we need to remove them.
learners := desc.Replicas().LearnerDescriptors()
if len(learners) == 0 {
return desc, nil
Expand Down
82 changes: 81 additions & 1 deletion pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,9 +1166,90 @@ func TestLearnerAndJointConfigAdminMerge(t *testing.T) {
checkFails()
}

func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
var activateSnapshotTestingKnob int64
blockSnapshot := make(chan struct{})
tc := testcluster.StartTestCluster(
t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Disable load-based splitting, so that the absence of sufficient
// QPS measurements do not prevent ranges from merging.
DisableLoadBasedSplitting: true,
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
if atomic.LoadInt64(&activateSnapshotTestingKnob) == 1 {
<-blockSnapshot
}
return nil
},
},
},
},
},
)
defer tc.Stopper().Stop(ctx)

scratchKey := tc.ScratchRange(t)
splitKey := scratchKey.Next()

// Split and then unsplit the range to clear the sticky bit, otherwise the
// mergeQueue will ignore the range.
tc.SplitRangeOrFatal(t, splitKey)
require.NoError(t, tc.Server(0).DB().AdminUnsplit(ctx, splitKey))

atomic.StoreInt64(&activateSnapshotTestingKnob, 1)
replicationChange := make(chan error)
go func() {
_, err := tc.AddVoters(scratchKey, makeReplicationTargets(2)...)
replicationChange <- err
}()

select {
case <-time.After(100 * time.Millisecond):
// Continue.
case <-replicationChange:
t.Fatal("did not expect the replication change to complete")
}
defer func() {
// Unblock the replication change and ensure that it succeeds.
close(blockSnapshot)
require.NoError(t, <-replicationChange)
}()

db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
// TestCluster currently overrides this when used with ReplicationManual.
db.Exec(t, `SET CLUSTER SETTING kv.range_merge.queue_enabled = true`)

testutils.SucceedsWithin(
t, func() error {
// While this replication change is stalled, we'll trigger a merge and
// ensure that the merge correctly notices that there is a snapshot in
// flight and ignores the range.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchKey)
trace, _, _ := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
formattedTrace := trace.String()
matched, err := regexp.MatchString(
"cannot remove learner while a snapshot is in flight", formattedTrace,
)
require.NoError(t, err)
if !matched {
return errors.Errorf("expected trace to contain 'cannot remove learner while snapshot is in flight'")
}
return nil
}, 20*time.Second,
)
}

func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
knobs, ltk := makeReplicationTestKnobs()
// Disable load-based splitting, so that the absence of sufficient QPS
Expand Down Expand Up @@ -1202,7 +1283,6 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
ltk.withStopAfterLearnerAtomic(func() {
_ = tc.AddVotersOrFatal(t, scratchStartKey, tc.Target(1))
})

store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
require.NoError(t, err)
Expand Down
39 changes: 36 additions & 3 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,10 @@ func (r *Replica) addSnapshotLogTruncationConstraintLocked(
r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{
index: index,
recipientStore: recipientStore,
// NB: We explicitly do not want to set a deadline here, since the presence
// of a deadline is assumed to indicate that the snapshot has been
// successfully transmitted (but perhaps not yet applied) by the logic
// inside getAndGCSnapshotLogTruncationConstraintsLocked().
}
}

Expand All @@ -1559,20 +1563,46 @@ func (r *Replica) completeSnapshotLogTruncationConstraint(
r.mu.snapshotLogTruncationConstraints[snapUUID] = item
}

// hasOutstandingLearnerSnapshotInFlight returns true if there is a snapshot in
// progress from this replica to a learner replica for this range.
func (r *Replica) hasOutstandingLearnerSnapshotInFlight() bool {
learners := r.Desc().Replicas().LearnerDescriptors()
for _, repl := range learners {
if yes := r.hasOutstandingSnapshotInFlightToStore(repl.StoreID); yes {
return yes
}
}
return false
}

// hasOutstandingSnapshotInFlightToStore returns true if there is a snapshot in
// flight from this replica to the store with the given ID.
func (r *Replica) hasOutstandingSnapshotInFlightToStore(storeID roachpb.StoreID) bool {
return r.getAndGCSnapshotLogTruncationConstraints(
// NB: We do not want to consider truncation constraints that have been
// marked completed but have not expired.
timeutil.Now(), storeID, true, /* ignoreDeadline */
) > 0
}

// getAndGCSnapshotLogTruncationConstraints returns the minimum index of any
// currently outstanding snapshot being sent from this replica to the specified
// recipient or 0 if there isn't one. Passing 0 for recipientStore means any
// recipient.
//
// If ignoreDeadline is true, the deadline associated with the truncation
// constraint is ignored. In other words, truncation constraints that have been
// marked completed (but have not expired) are not returned.
func (r *Replica) getAndGCSnapshotLogTruncationConstraints(
now time.Time, recipientStore roachpb.StoreID,
now time.Time, recipientStore roachpb.StoreID, ignoreDeadline bool,
) (minSnapIndex uint64) {
r.mu.Lock()
defer r.mu.Unlock()
return r.getAndGCSnapshotLogTruncationConstraintsLocked(now, recipientStore)
return r.getAndGCSnapshotLogTruncationConstraintsLocked(now, recipientStore, ignoreDeadline)
}

func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked(
now time.Time, recipientStore roachpb.StoreID,
now time.Time, recipientStore roachpb.StoreID, ignoreDeadline bool,
) (minSnapIndex uint64) {
for snapUUID, item := range r.mu.snapshotLogTruncationConstraints {
if item.deadline != (time.Time{}) && item.deadline.Before(now) {
Expand All @@ -1584,6 +1614,9 @@ func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked(
if recipientStore != 0 && item.recipientStore != recipientStore {
continue
}
if item.deadline != (time.Time{}) && ignoreDeadline {
continue
}
if minSnapIndex == 0 || minSnapIndex > item.index {
minSnapIndex = item.index
}
Expand Down

0 comments on commit ca87e67

Please sign in to comment.