Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: send MsgAppResp to latest leader after handling snapshot #140233

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2467,8 +2467,40 @@ func (r *raft) handleSnapshot(m pb.Message) {
if r.restore(s) {
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, id.index, id.term)

// To send MsgAppResp to any leader, we must be sure that our log is
// consistent with that leader's log.
//
// After restore(s), our log ends at the entryID of this snapshot.
// The snapshot came from a node at m.Term (typically the leader).
// Everything in this snapshot has been committed during m.Term or earlier,
// and will be present in all future term logs. It is thus safe to send
// MsgAppResp to any leader at term >= m.Term.
//
// From section 5.4 of the Raft paper:
// if a log entry is committed in a given term, then that entry will be
// present in the logs of the leaders for all higher-numbered terms.
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex(),
Commit: r.raftLog.committed})

// A leadership change may have happened while the snapshot was in flight.
// Therefore we need to send the MsgAppResp to the new leader as well.
//
// If we don't send response to the new leader, the new leader will not
// know that we have committed up to the snapshot index. This will cause
// delay for the new leader to transfer this follower to stateReplicate.
// (Since the new leader may send its own snapshot to this follower and
// wait for the MsgAppResp of that snapshot).
// Which may ultimately cause unwanted commit delay for client.
//
// Sending this response to the new leader allows the new leader to know
// this follower is caught up ASAP.
//
// TODO(pav-kv): consider if we can only send to r.lead.
if r.lead != None && r.lead != m.From {
r.send(pb.Message{To: r.lead, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex(),
Commit: r.raftLog.committed})
}
} else {
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, id.index, id.term)
Expand Down
196 changes: 196 additions & 0 deletions pkg/raft/testdata/snapshot_sent_and_leadership_change.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
# This test sets up a 3-node Raft cluster to verify redundant snapshot handling
# across leadership changes. It ensures that a slow follower (Node 3) correctly
# processes snapshots from both the previous (Node 1) and current (Node 2)
# leader. And also the new leader (Node 2) can avoid wasting necessary time in
# StateSnapshot.

# In the case that this slow follower happens to become the leaseholder,
# it is crucial for the leader node's progress tracker to have the leaseholder
# node transition from StateSnapshot -> StateReplicate ASAP

# Summary of steps:
# 1. Node 1 (previous leader) sends a snapshot to Node 3 (slow follower).
# 2. Leadership changes from Node 1 → Node 2.
# 3. Node 2 starts sending MsgApp to all followers, including Node 3.
# 4. Node 3, now aware of the leader change, responds with MsgAppResp to Node 2.
# 5. Node 2 discovers it has already compacted past Node 3's match index, so it
# sends a new snapshot to Node 3.
# 6. Node 3 receives the snapshot from previous term leader first.
# It sends MsgAppResp to both Node 1 and Node 2.
# 7. Node 2 had previously marked Node 3 as StateSnapshot when sending its
# own snapshot 2->3. If Node 3 responds to Snapshot 1->3, Node 2 transitions
# Node 3 back to StateReplicate.
# Even though its own snapshot is still inflight.


# This allows the new leader node 2 to transfer to StateReplicate from
# StateSnapshot earlier.

# Turn off output during the setup of the test.
log-level none
----
ok

# Start cluster of 3
add-nodes 3 voters=(1,2,3) index=10
----
ok

# Elect 1 as leader.
campaign 1
----
ok

stabilize
----
ok

propose 1 m13
----
ok

propose 1 m14
----
ok

propose 1 m15
----
ok

propose 1 m16
----
ok

# Create a network partition of (1,2) and (3)
stabilize 1 2
----
ok

deliver-msgs drop=(3)
----
ok

log-level debug
----
ok

# Follower 3 is far behind, so it will need a snapshot.
raft-log 3
----
1/11 EntryNormal ""

# Leader1 sends a snapshot to 3
send-snapshot 1 3
----
1->3 MsgSnap Term:1 Log:0/0
Snapshot: Index:15 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false

# Transfer leadership to 2, without 3 hearing about it.
transfer-leadership from=1 to=2
----
INFO 1 [term 1] starts to transfer leadership to 2
INFO 1 sends MsgTimeoutNow to 2 immediately as 2 already has up-to-date log
DEBUG 1 setting election elapsed to start from 3 ticks after store liveness support expired
INFO 1 became follower at term 1
DEBUG 1 reset election elapsed to 0

log-level none
----
ok

stabilize 1 2
----
ok

log-level debug
----
ok

