Skip to content

Commit

Permalink
Merge pull request cockroachdb#10690 from petermattis/pmattis/deflake…
Browse files Browse the repository at this point in the history
…-test-raft-remove-race

storage: deflake TestRaftRemoveRace
  • Loading branch information
petermattis authored Nov 22, 2016
2 parents 6e7af0e + dc6c5f6 commit ffe9a33
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 21 deletions.
18 changes: 9 additions & 9 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1905,19 +1905,19 @@ func TestRaftAfterRemoveRange(t *testing.T) {
mtc.expireLeases()
}

// TestRaftRemoveRace adds and removes a replica repeatedly in an
// attempt to reproduce a race
// (https://github.com/cockroachdb/cockroach/issues/1911). Note that
// 10 repetitions is not enough to reliably reproduce the problem, but
// it's better than any other tests we have for this (increasing the
// number of repetitions adds an unacceptable amount of test runtime).
// TestRaftRemoveRace adds and removes a replica repeatedly in an attempt to
// reproduce a race (see #1911 and #9037).
func TestRaftRemoveRace(t *testing.T) {
defer leaktest.AfterTest(t)()
mtc := startMultiTestContext(t, 3)
mtc := startMultiTestContext(t, 10)
defer mtc.Stop()

rangeID := roachpb.RangeID(1)
mtc.replicateRange(rangeID, 1, 2)
const rangeID = roachpb.RangeID(1)
// Up-replicate to a bunch of nodes which stresses a condition where a
// replica created via a preemptive snapshot receives a message for a
// previous incarnation of the replica (i.e. has a smaller replica ID) that
// existed on the same store.
mtc.replicateRange(rangeID, 1, 2, 3, 4, 5, 6, 7, 8, 9)

for i := 0; i < 10; i++ {
mtc.unreplicateRange(rangeID, 2)
Expand Down
43 changes: 31 additions & 12 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ type Replica struct {
// Raft group). The replica ID will be non-zero whenever the replica is
// part of a Raft group.
replicaID roachpb.ReplicaID
// The minimum allowed ID for this replica. Initialized from
// RaftTombstone.NextReplicaID.
minReplicaID roachpb.ReplicaID
// The ID of the leader replica within the Raft group. Used to determine
// when the leadership changes.
leaderID roachpb.ReplicaID
Expand Down Expand Up @@ -714,18 +717,27 @@ func (r *Replica) destroyDataRaftMuLocked() error {

// Save a tombstone. The range cannot be re-replicated onto this
// node without having a replica ID of at least desc.NextReplicaID.
tombstoneKey := keys.RaftTombstoneKey(desc.RangeID)
tombstone := &roachpb.RaftTombstone{
NextReplicaID: desc.NextReplicaID,
}
ctx := r.AnnotateCtx(context.TODO())
if err := engine.MVCCPutProto(ctx, batch, nil, tombstoneKey, hlc.ZeroTimestamp, nil, tombstone); err != nil {
if err := r.setTombstoneKey(ctx, batch, desc); err != nil {
return err
}

return batch.Commit()
}

func (r *Replica) setTombstoneKey(
ctx context.Context, eng engine.ReadWriter, desc *roachpb.RangeDescriptor,
) error {
r.mu.Lock()
r.mu.minReplicaID = desc.NextReplicaID
r.mu.Unlock()
tombstoneKey := keys.RaftTombstoneKey(desc.RangeID)
tombstone := &roachpb.RaftTombstone{
NextReplicaID: desc.NextReplicaID,
}
return engine.MVCCPutProto(ctx, eng, nil, tombstoneKey,
hlc.ZeroTimestamp, nil, tombstone)
}

func (r *Replica) setReplicaID(replicaID roachpb.ReplicaID) error {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -734,16 +746,23 @@ func (r *Replica) setReplicaID(replicaID roachpb.ReplicaID) error {

// setReplicaIDLocked requires that the replica lock is held.
func (r *Replica) setReplicaIDLocked(replicaID roachpb.ReplicaID) error {
if replicaID == 0 {
// If the incoming message didn't give us a new replica ID,
// there's nothing to do (this is only expected for preemptive snapshots).
if r.mu.replicaID == replicaID {
// The common case: the replica ID is unchanged.
return nil
}
if r.mu.replicaID == replicaID {
if replicaID == 0 {
// If the incoming message does not have a new replica ID it is a
// preemptive snapshot. We'll set a tombstone for the old replica ID if the
// snapshot is accepted.
return nil
} else if r.mu.replicaID > replicaID {
}
if replicaID < r.mu.minReplicaID {
return &roachpb.RaftGroupDeletedError{}
}
if r.mu.replicaID > replicaID {
return errors.Errorf("replicaID cannot move backwards from %d to %d", r.mu.replicaID, replicaID)
} else if r.mu.replicaID != 0 {
}
if r.mu.replicaID != 0 {
// TODO(bdarnell): clean up previous raftGroup (update peers)
}

Expand Down
41 changes: 41 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6032,6 +6032,47 @@ func TestReplicaIDChangePending(t *testing.T) {
<-commandProposed
}

func TestSetReplicaID(t *testing.T) {
defer leaktest.AfterTest(t)()

tsc := TestStoreConfig(nil)
tc := testContext{}
tc.StartWithStoreConfig(t, tsc)
defer tc.Stop()

repl := tc.repl

testCases := []struct {
replicaID roachpb.ReplicaID
minReplicaID roachpb.ReplicaID
newReplicaID roachpb.ReplicaID
expected string
}{
{0, 0, 1, ""},
{0, 1, 1, ""},
{0, 2, 1, "raft group deleted"},
{1, 2, 1, ""}, // not an error; replicaID == newReplicaID is checked first
{2, 0, 1, "replicaID cannot move backwards"},
}
for i, c := range testCases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
repl.mu.Lock()
repl.mu.replicaID = c.replicaID
repl.mu.minReplicaID = c.minReplicaID
repl.mu.Unlock()

err := repl.setReplicaID(c.newReplicaID)
if c.expected == "" {
if err != nil {
t.Fatalf("expected success, but found %v", err)
}
} else if !testutils.IsError(err, c.expected) {
t.Fatalf("expected %s, but found %v", c.expected, err)
}
})
}
}

func TestReplicaRetryRaftProposal(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2877,6 +2877,7 @@ func (s *Store) processRaftRequest(
// would limit the effectiveness of RaftTransport.SendSync for
// preemptive snapshots.
r.mu.internalRaftGroup = nil
needTombstone := r.mu.state.Desc.NextReplicaID != 0
r.mu.Unlock()

appliedIndex, _, err := loadAppliedIndex(ctx, r.store.Engine(), r.RangeID)
Expand Down Expand Up @@ -2911,6 +2912,14 @@ func (s *Store) processRaftRequest(
ready = raftGroup.Ready()
}

if needTombstone {
// Write a tombstone key in order to prevent the replica from receiving
// messages from its previous incarnation.
if err := r.setTombstoneKey(ctx, r.store.Engine(), r.mu.state.Desc); err != nil {
return roachpb.NewError(err)
}
}

// Apply the snapshot, as Raft told us to.
if err := r.applySnapshot(ctx, inSnap, ready.Snapshot, ready.HardState); err != nil {
return roachpb.NewError(err)
Expand Down Expand Up @@ -3490,6 +3499,7 @@ func (s *Store) tryGetOrCreateReplica(
// replica even outside of raft processing. Have to do this after grabbing
// Store.mu to maintain lock ordering invariant.
repl.mu.Lock()
repl.mu.minReplicaID = tombstone.NextReplicaID
// Add the range to range map, but not replicasByKey since the range's start
// key is unknown. The range will be added to replicasByKey later when a
// snapshot is applied. After unlocking Store.mu above, another goroutine
Expand Down

0 comments on commit ffe9a33

Please sign in to comment.