Skip to content

Commit

Permalink
merge fix from etcd-io/etcd#12163
Browse files Browse the repository at this point in the history
  • Loading branch information
absolute8511 committed Jun 15, 2021
1 parent 0de1420 commit eced5e5
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 6 deletions.
5 changes: 5 additions & 0 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ func (l *raftLog) hasMoreNextEnts(appliedTo uint64) bool {
return l.committed > appliedTo
}

// hasPendingSnapshot returns if there is pending snapshot waiting for applying.
func (l *raftLog) hasPendingSnapshot() bool {
return l.unstable.snapshot != nil && !IsEmptySnap(*l.unstable.snapshot)
}

func (l *raftLog) snapshot() (pb.Snapshot, error) {
if l.unstable.snapshot != nil {
return *l.unstable.snapshot, nil
Expand Down
21 changes: 19 additions & 2 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,23 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
})
}

func (r *raft) advance(rd Ready) {
// If entries were applied (or a snapshot), update our cursor for
// the next Ready. Note that if the current HardState contains a
// new Commit index, this does not mean that we're also applying
// all of the new entries due to commit pagination by size.
if index := rd.appliedCursor(); index > 0 {
r.raftLog.appliedTo(index)
}
if len(rd.Entries) > 0 {
e := rd.Entries[len(rd.Entries)-1]
r.raftLog.stableTo(e.Index, e.Term)
}
if !IsEmptySnap(rd.Snapshot) {
r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
}
}

// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
Expand Down Expand Up @@ -1446,8 +1463,8 @@ func (r *raft) restoreNode(nodes []uint64, grpsConf []*pb.Group, isLearner bool)
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
_, ok := r.prs[r.id]
return ok
pr, ok := r.prs[r.id]
return ok && pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot()
}

func (r *raft) updateNode(id uint64, g pb.Group) {
Expand Down
39 changes: 36 additions & 3 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2785,6 +2785,13 @@ func TestRestore(t *testing.T) {
if ok := sm.restore(s); ok {
t.Fatal("restore succeed, want fail")
}
// It should not campaign before actually applying data.
for i := 0; i < sm.randomizedElectionTimeout; i++ {
sm.tick()
}
if sm.state != StateFollower {
t.Errorf("state = %d, want %d", sm.state, StateFollower)
}
}

// TestRestoreWithLearner restores a snapshot which contains learners.
Expand Down Expand Up @@ -2901,12 +2908,16 @@ func TestLearnerReceiveSnapshot(t *testing.T) {
},
}

n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
store := NewMemoryStorage()
n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, store)
n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
defer closeAndFreeRaft(n1)
defer closeAndFreeRaft(n2)

n1.restore(s)
ready := newReady(n1, &SoftState{}, pb.HardState{}, true)
store.ApplySnapshot(ready.Snapshot)
n1.advance(ready)

// Force set n1 appplied index.
n1.raftLog.appliedTo(n1.raftLog.committed)
Expand Down Expand Up @@ -3567,10 +3578,32 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) {
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs[3].Match, 1)
}

filtered := pb.Message{}
// Snapshot needs to be applied before sending MsgAppResp
nt.msgHook = func(m pb.Message) bool {
if m.Type != pb.MsgAppResp || m.From != 3 || m.Reject {
return true
}
filtered = m
return false
}

// Transfer leadership to 3 when node 3 is lack of snapshot.
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
// Send pb.MsgHeartbeatResp to leader to trigger a snapshot for node 3.
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgHeartbeatResp})
if lead.state != StateLeader {
t.Fatalf("node 1 should still be leader as snapshot is not applied, got %x", lead.state)
}
if reflect.DeepEqual(filtered, pb.Message{}) {
t.Fatalf("Follower should report snapshot progress automatically.")
}

// Apply snapshot and resume progress
follower := nt.peers[3].(*raft)
ready := newReady(follower, &SoftState{}, pb.HardState{}, true)
nt.storage[3].ApplySnapshot(ready.Snapshot)
follower.advance(ready)
nt.msgHook = nil
nt.send(filtered)

checkLeaderTransferState(t, lead, StateFollower, 3)
}
Expand Down
2 changes: 1 addition & 1 deletion raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (rn *RawNode) HasReady() bool {
if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
return true
}
if r.raftLog.unstable.snapshot != nil && !IsEmptySnap(*r.raftLog.unstable.snapshot) {
if r.raftLog.hasPendingSnapshot() {
return true
}
if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
Expand Down

0 comments on commit eced5e5

Please sign in to comment.