Skip to content

Commit

Permalink
Group metalayer consumer assignments by stream
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
Co-authored-by: Maurice van Veen <[email protected]>
  • Loading branch information
neilalexander and MauriceVanVeen committed Nov 1, 2024
1 parent eda3396 commit 8558f13
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 13 deletions.
59 changes: 46 additions & 13 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,9 +1135,9 @@ func (js *jetStream) isMetaRecovering() bool {
// During recovery track any stream and consumer delete and update operations.
type recoveryUpdates struct {
removeStreams map[string]*streamAssignment
removeConsumers map[string]*consumerAssignment
removeConsumers map[string]map[string]*consumerAssignment
updateStreams map[string]*streamAssignment
updateConsumers map[string]*consumerAssignment
updateConsumers map[string]map[string]*consumerAssignment
}

// Called after recovery of the cluster on startup to check for any orphans.
Expand Down Expand Up @@ -1342,9 +1342,9 @@ func (js *jetStream) monitorCluster() {

ru := &recoveryUpdates{
removeStreams: make(map[string]*streamAssignment),
removeConsumers: make(map[string]*consumerAssignment),
removeConsumers: make(map[string]map[string]*consumerAssignment),
updateStreams: make(map[string]*streamAssignment),
updateConsumers: make(map[string]*consumerAssignment),
updateConsumers: make(map[string]map[string]*consumerAssignment),
}

// Make sure to cancel any pending checkForOrphans calls if the
Expand All @@ -1371,8 +1371,10 @@ func (js *jetStream) monitorCluster() {
// Signals we have replayed all of our metadata.
js.clearMetaRecovering()
// Process any removes that are still valid after recovery.
for _, ca := range ru.removeConsumers {
js.processConsumerRemoval(ca)
for _, cas := range ru.removeConsumers {
for _, ca := range cas {
js.processConsumerRemoval(ca)
}
}
for _, sa := range ru.removeStreams {
js.processStreamRemoval(sa)
Expand All @@ -1382,8 +1384,10 @@ func (js *jetStream) monitorCluster() {
js.processUpdateStreamAssignment(sa)
}
// Now consumers.
for _, ca := range ru.updateConsumers {
js.processConsumerAssignment(ca)
for _, cas := range ru.updateConsumers {
for _, ca := range cas {
js.processConsumerAssignment(ca)
}
}
// Clear.
ru = nil
Expand Down Expand Up @@ -1630,6 +1634,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
if isRecovering {
key := sa.recoveryKey()
ru.removeStreams[key] = sa
delete(ru.updateConsumers, key)
delete(ru.updateStreams, key)
} else {
js.processStreamRemoval(sa)
Expand Down Expand Up @@ -1665,7 +1670,11 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
js.setConsumerAssignmentRecovering(ca)
if isRecovering {
key := ca.recoveryKey()
ru.removeConsumers[key] = ca
skey := ca.streamRecoveryKey()
if _, ok := ru.removeConsumers[skey]; !ok {
ru.removeConsumers[skey] = map[string]*consumerAssignment{}
}
ru.removeConsumers[skey][key] = ca
delete(ru.updateConsumers, key)
} else {
js.processConsumerRemoval(ca)
Expand All @@ -1675,8 +1684,12 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
js.setConsumerAssignmentRecovering(ca)
if isRecovering {
key := ca.recoveryKey()
skey := ca.streamRecoveryKey()
delete(ru.removeConsumers, key)
ru.updateConsumers[key] = ca
if _, ok := ru.updateConsumers[skey]; !ok {
ru.updateConsumers[skey] = map[string]*consumerAssignment{}
}
ru.updateConsumers[skey][key] = ca
} else {
js.processConsumerAssignment(ca)
}
Expand Down Expand Up @@ -1889,6 +1902,13 @@ func (sa *streamAssignment) recoveryKey() string {
return sa.Client.serviceAccount() + ksep + sa.Config.Name
}

func (ca *consumerAssignment) streamRecoveryKey() string {
if ca == nil {
return _EMPTY_
}
return ca.Client.serviceAccount() + ksep + ca.Stream
}

func (ca *consumerAssignment) recoveryKey() string {
if ca == nil {
return _EMPTY_
Expand Down Expand Up @@ -1939,6 +1959,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
key := sa.recoveryKey()
ru.removeStreams[key] = sa
delete(ru.updateStreams, key)
delete(ru.updateConsumers, key)
} else {
js.processStreamRemoval(sa)
didRemoveStream = true
Expand All @@ -1952,8 +1973,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
key := ca.recoveryKey()
skey := ca.streamRecoveryKey()
delete(ru.removeConsumers, key)
ru.updateConsumers[key] = ca
if _, ok := ru.updateConsumers[skey]; !ok {
ru.updateConsumers[skey] = map[string]*consumerAssignment{}
}
ru.updateConsumers[skey][key] = ca
} else {
js.processConsumerAssignment(ca)
}
Expand All @@ -1966,8 +1991,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
key := ca.recoveryKey()
skey := ca.streamRecoveryKey()
delete(ru.removeConsumers, key)
ru.updateConsumers[key] = ca
if _, ok := ru.updateConsumers[skey]; !ok {
ru.updateConsumers[skey] = map[string]*consumerAssignment{}
}
ru.updateConsumers[skey][key] = ca
} else {
js.processConsumerAssignment(ca)
}
Expand All @@ -1980,7 +2009,11 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
key := ca.recoveryKey()
ru.removeConsumers[key] = ca
skey := ca.streamRecoveryKey()
if _, ok := ru.removeConsumers[skey]; !ok {
ru.removeConsumers[skey] = map[string]*consumerAssignment{}
}
ru.removeConsumers[skey][key] = ca
delete(ru.updateConsumers, key)
} else {
js.processConsumerRemoval(ca)
Expand Down
44 changes: 44 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6500,6 +6500,50 @@ func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) {
}
}

func TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

js := c.leader().getJetStream()

create := []*Entry{
{EntryNormal, encodeAddStreamAssignment(&streamAssignment{
Config: &StreamConfig{Name: "TEST", Storage: FileStorage},
})},
{EntryNormal, encodeAddConsumerAssignment(&consumerAssignment{
Stream: "TEST",
Config: &ConsumerConfig{Name: "consumer"},
})},
}

delete := []*Entry{
{EntryNormal, encodeDeleteStreamAssignment(&streamAssignment{
Config: &StreamConfig{Name: "TEST", Storage: FileStorage},
})},
}

// Need to be recovering so that we accumulate recoveryUpdates.
js.setMetaRecovering()
ru := &recoveryUpdates{
removeStreams: make(map[string]*streamAssignment),
removeConsumers: make(map[string]map[string]*consumerAssignment),
updateStreams: make(map[string]*streamAssignment),
updateConsumers: make(map[string]map[string]*consumerAssignment),
}

// Push recovery entries that create the stream & consumer.
_, _, _, err := js.applyMetaEntries(create, ru)
require_NoError(t, err)
require_Len(t, len(ru.updateConsumers), 1)

// Now push another recovery entry that deletes the stream. The
// entry that creates the consumer should now be gone.
_, _, _, err = js.applyMetaEntries(delete, ru)
require_NoError(t, err)
require_Len(t, len(ru.removeStreams), 1)
require_Len(t, len(ru.removeConsumers), 0)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down

0 comments on commit 8558f13

Please sign in to comment.