Skip to content

Commit 6666008

Browse files
storage: re-enqueue Raft groups on paginated application
Fixes cockroachdb#31330. This change re-enqueues Raft groups for processing immediately if they still have more to do after a Raft ready iteration. This comes up in practice when a Range has sufficient load to force Raft application pagination. See cockroachdb#31330 for a discussion on the symptoms this can cause. Release note (bug fix): Fix bug where Raft followers could fall behind leaders will entry application, causing stalls during splits.
1 parent 25081c6 commit 6666008

File tree

2 files changed

+100
-0
lines changed

2 files changed

+100
-0
lines changed

pkg/storage/replica.go

+8
Original file line numberDiff line numberDiff line change
@@ -4444,6 +4444,14 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
44444444
const expl = "during advance"
44454445
if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
44464446
raftGroup.Advance(rd)
4447+
4448+
// If the Raft group still has more to process then we immediately
4449+
// re-enqueue it for another round of processing. This is possible if
4450+
// the group's committed entries were paginated due to size limitations
4451+
// and we didn't apply all of them in this pass.
4452+
if raftGroup.HasReady() {
4453+
r.store.enqueueRaftUpdateCheck(r.RangeID)
4454+
}
44474455
return true, nil
44484456
}); err != nil {
44494457
return stats, expl, errors.Wrap(err, expl)

pkg/storage/replica_test.go

+92
Original file line numberDiff line numberDiff line change
@@ -9581,6 +9581,98 @@ func TestProposeWithAsyncConsensus(t *testing.T) {
95819581
close(blockRaftApplication)
95829582
}
95839583

9584+
// TestApplyPaginatedCommittedEntries tests that a Raft group's committed
9585+
// entries are quickly applied, even if their application is paginated due to
9586+
// the RaftMaxSizePerMsg configuration. This is a regession test for #31330.
9587+
func TestApplyPaginatedCommittedEntries(t *testing.T) {
9588+
defer leaktest.AfterTest(t)()
9589+
ctx := context.Background()
9590+
tc := testContext{}
9591+
tsc := TestStoreConfig(nil)
9592+
9593+
// Drop the RaftMaxSizePerMsg so that even small Raft entries have their
9594+
// application paginated.
9595+
// TODO(nvanbenschoten): Switch this to using the new MaxCommitedSizePerReady
9596+
// configuration once #31511 is addressed.
9597+
tsc.RaftMaxSizePerMsg = 128
9598+
// Slow down the tick interval dramatically so that Raft groups can't rely
9599+
// on ticks to trigger Raft ready iterations.
9600+
tsc.RaftTickInterval = 5 * time.Second
9601+
9602+
var filterActive int32
9603+
blockRaftApplication := make(chan struct{})
9604+
blockingRaftApplication := make(chan struct{}, 1)
9605+
tsc.TestingKnobs.TestingApplyFilter =
9606+
func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
9607+
if atomic.LoadInt32(&filterActive) == 1 {
9608+
select {
9609+
case blockingRaftApplication <- struct{}{}:
9610+
default:
9611+
}
9612+
<-blockRaftApplication
9613+
}
9614+
return nil
9615+
}
9616+
9617+
stopper := stop.NewStopper()
9618+
defer stopper.Stop(ctx)
9619+
tc.StartWithStoreConfig(t, stopper, tsc)
9620+
repl := tc.repl
9621+
9622+
// Block command application then propose a command to Raft.
9623+
var ba roachpb.BatchRequest
9624+
key := roachpb.Key("a")
9625+
put := putArgs(key, []byte("val"))
9626+
ba.Add(&put)
9627+
ba.Timestamp = tc.Clock().Now()
9628+
9629+
atomic.StoreInt32(&filterActive, 1)
9630+
exLease, _ := repl.GetLease()
9631+
_, _, _, pErr := repl.propose(ctx, exLease, ba, nil /* endCmds */, &allSpans)
9632+
if pErr != nil {
9633+
t.Fatal(pErr)
9634+
}
9635+
9636+
// Once that command is stuck applying, propose a number of large commands.
9637+
// This will allow them to all build up without any being applied so that
9638+
// their application will require pagination.
9639+
<-blockingRaftApplication
9640+
var ch chan proposalResult
9641+
for i := 0; i < 50; i++ {
9642+
var ba2 roachpb.BatchRequest
9643+
key := roachpb.Key("a")
9644+
put := putArgs(key, make([]byte, 2*tsc.RaftMaxSizePerMsg))
9645+
ba2.Add(&put)
9646+
ba2.Timestamp = tc.Clock().Now()
9647+
9648+
var pErr *roachpb.Error
9649+
ch, _, _, pErr = repl.propose(ctx, exLease, ba, nil /* endCmds */, &allSpans)
9650+
if pErr != nil {
9651+
t.Fatal(pErr)
9652+
}
9653+
}
9654+
9655+
// Stop blocking Raft application. All of the proposals should quickly
9656+
// commit and apply, even if their application is paginated due to the
9657+
// small RaftMaxSizePerMsg.
9658+
close(blockRaftApplication)
9659+
const maxWait = 5 * time.Second
9660+
select {
9661+
case propRes := <-ch:
9662+
if propRes.Err != nil {
9663+
t.Fatalf("unexpected proposal result error: %v", propRes.Err)
9664+
}
9665+
if propRes.Reply == nil || len(propRes.Reply.Responses) != 1 {
9666+
t.Fatalf("expected proposal result with 1 response, found: %v", propRes.Reply)
9667+
}
9668+
case <-time.After(maxWait):
9669+
// If we don't re-enqueue Raft groups for another round of processing
9670+
// when their committed entries are paginated and not all immediately
9671+
// applied, this test will take more than three minutes to finish.
9672+
t.Fatalf("stall detected, proposal did not finish within %s", maxWait)
9673+
}
9674+
}
9675+
95849676
func TestSplitMsgApps(t *testing.T) {
95859677
defer leaktest.AfterTest(t)()
95869678

0 commit comments

Comments
 (0)