Skip to content

Commit

Permalink
NRG: Lockless Leaderless and HadPreviousLeader (#6438)
Browse files Browse the repository at this point in the history
This removes lock contention around JS API requests figuring out if the
meta group is leaderless or not by instead tracking that state
atomically.

Also fixes a race condition in inactivity thresholds/consumer pause that
this seemed to uncover.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
derekcollison authored Jan 31, 2025
2 parents 4060845 + 039ab65 commit 587a4a4
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 22 deletions.
8 changes: 5 additions & 3 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4709,7 +4709,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,

// Since these could wait on the Raft group lock, don't do so under the JS lock.
ourID := meta.ID()
groupLeader := meta.GroupLeader()
groupLeaderless := meta.Leaderless()
groupCreated := meta.Created()

js.mu.RLock()
Expand All @@ -4727,7 +4727,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
// Also capture if we think there is no meta leader.
var isLeaderLess bool
if !isLeader {
isLeaderLess = groupLeader == _EMPTY_ && time.Since(groupCreated) > lostQuorumIntervalDefault
isLeaderLess = groupLeaderless && time.Since(groupCreated) > lostQuorumIntervalDefault
}
js.mu.RUnlock()

Expand Down Expand Up @@ -4814,7 +4814,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
return
}
// If we are a member and we have a group leader or we had a previous leader consider bailing out.
if node.GroupLeader() != _EMPTY_ || node.HadPreviousLeader() {
if !node.Leaderless() || node.HadPreviousLeader() {
if leaderNotPartOfGroup {
resp.Error = NewJSConsumerOfflineError()
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
Expand Down Expand Up @@ -4998,6 +4998,8 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
}

nca := *ca
ncfg := *ca.Config
nca.Config = &ncfg
js.mu.RUnlock()
pauseUTC := req.PauseUntil.UTC()
if !pauseUTC.IsZero() {
Expand Down
22 changes: 13 additions & 9 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ func (js *jetStream) isLeaderless() bool {

// If we don't have a leader.
// Make sure we have been running for enough time.
if meta.GroupLeader() == _EMPTY_ && time.Since(meta.Created()) > lostQuorumIntervalDefault {
if meta.Leaderless() && time.Since(meta.Created()) > lostQuorumIntervalDefault {
return true
}
return false
Expand Down Expand Up @@ -857,7 +857,7 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool {
node := rg.node
js.mu.RUnlock()
// If we don't have a leader.
if node.GroupLeader() == _EMPTY_ {
if node.Leaderless() {
// Threshold for jetstream startup.
const startupThreshold = 10 * time.Second

Expand Down Expand Up @@ -1073,7 +1073,7 @@ func (js *jetStream) checkForOrphans() {

// We only want to cleanup any orphans if we know we are current with the meta-leader.
meta := cc.meta
if meta == nil || meta.GroupLeader() == _EMPTY_ {
if meta == nil || meta.Leaderless() {
js.mu.Unlock()
s.Debugf("JetStream cluster skipping check for orphans, no meta-leader")
return
Expand Down Expand Up @@ -1375,7 +1375,7 @@ func (js *jetStream) monitorCluster() {
// If we have a current leader or had one in the past we can cancel this here since the metaleader
// will be in charge of all peer state changes.
// For cold boot only.
if n.GroupLeader() != _EMPTY_ || n.HadPreviousLeader() {
if !n.Leaderless() || n.HadPreviousLeader() {
lt.Stop()
continue
}
Expand Down Expand Up @@ -2568,7 +2568,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Always cancel if this was running.
stopDirectMonitoring()

} else if n.GroupLeader() != noLeader {
} else if !n.Leaderless() {
js.setStreamAssignmentRecovering(sa)
}

Expand Down Expand Up @@ -4089,7 +4089,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
js.mu.RLock()
s := js.srv
node := sa.Group.node
hadLeader := node == nil || node.GroupLeader() != noLeader
hadLeader := node == nil || !node.Leaderless()
offline := s.allPeersOffline(sa.Group)
var isMetaLeader bool
if cc := js.cluster; cc != nil {
Expand Down Expand Up @@ -4436,7 +4436,11 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
if isConfigUpdate = !reflect.DeepEqual(&cfg, ca.Config); isConfigUpdate {
// Call into update, ignore consumer exists error here since this means an old deliver subject is bound
// which can happen on restart etc.
if err := o.updateConfig(ca.Config); err != nil && err != NewJSConsumerNameExistError() {
// JS lock needed as this can mutate the consumer assignments and race with updateInactivityThreshold.
js.mu.Lock()
err := o.updateConfig(ca.Config)
js.mu.Unlock()
if err != nil && err != NewJSConsumerNameExistError() {
// This is essentially an update that has failed. Respond back to metaleader if we are not recovering.
js.mu.RLock()
if !js.metaRecovering {
Expand Down Expand Up @@ -8545,13 +8549,13 @@ RETRY:
// the semaphore.
releaseSyncOutSem()

if n.GroupLeader() == _EMPTY_ {
if n.Leaderless() {
// Prevent us from spinning if we've installed a snapshot from a leader but there's no leader online.
// We wait a bit to check if a leader has come online in the meantime, if so we can continue.
var canContinue bool
if numRetries == 0 {
time.Sleep(startInterval)
canContinue = n.GroupLeader() != _EMPTY_
canContinue = !n.Leaderless()
}
if !canContinue {
return fmt.Errorf("%w for stream '%s > %s'", errCatchupAbortedNoLeader, mset.account(), mset.name())
Expand Down
2 changes: 1 addition & 1 deletion server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3901,7 +3901,7 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) {
Applied: n.applied,
CatchingUp: n.catchup != nil,
Leader: n.leader,
EverHadLeader: n.pleader,
EverHadLeader: n.pleader.Load(),
Term: n.term,
Vote: n.vote,
PTerm: n.pterm,
Expand Down
30 changes: 21 additions & 9 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type RaftNode interface {
Current() bool
Healthy() bool
Term() uint64
Leaderless() bool
GroupLeader() string
HadPreviousLeader() bool
StepDown(preferred ...string) error
Expand Down Expand Up @@ -180,9 +181,10 @@ type raft struct {
c *client // Internal client for subscriptions
js *jetStream // JetStream, if running, to see if we are out of resources

dflag bool // Debug flag
pleader bool // Has the group ever had a leader?
observer bool // The node is observing, i.e. not participating in voting
dflag bool // Debug flag
hasleader atomic.Bool // Is there a group leader right now?
pleader atomic.Bool // Has the group ever had a leader?
observer bool // The node is observing, i.e. not participating in voting

extSt extensionState // Extension state

Expand Down Expand Up @@ -935,7 +937,7 @@ func (n *raft) AdjustBootClusterSize(csz int) error {
n.Lock()
defer n.Unlock()

if n.leader != noLeader || n.pleader {
if n.leader != noLeader || n.pleader.Load() {
return errAdjustBootCluster
}
// Same floor as bootstrap.
Expand Down Expand Up @@ -1497,9 +1499,7 @@ func (n *raft) Healthy() bool {

// HadPreviousLeader indicates if this group ever had a leader.
func (n *raft) HadPreviousLeader() bool {
n.RLock()
defer n.RUnlock()
return n.pleader
return n.pleader.Load()
}

// GroupLeader returns the current leader of the group.
Expand All @@ -1512,6 +1512,17 @@ func (n *raft) GroupLeader() string {
return n.leader
}

// Leaderless is a lockless way of finding out if the group has a
// leader or not. Use instead of GroupLeader in hot paths.
func (n *raft) Leaderless() bool {
if n == nil {
return true
}
// Negated because we want the default state of hasLeader to be
// false until the first setLeader() call.
return !n.hasleader.Load()
}

// Guess the best next leader. Stepdown will check more thoroughly.
// Lock should be held.
func (n *raft) selectNextLeader() string {
Expand Down Expand Up @@ -3263,8 +3274,9 @@ func (n *raft) resetWAL() {
// Lock should be held
func (n *raft) updateLeader(newLeader string) {
n.leader = newLeader
if !n.pleader && newLeader != noLeader {
n.pleader = true
n.hasleader.Store(newLeader != _EMPTY_)
if !n.pleader.Load() && newLeader != noLeader {
n.pleader.Store(true)
}
}

Expand Down

0 comments on commit 587a4a4

Please sign in to comment.