Skip to content

Commit

Permalink
[FIXED] Reserved resources accounting after cluster reset
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen committed Feb 5, 2025
1 parent 29fbf46 commit 58cafce
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 1 deletion.
4 changes: 4 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type streamAssignment struct {
responded bool
recovering bool
reassigning bool // i.e. due to placement issues, lack of resources, etc.
resetting bool // i.e. there was an error, and we're stopping and starting the stream
err error
}

Expand Down Expand Up @@ -2932,6 +2933,9 @@ func (mset *stream) resetClusteredState(err error) bool {
}

s.Warnf("Resetting stream cluster state for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
// Mark stream assignment as resetting, so we don't double-account reserved resources.
// But only if we're not also releasing the resources as part of the delete.
sa.resetting = !shouldDelete
// Now wipe groups from assignments.
sa.Group.node = nil
var consumers []*consumerAssignment
Expand Down
58 changes: 58 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4224,6 +4224,64 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) {
}
}

func TestJetStreamClusterReservedResourcesAccountingAfterClusterReset(t *testing.T) {
for _, clusterResetErr := range []error{errLastSeqMismatch, errFirstSequenceMismatch} {
t.Run(clusterResetErr.Error(), func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

maxBytes := int64(1024 * 1024 * 1024)
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
MaxBytes: maxBytes,
})
require_NoError(t, err)

sl := c.streamLeader(globalAccountName, "TEST")

mem, store, err := sl.JetStreamReservedResources()
require_NoError(t, err)
require_Equal(t, mem, 0)
require_Equal(t, store, maxBytes)

acc, err := sl.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)

sjs := sl.getJetStream()
rn := mset.raftNode()
sa := mset.streamAssignment()
sjs.mu.RLock()
saGroupNode := sa.Group.node
sjs.mu.RUnlock()
require_NotNil(t, sa)
require_Equal(t, rn, saGroupNode)

require_True(t, mset.resetClusteredState(clusterResetErr))

checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
sjs.mu.RLock()
defer sjs.mu.RUnlock()
if sa.Group.node == nil || sa.Group.node == saGroupNode {
return errors.New("waiting for reset to complete")
}
return nil
})

mem, store, err = sl.JetStreamReservedResources()
require_NoError(t, err)
require_Equal(t, mem, 0)
require_Equal(t, store, maxBytes)
})
}
}

func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down
12 changes: 11 additions & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,15 +757,25 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
}

// Set our stream assignment if in clustered mode.
reserveResources := true
if sa != nil {
mset.setStreamAssignment(sa)

// If the stream is resetting we must not double-account resources, they were already accounted for.
js.mu.Lock()
if sa.resetting {
reserveResources, sa.resetting = false, false
}
js.mu.Unlock()
}

// Setup our internal send go routine.
mset.setupSendCapabilities()

// Reserve resources if MaxBytes present.
mset.js.reserveStreamResources(&mset.cfg)
if reserveResources {
mset.js.reserveStreamResources(&mset.cfg)
}

// Call directly to set leader if not in clustered mode.
// This can be called though before we actually setup clustering, so check both.
Expand Down

0 comments on commit 58cafce

Please sign in to comment.