Skip to content

Commit d4e6fb8

Browse files
storage: re-enqueue Raft groups on paginated application
Fixes cockroachdb#31330. This change re-enqueues Raft groups for processing immediately if they still have more to do after a Raft ready iteration. This comes up in practice when a Range has sufficient load to force Raft application pagination. See cockroachdb#31330 for a discussion on the symptoms this can cause. Release note (bug fix): Fix bug where Raft followers could fall behind leaders will entry application, causing stalls during splits.
1 parent de83c9e commit d4e6fb8

File tree

2 files changed

+100
-0
lines changed

2 files changed

+100
-0
lines changed

pkg/storage/replica.go

+8
Original file line numberDiff line numberDiff line change
@@ -4464,6 +4464,14 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
44644464
const expl = "during advance"
44654465
if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
44664466
raftGroup.Advance(rd)
4467+
4468+
// If the Raft group still has more to process then we immediately
4469+
// re-enqueue it for another round of processing. This is possible if
4470+
// the group's committed entries were paginated due to size limitations
4471+
// and we didn't apply all of them in this pass.
4472+
if raftGroup.HasReady() {
4473+
r.store.enqueueRaftUpdateCheck(r.RangeID)
4474+
}
44674475
return true, nil
44684476
}); err != nil {
44694477
return stats, expl, errors.Wrap(err, expl)

pkg/storage/replica_test.go

+92
Original file line numberDiff line numberDiff line change
@@ -9626,6 +9626,98 @@ func TestProposeWithAsyncConsensus(t *testing.T) {
96269626
close(blockRaftApplication)
96279627
}
96289628

9629+
// TestApplyPaginatedCommittedEntries tests that a Raft group's committed
9630+
// entries are quickly applied, even if their application is paginated due to
9631+
// the RaftMaxSizePerMsg configuration. This is a regression test for #31330.
9632+
func TestApplyPaginatedCommittedEntries(t *testing.T) {
9633+
defer leaktest.AfterTest(t)()
9634+
ctx := context.Background()
9635+
tc := testContext{}
9636+
tsc := TestStoreConfig(nil)
9637+
9638+
// Drop the RaftMaxSizePerMsg so that even small Raft entries have their
9639+
// application paginated.
9640+
// TODO(nvanbenschoten): Switch this to using the new MaxCommitedSizePerReady
9641+
// configuration once #31511 is addressed.
9642+
tsc.RaftMaxSizePerMsg = 128
9643+
// Slow down the tick interval dramatically so that Raft groups can't rely
9644+
// on ticks to trigger Raft ready iterations.
9645+
tsc.RaftTickInterval = 5 * time.Second
9646+
9647+
var filterActive int32
9648+
blockRaftApplication := make(chan struct{})
9649+
blockingRaftApplication := make(chan struct{}, 1)
9650+
tsc.TestingKnobs.TestingApplyFilter =
9651+
func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
9652+
if atomic.LoadInt32(&filterActive) == 1 {
9653+
select {
9654+
case blockingRaftApplication <- struct{}{}:
9655+
default:
9656+
}
9657+
<-blockRaftApplication
9658+
}
9659+
return nil
9660+
}
9661+
9662+
stopper := stop.NewStopper()
9663+
defer stopper.Stop(ctx)
9664+
tc.StartWithStoreConfig(t, stopper, tsc)
9665+
repl := tc.repl
9666+
9667+
// Block command application then propose a command to Raft.
9668+
var ba roachpb.BatchRequest
9669+
key := roachpb.Key("a")
9670+
put := putArgs(key, []byte("val"))
9671+
ba.Add(&put)
9672+
ba.Timestamp = tc.Clock().Now()
9673+
9674+
atomic.StoreInt32(&filterActive, 1)
9675+
exLease, _ := repl.GetLease()
9676+
_, _, _, pErr := repl.propose(ctx, exLease, ba, nil /* endCmds */, &allSpans)
9677+
if pErr != nil {
9678+
t.Fatal(pErr)
9679+
}
9680+
9681+
// Once that command is stuck applying, propose a number of large commands.
9682+
// This will allow them to all build up without any being applied so that
9683+
// their application will require pagination.
9684+
<-blockingRaftApplication
9685+
var ch chan proposalResult
9686+
for i := 0; i < 50; i++ {
9687+
var ba2 roachpb.BatchRequest
9688+
key := roachpb.Key("a")
9689+
put := putArgs(key, make([]byte, 2*tsc.RaftMaxSizePerMsg))
9690+
ba2.Add(&put)
9691+
ba2.Timestamp = tc.Clock().Now()
9692+
9693+
var pErr *roachpb.Error
9694+
ch, _, _, pErr = repl.propose(ctx, exLease, ba, nil /* endCmds */, &allSpans)
9695+
if pErr != nil {
9696+
t.Fatal(pErr)
9697+
}
9698+
}
9699+
9700+
// Stop blocking Raft application. All of the proposals should quickly
9701+
// commit and apply, even if their application is paginated due to the
9702+
// small RaftMaxSizePerMsg.
9703+
close(blockRaftApplication)
9704+
const maxWait = 10 * time.Second
9705+
select {
9706+
case propRes := <-ch:
9707+
if propRes.Err != nil {
9708+
t.Fatalf("unexpected proposal result error: %v", propRes.Err)
9709+
}
9710+
if propRes.Reply == nil || len(propRes.Reply.Responses) != 1 {
9711+
t.Fatalf("expected proposal result with 1 response, found: %v", propRes.Reply)
9712+
}
9713+
case <-time.After(maxWait):
9714+
// If we don't re-enqueue Raft groups for another round of processing
9715+
// when their committed entries are paginated and not all immediately
9716+
// applied, this test will take more than three minutes to finish.
9717+
t.Fatalf("stall detected, proposal did not finish within %s", maxWait)
9718+
}
9719+
}
9720+
96299721
func TestSplitMsgApps(t *testing.T) {
96309722
defer leaktest.AfterTest(t)()
96319723

0 commit comments

Comments
 (0)