Skip to content

Commit fa92397

Browse files
authored
Merge pull request #10258 from ajwerner/ajwerner/raft_committed_entries_size
raft: separate MaxCommittedSizePerReady config from MaxSizePerMsg
2 parents ee9dcbc + e4af2be commit fa92397

File tree

3 files changed

+19
-8
lines changed

3 files changed

+19
-8
lines changed

raft/log.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ type raftLog struct {
3939

4040
logger Logger
4141

42-
maxMsgSize uint64
42+
// maxNextEntsSize is the maximum number aggregate byte size of the messages
43+
// returned from calls to nextEnts.
44+
maxNextEntsSize uint64
4345
}
4446

4547
// newLog returns log using the given storage and default options. It
@@ -51,14 +53,14 @@ func newLog(storage Storage, logger Logger) *raftLog {
5153

5254
// newLogWithSize returns a log using the given storage and max
5355
// message size.
54-
func newLogWithSize(storage Storage, logger Logger, maxMsgSize uint64) *raftLog {
56+
func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog {
5557
if storage == nil {
5658
log.Panic("storage must not be nil")
5759
}
5860
log := &raftLog{
59-
storage: storage,
60-
logger: logger,
61-
maxMsgSize: maxMsgSize,
61+
storage: storage,
62+
logger: logger,
63+
maxNextEntsSize: maxNextEntsSize,
6264
}
6365
firstIndex, err := storage.FirstIndex()
6466
if err != nil {
@@ -149,7 +151,7 @@ func (l *raftLog) unstableEntries() []pb.Entry {
149151
func (l *raftLog) nextEnts() (ents []pb.Entry) {
150152
off := max(l.applied+1, l.firstIndex())
151153
if l.committed+1 > off {
152-
ents, err := l.slice(off, l.committed+1, l.maxMsgSize)
154+
ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize)
153155
if err != nil {
154156
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
155157
}

raft/node_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -894,7 +894,7 @@ func TestAppendPagination(t *testing.T) {
894894
func TestCommitPagination(t *testing.T) {
895895
s := NewMemoryStorage()
896896
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
897-
cfg.MaxSizePerMsg = 2048
897+
cfg.MaxCommittedSizePerReady = 2048
898898
r := newRaft(cfg)
899899
n := newNode()
900900
go n.run(r)

raft/raft.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ type Config struct {
154154
// throughput during normal replication. Note: math.MaxUint64 for unlimited,
155155
// 0 for at most one entry per message.
156156
MaxSizePerMsg uint64
157+
// MaxCommittedSizePerReady limits the size of the committed entries which
158+
// can be applied.
159+
MaxCommittedSizePerReady uint64
157160
// MaxUncommittedEntriesSize limits the aggregate byte size of the
158161
// uncommitted entries that may be appended to a leader's log. Once this
159162
// limit is exceeded, proposals will begin to return ErrProposalDropped
@@ -224,6 +227,12 @@ func (c *Config) validate() error {
224227
c.MaxUncommittedEntriesSize = noLimit
225228
}
226229

230+
// default MaxCommittedSizePerReady to MaxSizePerMsg because they were
231+
// previously the same parameter.
232+
if c.MaxCommittedSizePerReady == 0 {
233+
c.MaxCommittedSizePerReady = c.MaxSizePerMsg
234+
}
235+
227236
if c.MaxInflightMsgs <= 0 {
228237
return errors.New("max inflight messages must be greater than 0")
229238
}
@@ -316,7 +325,7 @@ func newRaft(c *Config) *raft {
316325
if err := c.validate(); err != nil {
317326
panic(err.Error())
318327
}
319-
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxSizePerMsg)
328+
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
320329
hs, cs, err := c.Storage.InitialState()
321330
if err != nil {
322331
panic(err) // TODO(bdarnell)

0 commit comments

Comments
 (0)