Skip to content

Commit

Permalink
kvserver: avoid races where replication changes can get interrupted
Browse files Browse the repository at this point in the history
This commit adds a safeguard inside
`Replica.maybeLeaveAtomicChangeReplicasAndRemoveLearners()` to avoid removing
learner replicas _when we know_ that that learner replica is in the process of
receiving its initial snapshot (as indicated by an in-memory lock on log
truncations that we place while the snapshot is in-flight).

This change should considerably reduce the instances where `AdminRelocateRange`
calls are interrupted by the mergeQueue or the replicateQueue (and vice versa).

Fixes cockroachdb#57129
Relates to cockroachdb#79118

Release note: none
  • Loading branch information
aayushshah15 committed Apr 7, 2022
1 parent bb2c29c commit 5fc368e
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 72 deletions.
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,9 @@ func (mq *mergeQueue) process(
// performed by the LHS leaseholder, so it can easily do this for LHS.
// We deal with the RHS, whose leaseholder may be remote, further down.
var err error
lhsDesc, err =
lhsRepl.maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, lhsDesc)
// TODO(aayush): Separately track metrics for how many learners were removed
// by the mergeQueue here.
lhsDesc, _, err = lhsRepl.maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, lhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return false, err
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,11 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err
raftStatus := r.raftStatusRLocked()

const anyRecipientStore roachpb.StoreID = 0
pendingSnapshotIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now, anyRecipientStore)
pendingSnapshotIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(
now,
anyRecipientStore,
false, /* onlyInFlight */
)
lastIndex := r.mu.lastIndex
// NB: raftLogSize above adjusts for pending truncations that have already
// been successfully replicated via raft, but logSizeTrusted does not see if
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
raft "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/tracker"
)

Expand Down Expand Up @@ -669,7 +669,9 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
assertMin := func(exp uint64, now time.Time) {
t.Helper()
const anyRecipientStore roachpb.StoreID = 0
if maxIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now, anyRecipientStore); maxIndex != exp {
if maxIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(
now, anyRecipientStore, false, /* onlyInFlight */
); maxIndex != exp {
t.Fatalf("unexpected max index %d, wanted %d", maxIndex, exp)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
return nil
}
if index := repl.getAndGCSnapshotLogTruncationConstraints(
timeutil.Now(), repDesc.StoreID,
timeutil.Now(), repDesc.StoreID, false, /* onlyInFlight */
); index > 0 {
// There is a snapshot being transferred. It's probably an INITIAL snap,
// so bail for now and try again later.
Expand Down
34 changes: 24 additions & 10 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,8 @@ func (r *Replica) changeReplicasImpl(
// If we demoted or swapped any voters with non-voters, we likely are in a
// joint config or have learners on the range. Let's exit the joint config
// and remove the learners.
return r.maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, desc)
desc, _, err = r.maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, desc)
return desc, err
}
return desc, nil
}
Expand Down Expand Up @@ -1278,22 +1279,34 @@ func (r *Replica) maybeLeaveAtomicChangeReplicas(
})
}

// ErrCannotRemoveLearnerWhileSnapshotInFlight is returned when we cannot remove
// a learner replica because it is in the process of receiving its initial
// snapshot.
var ErrCannotRemoveLearnerWhileSnapshotInFlight = errors.New("cannot remove learner while snapshot is in flight")

// maybeLeaveAtomicChangeReplicasAndRemoveLearners transitions out of the joint
// config (if there is one), and then removes all learners. After this function
// returns, all remaining replicas will be of type VOTER_FULL or NON_VOTER.
func (r *Replica) maybeLeaveAtomicChangeReplicasAndRemoveLearners(
ctx context.Context, desc *roachpb.RangeDescriptor,
) (rangeDesc *roachpb.RangeDescriptor, err error) {
) (rangeDesc *roachpb.RangeDescriptor, learnersRemoved int64, err error) {
desc, err = r.maybeLeaveAtomicChangeReplicas(ctx, desc)
if err != nil {
return nil, err
return nil, learnersRemoved, err
}

// If we detect that there are learners in the process of receiving their
// initial upreplication snapshot, we don't want to remove them and we bail
// out.
if r.hasOutstandingLearnerSnapshotInFlight() {
return desc, learnersRemoved, ErrCannotRemoveLearnerWhileSnapshotInFlight
}

// Now the config isn't joint any more, but we may have demoted some voters
// into learners. These learners should go as well.
// into learners. If so, we need to remove them.
learners := desc.Replicas().LearnerDescriptors()
if len(learners) == 0 {
return desc, nil
return desc, learnersRemoved, nil
}
targets := make([]roachpb.ReplicationTarget, len(learners))
for i := range learners {
Expand All @@ -1320,10 +1333,11 @@ func (r *Replica) maybeLeaveAtomicChangeReplicasAndRemoveLearners(
},
)
if err != nil {
return nil, errors.Wrapf(err, `removing learners from %s`, origDesc)
return nil, learnersRemoved, errors.Wrapf(err, `removing learners from %s`, origDesc)
}
learnersRemoved++
}
return desc, nil
return desc, learnersRemoved, nil
}

