Skip to content

Commit 25081c6

Browse files
craig[bot]nvanbenschoten
craig[bot]
andcommitted
Merge #31408
31408: storage: replace remote proposal tracking with uncommitted log size protection r=nvanbenschoten a=nvanbenschoten Closes #30064. This change reverts most of the non-testing code from 03b116f and f2f3fd2 and replaces it with use of the MaxUncommittedEntriesSize config. This configuration was added in etcd-io/etcd#10167 and provides protection against unbounded Raft log growth when a Raft group stops being able to commit entries. It makes proposals into Raft safer because proposers don't need to verify before the fact that the proposal isn't a duplicate that might be blowing up the size of the Raft group. By default, the configuration is set to double the Replica's proposal quota. The logic here is that the quotaPool should be responsible for throttling proposals in all cases except for unbounded Raft re-proposals because it queues efficiently instead of dropping proposals on the floor indiscriminately. Release note (bug fix): Fix a bug where Raft proposals could get stuck if forwarded to a leader who could not itself append a new entry to its log. This will be backported, but not to 2.1.0. The plan is to get it into 2.1.1. Co-authored-by: Nathan VanBenschoten <[email protected]>
2 parents 6a8999f + 0ffdb68 commit 25081c6

18 files changed

+226
-427
lines changed

Gopkg.lock

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/base/config.go

+78-2
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,28 @@ const (
9090
DefaultTableDescriptorLeaseRenewalTimeout = time.Minute
9191
)
9292