# Deliver MsgApp 2->3 first
deliver-msgs type=MsgApp 3
----
2->3 MsgApp Term:2 Log:1/15 Commit:15 Entries:[2/16 EntryNormal ""]
INFO 3 [term: 1] received a MsgApp message with higher term from 2 [term: 2], new leader indicated, advancing term
DEBUG 3 setting election elapsed to start from 3 ticks after store liveness support expired
INFO 3 became follower at term 2
DEBUG 3 reset election elapsed to 0
DEBUG 3 [logterm: 0, index: 15] rejected MsgApp [logterm: 1, index: 15] from 2

# Deliver MsgSnap from 1->3 second
deliver-msgs type=MsgSnap 3
----
1->3 MsgSnap Term:1 Log:0/0
Snapshot: Index:15 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
INFO log [committed=11, applied=11, applying=11, unstable.offset=12, unstable.offsetInProgress=12, len(unstable.Entries)=0] starts to restore snapshot [index: 15, term: 1]
INFO 3 switched to configuration voters=(1 2 3)
INFO 3 [commit: 15, lastindex: 15, lastterm: 1] restored snapshot [index: 15, term: 1]
INFO 3 [commit: 15] restored snapshot [index: 15, term: 1]

# 3 first responds to MsgApp 2->3, with hint = 11
# 3 then responds to MsgSnap 1->3, with commit index = 15
stabilize 3
----
> 3 handling Ready
Ready MustSync=true:
HardState Term:2 Commit:15 Lead:2 LeadEpoch:0
Snapshot Index:15 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
Messages:
3->2 MsgAppResp Term:2 Log:1/15 Rejected (Hint: 11) Commit:11
3->1 MsgAppResp Term:2 Log:0/15 Commit:15
3->2 MsgAppResp Term:2 Log:0/15 Commit:15
> 3 receiving messages
2->3 MsgVote Term:2 Log:1/15
INFO 3 [logterm: 1, index: 15, vote: 0] rejected MsgVote from 2 [logterm: 1, index: 15] at term 2
2->3 MsgFortifyLeader Term:2 Log:0/0
> 3 handling Ready
Ready MustSync=true:
HardState Term:2 Commit:15 Lead:2 LeadEpoch:1
Messages:
3->2 MsgVoteResp Term:2 Log:0/0 Rejected (Hint: 0)
3->2 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:1

# 3 has applied the snapshot
raft-log 3
----
log is empty: first index=16, last index=15

# Do compaction on 2
compact 2 15
----
2/16 EntryNormal ""


# 2 processes MsgAppResp from 3 with hint 11,
# since compaction already happened on 2, 2 sends MsgSnap 2->3 up to index 16
# 2 processes MsgAppResp from 3 with commit index 15
# This is different from the snapshot just sent by 2, but 2 still sees 3 has
# caught up
# 2 changes follower 3's state in progress tracker to StateReplicate
# from StateSnapshot
# 2 then sends MsgApp 2->3 for entry 16, since 3 is caught up and in state
# replicate from the perspective of 2
stabilize 2
----
> 2 receiving messages
3->2 MsgAppResp Term:2 Log:1/15 Rejected (Hint: 11) Commit:11
DEBUG 2 received MsgAppResp(rejected, hint: (index 11, term 1)) from 3 for index 15
DEBUG 2 decreased progress of 3 to [StateProbe match=0 next=12 sentCommit=11 matchCommit=11]
DEBUG 2 [firstindex: 16, commit: 16] sent snapshot[index: 16, term: 2] to 3 [StateProbe match=0 next=12 sentCommit=11 matchCommit=11]
DEBUG 2 paused sending replication messages to 3 [StateSnapshot match=0 next=17 sentCommit=16 matchCommit=11 paused pendingSnap=16]
3->2 MsgAppResp Term:2 Log:0/15 Commit:15
DEBUG 2 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=15 next=17 sentCommit=16 matchCommit=15 paused pendingSnap=16]
3->2 MsgVoteResp Term:2 Log:0/0 Rejected (Hint: 0)
3->2 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:1
> 2 handling Ready
Ready MustSync=false:
Messages:
2->3 MsgSnap Term:2 Log:0/0
Snapshot: Index:16 Term:2 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
2->3 MsgApp Term:2 Log:1/15 Commit:16 Entries:[2/16 EntryNormal ""]
2->3 MsgApp Term:2 Log:2/16 Commit:16

status 2
----
1: StateReplicate match=16 next=17 sentCommit=16 matchCommit=16
2: StateReplicate match=16 next=17 sentCommit=15 matchCommit=15
3: StateReplicate match=15 next=17 sentCommit=16 matchCommit=15 inflight=1