// validateAdditionsPerStore ensures that we're not trying to add the same type
Expand Down Expand Up @@ -1855,7 +1869,8 @@ func (r *Replica) execReplicationChangesForVoters(

// Leave the joint config if we entered one. Also, remove any learners we
// might have picked up due to removal-via-demotion.
return r.maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, desc)
desc, _, err = r.maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, desc)
return desc, err
}

// tryRollbackRaftLearner attempts to remove a learner specified by the target.
Expand Down Expand Up @@ -2768,8 +2783,7 @@ func (r *Replica) AdminRelocateRange(

// Remove learners so we don't have to think about relocating them, and leave
// the joint config if we're in one.
newDesc, err :=
r.maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, &rangeDesc)
newDesc, _, err := r.maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, &rangeDesc)
if err != nil {
log.Warningf(ctx, "%v", err)
return err
Expand Down
85 changes: 85 additions & 0 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,91 @@ func TestLearnerAndJointConfigAdminMerge(t *testing.T) {
checkFails()
}

// TestMergeQueueDoesNotInterruptReplicationChange verifies that the merge queue
// will correctly ignore a range that has an in-flight snapshot to a learner
// replica.
func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
var activateSnapshotTestingKnob int64
blockSnapshot := make(chan struct{})
tc := testcluster.StartTestCluster(
t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Disable load-based splitting, so that the absence of sufficient
// QPS measurements do not prevent ranges from merging.
DisableLoadBasedSplitting: true,
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
if atomic.LoadInt64(&activateSnapshotTestingKnob) == 1 {
<-blockSnapshot
}
return nil
},
},
},
},
},
)
defer tc.Stopper().Stop(ctx)

scratchKey := tc.ScratchRange(t)
splitKey := scratchKey.Next()

// Split and then unsplit the range to clear the sticky bit, otherwise the
// mergeQueue will ignore the range.
tc.SplitRangeOrFatal(t, splitKey)
require.NoError(t, tc.Server(0).DB().AdminUnsplit(ctx, splitKey))

atomic.StoreInt64(&activateSnapshotTestingKnob, 1)
replicationChange := make(chan error)
err := tc.Stopper().RunAsyncTask(ctx, "test", func(ctx context.Context) {
_, err := tc.AddVoters(scratchKey, makeReplicationTargets(2)...)
replicationChange <- err
})
require.NoError(t, err)

select {
case <-time.After(100 * time.Millisecond):
// Continue.
case <-replicationChange:
t.Fatal("did not expect the replication change to complete")
}
defer func() {
// Unblock the replication change and ensure that it succeeds.
close(blockSnapshot)
require.NoError(t, <-replicationChange)
}()

db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
// TestCluster currently overrides this when used with ReplicationManual.
db.Exec(t, `SET CLUSTER SETTING kv.range_merge.queue_enabled = true`)

testutils.SucceedsSoon(t, func() error {
// While this replication change is stalled, we'll trigger a merge and
// ensure that the merge correctly notices that there is a snapshot in
// flight and ignores the range.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchKey)
trace, _, _ := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
formattedTrace := trace.String()
matched, err := regexp.MatchString(
kvserver.ErrCannotRemoveLearnerWhileSnapshotInFlight.Error(), formattedTrace,
)
require.NoError(t, err)
if !matched {
return errors.Errorf(
"expected the mergeQueue to detect that there was an in-flight snapshot to a learner; got %v",
formattedTrace,
)
}
return nil
})
}

func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
57 changes: 50 additions & 7 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1506,7 +1506,7 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID
type snapTruncationInfo struct {
index uint64
recipientStore roachpb.StoreID
deadline time.Time
completedAt time.Time
}

