diff --git a/server/jetstream_api.go b/server/jetstream_api.go index e90f5f7b8b..6f9913b1f1 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -4709,7 +4709,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, // Since these could wait on the Raft group lock, don't do so under the JS lock. ourID := meta.ID() - groupLeader := meta.GroupLeader() + groupLeaderless := meta.Leaderless() groupCreated := meta.Created() js.mu.RLock() @@ -4727,7 +4727,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, // Also capture if we think there is no meta leader. var isLeaderLess bool if !isLeader { - isLeaderLess = groupLeader == _EMPTY_ && time.Since(groupCreated) > lostQuorumIntervalDefault + isLeaderLess = groupLeaderless && time.Since(groupCreated) > lostQuorumIntervalDefault } js.mu.RUnlock() @@ -4814,7 +4814,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, return } // If we are a member and we have a group leader or we had a previous leader consider bailing out. - if node.GroupLeader() != _EMPTY_ || node.HadPreviousLeader() { + if !node.Leaderless() || node.HadPreviousLeader() { if leaderNotPartOfGroup { resp.Error = NewJSConsumerOfflineError() s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay) @@ -4998,6 +4998,8 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account } nca := *ca + ncfg := *ca.Config + nca.Config = &ncfg js.mu.RUnlock() pauseUTC := req.PauseUntil.UTC() if !pauseUTC.IsZero() { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ef904e258b..efbb5c2a9d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -825,7 +825,7 @@ func (js *jetStream) isLeaderless() bool { // If we don't have a leader. // Make sure we have been running for enough time. - if meta.GroupLeader() == _EMPTY_ && time.Since(meta.Created()) > lostQuorumIntervalDefault { + if meta.Leaderless() && time.Since(meta.Created()) > lostQuorumIntervalDefault { return true } return false @@ -857,7 +857,7 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool { node := rg.node js.mu.RUnlock() // If we don't have a leader. - if node.GroupLeader() == _EMPTY_ { + if node.Leaderless() { // Threshold for jetstream startup. const startupThreshold = 10 * time.Second @@ -1073,7 +1073,7 @@ func (js *jetStream) checkForOrphans() { // We only want to cleanup any orphans if we know we are current with the meta-leader. meta := cc.meta - if meta == nil || meta.GroupLeader() == _EMPTY_ { + if meta == nil || meta.Leaderless() { js.mu.Unlock() s.Debugf("JetStream cluster skipping check for orphans, no meta-leader") return @@ -1375,7 +1375,7 @@ func (js *jetStream) monitorCluster() { // If we have a current leader or had one in the past we can cancel this here since the metaleader // will be in charge of all peer state changes. // For cold boot only. - if n.GroupLeader() != _EMPTY_ || n.HadPreviousLeader() { + if !n.Leaderless() || n.HadPreviousLeader() { lt.Stop() continue } @@ -2568,7 +2568,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Always cancel if this was running. stopDirectMonitoring() - } else if n.GroupLeader() != noLeader { + } else if !n.Leaderless() { js.setStreamAssignmentRecovering(sa) } @@ -4089,7 +4089,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, js.mu.RLock() s := js.srv node := sa.Group.node - hadLeader := node == nil || node.GroupLeader() != noLeader + hadLeader := node == nil || !node.Leaderless() offline := s.allPeersOffline(sa.Group) var isMetaLeader bool if cc := js.cluster; cc != nil { @@ -4436,7 +4436,11 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state if isConfigUpdate = !reflect.DeepEqual(&cfg, ca.Config); isConfigUpdate { // Call into update, ignore consumer exists error here since this means an old deliver subject is bound // which can happen on restart etc. - if err := o.updateConfig(ca.Config); err != nil && err != NewJSConsumerNameExistError() { + // JS lock needed as this can mutate the consumer assignments and race with updateInactivityThreshold. + js.mu.Lock() + err := o.updateConfig(ca.Config) + js.mu.Unlock() + if err != nil && err != NewJSConsumerNameExistError() { // This is essentially an update that has failed. Respond back to metaleader if we are not recovering. js.mu.RLock() if !js.metaRecovering { @@ -8545,13 +8549,13 @@ RETRY: // the semaphore. releaseSyncOutSem() - if n.GroupLeader() == _EMPTY_ { + if n.Leaderless() { // Prevent us from spinning if we've installed a snapshot from a leader but there's no leader online. // We wait a bit to check if a leader has come online in the meantime, if so we can continue. var canContinue bool if numRetries == 0 { time.Sleep(startInterval) - canContinue = n.GroupLeader() != _EMPTY_ + canContinue = !n.Leaderless() } if !canContinue { return fmt.Errorf("%w for stream '%s > %s'", errCatchupAbortedNoLeader, mset.account(), mset.name()) diff --git a/server/monitor.go b/server/monitor.go index df14a22c7c..1b19bb959b 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -3901,7 +3901,7 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) { Applied: n.applied, CatchingUp: n.catchup != nil, Leader: n.leader, - EverHadLeader: n.pleader, + EverHadLeader: n.pleader.Load(), Term: n.term, Vote: n.vote, PTerm: n.pterm, diff --git a/server/raft.go b/server/raft.go index 1c472fcaa1..9fec016920 100644 --- a/server/raft.go +++ b/server/raft.go @@ -52,6 +52,7 @@ type RaftNode interface { Current() bool Healthy() bool Term() uint64 + Leaderless() bool GroupLeader() string HadPreviousLeader() bool StepDown(preferred ...string) error @@ -180,9 +181,10 @@ type raft struct { c *client // Internal client for subscriptions js *jetStream // JetStream, if running, to see if we are out of resources - dflag bool // Debug flag - pleader bool // Has the group ever had a leader? - observer bool // The node is observing, i.e. not participating in voting + dflag bool // Debug flag + hasleader atomic.Bool // Is there a group leader right now? + pleader atomic.Bool // Has the group ever had a leader? + observer bool // The node is observing, i.e. not participating in voting extSt extensionState // Extension state @@ -935,7 +937,7 @@ func (n *raft) AdjustBootClusterSize(csz int) error { n.Lock() defer n.Unlock() - if n.leader != noLeader || n.pleader { + if n.leader != noLeader || n.pleader.Load() { return errAdjustBootCluster } // Same floor as bootstrap. @@ -1497,9 +1499,7 @@ func (n *raft) Healthy() bool { // HadPreviousLeader indicates if this group ever had a leader. func (n *raft) HadPreviousLeader() bool { - n.RLock() - defer n.RUnlock() - return n.pleader + return n.pleader.Load() } // GroupLeader returns the current leader of the group. @@ -1512,6 +1512,17 @@ func (n *raft) GroupLeader() string { return n.leader } +// Leaderless is a lockless way of finding out if the group has a +// leader or not. Use instead of GroupLeader in hot paths. +func (n *raft) Leaderless() bool { + if n == nil { + return true + } + // Negated because we want the default state of hasLeader to be + // false until the first setLeader() call. + return !n.hasleader.Load() +} + // Guess the best next leader. Stepdown will check more thoroughly. // Lock should be held. func (n *raft) selectNextLeader() string { @@ -3263,8 +3274,9 @@ func (n *raft) resetWAL() { // Lock should be held func (n *raft) updateLeader(newLeader string) { n.leader = newLeader - if !n.pleader && newLeader != noLeader { - n.pleader = true + n.hasleader.Store(newLeader != _EMPTY_) + if !n.pleader.Load() && newLeader != noLeader { + n.pleader.Store(true) } }