diff --git a/pkg/kv/kvserver/raft_log_queue.go b/pkg/kv/kvserver/raft_log_queue.go index 408009389d78..4a6fa99eaa23 100644 --- a/pkg/kv/kvserver/raft_log_queue.go +++ b/pkg/kv/kvserver/raft_log_queue.go @@ -263,7 +263,11 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err raftStatus := r.raftStatusRLocked() const anyRecipientStore roachpb.StoreID = 0 - pendingSnapshotIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now, anyRecipientStore) + pendingSnapshotIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked( + now, + anyRecipientStore, + false, /* ignoreDeadline */ + ) lastIndex := r.mu.lastIndex // NB: raftLogSize above adjusts for pending truncations that have already // been successfully replicated via raft, but logSizeTrusted does not see if diff --git a/pkg/kv/kvserver/raft_log_queue_test.go b/pkg/kv/kvserver/raft_log_queue_test.go index d2ba4982f4e3..2d7c1c78f6a0 100644 --- a/pkg/kv/kvserver/raft_log_queue_test.go +++ b/pkg/kv/kvserver/raft_log_queue_test.go @@ -669,7 +669,9 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) { assertMin := func(exp uint64, now time.Time) { t.Helper() const anyRecipientStore roachpb.StoreID = 0 - if maxIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now, anyRecipientStore); maxIndex != exp { + if maxIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked( + now, anyRecipientStore, false, /* ignoreDeadline */ + ); maxIndex != exp { t.Fatalf("unexpected max index %d, wanted %d", maxIndex, exp) } } diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 82dc076a0098..85367ce43443 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -116,7 +116,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( return nil } if index := repl.getAndGCSnapshotLogTruncationConstraints( - timeutil.Now(), repDesc.StoreID, + timeutil.Now(), repDesc.StoreID, false, /* ignoreDeadline */ ); index > 0 { // There is a snapshot being transferred. It's probably an INITIAL snap, // so bail for now and try again later. diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 5fdeb2c41816..ef82c62f971b 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -1289,8 +1289,15 @@ func (r *Replica) maybeLeaveAtomicChangeReplicasAndRemoveLearners( return nil, err } + // If we detect that there are learners in the process of receiving their + // initial upreplication snapshot, we don't want to remove them and we bail + // out. + if r.hasOutstandingLearnerSnapshotInFlight() { + return desc, errors.New("cannot remove learner while a snapshot is in flight") + } + // Now the config isn't joint any more, but we may have demoted some voters - // into learners. These learners should go as well. + // into learners. If so, we need to remove them. learners := desc.Replicas().LearnerDescriptors() if len(learners) == 0 { return desc, nil diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 3edeb38dae50..c6491fd20807 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -1166,9 +1166,90 @@ func TestLearnerAndJointConfigAdminMerge(t *testing.T) { checkFails() } +func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var activateSnapshotTestingKnob int64 + blockSnapshot := make(chan struct{}) + tc := testcluster.StartTestCluster( + t, 2, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // Disable load-based splitting, so that the absence of sufficient + // QPS measurements do not prevent ranges from merging. + DisableLoadBasedSplitting: true, + ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error { + if atomic.LoadInt64(&activateSnapshotTestingKnob) == 1 { + <-blockSnapshot + } + return nil + }, + }, + }, + }, + }, + ) + defer tc.Stopper().Stop(ctx) + + scratchKey := tc.ScratchRange(t) + splitKey := scratchKey.Next() + + // Split and then unsplit the range to clear the sticky bit, otherwise the + // mergeQueue will ignore the range. + tc.SplitRangeOrFatal(t, splitKey) + require.NoError(t, tc.Server(0).DB().AdminUnsplit(ctx, splitKey)) + + atomic.StoreInt64(&activateSnapshotTestingKnob, 1) + replicationChange := make(chan error) + go func() { + _, err := tc.AddVoters(scratchKey, makeReplicationTargets(2)...) + replicationChange <- err + }() + + select { + case <-time.After(100 * time.Millisecond): + // Continue. + case <-replicationChange: + t.Fatal("did not expect the replication change to complete") + } + defer func() { + // Unblock the replication change and ensure that it succeeds. + close(blockSnapshot) + require.NoError(t, <-replicationChange) + }() + + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + // TestCluster currently overrides this when used with ReplicationManual. + db.Exec(t, `SET CLUSTER SETTING kv.range_merge.queue_enabled = true`) + + testutils.SucceedsWithin( + t, func() error { + // While this replication change is stalled, we'll trigger a merge and + // ensure that the merge correctly notices that there is a snapshot in + // flight and ignores the range. + store, repl := getFirstStoreReplica(t, tc.Server(0), scratchKey) + trace, _, _ := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) + formattedTrace := trace.String() + matched, err := regexp.MatchString( + "cannot remove learner while a snapshot is in flight", formattedTrace, + ) + require.NoError(t, err) + if !matched { + return errors.Errorf("expected trace to contain 'cannot remove learner while snapshot is in flight'") + } + return nil + }, 20*time.Second, + ) +} + func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() knobs, ltk := makeReplicationTestKnobs() // Disable load-based splitting, so that the absence of sufficient QPS @@ -1202,7 +1283,6 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) { ltk.withStopAfterLearnerAtomic(func() { _ = tc.AddVotersOrFatal(t, scratchStartKey, tc.Target(1)) }) - store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) require.NoError(t, err) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 148734cf2874..ec160c73f60d 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1536,6 +1536,10 @@ func (r *Replica) addSnapshotLogTruncationConstraintLocked( r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{ index: index, recipientStore: recipientStore, + // NB: We explicitly do not want to set a deadline here, since the presence + // of a deadline is assumed to indicate that the snapshot has been + // successfully transmitted (but perhaps not yet applied) by the logic + // inside getAndGCSnapshotLogTruncationConstraintsLocked(). } } @@ -1559,20 +1563,46 @@ func (r *Replica) completeSnapshotLogTruncationConstraint( r.mu.snapshotLogTruncationConstraints[snapUUID] = item } +// hasOutstandingLearnerSnapshotInFlight returns true if there is a snapshot in +// progress from this replica to a learner replica for this range. +func (r *Replica) hasOutstandingLearnerSnapshotInFlight() bool { + learners := r.Desc().Replicas().LearnerDescriptors() + for _, repl := range learners { + if yes := r.hasOutstandingSnapshotInFlightToStore(repl.StoreID); yes { + return yes + } + } + return false +} + +// hasOutstandingSnapshotInFlightToStore returns true if there is a snapshot in +// flight from this replica to the store with the given ID. +func (r *Replica) hasOutstandingSnapshotInFlightToStore(storeID roachpb.StoreID) bool { + return r.getAndGCSnapshotLogTruncationConstraints( + // NB: We do not want to consider truncation constraints that have been + // marked completed but have not expired. + timeutil.Now(), storeID, true, /* ignoreDeadline */ + ) > 0 +} + // getAndGCSnapshotLogTruncationConstraints returns the minimum index of any // currently outstanding snapshot being sent from this replica to the specified // recipient or 0 if there isn't one. Passing 0 for recipientStore means any // recipient. +// +// If ignoreDeadline is true, the deadline associated with the truncation +// constraint is ignored. In other words, truncation constraints that have been +// marked completed (but have not expired) are not returned. func (r *Replica) getAndGCSnapshotLogTruncationConstraints( - now time.Time, recipientStore roachpb.StoreID, + now time.Time, recipientStore roachpb.StoreID, ignoreDeadline bool, ) (minSnapIndex uint64) { r.mu.Lock() defer r.mu.Unlock() - return r.getAndGCSnapshotLogTruncationConstraintsLocked(now, recipientStore) + return r.getAndGCSnapshotLogTruncationConstraintsLocked(now, recipientStore, ignoreDeadline) } func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked( - now time.Time, recipientStore roachpb.StoreID, + now time.Time, recipientStore roachpb.StoreID, ignoreDeadline bool, ) (minSnapIndex uint64) { for snapUUID, item := range r.mu.snapshotLogTruncationConstraints { if item.deadline != (time.Time{}) && item.deadline.Before(now) { @@ -1584,6 +1614,9 @@ func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked( if recipientStore != 0 && item.recipientStore != recipientStore { continue } + if item.deadline != (time.Time{}) && ignoreDeadline { + continue + } if minSnapIndex == 0 || minSnapIndex > item.index { minSnapIndex = item.index }