From 89e8fe305450a88e11fa839a08b3b240df9cc469 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Mon, 21 Nov 2016 16:56:15 -0500 Subject: [PATCH] storage: tune raft.Config.{MaxSizePerMsg,MaxInflightMsgs} The previous settings allowed up to 256 MB of Raft log entries to be inflight to a follower, resulting in a single Replica.handleRaftReady call processing thousands or 10s of thousands of commands. Log the number of commands processed when Replica.handleRaftReady takes too long. Fixes #10917 --- pkg/storage/replica.go | 44 ++++++++++++++++++++++++++---------------- pkg/storage/store.go | 38 ++++++++++++++++++++++++------------ 2 files changed, 53 insertions(+), 29 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 4a26d07e740c..cef97a26a0d2 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -2267,11 +2267,15 @@ func (r *Replica) maybeAbandonSnapshot(ctx context.Context) { } } +type handleRaftReadyStats struct { + processed int +} + // handleRaftReady processes a raft.Ready containing entries and messages that // are ready to read, be saved to stable storage, committed or sent to other // peers. It takes a non-empty IncomingSnapshot to indicate that it is // about to process a snapshot. -func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) error { +func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) (handleRaftReadyStats, error) { r.raftMu.Lock() defer r.raftMu.Unlock() return r.handleRaftReadyRaftMuLocked(inSnap) @@ -2279,7 +2283,11 @@ func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) error { // handleRaftReadyLocked is the same as handleRaftReady but requires that the // replica's raftMu be held. -func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { +func (r *Replica) handleRaftReadyRaftMuLocked( + inSnap IncomingSnapshot, +) (handleRaftReadyStats, error) { + var stats handleRaftReadyStats + ctx := r.AnnotateCtx(context.TODO()) var hasReady bool var rd raft.Ready @@ -2296,11 +2304,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { }) r.mu.Unlock() if err != nil { - return err + return stats, err } if !hasReady { - return nil + return stats, nil } logRaftReady(ctx, rd) @@ -2326,7 +2334,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { if !raft.IsEmptySnap(rd.Snapshot) { snapUUID, err := uuid.FromBytes(rd.Snapshot.Data) if err != nil { - return errors.Wrap(err, "invalid snapshot id") + return stats, errors.Wrap(err, "invalid snapshot id") } if inSnap.SnapUUID == (uuid.UUID{}) { log.Fatalf(ctx, "programming error: a snapshot application was attempted outside of the streaming snapshot codepath") @@ -2336,7 +2344,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { } if err := r.applySnapshot(ctx, inSnap, rd.Snapshot, rd.HardState); err != nil { - return err + return stats, err } // handleRaftReady is called under the processRaftMu lock, so it is @@ -2353,11 +2361,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { } return nil }(); err != nil { - return err + return stats, err } if lastIndex, err = loadLastIndex(ctx, r.store.Engine(), r.RangeID); err != nil { - return err + return stats, err } // We refresh pending commands after applying a snapshot because this // replica may have been temporarily partitioned from the Raft group and @@ -2381,16 +2389,16 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { // last index. var err error if lastIndex, raftLogSize, err = r.append(ctx, writer, lastIndex, raftLogSize, rd.Entries); err != nil { - return err + return stats, err } } if !raft.IsEmptyHardState(rd.HardState) { if err := setHardState(ctx, writer, r.RangeID, rd.HardState); err != nil { - return err + return stats, err } } if err := batch.Commit(); err != nil { - return err + return stats, err } // Update protected state (last index, raft log size and raft leader @@ -2433,26 +2441,27 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { var encodedCommand []byte commandID, encodedCommand = DecodeRaftCommand(e.Data) if err := command.Unmarshal(encodedCommand); err != nil { - return err + return stats, err } } // Discard errors from processRaftCommand. The error has been sent // to the client that originated it, where it will be handled. _ = r.processRaftCommand(ctx, commandID, e.Index, command) + stats.processed++ case raftpb.EntryConfChange: var cc raftpb.ConfChange if err := cc.Unmarshal(e.Data); err != nil { - return err + return stats, err } var ccCtx ConfChangeContext if err := ccCtx.Unmarshal(cc.Context); err != nil { - return err + return stats, err } var command storagebase.RaftCommand if err := command.Unmarshal(ccCtx.Payload); err != nil { - return err + return stats, err } if pErr := r.processRaftCommand( ctx, storagebase.CmdIDKey(ccCtx.CommandID), e.Index, command, @@ -2460,11 +2469,12 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { // If processRaftCommand failed, tell raft that the config change was aborted. cc = raftpb.ConfChange{} } + stats.processed++ if err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { raftGroup.ApplyConfChange(cc) return true, nil }); err != nil { - return err + return stats, err } default: log.Fatalf(ctx, "unexpected Raft entry: %v", e) @@ -2479,7 +2489,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { // TODO(bdarnell): need to check replica id and not Advance if it // has changed. Or do we need more locking to guarantee that replica // ID cannot change during handleRaftReady? - return r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { + return stats, r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { raftGroup.Advance(rd) return true, nil }) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 3ab35729e0cf..9552aa43def4 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -150,6 +150,11 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig { } } +var ( + raftMaxSizePerMsg = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_SIZE_PER_MSG", 16*1024) + raftMaxInflightMsgs = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 4) +) + func newRaftConfig( strg raft.Storage, id uint64, appliedIndex uint64, storeCfg StoreConfig, logger raft.Logger, ) *raft.Config { @@ -168,9 +173,17 @@ func newRaftConfig( PreVote: enablePreVote, CheckQuorum: !enablePreVote, - // TODO(bdarnell): make these configurable; evaluate defaults. - MaxSizePerMsg: 1024 * 1024, - MaxInflightMsgs: 256, + // MaxSizePerMsg controls how many Raft log entries the leader will send to + // followers in a single MsgApp. + MaxSizePerMsg: uint64(raftMaxSizePerMsg), + // MaxInflightMsgs controls how many "inflight" messages Raft will send to + // a follower without hearing a response. The total number of Raft log + // entries is a combination of this setting and MaxSizePerMsg. The current + // settings provide for up to 64 KB of raft log to be sent without + // acknowledgement. With an average entry size of 1 KB that translates to + // ~64 commands that might be executed in the handling of a single + // raft.Ready operation. + MaxInflightMsgs: raftMaxInflightMsgs, } } @@ -2961,7 +2974,7 @@ func (s *Store) processRaftRequest( removePlaceholder = false } else { // Force the replica to deal with this snapshot right now. - if err := r.handleRaftReadyRaftMuLocked(inSnap); err != nil { + if _, err := r.handleRaftReadyRaftMuLocked(inSnap); err != nil { // mimic the behavior in processRaft. panic(err) } @@ -3209,19 +3222,20 @@ func (s *Store) processReady(rangeID roachpb.RangeID) { s.mu.Unlock() if ok { - if err := r.handleRaftReady(IncomingSnapshot{}); err != nil { + stats, err := r.handleRaftReady(IncomingSnapshot{}) + if err != nil { panic(err) // TODO(bdarnell) } elapsed := timeutil.Since(start) s.metrics.RaftWorkingDurationNanos.Inc(elapsed.Nanoseconds()) - // If Raft processing took longer than 10x the raft tick interval something - // bad is going on. Such long processing time means we'll have starved - // local replicas of ticks and remote replicas will likely start - // campaigning. - var warnDuration = 10 * s.cfg.RaftTickInterval - if elapsed >= warnDuration { + // Warn if Raft processing took too long. We use the same duration as we + // use for warning about excessive raft mutex lock hold times. Long + // processing time means we'll have starved local replicas of ticks and + // remote replicas will likely start campaigning. + if elapsed >= defaultReplicaRaftMuWarnThreshold { ctx := r.AnnotateCtx(context.TODO()) - log.Warningf(ctx, "handle raft ready: %.1fs", elapsed.Seconds()) + log.Warningf(ctx, "handle raft ready: %.1fs [processed=%d]", + elapsed.Seconds(), stats.processed) } if !r.IsInitialized() { // Only an uninitialized replica can have a placeholder since, by