Skip to content

Commit

Permalink
Fix install stream snapshots on graceful shutdown (#5809)
Browse files Browse the repository at this point in the history
Upon graceful shutdown, stream snapshots aren't installed:
stream.go
```go
} else {
	// Always attempt snapshot on clean exit.
	n.InstallSnapshot(mset.stateSnapshotLocked())
	n.Stop()
}
```

But raft.go exits immediately since it's in Closed state
```go
if n.State() == Closed {
        return errNodeClosed
}
```

This turns out to be because of calling either of `close(mset.mqch)` or
`close(mset.qch)`. Which then calls the following upon leaving
`monitorStream`:
```go
	// Make sure to stop the raft group on exit to prevent accidental memory bloat.
	// This should be below the checkInMonitor call though to avoid stopping it out
	// from underneath the one that is running since it will be the same raft node.
	defer n.Stop()
```

This PR proposes to skip running `n.Stop()` from `monitorStream` when we
know we're closing / have called `mset.stop()`. Which will already take
care of calling either `n.Stop()` or `n.Delete()` so there's no need in
stopping from `monitorStream` in that case.

TLDR; this ensures stream snapshots are installed upon shutdown.

Signed-off-by: Maurice van Veen <[email protected]>

---------

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen authored Aug 20, 2024
1 parent 89b042d commit 276e289
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
12 changes: 7 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2261,7 +2261,12 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Make sure to stop the raft group on exit to prevent accidental memory bloat.
// This should be below the checkInMonitor call though to avoid stopping it out
// from underneath the one that is running since it will be the same raft node.
defer n.Stop()
defer func() {
// We might be closing during shutdown, don't pre-emptively stop here since we'll still want to install snapshots.
if !mset.closed.Load() {
n.Stop()
}
}()

qch, mqch, lch, aq, uch, ourPeerId := n.QuitC(), mset.monitorQuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID()

Expand Down Expand Up @@ -3133,13 +3138,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}
}

if !isRecovering && !mset.IsLeader() {
if isRecovering || !mset.IsLeader() {
if err := mset.processSnapshot(ss); err != nil {
return err
}
} else if isRecovering {
// On recovery, reset CLFS/FAILED.
mset.setCLFS(ss.Failed)
}
} else if e.Type == EntryRemovePeer {
js.mu.RLock()
Expand Down
7 changes: 4 additions & 3 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5340,6 +5340,10 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {

// Kick monitor and collect consumers first.
mset.mu.Lock()

// Mark closed.
mset.closed.Store(true)

// Signal to the monitor loop.
// Can't use qch here.
if mset.mqch != nil {
Expand Down Expand Up @@ -5400,9 +5404,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.sendDeleteAdvisoryLocked()
}

// Mark closed.
mset.closed.Store(true)

// Quit channel, do this after sending the delete advisory
if mset.qch != nil {
close(mset.qch)
Expand Down

0 comments on commit 276e289

Please sign in to comment.