Skip to content

Commit 7a8ab37

Browse files
committed
raft: fix correctness bug in CommittedEntries pagination
In #9982, a mechanism to limit the size of `CommittedEntries` was introduced. The way this mechanism worked was that it would load applicable entries (passing the max size hint) and would emit a `HardState` whose commit index was truncated to match the limitation applied to the entries. Unfortunately, this was subtly incorrect when the user-provided `Entries` implementation didn't exactly match what Raft uses internally. Depending on whether a `Node` or a `RawNode` was used, this would either lead to regressing the HardState's commit index or outright forgetting to apply entries, respectively. Asking implementers to precisely match the Raft size limitation semantics was considered but looks like a bad idea as it puts correctness squarely in the hands of downstream users. Instead, this PR removes the truncation of `HardState` when limiting is active and tracks the applied index separately. This removes the old paradigm (that the previous code tried to work around) that the client will always apply all the way to the commit index, which isn't true when commit entries are paginated. See [1] for more on the discovery of this bug (CockroachDB's implementation of `Entries` returns one more entry than Raft's when the size limit hits). [1]: cockroachdb/cockroach#28918 (comment)
1 parent 6143c13 commit 7a8ab37

File tree

5 files changed

+188
-22
lines changed

5 files changed

+188
-22
lines changed

raft/node.go

+20-11
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,19 @@ func (rd Ready) containsUpdates() bool {
109109
len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
110110
}
111111

