Skip to content

Commit

Permalink
raft: Asserting current state storage access for term number when lea…
Browse files Browse the repository at this point in the history
…der commits

Currently, when the raft leader wants to commit an entry(advance commit index), it must check the term of the entry it wants to commit and ensure the term number is the same as the current leader term(term of itself). This may involve a storage access for the term number if the entry has been persisted on stable storage and no longer in the unstable log of the leader node.

This change adds an assertion for this existing storage access for term by adding a scenario that triggers storage access in TestLeaderAppResp() and asserting storage has been called(logged by MemoryStorage.callStats.term)

Jira issue: CRDB-45763
Fixes: #137826

Release note: None
  • Loading branch information
hakuuww committed Jan 27, 2025
1 parent eb0fc95 commit 27d717f
Showing 1 changed file with 47 additions and 13 deletions.
60 changes: 47 additions & 13 deletions pkg/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2572,22 +2572,53 @@ func TestLeaderAppResp(t *testing.T) {
reject bool
// progress
wmatch uint64
wnext uint64
// message
wmsgNum int
windex uint64
// log index of the next entry to send to this follower.
wnext uint64
// number of messages the leader sends out
wmsgNum int
// prevLogIndex in MsgApp from leader to followers
windex uint64
// leader's commit index
wcommitted uint64
// storage access counts for getting term number
ctStgTerm int
}{
{2, true, 0, 4, 0, 0, 0}, // stale resp; no replies
{6, true, 0, 4, 0, 0, 0}, // stale resp; no replies
{3, true, 0, 3, 1, 2, 0}, // denied resp; leader does not commit; decrease next and send probing msg

{4, false, 4, 7, 1, 4, 4}, // accept resp; leader commits; broadcast with commit index
// Follower is StateProbing at 4, it sends MsgAppResp for 4, and is moved to
// StateReplicate and as many entries as possible are sent to it (5, 6).
// stale resp; no replies
{2, true, 0, 4, 0, 0, 0, 1},
// stale resp; no replies
{6, true, 0, 4, 0, 0, 0, 1},

// denied resp; leader does not commit; decrease next and send probing msg
// An additional term storage access is involved for an entry
// that's already persisted since we are probing backwards
{3, true, 0, 3, 1, 2, 0, 2},

// Follower 2 responds to leader, indicating log index 2 is replicated.
// Leader tries to commit, but commit index doesn't advance since the index
// is from a previous term
// We hit maybeCommit() and do term check by getting the term number from
// storage
{2, false, 2, 7, 1, 2, 0, 3},

// NB: For the following tests, we are skipping the MsgAppResp for the first
// 3 entries, by directly processing MsgAppResp for later entries
//
// Follower 2 is StateProbing at 4, it sends MsgAppResp for 4, and is moved
// to StateReplicate and as many entries as possible are sent to it (5, 6).
// Correspondingly the Next is then 7 (entry 7 does not exist, indicating
// the follower will be up to date should it process the emitted MsgApp).
{4, false, 4, 7, 1, 4, 4},
// accept resp; leader commits; broadcast with commit index
{4, false, 4, 7, 1, 4, 4, 1},

// Follower 2 says term2, index5 is already replicated
// The leader responds with the updated commit index to follower 2
// The leader sends entry{term:2, index:6} to follower 2
{5, false, 5, 7, 1, 5, 5, 1},
// Follower 2 says term2, index6 is already replicated
// The leader responds with the updated commit index to follower 2
// The leader sends no additional entries to follower 2 since all logs are
// up to date
{6, false, 6, 7, 1, 6, 6, 1},
} {
t.Run("", func(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2, 3))
Expand All @@ -2600,9 +2631,10 @@ func TestLeaderAppResp(t *testing.T) {
sm.becomeLeader()
require.Equal(t, uint64(4), sm.raftLog.lastIndex()) // appended a dummy
sm.appendEntry(index(5).terms(2, 2)...)
require.Equal(t, uint64(0), sm.raftLog.committed)
sm.bcastAppend()

sm.readMessages()

require.NoError(t, sm.Step(pb.Message{
From: 2,
Type: pb.MsgAppResp,
Expand All @@ -2622,6 +2654,8 @@ func TestLeaderAppResp(t *testing.T) {
require.Equal(t, tt.windex, msg.Index, "%v", DescribeMessage(msg, nil))
require.Equal(t, tt.wcommitted, msg.Commit, "%v", DescribeMessage(msg, nil))
}

assert.Equal(t, tt.ctStgTerm, storage.callStats.term)
})
}
}
Expand Down

0 comments on commit 27d717f

Please sign in to comment.