Skip to content

Commit 58413cb

Browse files
storage: replace remote proposal tracking with uncommitted log size protection
This change reverts most of the non-testing code from 03b116f and f2f3fd2 and replaces it with use of the MaxUncommittedEntriesSize config. This configuration was added in etcd-io/etcd#10167 and provides protection against unbounded Raft log growth when a Raft group stops being able to commit entries. It makes proposals into Raft safer because proposers don't need to verify before the fact that the proposal isn't a duplicate that might be blowing up the size of the Raft group. By default, the configuration is set to double the Replica's proposal quota. The logic here is that the quotaPool should be responsible for throttling proposals in all cases except for unbounded Raft re-proposals because it queues efficiently instead of dropping proposals on the floor indiscriminately. Release note (bug fix): Fix a bug where Raft proposals could get stuck if forwarded to a leader who could not itself append a new entry to its log.
1 parent e4003e1 commit 58413cb

File tree

8 files changed

+108
-362
lines changed

8 files changed

+108
-362
lines changed

pkg/base/config.go

+29
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,23 @@ type RaftConfig struct {
446446
// performing log truncations.
447447
RaftLogMaxSize int64
448448

449+
// RaftProposalQuota controls the maximum aggregate size of Raft commands
450+
// that a leader is allowed to propose concurrently.
451+
//
452+
// By default, the quota is set to a fraction of the RaftLogMaxSize. In
453+
// doing so, we ensure all replicas have sufficiently up to date logs so
454+
// that when the log gets truncated, the followers do not need
455+
// non-preemptive snapshots. Changing this deserves care. Too low and
456+
// everything comes to a grinding halt, too high and we're not really
457+
// throttling anything (we'll still generate snapshots).
458+
RaftProposalQuota int64
459+
460+
// RaftMaxUncommittedEntriesSize controls how large the uncommitted tail of
461+
// the Raft log can grow. The limit is meant to provide protection against
462+
// unbounded Raft log growth when quorum is lost and entries stop being
463+
// committed but continue to be proposed.
464+
RaftMaxUncommittedEntriesSize uint64
465+
449466
// RaftMaxSizePerMsg controls how many Raft log entries the leader will send to
450467
// followers in a single MsgApp.
451468
RaftMaxSizePerMsg uint64
@@ -474,6 +491,18 @@ func (cfg *RaftConfig) SetDefaults() {
474491
if cfg.RaftLogMaxSize == 0 {
475492
cfg.RaftLogMaxSize = defaultRaftLogMaxSize
476493
}
494+
if cfg.RaftProposalQuota == 0 {
495+
// By default, set this to a fraction of RaftLogMaxSize. See comment
496+
// above for the tradeoffs of setting this higher or lower.
497+
cfg.RaftProposalQuota = cfg.RaftLogMaxSize / 4
498+
}
499+
if cfg.RaftMaxUncommittedEntriesSize == 0 {
500+
// By default, set this to twice the RaftProposalQuota. The logic here
501+
// is that the quotaPool should be responsible for throttling proposals
502+
// in all cases except for unbounded Raft re-proposals because it queues
503+
// efficiently instead of dropping proposals on the floor indiscriminately.
504+
cfg.RaftMaxUncommittedEntriesSize = uint64(2 * cfg.RaftProposalQuota)
505+
}
477506
if cfg.RaftMaxSizePerMsg == 0 {
478507
cfg.RaftMaxSizePerMsg = uint64(defaultRaftMaxSizePerMsg)
479508
}

pkg/storage/client_raft_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -1155,6 +1155,8 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
11551155
sc.RaftTickInterval = 10 * time.Millisecond
11561156
// Don't timeout raft leader. We don't want leadership moving.
11571157
sc.RaftElectionTimeoutTicks = 1000000
1158+
// Reduce the max uncommitted entry size.
1159+
sc.RaftMaxUncommittedEntriesSize = 64 << 10 // 64 KB
11581160
// Disable leader transfers during leaseholder changes so that we
11591161
// can easily create leader-not-leaseholder scenarios.
11601162
sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true
@@ -1233,7 +1235,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
12331235
// While a majority nodes are down, write some data.
12341236
putRes := make(chan *roachpb.Error)
12351237
go func() {
1236-
putArgs := putArgs([]byte("b"), make([]byte, 8<<10 /* 8 KB */))
1238+
putArgs := putArgs([]byte("b"), make([]byte, sc.RaftMaxUncommittedEntriesSize/8))
12371239
_, err := client.SendWrapped(context.Background(), propNode, putArgs)
12381240
putRes <- err
12391241
}()
@@ -1254,11 +1256,10 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
12541256
}
12551257

12561258
// Check raft log size.
1257-
const logSizeLimit = 64 << 10 // 64 KB
12581259
curlogSize := leaderRepl.GetRaftLogSize()
12591260
logSize := curlogSize - initLogSize
12601261
logSizeStr := humanizeutil.IBytes(logSize)
1261-
if logSize > logSizeLimit {
1262+
if uint64(logSize) > sc.RaftMaxUncommittedEntriesSize {
12621263
t.Fatalf("raft log size grew to %s", logSizeStr)
12631264
}
12641265
t.Logf("raft log size grew to %s", logSizeStr)

pkg/storage/replica.go

+3-108
Original file line numberDiff line numberDiff line change
@@ -387,12 +387,7 @@ type Replica struct {
387387
// map must only be referenced while Replica.mu is held, except if the
388388
// element is removed from the map first. The notable exception is the
389389
// contained RaftCommand, which we treat as immutable.
390-
localProposals map[storagebase.CmdIDKey]*ProposalData
391-
// remoteProposals is maintained by Raft leaders and stores in-flight
392-
// commands that were forwarded to the leader during its current term.
393-
// The set allows leaders to detect duplicate forwarded commands and
394-
// avoid re-proposing the same forwarded command multiple times.
395-
remoteProposals map[storagebase.CmdIDKey]struct{}
390+
localProposals map[storagebase.CmdIDKey]*ProposalData
396391
internalRaftGroup *raft.RawNode
397392
// The ID of the replica within the Raft group. May be 0 if the replica has
398393
// been created from a preemptive snapshot (i.e. before being added to the
@@ -883,7 +878,6 @@ func (r *Replica) cancelPendingCommandsLocked() {
883878
r.cleanupFailedProposalLocked(p)
884879
p.finishApplication(pr)
885880
}
886-
r.mu.remoteProposals = nil
887881
}
888882

889883
// cleanupFailedProposalLocked cleans up after a proposal that has failed. It
@@ -1118,22 +1112,12 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
11181112
log.Fatalf(ctx, "len(r.mu.commandSizes) = %d, expected 0", commandSizesLen)
11191113
}
11201114

1121-
// We set the defaultProposalQuota to be less than RaftLogMaxSize,
1122-
// in doing so we ensure all replicas have sufficiently up to date
1123-
// logs so that when the log gets truncated, the followers do not
1124-
// need non-preemptive snapshots. Changing this deserves care. Too
1125-
// low and everything comes to a grinding halt, too high and we're
1126-
// not really throttling anything (we'll still generate snapshots).
1127-
//
1128-
// TODO(nvanbenschoten): clean this up in later commits.
1129-
proposalQuota := r.store.cfg.RaftLogMaxSize / 4
1130-
11311115
// Raft may propose commands itself (specifically the empty
11321116
// commands when leadership changes), and these commands don't go
11331117
// through the code paths where we acquire quota from the pool. To
11341118
// offset this we reset the quota pool whenever leadership changes
11351119
// hands.
1136-
r.mu.proposalQuota = newQuotaPool(proposalQuota)
1120+
r.mu.proposalQuota = newQuotaPool(r.store.cfg.RaftProposalQuota)
11371121
r.mu.lastUpdateTimes = make(map[roachpb.ReplicaID]time.Time)
11381122
r.mu.commandSizes = make(map[storagebase.CmdIDKey]int)
11391123
} else if r.mu.proposalQuota != nil {
@@ -1913,7 +1897,6 @@ func (r *Replica) State() storagepb.RangeInfo {
19131897
ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagepb.ReplicaState)
19141898
ri.LastIndex = r.mu.lastIndex
19151899
ri.NumPending = uint64(len(r.mu.localProposals))
1916-
ri.NumRemotePending = uint64(len(r.mu.remoteProposals))
19171900
ri.RaftLogSize = r.mu.raftLogSize
19181901
ri.NumDropped = uint64(r.mu.droppedMessages)
19191902
if r.mu.proposalQuota != nil {
@@ -4042,20 +4025,7 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error {
40424025
// we expect the originator to campaign instead.
40434026
r.unquiesceWithOptionsLocked(false /* campaignOnWake */)
40444027
r.refreshLastUpdateTimeForReplicaLocked(req.FromReplica.ReplicaID)
4045-
4046-
// Check if the message is a proposal that should be dropped.
4047-
if r.shouldDropForwardedProposalLocked(req) {
4048-
// If we could signal to the sender that its proposal was accepted
4049-
// or dropped then we wouldn't need to track anything.
4050-
return false /* unquiesceAndWakeLeader */, nil
4051-
}
4052-
40534028
err := raftGroup.Step(req.Message)
4054-
if err == nil {
4055-
// If we stepped successfully and the request is a proposal, consider
4056-
// tracking it so that we can ignore identical proposals in the future.
4057-
r.maybeTrackForwardedProposalLocked(raftGroup, req)
4058-
}
40594029
if err == raft.ErrProposalDropped {
40604030
// A proposal was forwarded to this replica but we couldn't propose it.
40614031
// Swallow the error since we don't have an effective way of signaling
@@ -4068,68 +4038,6 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error {
40684038
})
40694039
}
40704040

4071-
func (r *Replica) shouldDropForwardedProposalLocked(req *RaftMessageRequest) bool {
4072-
if req.Message.Type != raftpb.MsgProp {
4073-
// Not a proposal.
4074-
return false
4075-
}
4076-
4077-
for _, e := range req.Message.Entries {
4078-
switch e.Type {
4079-
case raftpb.EntryNormal:
4080-
cmdID, _ := DecodeRaftCommand(e.Data)
4081-
if _, ok := r.mu.remoteProposals[cmdID]; !ok {
4082-
// Untracked remote proposal. Don't drop.
4083-
return false
4084-
}
4085-
case raftpb.EntryConfChange:
4086-
// Never drop EntryConfChange proposals.
4087-
return false
4088-
default:
4089-
log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e)
4090-
}
4091-
}
4092-
// All entries tracked.
4093-
return true
4094-
}
4095-
4096-
func (r *Replica) maybeTrackForwardedProposalLocked(rg *raft.RawNode, req *RaftMessageRequest) {
4097-
if req.Message.Type != raftpb.MsgProp {
4098-
// Not a proposal.
4099-
return
4100-
}
4101-
4102-
if rg.Status().RaftState != raft.StateLeader {
4103-
// We're not the leader. We can't be sure that the proposal made it into
4104-
// the Raft log, so don't track it.
4105-
return
4106-
}
4107-
4108-
// Record that each of the proposal's entries was seen and appended. This
4109-
// allows us to catch duplicate forwarded proposals in the future and
4110-
// prevent them from being repeatedly appended to a leader's raft log.
4111-
for _, e := range req.Message.Entries {
4112-
switch e.Type {
4113-
case raftpb.EntryNormal:
4114-
cmdID, data := DecodeRaftCommand(e.Data)
4115-
if len(data) == 0 {
4116-
// An empty command is proposed to unquiesce a range and
4117-
// wake the leader. Don't keep track of these forwarded
4118-
// proposals because they will never be cleaned up.
4119-
} else {
4120-
if r.mu.remoteProposals == nil {
4121-
r.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{}
4122-
}
4123-
r.mu.remoteProposals[cmdID] = struct{}{}
4124-
}
4125-
case raftpb.EntryConfChange:
4126-
// Don't track EntryConfChanges.
4127-
default:
4128-
log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e)
4129-
}
4130-
}
4131-
}
4132-
41334041
type handleRaftReadyStats struct {
41344042
processed int
41354043
}
@@ -4394,7 +4302,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
43944302
r.mu.leaderID = leaderID
43954303
// Clear the remote proposal set. Would have been nil already if not
43964304
// previously the leader.
4397-
r.mu.remoteProposals = nil
43984305
becameLeader = r.mu.leaderID == r.mu.replicaID
43994306
}
44004307
r.mu.Unlock()
@@ -4599,22 +4506,13 @@ func (r *Replica) tick(livenessMap IsLiveMap) (bool, error) {
45994506
if knob := r.store.TestingKnobs().RefreshReasonTicksPeriod; knob > 0 {
46004507
refreshAtDelta = knob
46014508
}
4602-
if !r.store.TestingKnobs().DisableRefreshReasonTicks &&
4603-
r.mu.replicaID != r.mu.leaderID &&
4604-
r.mu.ticks%refreshAtDelta == 0 {
4509+
if !r.store.TestingKnobs().DisableRefreshReasonTicks && r.mu.ticks%refreshAtDelta == 0 {
46054510
// RaftElectionTimeoutTicks is a reasonable approximation of how long we
46064511
// should wait before deciding that our previous proposal didn't go
46074512
// through. Note that the combination of the above condition and passing
46084513
// RaftElectionTimeoutTicks to refreshProposalsLocked means that commands
46094514
// will be refreshed when they have been pending for 1 to 2 election
46104515
// cycles.
4611-
//
4612-
// However, we don't refresh proposals if we are the leader because
4613-
// doing so would be useless. The commands tracked by a leader replica
4614-
// were either all proposed when the replica was a leader or were
4615-
// re-proposed when the replica became a leader. Either way, they are
4616-
// guaranteed to be in the leader's Raft log so re-proposing won't do
4617-
// anything.
46184516
r.refreshProposalsLocked(refreshAtDelta, reasonTicks)
46194517
}
46204518
return true, nil
@@ -5407,9 +5305,6 @@ func (r *Replica) processRaftCommand(
54075305
delete(r.mu.localProposals, idKey)
54085306
}
54095307

5410-
// Delete the entry for a forwarded proposal set.
5411-
delete(r.mu.remoteProposals, idKey)
5412-
54135308
leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally)
54145309

54155310
r.mu.Unlock()

0 commit comments

Comments
 (0)