From 3edc0d18fa8d79610e4d51c7013afac7049a42d8 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 25 Nov 2024 10:19:54 +0100 Subject: [PATCH] NRG: empty snapshots dir if memory WAL Signed-off-by: Maurice van Veen --- server/raft.go | 12 +++++----- server/raft_helpers_test.go | 14 ++++++++---- server/raft_test.go | 45 +++++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 10 deletions(-) diff --git a/server/raft.go b/server/raft.go index edcd423f519..6f885c4cbd7 100644 --- a/server/raft.go +++ b/server/raft.go @@ -424,20 +424,20 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel n.vote = vote } - // Make sure that the snapshots directory exists. - if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), defaultDirPerms); err != nil { - return nil, fmt.Errorf("could not create snapshots directory - %v", err) - } - // Can't recover snapshots if memory based since wal will be reset. // We will inherit from the current leader. if _, ok := n.wal.(*memStore); ok { - os.Remove(filepath.Join(n.sd, snapshotsDir, "*")) + _ = os.RemoveAll(filepath.Join(n.sd, snapshotsDir)) } else { // See if we have any snapshots and if so load and process on startup. n.setupLastSnapshot() } + // Make sure that the snapshots directory exists. + if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), defaultDirPerms); err != nil { + return nil, fmt.Errorf("could not create snapshots directory - %v", err) + } + truncateAndErr := func(index uint64) { if err := n.wal.Truncate(index); err != nil { n.setWriteErr(err) diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index 837b434a7c8..218258a88db 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -319,6 +319,15 @@ func newStateAdder(s *Server, cfg *RaftConfig, n RaftNode) stateMachine { } func initSingleMemRaftNode(t *testing.T) (*raft, func()) { + t.Helper() + n, c := initSingleMemRaftNodeWithCluster(t) + cleanup := func() { + c.shutdown() + } + return n, cleanup +} + +func initSingleMemRaftNodeWithCluster(t *testing.T) (*raft, *cluster) { t.Helper() c := createJetStreamClusterExplicit(t, "R3S", 3) s := c.servers[0] // RunBasicJetStreamServer not available @@ -332,10 +341,7 @@ func initSingleMemRaftNode(t *testing.T) (*raft, func()) { n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{}) require_NoError(t, err) - cleanup := func() { - c.shutdown() - } - return n, cleanup + return n, c } // Encode an AppendEntry. diff --git a/server/raft_test.go b/server/raft_test.go index 41a130a4607..05a19bdd3ea 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1654,3 +1654,48 @@ func TestNRGForwardProposalResponse(t *testing.T) { rg.waitOnTotal(t, 123) } + +func TestNRGMemoryWALEmptiesSnapshotsDir(t *testing.T) { + n, c := initSingleMemRaftNodeWithCluster(t) + defer c.shutdown() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + + // Simply receive first message. + n.processAppendEntry(aeMsg, n.aesub) + require_Equal(t, n.pindex, 1) + require_Equal(t, n.commit, 0) + + // Heartbeat moves commit up. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 1) + + // Manually call back down to applied, and then snapshot. + n.Applied(1) + err := n.InstallSnapshot(nil) + require_NoError(t, err) + + // Stop current node and restart it. + n.Stop() + n.WaitForStop() + + s := c.servers[0] + ms, err := newMemStore(&StreamConfig{Name: "TEST", Storage: MemoryStorage}) + require_NoError(t, err) + cfg := &RaftConfig{Name: "TEST", Store: n.sd, Log: ms} + n, err = s.initRaftNode(globalAccountName, cfg, pprofLabels{}) + require_NoError(t, err) + + // Since the WAL is in-memory, the snapshots dir should've been emptied upon restart. + files, err := os.ReadDir(filepath.Join(n.sd, snapshotsDir)) + require_NoError(t, err) + require_Len(t, len(files), 0) +}