Skip to content

Commit

Permalink
[DNM] kv: don't signal raft scheduler when on scheduler goroutine
Browse files Browse the repository at this point in the history
Avoids unnecessary interactions with the Raft scheduler.
  • Loading branch information
nvanbenschoten committed Mar 16, 2023
1 parent 7f3fc56 commit 8949816
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}

r.traceMessageSends(outboundMsgs, "sending messages")
r.sendRaftMessages(ctx, outboundMsgs, pausedFollowers)
r.sendRaftMessages(ctx, outboundMsgs, pausedFollowers, true /* onRaftSched */)

// If the ready struct includes entries that have been committed, these
// entries will be applied to the Replica's replicated state machine down
Expand Down Expand Up @@ -926,7 +926,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}

// Send MsgStorageAppend's responses.
r.sendRaftMessages(ctx, msgStorageAppend.Responses, nil)
r.sendRaftMessages(ctx, msgStorageAppend.Responses, nil, true /* onRaftSched */)
} else {
// TODO(pavelkalinnikov): find a way to move it to storeEntries.
if msgStorageAppend.Commit != 0 && !r.IsInitialized() {
Expand Down Expand Up @@ -1018,7 +1018,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}

// Send MsgStorageApply's responses.
r.sendRaftMessages(ctx, msgStorageApply.Responses, nil)
r.sendRaftMessages(ctx, msgStorageApply.Responses, nil, true /* onRaftSched */)
}
stats.tApplicationEnd = timeutil.Now()
applicationElapsed := stats.tApplicationEnd.Sub(stats.tApplicationBegin).Nanoseconds()
Expand Down Expand Up @@ -1476,11 +1476,14 @@ type replicaSyncCallback Replica

func (r *replicaSyncCallback) OnLogSync(ctx context.Context, msgs []raftpb.Message) {
// Send MsgStorageAppend's responses.
(*Replica)(r).sendRaftMessages(ctx, msgs, nil /* blocked */)
(*Replica)(r).sendRaftMessages(ctx, msgs, nil /* blocked */, false /* onRaftSched */)
}

func (r *Replica) sendRaftMessages(
ctx context.Context, messages []raftpb.Message, blocked map[roachpb.ReplicaID]struct{},
ctx context.Context,
messages []raftpb.Message,
blocked map[roachpb.ReplicaID]struct{},
onRaftSched bool,
) {
var lastAppResp raftpb.Message
for _, message := range messages {
Expand All @@ -1503,7 +1506,7 @@ func (r *Replica) sendRaftMessages(
// replicaSyncCallback.OnLogSync. For other local storage work (log
// application and snapshot application), these messages come from
// Replica.handleRaftReadyRaftMuLocked.
r.sendLocalRaftMsg(message)
r.sendLocalRaftMsg(message, onRaftSched)
default:
_, drop := blocked[roachpb.ReplicaID(message.To)]
if drop {
Expand Down Expand Up @@ -1583,15 +1586,15 @@ func (r *Replica) sendRaftMessages(
}

// sendLocalRaftMsg sends a message to the local raft state machine.
func (r *Replica) sendLocalRaftMsg(msg raftpb.Message) {
func (r *Replica) sendLocalRaftMsg(msg raftpb.Message, onRaftSched bool) {
if msg.To != uint64(r.ReplicaID()) {
panic("incorrect message target")
}
r.localMsgs.Lock()
wasEmpty := len(r.localMsgs.active) == 0
r.localMsgs.active = append(r.localMsgs.active, msg)
r.localMsgs.Unlock()
if wasEmpty {
if wasEmpty && !onRaftSched {
r.store.enqueueRaftUpdateCheck(r.RangeID)
}
}
Expand Down

0 comments on commit 8949816

Please sign in to comment.