func (r *Replica) addSnapshotLogTruncationConstraint(
Expand Down Expand Up @@ -1536,6 +1536,10 @@ func (r *Replica) addSnapshotLogTruncationConstraintLocked(
r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{
index: index,
recipientStore: recipientStore,
// NB: We explicitly do not want to set the `completedAt` field here, since
// the presence of that field is assumed to indicate that the snapshot has
// been successfully transmitted (but perhaps not yet applied) by the logic
// inside getAndGCSnapshotLogTruncationConstraintsLocked().
}
}

Expand All @@ -1554,28 +1558,55 @@ func (r *Replica) completeSnapshotLogTruncationConstraint(
return
}

deadline := now.Add(RaftLogQueuePendingSnapshotGracePeriod)
item.deadline = deadline
item.completedAt = now
r.mu.snapshotLogTruncationConstraints[snapUUID] = item
}

// hasOutstandingLearnerSnapshotInFlight returns true if there is a snapshot in
// progress from this replica to a learner replica for this range.
func (r *Replica) hasOutstandingLearnerSnapshotInFlight() bool {
learners := r.Desc().Replicas().LearnerDescriptors()
for _, repl := range learners {
if yes := r.hasOutstandingSnapshotInFlightToStore(repl.StoreID); yes {
return yes
}
}
return false
}

// hasOutstandingSnapshotInFlightToStore returns true if there is a snapshot in
// flight from this replica to the store with the given ID.
func (r *Replica) hasOutstandingSnapshotInFlightToStore(storeID roachpb.StoreID) bool {
return r.getAndGCSnapshotLogTruncationConstraints(
// NB: We do not want to consider truncation constraints that have been
// marked completed but have not expired.
timeutil.Now(), storeID, true, /* onlyInFlight */
) > 0
}

// getAndGCSnapshotLogTruncationConstraints returns the minimum index of any
// currently outstanding snapshot being sent from this replica to the specified
// recipient or 0 if there isn't one. Passing 0 for recipientStore means any
// recipient.
//
// If onlyInFlight is true, only truncation constraints corresponding to
// snapshots currently in flight are considered. In other words, truncation
// constraints that have been marked completed (but have not expired their grace
// period yet) are not returned.
func (r *Replica) getAndGCSnapshotLogTruncationConstraints(
now time.Time, recipientStore roachpb.StoreID,
now time.Time, recipientStore roachpb.StoreID, onlyInFlight bool,
) (minSnapIndex uint64) {
r.mu.Lock()
defer r.mu.Unlock()
return r.getAndGCSnapshotLogTruncationConstraintsLocked(now, recipientStore)
return r.getAndGCSnapshotLogTruncationConstraintsLocked(now, recipientStore, onlyInFlight)
}

func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked(
now time.Time, recipientStore roachpb.StoreID,
now time.Time, recipientStore roachpb.StoreID, onlyInFlight bool,
) (minSnapIndex uint64) {
constraintGCThreshold := now.Add(-RaftLogQueuePendingSnapshotGracePeriod)
for snapUUID, item := range r.mu.snapshotLogTruncationConstraints {
if item.deadline != (time.Time{}) && item.deadline.Before(now) {
if item.completedAt != (time.Time{}) && item.completedAt.Before(constraintGCThreshold) {
// The snapshot has finished and its grace period has passed.
// Ignore it when making truncation decisions.
delete(r.mu.snapshotLogTruncationConstraints, snapUUID)
Expand All @@ -1584,6 +1615,18 @@ func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked(
if recipientStore != 0 && item.recipientStore != recipientStore {
continue
}
// NB: `completedAt` is only assigned to truncation constraints that
// correspond to snapshots that have been transmitted to the recipient.
// Above, we allow for a grace period before GC-ing a truncation constraint
// after its snapshot has been marked completed in order to avoid ill-timed
// log truncations where the recipient has received the snapshot, but hasn't
// yet applied it.
//
// Ignore such completed constraints if the caller only cares about
// in-flight snapshots.
if item.completedAt != (time.Time{}) && onlyInFlight {
continue
}
if minSnapIndex == 0 || minSnapIndex > item.index {
minSnapIndex = item.index
}
Expand Down
Loading

0 comments on commit 5fc368e

Please sign in to comment.