Skip to content

Commit

Permalink
NRG: Refactor shutdown, update switchState to CAS
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Nov 5, 2024
1 parent e4e3dee commit dc65149
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 95 deletions.
1 change: 1 addition & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3914,6 +3914,7 @@ func TestJetStreamClusterStreamNodeShutdownBugOnStop(t *testing.T) {
node.InstallSnapshot(mset.stateSnapshot())
// Stop the stream
mset.stop(false, false)
node.WaitForStop()

if numNodes := s.numRaftNodes(); numNodes != numNodesStart-1 {
t.Fatalf("RAFT nodes after stream stop incorrect: %d vs %d", numNodesStart, numNodes)
Expand Down
153 changes: 59 additions & 94 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ type RaftNode interface {
Stop()
WaitForStop()
Delete()
Wipe()
RecreateInternalSubs() error
}

Expand Down Expand Up @@ -400,7 +399,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
// If we fail to do this for some reason then this is fatal — we cannot
// continue setting up or the Raft node may be partially/totally isolated.
if err := n.RecreateInternalSubs(); err != nil {
n.shutdown(false)
n.shutdown()
return nil, err
}

Expand Down Expand Up @@ -1719,106 +1718,33 @@ func (n *raft) Created() time.Time {
}

func (n *raft) Stop() {
n.shutdown(false)
n.shutdown()
}

func (n *raft) WaitForStop() {
n.wg.Wait()
}

func (n *raft) Delete() {
n.shutdown(true)
}
n.shutdown()
n.wg.Wait()

func (n *raft) shutdown(shouldDelete bool) {
n.Lock()
defer n.Unlock()

// Returned swap value is the previous state. It looks counter-intuitive
// to do this atomic operation with the lock held, but we have to do so in
// order to make sure that switchState() is not already running. If it is
// then it can potentially update the n.state back to a non-closed state,
// allowing shutdown() to be called again. If that happens then the below
// close(n.quit) will panic from trying to close an already-closed channel.
if n.state.Swap(int32(Closed)) == int32(Closed) {
// If we get called again with shouldDelete, in case we were called first with Stop() cleanup
if shouldDelete {
n.wg.Wait()
if wal := n.wal; wal != nil {
wal.Delete()
}
os.RemoveAll(n.sd)
}
return
}

close(n.quit)
if c := n.c; c != nil {
var subs []*subscription
c.mu.Lock()
for _, sub := range c.subs {
subs = append(subs, sub)
}
c.mu.Unlock()
for _, sub := range subs {
n.unsubscribe(sub)
}
c.closeConnection(InternalClient)
n.c = nil
}

// Wait for goroutines to shut down before trying to clean up
// the log below, otherwise we might end up deleting the store
// from underneath running goroutines.
n.Unlock()
n.wg.Wait()
n.Lock()

s, g, wal := n.s, n.group, n.wal

// Unregistering ipQueues do not prevent them from push/pop
// just will remove them from the central monitoring map
queues := []interface {
unregister()
drain()
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
for _, q := range queues {
q.drain()
q.unregister()
}
sd := n.sd

s.unregisterRaftNode(g)

if wal != nil {
if shouldDelete {
wal.Delete()
} else {
wal.Stop()
}
}

if shouldDelete {
// Delete all our peer state and vote state and any snapshots.
os.RemoveAll(sd)
n.debug("Deleted")
} else {
n.debug("Shutdown")
if wal := n.wal; wal != nil {
wal.Delete()
}
os.RemoveAll(n.sd)
n.debug("Deleted")
}

// Wipe will force an on disk state reset and then call Delete().
// Useful in case we have been stopped before this point.
func (n *raft) Wipe() {
n.RLock()
wal := n.wal
n.RUnlock()
// Delete our underlying storage.
if wal != nil {
wal.Delete()
func (n *raft) shutdown() {
// First call to Stop or Delete should close the quit chan
// to notify the runAs goroutines to stop what they're doing.
if n.state.Swap(int32(Closed)) != int32(Closed) {
close(n.quit)
}
// Now call delete.
n.Delete()
}

const (
Expand Down Expand Up @@ -1971,6 +1897,7 @@ func (n *raft) run() {
// Send nil entry to signal the upper layers we are done doing replay/restore.
n.apply.push(nil)

runner:
for s.isRunning() {
switch n.State() {
case Follower:
Expand All @@ -1980,9 +1907,46 @@ func (n *raft) run() {
case Leader:
n.runAsLeader()
case Closed:
return
break runner
}
}

// If we've reached this point then we're shutting down.
n.Lock()
defer n.Unlock()

if c := n.c; c != nil {
var subs []*subscription
c.mu.Lock()
for _, sub := range c.subs {
subs = append(subs, sub)
}
c.mu.Unlock()
for _, sub := range subs {
n.unsubscribe(sub)
}
c.closeConnection(InternalClient)
n.c = nil
}

// Unregistering ipQueues do not prevent them from push/pop
// just will remove them from the central monitoring map
queues := []interface {
unregister()
drain()
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
for _, q := range queues {
q.drain()
q.unregister()
}

n.s.unregisterRaftNode(n.group)

if wal := n.wal; wal != nil {
wal.Stop()
}

n.debug("Shutdown")
}

func (n *raft) debug(format string, args ...any) {
Expand Down Expand Up @@ -2077,7 +2041,6 @@ func (n *raft) runAsFollower() {
n.processAppendEntries()
case <-n.s.quitCh:
// The server is shutting down.
n.shutdown(false)
return
case <-n.quit:
// The Raft node is shutting down.
Expand Down Expand Up @@ -2490,7 +2453,6 @@ func (n *raft) runAsLeader() {
for n.State() == Leader {
select {
case <-n.s.quitCh:
n.shutdown(false)
return
case <-n.quit:
return
Expand Down Expand Up @@ -2686,7 +2648,6 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64
for n.Leader() {
select {
case <-n.s.quitCh:
n.shutdown(false)
return
case <-n.quit:
return
Expand Down Expand Up @@ -3081,7 +3042,6 @@ func (n *raft) runAsCandidate() {
// Ignore proposals received from before the state change.
n.prop.drain()
case <-n.s.quitCh:
n.shutdown(false)
return
case <-n.quit:
return
Expand Down Expand Up @@ -4185,15 +4145,20 @@ func (n *raft) updateLeadChange(isLeader bool) {

// Lock should be held.
func (n *raft) switchState(state RaftState) {
retry:
pstate := n.State()
if pstate == Closed {
return
}

// Set our state. If something else has changed our state
// then retry, this will either be a Stop or Delete call.
if !n.state.CompareAndSwap(int32(pstate), int32(state)) {
goto retry
}

// Reset the election timer.
n.resetElectionTimeout()
// Set our state.
n.state.Store(int32(state))

if pstate == Leader && state != Leader {
n.updateLeadChange(false)
Expand Down
19 changes: 18 additions & 1 deletion server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,9 @@ func TestNRGSystemClientCleanupFromAccount(t *testing.T) {
for _, sm := range rg {
sm.node().Stop()
}
for _, sm := range rg {
sm.node().WaitForStop()
}
}
finish := numClients()
require_Equal(t, start, finish)
Expand Down Expand Up @@ -988,7 +991,7 @@ func TestNRGWALEntryWithoutQuorumMustTruncate(t *testing.T) {
require_NoError(t, err)

// Stop the leader so it moves to another one.
n.shutdown(false)
n.shutdown()

// Wait for another leader to be picked
rg.waitOnLeader()
Expand Down Expand Up @@ -1650,3 +1653,17 @@ func TestNRGCancelCatchupWhenDetectingHigherTermDuringVoteRequest(t *testing.T)
require_NoError(t, err)
require_True(t, n.catchup == nil)
}

func TestNRGMultipleStopsDontPanic(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

defer func() {
p := recover()
require_True(t, p == nil)
}()

for i := 0; i < 10; i++ {
n.Stop()
}
}

0 comments on commit dc65149

Please sign in to comment.