93-
var defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt(
94-
"COCKROACH_RAFT_ELECTION_TIMEOUT_TICKS", 15)
93+
var (
94+
// defaultRaftElectionTimeoutTicks specifies the number of Raft Tick
95+
// invocations that must pass between elections.
96+
defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt(
97+
"COCKROACH_RAFT_ELECTION_TIMEOUT_TICKS", 15)
98+
99+
// defaultRaftLogTruncationThreshold specifies the upper bound that a single
100+
// Range's Raft log can grow to before log truncations are triggered, even
101+
// if that means a snapshot will be required for a straggling follower.
102+
defaultRaftLogTruncationThreshold = envutil.EnvOrDefaultInt64(
103+
"COCKROACH_RAFT_LOG_TRUNCATION_THRESHOLD", 4<<20 /* 4 MB */)
104+
105+
// defaultRaftMaxSizePerMsg specifies the maximum number of Raft log entries
106+
// that a leader will send to followers in a single MsgApp.
107+
defaultRaftMaxSizePerMsg = envutil.EnvOrDefaultInt(
108+
"COCKROACH_RAFT_MAX_SIZE_PER_MSG", 16<<10 /* 16 KB */)
109+
110+
// defaultRaftMaxSizePerMsg specifies how many "inflight" messages a leader
111+
// will send to a follower without hearing a response.
112+
defaultRaftMaxInflightMsgs = envutil.EnvOrDefaultInt(
113+
"COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64)
114+
)
95115

96116
type lazyHTTPClient struct {
97117
once sync.Once
@@ -421,6 +441,41 @@ type RaftConfig struct {
421441
// RangeLeaseRaftElectionTimeoutMultiplier specifies what multiple the leader
422442
// lease active duration should be of the raft election timeout.
423443
RangeLeaseRaftElectionTimeoutMultiplier float64
444+
445+
// RaftLogTruncationThreshold controls how large a single Range's Raft log
446+
// can grow. When a Range's Raft log grows above this size, the Range will
447+
// begin performing log truncations.
448+
RaftLogTruncationThreshold int64
449+
450+
// RaftProposalQuota controls the maximum aggregate size of Raft commands
451+
// that a leader is allowed to propose concurrently.
452+
//
453+
// By default, the quota is set to a fraction of the Raft log truncation
454+
// threshold. In doing so, we ensure all replicas have sufficiently up to
455+
// date logs so that when the log gets truncated, the followers do not need
456+
// non-preemptive snapshots. Changing this deserves care. Too low and
457+
// everything comes to a grinding halt, too high and we're not really
458+
// throttling anything (we'll still generate snapshots).
459+
RaftProposalQuota int64
460+
461+
// RaftMaxUncommittedEntriesSize controls how large the uncommitted tail of
462+
// the Raft log can grow. The limit is meant to provide protection against
463+
// unbounded Raft log growth when quorum is lost and entries stop being
464+
// committed but continue to be proposed.
465+
RaftMaxUncommittedEntriesSize uint64
466+
467+
// RaftMaxSizePerMsg controls how many Raft log entries the leader will send to
468+
// followers in a single MsgApp.
469+
RaftMaxSizePerMsg uint64
470+
471+
// RaftMaxInflightMsgs controls how many "inflight" messages Raft will send
472+
// to a follower without hearing a response. The total number of Raft log
473+
// entries is a combination of this setting and RaftMaxSizePerMsg. The
474+
// current default settings provide for up to 1 MB of raft log to be sent
475+
// without acknowledgement. With an average entry size of 1 KB that
476+
// translates to ~1024 commands that might be executed in the handling of a
477+
// single raft.Ready operation.
478+
RaftMaxInflightMsgs int
424479
}
425480

426481
// SetDefaults initializes unset fields.
@@ -434,6 +489,27 @@ func (cfg *RaftConfig) SetDefaults() {
434489
if cfg.RangeLeaseRaftElectionTimeoutMultiplier == 0 {
435490
cfg.RangeLeaseRaftElectionTimeoutMultiplier = defaultRangeLeaseRaftElectionTimeoutMultiplier
436491
}
492+
if cfg.RaftLogTruncationThreshold == 0 {
493+
cfg.RaftLogTruncationThreshold = defaultRaftLogTruncationThreshold
494+
}
495+
if cfg.RaftProposalQuota == 0 {
496+
// By default, set this to a fraction of RaftLogMaxSize. See the comment
497+
// on the field for the tradeoffs of setting this higher or lower.
498+
cfg.RaftProposalQuota = cfg.RaftLogTruncationThreshold / 4
499+
}
500+
if cfg.RaftMaxUncommittedEntriesSize == 0 {
501+
// By default, set this to twice the RaftProposalQuota. The logic here
502+
// is that the quotaPool should be responsible for throttling proposals
503+
// in all cases except for unbounded Raft re-proposals because it queues
504+
// efficiently instead of dropping proposals on the floor indiscriminately.
505+
cfg.RaftMaxUncommittedEntriesSize = uint64(2 * cfg.RaftProposalQuota)
506+
}
507+
if cfg.RaftMaxSizePerMsg == 0 {
508+
cfg.RaftMaxSizePerMsg = uint64(defaultRaftMaxSizePerMsg)
509+
}
510+
if cfg.RaftMaxInflightMsgs == 0 {
511+
cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs
512+
}
437513
}
438514

439515
// RaftElectionTimeout returns the raft election timeout, as computed from the

pkg/storage/client_raft_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -1155,6 +1155,8 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
11551155
sc.RaftTickInterval = 10 * time.Millisecond
11561156
// Don't timeout raft leader. We don't want leadership moving.
11571157
sc.RaftElectionTimeoutTicks = 1000000
1158+
// Reduce the max uncommitted entry size.
1159+
sc.RaftMaxUncommittedEntriesSize = 64 << 10 // 64 KB
11581160
// Disable leader transfers during leaseholder changes so that we
11591161
// can easily create leader-not-leaseholder scenarios.
11601162
sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true
@@ -1233,7 +1235,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
12331235
// While a majority nodes are down, write some data.
12341236
putRes := make(chan *roachpb.Error)
12351237
go func() {
1236-
putArgs := putArgs([]byte("b"), make([]byte, 8<<10 /* 8 KB */))
1238+
putArgs := putArgs([]byte("b"), make([]byte, sc.RaftMaxUncommittedEntriesSize/8))
12371239
_, err := client.SendWrapped(context.Background(), propNode, putArgs)
12381240
putRes <- err
12391241
}()
@@ -1254,11 +1256,10 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
12541256
}
12551257

12561258
// Check raft log size.
1257-
const logSizeLimit = 64 << 10 // 64 KB
12581259
curlogSize := leaderRepl.GetRaftLogSize()
12591260
logSize := curlogSize - initLogSize
12601261
logSizeStr := humanizeutil.IBytes(logSize)
1261-
if logSize > logSizeLimit {
1262+
if uint64(logSize) > sc.RaftMaxUncommittedEntriesSize {
12621263
t.Fatalf("raft log size grew to %s", logSizeStr)
12631264
}
12641265
t.Logf("raft log size grew to %s", logSizeStr)

pkg/storage/raft_log_queue.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/cockroachdb/cockroach/pkg/gossip"
2727
"github.com/cockroachdb/cockroach/pkg/internal/client"
2828
"github.com/cockroachdb/cockroach/pkg/roachpb"
29-
"github.com/cockroachdb/cockroach/pkg/util/envutil"
3029
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3130
"github.com/cockroachdb/cockroach/pkg/util/log"
3231
)
@@ -49,9 +48,6 @@ const (
4948
raftLogQueueConcurrency = 4
5049
)
5150

52-
// raftLogMaxSize limits the maximum size of the Raft log.
53-
var raftLogMaxSize = envutil.EnvOrDefaultInt64("COCKROACH_RAFT_LOG_MAX_SIZE", 4<<20 /* 4 MB */)
54-
5551
// raftLogQueue manages a queue of replicas slated to have their raft logs
5652
// truncated by removing unneeded entries.
5753
type raftLogQueue struct {
@@ -118,8 +114,8 @@ func getTruncatableIndexes(ctx context.Context, r *Replica) (uint64, uint64, int
118114
if targetSize > *r.mu.zone.RangeMaxBytes {
119115
targetSize = *r.mu.zone.RangeMaxBytes
120116
}
121-
if targetSize > raftLogMaxSize {
122-
targetSize = raftLogMaxSize
117+
if targetSize > r.store.cfg.RaftLogTruncationThreshold {
118+
targetSize = r.store.cfg.RaftLogTruncationThreshold
123119
}
124120
firstIndex, err := r.raftFirstIndexLocked()
125121
pendingSnapshotIndex := r.mu.pendingSnapshotIndex

pkg/storage/raft_transport.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"go.etcd.io/etcd/raft/raftpb"
3030
"google.golang.org/grpc"
3131

32+
"github.com/cockroachdb/cockroach/pkg/base"
3233
"github.com/cockroachdb/cockroach/pkg/roachpb"
3334
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
3435
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
@@ -615,6 +616,7 @@ func (t *RaftTransport) startProcessNewQueue(
615616
// for closing the OutgoingSnapshot.
616617
func (t *RaftTransport) SendSnapshot(
617618
ctx context.Context,
619+
raftCfg *base.RaftConfig,
618620
storePool *StorePool,
619621
header SnapshotRequest_Header,
620622
snap *OutgoingSnapshot,
@@ -640,5 +642,5 @@ func (t *RaftTransport) SendSnapshot(
640642
log.Warningf(ctx, "failed to close snapshot stream: %s", err)
641643
}
642644
}()
643-
return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent)
645+
return sendSnapshot(ctx, raftCfg, t.st, stream, storePool, header, snap, newBatch, sent)
644646
}

0 commit comments

Comments
 (0)