112+
// appliedCursor extracts from the Ready the highest index the client has
113+
// applied (once the Ready is confirmed via Advance). If no information is
114+
// contained in the Ready, returns zero.
115+
func (rd Ready) appliedCursor() uint64 {
116+
if n := len(rd.CommittedEntries); n > 0 {
117+
return rd.CommittedEntries[n-1].Index
118+
}
119+
if index := rd.Snapshot.Metadata.Index; index > 0 {
120+
return index
121+
}
122+
return 0
123+
}
124+
112125
// Node represents a node in a raft cluster.
113126
type Node interface {
114127
// Tick increments the internal logical clock for the Node by a single tick. Election
@@ -282,6 +295,7 @@ func (n *node) run(r *raft) {
282295
var prevLastUnstablei, prevLastUnstablet uint64
283296
var havePrevLastUnstablei bool
284297
var prevSnapi uint64
298+
var applyingToI uint64
285299
var rd Ready
286300

287301
lead := None
@@ -381,13 +395,17 @@ func (n *node) run(r *raft) {
381395
if !IsEmptySnap(rd.Snapshot) {
382396
prevSnapi = rd.Snapshot.Metadata.Index
383397
}
398+
if index := rd.appliedCursor(); index != 0 {
399+
applyingToI = index
400+
}
384401

385402
r.msgs = nil
386403
r.readStates = nil
387404
advancec = n.advancec
388405
case <-advancec:
389-
if prevHardSt.Commit != 0 {
390-
r.raftLog.appliedTo(prevHardSt.Commit)
406+
if applyingToI != 0 {
407+
r.raftLog.appliedTo(applyingToI)
408+
applyingToI = 0
391409
}
392410
if havePrevLastUnstablei {
393411
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
@@ -559,15 +577,6 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
559577
}
560578
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
561579
rd.HardState = hardSt
562-
// If we hit a size limit when loadaing CommittedEntries, clamp
563-
// our HardState.Commit to what we're actually returning. This is
564-
// also used as our cursor to resume for the next Ready batch.
565-
if len(rd.CommittedEntries) > 0 {
566-
lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1]
567-
if rd.HardState.Commit > lastCommit.Index {
568-
rd.HardState.Commit = lastCommit.Index
569-
}
570-
}
571580
}
572581
if r.raftLog.unstable.snapshot != nil {
573582
rd.Snapshot = *r.raftLog.unstable.snapshot

raft/node_test.go

+71
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ package raft
1717
import (
1818
"bytes"
1919
"context"
20+
"fmt"
21+
"math"
2022
"reflect"
2123
"strings"
2224
"testing"
@@ -926,3 +928,72 @@ func TestCommitPagination(t *testing.T) {
926928
s.Append(rd.Entries)
927929
n.Advance()
928930
}
931+
932+
type ignoreSizeHintMemStorage struct {
933+
*MemoryStorage
934+
}
935+
936+
func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raftpb.Entry, error) {
937+
return s.MemoryStorage.Entries(lo, hi, math.MaxUint64)
938+
}
939+
940+
// TestNodeCommitPaginationAfterRestart regression tests a scenario in which the
941+
// Storage's Entries size limitation is slightly more permissive than Raft's
942+
// internal one. The original bug was the following:
943+
//
944+
// - node learns that index 11 (or 100, doesn't matter) is committed
945+
// - nextEnts returns index 1..10 in CommittedEntries due to size limiting. However,
946+
// index 10 already exceeds maxBytes, due to a user-provided impl of Entries.
947+
// - Commit index gets bumped to 10
948+
// - the node persists the HardState, but crashes before applying the entries
949+
// - upon restart, the storage returns the same entries, but `slice` takes a different code path
950+
// (since it is now called with an upper bound of 10) and removes the last entry.
951+
// - Raft emits a HardState with a regressing commit index.
952+
//
953+
// A simpler version of this test would have the storage return a lot less entries than dictated
954+
// by maxSize (for example, exactly one entry) after the restart, resulting in a larger regression.
955+
// This wouldn't need to exploit anything about Raft-internal code paths to fail.
956+
func TestNodeCommitPaginationAfterRestart(t *testing.T) {
957+
s := &ignoreSizeHintMemStorage{
958+
MemoryStorage: NewMemoryStorage(),
959+
}
960+
persistedHardState := raftpb.HardState{
961+
Term: 1,
962+
Vote: 1,
963+
Commit: 10,
964+
}
965+
966+
s.hardState = persistedHardState
967+
s.ents = make([]raftpb.Entry, 10)
968+
var size uint64
969+
for i := range s.ents {
970+
ent := raftpb.Entry{
971+
Term: 1,
972+
Index: uint64(i + 1),
973+
Type: raftpb.EntryNormal,
974+
Data: []byte("a"),
975+
}
976+
977+
s.ents[i] = ent
978+
size += uint64(ent.Size())
979+
}
980+
981+
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
982+
// Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
983+
// not be included in the initial rd.CommittedEntries. However, our storage will ignore
984+
// this and *will* return it (which is how the Commit index ended up being 10 initially).
985+
cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
986+
987+
r := newRaft(cfg)
988+
n := newNode()
989+
go n.run(r)
990+
defer n.Stop()
991+
992+
rd := readyWithTimeout(&n)
993+
if !IsEmptyHardState(rd.HardState) && rd.HardState.Commit < persistedHardState.Commit {
994+
t.Errorf("HardState regressed: Commit %d -> %d\nCommitting:\n%+v",
995+
persistedHardState.Commit, rd.HardState.Commit,
996+
DescribeEntries(rd.CommittedEntries, func(data []byte) string { return fmt.Sprintf("%q", data) }),
997+
)
998+
}
999+
}

raft/rawnode.go

+8-11
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,15 @@ func (rn *RawNode) commitReady(rd Ready) {
4747
if !IsEmptyHardState(rd.HardState) {
4848
rn.prevHardSt = rd.HardState
4949
}
50-
if rn.prevHardSt.Commit != 0 {
51-
// In most cases, prevHardSt and rd.HardState will be the same
52-
// because when there are new entries to apply we just sent a
53-
// HardState with an updated Commit value. However, on initial
54-
// startup the two are different because we don't send a HardState
55-
// until something changes, but we do send any un-applied but
56-
// committed entries (and previously-committed entries may be
57-
// incorporated into the snapshot, even if rd.CommittedEntries is
58-
// empty). Therefore we mark all committed entries as applied
59-
// whether they were included in rd.HardState or not.
60-
rn.raft.raftLog.appliedTo(rn.prevHardSt.Commit)
50+
51+
// If entries were applied (or a snapshot), update our cursor for
52+
// the next Ready. Note that if the current HardState contains a
53+
// new Commit index, this does not mean that we're also applying
54+
// all of the new entries due to commit pagination by size.
55+
if index := rd.appliedCursor(); index > 0 {
56+
rn.raft.raftLog.appliedTo(index)
6157
}
58+
6259
if len(rd.Entries) > 0 {
6360
e := rd.Entries[len(rd.Entries)-1]
6461
rn.raft.raftLog.stableTo(e.Index, e.Term)

raft/rawnode_test.go

+79
Original file line numberDiff line numberDiff line change
@@ -401,3 +401,82 @@ func TestRawNodeStatus(t *testing.T) {
401401
t.Errorf("expected status struct, got nil")
402402
}
403403
}
404+
405+
// TestRawNodeCommitPaginationAfterRestart is the RawNode version of
406+
// TestNodeCommitPaginationAfterRestart. The anomaly here was even worse as the
407+
// Raft group would forget to apply entries:
408+
//
409+
// - node learns that index 11 is committed
410+
// - nextEnts returns index 1..10 in CommittedEntries (but index 10 already
411+
// exceeds maxBytes), which isn't noticed internally by Raft
412+
// - Commit index gets bumped to 10
413+
// - the node persists the HardState, but crashes before applying the entries
414+
// - upon restart, the storage returns the same entries, but `slice` takes a
415+
// different code path and removes the last entry.
416+
// - Raft does not emit a HardState, but when the app calls Advance(), it bumps
417+
// its internal applied index cursor to 10 (when it should be 9)
418+
// - the next Ready asks the app to apply index 11 (omitting index 10), losing a
419+
// write.
420+
func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
421+
s := &ignoreSizeHintMemStorage{
422+
MemoryStorage: NewMemoryStorage(),
423+
}
424+
persistedHardState := raftpb.HardState{
425+
Term: 1,
426+
Vote: 1,
427+
Commit: 10,
428+
}
429+
430+
s.hardState = persistedHardState
431+
s.ents = make([]raftpb.Entry, 10)
432+
var size uint64
433+
for i := range s.ents {
434+
ent := raftpb.Entry{
435+
Term: 1,
436+
Index: uint64(i + 1),
437+
Type: raftpb.EntryNormal,
438+
Data: []byte("a"),
439+
}
440+
441+
s.ents[i] = ent
442+
size += uint64(ent.Size())
443+
}
444+
445+
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
446+
// Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
447+
// not be included in the initial rd.CommittedEntries. However, our storage will ignore
448+
// this and *will* return it (which is how the Commit index ended up being 10 initially).
449+
cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
450+
451+
s.ents = append(s.ents, raftpb.Entry{
452+
Term: 1,
453+
Index: uint64(11),
454+
Type: raftpb.EntryNormal,
455+
Data: []byte("boom"),
456+
})
457+
458+
rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}})
459+
if err != nil {
460+
t.Fatal(err)
461+
}
462+
463+
for highestApplied := uint64(0); highestApplied != 11; {
464+
rd := rawNode.Ready()
465+
n := len(rd.CommittedEntries)
466+
if n == 0 {
467+
t.Fatalf("stopped applying entries at index %d", highestApplied)
468+
}
469+
if next := rd.CommittedEntries[0].Index; highestApplied != 0 && highestApplied+1 != next {
470+
t.Fatalf("attempting to apply index %d after index %d, leaving a gap", next, highestApplied)
471+
}
472+
highestApplied = rd.CommittedEntries[n-1].Index
473+
rawNode.Advance(rd)
474+
rawNode.Step(raftpb.Message{
475+
Type: raftpb.MsgHeartbeat,
476+
To: 1,
477+
From: 1, // illegal, but we get away with it
478+
Term: 1,
479+
Commit: 11,
480+
})
481+
}
482+
}

raft/util.go

+10
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,16 @@ func DescribeEntry(e pb.Entry, f EntryFormatter) string {
113113
return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted)
114114
}
115115

116+
// DescribeEntries calls DescribeEntry for each Entry, adding a newline to
117+
// each.
118+
func DescribeEntries(ents []pb.Entry, f EntryFormatter) string {
119+
var buf bytes.Buffer
120+
for _, e := range ents {
121+
_, _ = buf.WriteString(DescribeEntry(e, f) + "\n")
122+
}
123+
return buf.String()
124+
}
125+
116126
func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry {
117127
if len(ents) == 0 {
118128
return ents

0 commit comments

Comments
 (0)