Skip to content

Commit

Permalink
raft:characterization test for leader commit term storage access
Browse files Browse the repository at this point in the history
<what was there before: Previously, ...>
<why it needed to change: This was inadequate because ...>
<what you did about it: To address this, this patch ...>

Epic: none

Release note: None
  • Loading branch information
hakuuww committed Jan 27, 2025
1 parent e9b16c1 commit bd7dc5b
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 249 deletions.
22 changes: 12 additions & 10 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,14 @@ type raft struct {

state pb.StateType

// idxPreLeading is the highest log index on the raft node before the leader became leader
// Updated only when the raft node becomes leader
// Only used for term check comparison before committing entry(s) in maybeCommit() when raft is in leader state
// idxPreLeading is the last log index as of when this node became the
// leader. Separates entries proposed by previous leaders from the entries
// proposed by the current leader. Used only in StateLeader, and updated
// when entering StateLeader (in becomeLeader()).
//
// Invariants (when in StateLeader at raft.Term):
// - entries at indices <= idxBeforeLeader have term < raft.Term
// - entries at indices > idxBeforeLeader have term == raft.Term
idxPreLeading uint64

// isLearner is true if the local raft node is a learner.
Expand Down Expand Up @@ -1039,13 +1044,10 @@ func (r *raft) maybeCommit() bool {
// way, then all prior entries are committed indirectly because of the Log
// Matching Property.
//
// Instead of comparing the term of the entry we want to commit with the current term of the leader,
// (which may trigger a storage read to know the term number of the entry we want to commit,
// because the entry might already have been persisted to stable storage and deleted from memory)
// we keep the index of the highest entry of the current leader node before it became leader
if index <= r.idxPreLeading {
if !r.raftLog.matchTerm(entryID{term: r.Term, index: index}) {
return false
}

r.raftLog.commitTo(LogMark{Term: r.Term, Index: index})
return true
}
Expand Down Expand Up @@ -2314,10 +2316,10 @@ func stepFollower(r *raft, m pb.Message) error {
// may never be safe for MsgTimeoutNow to come from anyone but the leader.
// We need to think about this more.
//
//if r.supportingFortifiedLeader() && r.lead != m.From {
// if r.supportingFortifiedLeader() && r.lead != m.From {
// r.logger.Infof("%x [term %d] ignored MsgTimeoutNow from %x due to leader fortification", r.id, r.Term, m.From)
// return nil
//}
// }
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership", r.id, r.Term, m.From)
// Leadership transfers never use pre-vote even if r.preVote is true; we
// know we are not recovering from a partition so there is no need for the
Expand Down
54 changes: 42 additions & 12 deletions pkg/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2572,22 +2572,49 @@ func TestLeaderAppResp(t *testing.T) {
reject bool
// progress
wmatch uint64
wnext uint64
// message
wmsgNum int
windex uint64
// log index of the next entry to send to this follower.
wnext uint64
// messageCount the leader sends out
wmsgNum int
// prevLogIndex in MsgApp from leader to followers
windex uint64
// leader's commit index
wcommitted uint64
// storage access counts for getting term number
ctStgTerm int
}{
{2, true, 0, 4, 0, 0, 0}, // stale resp; no replies
{6, true, 0, 4, 0, 0, 0}, // stale resp; no replies
{3, true, 0, 3, 1, 2, 0}, // denied resp; leader does not commit; decrease next and send probing msg

{4, false, 4, 7, 1, 4, 4}, // accept resp; leader commits; broadcast with commit index
// Follower is StateProbing at 4, it sends MsgAppResp for 4, and is moved to
{2, true, 0, 4, 0, 0, 0, 1}, // stale resp; no replies
{6, true, 0, 4, 0, 0, 0, 1}, // stale resp; no replies

// denied resp; leader does not commit; decrease next and send probing msg
// An additional term storage access is involved for an entry
// that's already persisted since we are probing backwards
{3, true, 0, 3, 1, 2, 0, 2},

// Follower2 responds to leader, indicating log index2 is replicated.
// Leader tries to commit, but commit index doesn't advance since the index is from a previous term
// We hit maybeCommit() and do term check by getting the term number from storage
{2, false, 2, 7, 1, 2, 0, 3},

// NB: For the following tests, we are skipping the MsgAppResp for the first 3 entries, by directly processing MsgAppResp for later entries
//
// Follower2 is StateProbing at 4, it sends MsgAppResp for 4, and is moved to
// StateReplicate and as many entries as possible are sent to it (5, 6).
// Correspondingly the Next is then 7 (entry 7 does not exist, indicating
// the follower will be up to date should it process the emitted MsgApp).
{4, false, 4, 7, 1, 4, 4},
// accept resp; leader commits; broadcast with commit index
{4, false, 4, 7, 1, 4, 4, 1},

// Follower2 says term2, index5 is already replicated
// The leader broadcasts to followers saying the entry{term:2, index:5} has been committed, with commit index 5
// The leader sends entry{term:2, index:6} to follower2
// The leader sends entry 4,5,6 to follower3 since follower3 haven't sent MsgAppResp to leader
{5, false, 5, 7, 1, 5, 5, 1},
// Follower2 says term2, index6 is already replicated
// The leader broadcasts to followers saying the entry{term:2, index:6} has been committed with commit index 6
// The leader sends no additional entries to follower2 since all logs are up to date
// The leader sends entry 4,5,6 to follower3 since follower3 haven't sent MsgAppResp to leader
{6, false, 6, 7, 1, 6, 6, 1},
} {
t.Run("", func(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2, 3))
Expand All @@ -2600,9 +2627,10 @@ func TestLeaderAppResp(t *testing.T) {
sm.becomeLeader()
require.Equal(t, uint64(4), sm.raftLog.lastIndex()) // appended a dummy
sm.appendEntry(index(5).terms(2, 2)...)
require.Equal(t, uint64(0), sm.raftLog.committed)
sm.bcastAppend()

sm.readMessages()

require.NoError(t, sm.Step(pb.Message{
From: 2,
Type: pb.MsgAppResp,
Expand All @@ -2622,6 +2650,8 @@ func TestLeaderAppResp(t *testing.T) {
require.Equal(t, tt.windex, msg.Index, "%v", DescribeMessage(msg, nil))
require.Equal(t, tt.wcommitted, msg.Commit, "%v", DescribeMessage(msg, nil))
}

assert.Equal(t, tt.ctStgTerm, storage.callStats.term)
})
}
}
Expand Down
227 changes: 0 additions & 227 deletions pkg/raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,233 +1023,6 @@ func TestRawNodeConsumeReady(t *testing.T) {
require.Equal(t, m2, rn.raft.msgs[0])
}

func TestStorageUsageForTermCheckBeforeCommit(t *testing.T) {
// Check for eliminating storage access for getting the term number of an entry that is unavailable in unstable log
// (because it has already been persisted on storage, and removed from unstable log)
//
// Before adding raft.idxPreLeading optimization, the leader node often needs to access storage
// to know the term number of a specific entry
//
// High level overview of the scenario before the optimization:
// Within a cluster, Node "A" campaigns and wins the election.
// This new leader A now is in term X
// A has some uncommitted entries from previous term(s) < X
//
// The following (can) happens asynchronously through "Ready messages" that are picked up by the application layer
// A tries to replicate all its uncommitted entries to followers (send MsgApp to followers)
// A also tells the application layer of what is currently in the unstable log of A. (MsgStorageAppend)
//
// The following happens asynchronously:
// A receives MsgAppResp from a follower, eventually A receives MsgAppResp from a quorum of followers
// one MsgAppResp from an individual follower indicates
// the specific follower node has replicated the entry to its stable storage
//
// A keeps track of whether each follower has persisted the entry in its progress tracker
// If A sees that a quorum of followers has persisted the entry,
// A tries to commit the entry with raft.maybeCommit()
// A receives MsgStorageAppendResp,
// indicating application layer has persisted the entries currently in unstable
// (which are also sent out to followers in MsgAppResp)
// to storage
// A goes ahead and remove those entries from unstable log (since A knows its safely in storage)
//
//
// As you can see if "A receives MsgStorageAppendResp" happens first, A will remove the entries from unstable
// After this has happened, if A receives MsgAppResp(for the entry) from a quorum of followers,
// A tries to commit the entry(s) because it has quorum, but A still needs to know the entry it is trying to commit
// is from the current term.
//
// So it checks for the term number of the entry, but the entry is no longer in stable, so A calls the storage
// to ask for the term number. This disk access is very slow compared to just knowing what the term is in memory.
// (or an invariant info that serves the same purpose)

// Implementation of the unit test:
// We will have two nodes, node 1 and node 2, node 1 will be the leader node
// We will manually operate/trigger the actions/behaviours of node 1 (id: 1),
// node 1 is aware of the existence of node 2 (id: 2)
// but we won't actually create a raw node for node2 or do stuff on node 2
// we will mimic node 2 sending messages to node 1
// by manually stepping messages on node 1 that seems to be sent from node 2.
// By calling rn.Step(message) rn is short for rawnode which represents node 1 in this test

// Creating an in-memory storage/network scenario with two nodes. node1 (id:1) and node2 (id:2)
// s is the persistent storage object of node1
s := newTestMemoryStorage(withPeers(1, 2))

// Node 1 starts with commit index 0
hs := pb.HardState{
Term: 2,
Vote: 1,
Commit: 0,
}
require.NoError(t, s.SetHardState(hs))
// node1 starts with two entries in its storage at the beginning
// which is why node 1 starts with commit index 0
require.NoError(t, s.Append(entryID{}.append(1, 2).entries))

// config for node 1 (id:1), election timeout and heartbeat doesn't matter
// cfg has the storage object s passed in, which represents node 1 knowing that there is another node 2 in the cluster
cfg := newTestConfig(1, 3, 1, s)
cfg.AsyncStorageWrites = true

// rn is node1
rn, err := NewRawNode(cfg)
require.NoError(t, err)
require.Equal(t, uint64(0), rn.raft.raftLog.committed)

// node1 starts campaign
rn.Campaign() // prepares the ready with state candidate
t.Log("node 1 becomes leader")
// Generate a Ready object
rd := rn.Ready()
t.Log("describe node1 1st ready")
t.Logf("%s",
DescribeReady(rd, func(bytes []byte) string { return "" }))
//Ready MustSync=true:
//State:StateCandidate
//HardState Term:3 Vote:1 Commit:0 Lead:0 LeadEpoch:0
//Messages:
//1->2 MsgVote Term:3 Log:2/2
//1->AppendThread MsgStorageAppend Term:3 Log:0/0 Vote:1 Responses:[
// 1->1 MsgVoteResp Term:3 Log:0/0
//]
s.SetHardState(rd.HardState) // actually becomes candidate, and send out vote request

rn.Step(rd.Messages[1].Responses[0]) // 1->1 MsgVoteResp, Mimic node1 voting itself
rn.Step(pb.Message{To: 1, From: 2, Term: 3, Type: pb.MsgVoteResp}) //2->1 MsgVoteResp. Mimic node2 voting for node1
//node 1 receives the quorum(itself and node2 vote), therefore becomes the leader
require.Equal(t, pb.StateLeader, rn.raft.state)
// a dummy entry is appended to the local unstable log and to the ready struct,
// which will be picked up by application and sent out to followers via MsgApp

t.Log(rn.raft.trk.Progress(1).String())
t.Log(rn.raft.trk.Progress(2).String())

// mimic that node 2 has replicated the two uncommited entries(from perspective of node1)
// This will invoke maybeCommit.
// but cannot commit because the term number of those two uncommited entries is less than current node1 leader term
// there is also no storage read for term involved since node 1 still has those entries in unstable
// 2->1 MsgAppResp. Mimic node2 has persisted the logs up to index 2
rn.Step(pb.Message{To: 1, From: 2, Term: 3, Type: pb.MsgAppResp, Index: 2})

t.Log(rn.raft.trk.Progress(1).String())
t.Log(rn.raft.trk.Progress(2).String())

t.Log("describe node1 2nd ready")
//A new dummy entry should be in ready for replicating to followers and persisting because node1 just became leader
t.Log("dummy")
rd = rn.Ready()
t.Logf("%s",
DescribeReady(rd, func(bytes []byte) string { return "" }))
//Ready MustSync=true:
//State:StateLeader
//HardState Term:3 Vote:1 Commit:0 Lead:1 LeadEpoch:1
//Entries:
//3/3 EntryNormal
//Messages:
//1->2 MsgFortifyLeader Term:3 Log:0/0
//1->2 MsgApp Term:3 Log:2/2 Entries:[3/3 EntryNormal]
//1->2 MsgApp Term:3 Log:2/2 Entries:[3/3 EntryNormal]
//1->AppendThread MsgStorageAppend Term:3 Log:3/3 Vote:1 Lead:1 LeadEpoch:1 Entries:[3/3 EntryNormal] Responses:[
// AppendThread->1 MsgStorageAppendResp Term:0 Log:3/3
// 1->1 MsgAppResp Term:3 Log:0/3
// 1->1 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1
//]

// a dummy entry is generated after node1 becomes leader, which is added to ready for replicating to followers
s.SetHardState(rd.HardState)
s.Append(rd.Entries)

// MsgStorageAppend
require.Equal(t, pb.MsgStorageAppend, rd.Messages[3].Type)

// Process MsgStorageAppendResp
// We are telling the leader node1 that the new dummy entry(index3, term3) has been persisted on log
// therefore the leader rawnode flushes its unstablelog,
// and its prev.entry variable which is still in memory, now points to the dummy entry(which was just flushed)
rn.Step(rd.Messages[3].Responses[0]) // AppendThread->1 MsgStorageAppendResp Term:0 Log:3/4

// The leader node's application layer telling itself that it has received MsgApp(for dummy entry) from itself.
// This will invoke maybeCommit again for the previous 2 entries because there is still a quorum for index 2
// (maybeCommit() is invoked as long as there is an uncommited quorum index, maybe need optimization)
// The action is the same as before,
// only this time a storage access is involved since we lost memory access for the entry we are checking
// "Returned term from storage. Slow access. index: 2 , term: 2"
rn.Step(rd.Messages[3].Responses[1]) // 1->1 MsgAppResp Term:3 Log:0/3,

// Leader can't commit anything, commit index should still stay at 0
require.Equal(t, uint64(0), rn.raft.raftLog.committed)

// Add new entry 4 to the Ready message. This is mimicking client's request for new entry
rn.Propose([]byte("foo")) // entry 4
rd = rn.Ready()

t.Logf("%s",
DescribeReady(rd, func(bytes []byte) string { return "" }))
// Ready MustSync=true:
// Entries:
// 3/4 EntryNormal
// Messages:
// 1->2 MsgApp Term:3 Log:3/3 Entries:[3/4 EntryNormal]
// 1->AppendThread MsgStorageAppend Term:0 Log:3/4 Entries:[3/4 EntryNormal] Responses:[
// AppendThread->1 MsgStorageAppendResp Term:0 Log:3/4
// 1->1 MsgAppResp Term:3 Log:0/4
// ]

s.SetHardState(rd.HardState)
s.Append(rd.Entries)
t.Log("entry 4")
// Local store has persisted entry 4
rn.Step(rd.Messages[1].Responses[0]) //AppendThread->1 MsgStorageAppendResp Term:0 Log:3/4
// t.Log("entry 4 second response")
// right now entry4 with entry id{term:3, index:4} is removed from unstable log of node1(leader)
// because storage has responded and said that log is persisted
// unstable log of node1(leader) is empty right now
// the node still has access to the term of entry id{term:3, index:4}, because prev.term is kept in memory
// but doesn't have access to the term of entry id{term:3, index:3}

// This will invoke maybeCommit and a term check
// the leader calls maybeCommit() everytime the leader receive a MsgAppResp.
// The want to commit quorum is still at index 2,
// because we never heard back from node2 saying it has persisted any entry greater than 2
// "Returned term from storage. Slow access. index: 2 , term: 2"
// maybeCommit will return false because the term check failed, leader is currently in term 3. The commitIndex will stay at zero
rn.Step(rd.Messages[1].Responses[1]) // 1->1 MsgAppResp Term:3 Log:0/4
// leader still can't commit anything
require.Equal(t, uint64(0), rn.raft.raftLog.committed)

t.Log("step MsgAppResp 3")
// The follower replies to the MsgApp of entry id{term:3, index:3}.
// leader attempts to commit entry id{term:3, index:3}, and has to verify the term in storage

// Since its MsgAppResp, this will invoke maybeCommit and a term check,
// The term check is a fetch from storage, since we lost access to the term of entry index 3
// "Returned term from storage. Slow access. index: 3 , term: 3"
// maybeCommit will return true because the leader is currently in term 3.
//
// And we are trying to advance commit index to 3, which is the dummy entry generated automatically by leader node1,
// with entry id{term:3, index:3}
//
// Even though the previous 2 entries are from term2, we can still commit them along with the dummy entry(from term3).
// Section 5.4.2 of raft paper has more details (https://raft.github.io/raft.pdf):
//
// 2->1 MsgAppResp. Mimic node2 has persisted the logs up to index 3(index3 is the dummy entry with term = 3).
rn.Step(pb.Message{To: 1, From: 2, Type: pb.MsgAppResp, Index: 3})

// This time we can commit, commit index is advanced to 3.
// Not 4 because we don't have follower quorum for entry4(index:4, term:3),
//
// If node 2 sends another MsgAppResp with index:4 indicating that it has persisted entry 4,
// then we can advance commit index to 4
require.Equal(t, uint64(3), rn.raft.raftLog.committed)

// should only trigger storage read for a term number once, after the optimization
require.Equal(t, 1, s.callStats.term)
// Before the idxPreLeading optimization, we would fetch a term number from storage exactly 4 times.
// 3 times more than after the optimization.
// You can kind of see that the storage fetch for term happens quite often, so this minor optimization can help a bit.
}

func BenchmarkRawNode(b *testing.B) {
cases := []struct {
name string
Expand Down

0 comments on commit bd7dc5b

Please sign in to comment.