Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: remove apply pacing #133175

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 17 additions & 24 deletions pkg/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,37 +73,27 @@ type raftLog struct {
applied uint64

logger raftlogger.Logger

// maxApplyingEntsSize limits the outstanding byte size of the messages
// returned from calls to nextCommittedEnts that have not been acknowledged
// by a call to appliedTo.
maxApplyingEntsSize entryEncodingSize
}

// newLog returns log using the given storage and default options. It
// recovers the log to the state that it just commits and applies the
// latest snapshot.
// newLog returns a raft log initialized to the state in the given storage.
func newLog(storage Storage, logger raftlogger.Logger) *raftLog {
return newLogWithSize(storage, logger, noLimit)
}

// newLogWithSize returns a log using the given storage and max
// message size.
func newLogWithSize(
storage Storage, logger raftlogger.Logger, maxApplyingEntsSize entryEncodingSize,
) *raftLog {
firstIndex, lastIndex := storage.FirstIndex(), storage.LastIndex()
lastTerm, err := storage.Term(lastIndex)
if err != nil {
panic(err) // TODO(pav-kv): the storage should always cache the last term.
}
last := entryID{term: lastTerm, index: lastIndex}
return &raftLog{
storage: storage,
unstable: newUnstable(last, logger),
maxApplyingEntsSize: maxApplyingEntsSize,

// Initialize our committed and applied pointers to the time of the last compaction.
storage: storage,
unstable: newUnstable(last, logger),

// Initialize our committed and applied pointers to the time of the last
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏽

// compaction.
// TODO(pav-kv): this is insufficient, and the caller (newRaft) additionally
// moves the committed and applied indices forward, based on the loaded
// HardState and Config.Applied. We should consolidate initialization. All
// the related initial state can be read in one place, and passed in to
// newRaft/newLog instead of being read in 3 different places.
committed: firstIndex - 1,
applying: firstIndex - 1,
applied: firstIndex - 1,
Expand Down Expand Up @@ -260,12 +250,15 @@ func (l *raftLog) applyingEntsPaused() bool {
return l.applying > l.applied
}

// nextCommittedEnts returns all the available entries for execution.
// nextCommittedEnts returns a batch of committed entries to be applied to the
// state machine next. The total size of the returned entries does not exceed
// maxSize, except when the first entry is larger than this limit.
//
// Entries can be committed even when the local raft instance has not durably
// appended them to the local raft log yet. If allowUnstable is true, committed
// entries from the unstable log may be returned; otherwise, only entries known
// to reside locally on stable storage will be returned.
func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
func (l *raftLog) nextCommittedEnts(maxSize entryEncodingSize, allowUnstable bool) []pb.Entry {
if l.applyingEntsPaused() {
// Some entries are still being applied.
return nil
Expand All @@ -279,7 +272,7 @@ func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
// Nothing to apply.
return nil
}
ents, err := l.slice(lo, hi, l.maxApplyingEntsSize)
ents, err := l.slice(lo, hi, maxSize)
if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,15 +352,14 @@ func TestNextCommittedEnts(t *testing.T) {
}

hasNext := raftLog.hasNextCommittedEnts(tt.allowUnstable)
next := raftLog.nextCommittedEnts(tt.allowUnstable)
next := raftLog.nextCommittedEnts(noLimit, tt.allowUnstable)
require.Equal(t, next != nil, hasNext)
require.Equal(t, tt.want, next)
})
}
}

