Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NRG: Lockless Leaderless and HadPreviousLeader #6438

Merged
merged 2 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4709,7 +4709,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,

// Since these could wait on the Raft group lock, don't do so under the JS lock.
ourID := meta.ID()
groupLeader := meta.GroupLeader()
groupLeaderless := meta.Leaderless()
groupCreated := meta.Created()

js.mu.RLock()
Expand All @@ -4727,7 +4727,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
// Also capture if we think there is no meta leader.
var isLeaderLess bool
if !isLeader {
isLeaderLess = groupLeader == _EMPTY_ && time.Since(groupCreated) > lostQuorumIntervalDefault
isLeaderLess = groupLeaderless && time.Since(groupCreated) > lostQuorumIntervalDefault
}
js.mu.RUnlock()

Expand Down Expand Up @@ -4814,7 +4814,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
return
}
// If we are a member and we have a group leader or we had a previous leader consider bailing out.
if node.GroupLeader() != _EMPTY_ || node.HadPreviousLeader() {
if !node.Leaderless() || node.HadPreviousLeader() {
if leaderNotPartOfGroup {
resp.Error = NewJSConsumerOfflineError()
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
Expand Down Expand Up @@ -4998,6 +4998,8 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
}

nca := *ca
ncfg := *ca.Config
nca.Config = &ncfg
js.mu.RUnlock()
pauseUTC := req.PauseUntil.UTC()
if !pauseUTC.IsZero() {
Expand Down
22 changes: 13 additions & 9 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ func (js *jetStream) isLeaderless() bool {

// If we don't have a leader.
// Make sure we have been running for enough time.
if meta.GroupLeader() == _EMPTY_ && time.Since(meta.Created()) > lostQuorumIntervalDefault {
if meta.Leaderless() && time.Since(meta.Created()) > lostQuorumIntervalDefault {
return true
}
return false
Expand Down Expand Up @@ -857,7 +857,7 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool {
node := rg.node
js.mu.RUnlock()
// If we don't have a leader.
if node.GroupLeader() == _EMPTY_ {
if node.Leaderless() {
// Threshold for jetstream startup.
const startupThreshold = 10 * time.Second

Expand Down Expand Up @@ -1073,7 +1073,7 @@ func (js *jetStream) checkForOrphans() {

// We only want to cleanup any orphans if we know we are current with the meta-leader.
meta := cc.meta
if meta == nil || meta.GroupLeader() == _EMPTY_ {
if meta == nil || meta.Leaderless() {
js.mu.Unlock()
s.Debugf("JetStream cluster skipping check for orphans, no meta-leader")
return
Expand Down Expand Up @@ -1375,7 +1375,7 @@ func (js *jetStream) monitorCluster() {
// If we have a current leader or had one in the past we can cancel this here since the metaleader
// will be in charge of all peer state changes.
// For cold boot only.
if n.GroupLeader() != _EMPTY_ || n.HadPreviousLeader() {
if !n.Leaderless() || n.HadPreviousLeader() {
lt.Stop()
continue
}
Expand Down Expand Up @@ -2568,7 +2568,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Always cancel if this was running.
stopDirectMonitoring()

} else if n.GroupLeader() != noLeader {
} else if !n.Leaderless() {
js.setStreamAssignmentRecovering(sa)
}

Expand Down Expand Up @@ -4089,7 +4089,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
js.mu.RLock()
s := js.srv
node := sa.Group.node
hadLeader := node == nil || node.GroupLeader() != noLeader
hadLeader := node == nil || !node.Leaderless()
offline := s.allPeersOffline(sa.Group)
var isMetaLeader bool
if cc := js.cluster; cc != nil {
Expand Down Expand Up @@ -4436,7 +4436,11 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
if isConfigUpdate = !reflect.DeepEqual(&cfg, ca.Config); isConfigUpdate {
// Call into update, ignore consumer exists error here since this means an old deliver subject is bound
// which can happen on restart etc.
if err := o.updateConfig(ca.Config); err != nil && err != NewJSConsumerNameExistError() {
// JS lock needed as this can mutate the consumer assignments and race with updateInactivityThreshold.
js.mu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These suppose to be part of this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something about the changing in the locking caused this race condition to hit far more often, to the point that it was taking several attempts of kicking CI.

I've kept it isolated in a second commit though, as that change won't be relevant for a 2.10.x backport.

err := o.updateConfig(ca.Config)
js.mu.Unlock()
if err != nil && err != NewJSConsumerNameExistError() {
// This is essentially an update that has failed. Respond back to metaleader if we are not recovering.
js.mu.RLock()
if !js.metaRecovering {
Expand Down Expand Up @@ -8545,13 +8549,13 @@ RETRY:
// the semaphore.
releaseSyncOutSem()

if n.GroupLeader() == _EMPTY_ {
if n.Leaderless() {
// Prevent us from spinning if we've installed a snapshot from a leader but there's no leader online.
// We wait a bit to check if a leader has come online in the meantime, if so we can continue.
var canContinue bool
if numRetries == 0 {
time.Sleep(startInterval)
canContinue = n.GroupLeader() != _EMPTY_
canContinue = !n.Leaderless()
}
if !canContinue {
return fmt.Errorf("%w for stream '%s > %s'", errCatchupAbortedNoLeader, mset.account(), mset.name())
Expand Down
2 changes: 1 addition & 1 deletion server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3901,7 +3901,7 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) {
Applied: n.applied,
CatchingUp: n.catchup != nil,
Leader: n.leader,
EverHadLeader: n.pleader,
EverHadLeader: n.pleader.Load(),
Term: n.term,
Vote: n.vote,
PTerm: n.pterm,
Expand Down
30 changes: 21 additions & 9 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type RaftNode interface {
Current() bool
Healthy() bool
Term() uint64
Leaderless() bool
GroupLeader() string
HadPreviousLeader() bool
StepDown(preferred ...string) error
Expand Down Expand Up @@ -180,9 +181,10 @@ type raft struct {
c *client // Internal client for subscriptions
js *jetStream // JetStream, if running, to see if we are out of resources

dflag bool // Debug flag
pleader bool // Has the group ever had a leader?
observer bool // The node is observing, i.e. not participating in voting
dflag bool // Debug flag
hasleader atomic.Bool // Is there a group leader right now?
pleader atomic.Bool // Has the group ever had a leader?
observer bool // The node is observing, i.e. not participating in voting

extSt extensionState // Extension state

Expand Down Expand Up @@ -935,7 +937,7 @@ func (n *raft) AdjustBootClusterSize(csz int) error {
n.Lock()
defer n.Unlock()

if n.leader != noLeader || n.pleader {
if n.leader != noLeader || n.pleader.Load() {
return errAdjustBootCluster
}
// Same floor as bootstrap.
Expand Down Expand Up @@ -1497,9 +1499,7 @@ func (n *raft) Healthy() bool {

// HadPreviousLeader indicates if this group ever had a leader.
func (n *raft) HadPreviousLeader() bool {
n.RLock()
defer n.RUnlock()
return n.pleader
return n.pleader.Load()
}

// GroupLeader returns the current leader of the group.
Expand All @@ -1512,6 +1512,17 @@ func (n *raft) GroupLeader() string {
return n.leader
}

// Leaderless is a lockless way of finding out if the group has a
// leader or not. Use instead of GroupLeader in hot paths.
func (n *raft) Leaderless() bool {
if n == nil {
return true
}
// Negated because we want the default state of hasLeader to be
// false until the first setLeader() call.
return !n.hasleader.Load()
}

// Guess the best next leader. Stepdown will check more thoroughly.
// Lock should be held.
func (n *raft) selectNextLeader() string {
Expand Down Expand Up @@ -3263,8 +3274,9 @@ func (n *raft) resetWAL() {
// Lock should be held
func (n *raft) updateLeader(newLeader string) {
n.leader = newLeader
if !n.pleader && newLeader != noLeader {
n.pleader = true
n.hasleader.Store(newLeader != _EMPTY_)
if !n.pleader.Load() && newLeader != noLeader {
n.pleader.Store(true)
}
}

Expand Down