func TestAcceptApplying(t *testing.T) {
maxSize := entryEncodingSize(100)
snap := pb.Snapshot{
Metadata: pb.SnapshotMetadata{Term: 1, Index: 3},
}
Expand All @@ -376,7 +375,7 @@ func TestAcceptApplying(t *testing.T) {
t.Run("", func(t *testing.T) {
storage := NewMemoryStorage()
require.NoError(t, storage.ApplySnapshot(snap))
raftLog := newLogWithSize(storage, raftlogger.DiscardLogger, maxSize)
raftLog := newLog(storage, raftlogger.DiscardLogger)
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

Expand All @@ -392,7 +391,6 @@ func TestAcceptApplying(t *testing.T) {
}

func TestAppliedTo(t *testing.T) {
maxSize := entryEncodingSize(100)
snap := pb.Snapshot{
Metadata: pb.SnapshotMetadata{Term: 1, Index: 3},
}
Expand All @@ -409,7 +407,7 @@ func TestAppliedTo(t *testing.T) {
t.Run("", func(t *testing.T) {
storage := NewMemoryStorage()
require.NoError(t, storage.ApplySnapshot(snap))
raftLog := newLogWithSize(storage, raftlogger.DiscardLogger, maxSize)
raftLog := newLog(storage, raftlogger.DiscardLogger)
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

Expand Down
8 changes: 6 additions & 2 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ type raft struct {

maxMsgSize entryEncodingSize
maxUncommittedSize entryPayloadSize
// maxApplyingEntsSize limits the byte size of MsgStorageApply messages
// returned from Ready calls.
maxApplyingEntsSize entryEncodingSize

config quorum.Config
trk tracker.ProgressTracker
Expand Down Expand Up @@ -433,7 +436,7 @@ func newRaft(c *Config) *raft {
if err := c.validate(); err != nil {
panic(err.Error())
}
raftlog := newLogWithSize(c.Storage, c.Logger, entryEncodingSize(c.MaxCommittedSizePerReady))
raftlog := newLog(c.Storage, c.Logger)
hs, cs, err := c.Storage.InitialState()
if err != nil {
panic(err) // TODO(bdarnell)
Expand All @@ -446,6 +449,7 @@ func newRaft(c *Config) *raft {
raftLog: raftlog,
maxMsgSize: entryEncodingSize(c.MaxSizePerMsg),
maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize),
maxApplyingEntsSize: entryEncodingSize(c.MaxCommittedSizePerReady),
lazyReplication: c.LazyReplication,
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
Expand Down Expand Up @@ -1337,7 +1341,7 @@ func (r *raft) hasUnappliedConfChanges() bool {
// via the Ready struct for application.
// TODO(pavelkalinnikov): find a way to budget memory/bandwidth for this scan
// outside the raft package.
pageSize := r.raftLog.maxApplyingEntsSize
pageSize := r.maxApplyingEntsSize
if err := r.raftLog.scan(lo, hi, pageSize, func(ents []pb.Entry) error {
for i := range ents {
if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 {
Expand Down
6 changes: 3 additions & 3 deletions pkg/raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func TestLeaderCommitEntry(t *testing.T) {
assert.Equal(t, li+1, r.raftLog.committed)
assert.Equal(t, []pb.Entry{
{Index: li + 1, Term: 1, Data: []byte("some data")},
}, r.raftLog.nextCommittedEnts(true))
}, r.raftLog.nextCommittedEnts(noLimit, true))
msgs := r.readMessages()
slices.SortFunc(msgs, cmpMessages)
for i, m := range msgs {
Expand Down Expand Up @@ -515,7 +515,7 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) {
assert.Equal(t, append(tt,
pb.Entry{Term: 3, Index: li + 1},
pb.Entry{Term: 3, Index: li + 2, Data: []byte("some data")},
), r.raftLog.nextCommittedEnts(true), "#%d", i)
), r.raftLog.nextCommittedEnts(noLimit, true), "#%d", i)
}
}

Expand Down Expand Up @@ -562,7 +562,7 @@ func TestFollowerCommitEntry(t *testing.T) {
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit})

assert.Equal(t, tt.commit, r.raftLog.committed, "#%d", i)
assert.Equal(t, tt.ents[:int(tt.commit)], r.raftLog.nextCommittedEnts(true), "#%d", i)
assert.Equal(t, tt.ents[:int(tt.commit)], r.raftLog.nextCommittedEnts(noLimit, true), "#%d", i)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
r.advanceMessagesAfterAppend()

// Return committed entries.
ents = r.raftLog.nextCommittedEnts(true)
ents = r.raftLog.nextCommittedEnts(noLimit, true)
r.raftLog.appliedTo(r.raftLog.committed)
return ents
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (rn *RawNode) readyWithoutAccept() Ready {

rd := Ready{
Entries: r.raftLog.nextUnstableEnts(),
CommittedEntries: r.raftLog.nextCommittedEnts(rn.applyUnstableEntries()),
CommittedEntries: r.raftLog.nextCommittedEnts(r.maxApplyingEntsSize, rn.applyUnstableEntries()),
Messages: r.msgs,
}
if softSt := r.softState(); !softSt.equal(rn.prevSoftSt) {
Expand Down
Loading