From 04339d580327c15ae02a8588b0fe9159c0f83b88 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 13 Dec 2024 18:23:49 +0100 Subject: [PATCH 1/6] Revert "[FIXED] ss.Last was not kept up-to-date" This reverts commit 0630752e4870092af94ed9c44e483e305fe70964. --- server/filestore.go | 145 ++++++++++++--------------------------- server/filestore_test.go | 2 +- server/memstore.go | 72 +++++++------------ server/store.go | 2 - server/store_test.go | 39 +++-------- 5 files changed, 79 insertions(+), 181 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index de0dbda7e9c..f50b54abee0 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2315,8 +2315,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor fseq = lseq + 1 for _, subj := range subs { ss, _ := mb.fss.Find(stringToBytes(subj)) - if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { - mb.recalculateForSubj(subj, ss) + if ss != nil && ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } if ss == nil || start > ss.Last || ss.First >= fseq { continue @@ -2445,8 +2445,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( // If we already found a partial then don't do anything else. return } - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - mb.recalculateForSubj(bytesToString(bsubj), ss) + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss) } if sseq <= ss.First { update(ss) @@ -2745,8 +2745,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { mb.lsts = time.Now().UnixNano() mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) { subj := string(bsubj) - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - mb.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } oss := fss[subj] if oss.First == 0 { // New @@ -2936,8 +2936,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) return } subj := bytesToString(bsubj) - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - mb.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } if sseq <= ss.First { t += ss.Msgs @@ -3224,8 +3224,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo // If we already found a partial then don't do anything else. return } - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - mb.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } if sseq <= ss.First { t += ss.Msgs @@ -3898,8 +3898,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { info.fblk = i } } - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - mb.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } mb.mu.Unlock() // Re-acquire fs lock @@ -4030,8 +4030,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { mb.mu.Lock() mb.ensurePerSubjectInfoLoaded() ss, ok := mb.fss.Find(stringToBytes(subj)) - if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { - mb.recalculateForSubj(subj, ss) + if ok && ss != nil && ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } mb.mu.Unlock() if ss == nil { @@ -7832,14 +7832,13 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { ss.Msgs-- - // We can lazily calculate the first/last sequence when needed. + // We can lazily calculate the first sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate - ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate } -// Will recalculate the first and/or last sequence for this subject in this block. +// Will recalulate the first sequence for this subject in this block. // Will avoid slower path message lookups and scan the cache directly instead. -func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { +func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) { // Need to make sure messages are loaded. if mb.cacheNotLoaded() { if err := mb.loadMsgsWithLock(); err != nil { @@ -7847,100 +7846,46 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { } } - startSlot := int(ss.First - mb.cache.fseq) - if startSlot < 0 { - startSlot = 0 - } + // Mark first as updated. + ss.firstNeedsUpdate = false + + startSlot := int(startSeq - mb.cache.fseq) if startSlot >= len(mb.cache.idx) { ss.First = ss.Last return - } - endSlot := int(ss.Last - mb.cache.fseq) - if endSlot < 0 { - endSlot = 0 - } - if endSlot >= len(mb.cache.idx) || startSlot > endSlot { - return + } else if startSlot < 0 { + startSlot = 0 } + fseq := startSeq + 1 + if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq { + fseq = mbFseq + } var le = binary.LittleEndian - if ss.firstNeedsUpdate { - // Mark first as updated. - ss.firstNeedsUpdate = false - - fseq := ss.First + 1 - if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq { - fseq = mbFseq - } - for slot := startSlot; slot < len(mb.cache.idx); slot++ { - bi := mb.cache.idx[slot] &^ hbit - if bi == dbit { - // delete marker so skip. - continue - } - li := int(bi) - mb.cache.off - if li >= len(mb.cache.buf) { - ss.First = ss.Last - return - } - buf := mb.cache.buf[li:] - hdr := buf[:msgHdrSize] - slen := int(le.Uint16(hdr[20:])) - if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { - seq := le.Uint64(hdr[4:]) - if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) { - continue - } - ss.First = seq - if ss.Msgs == 1 { - ss.Last = seq - ss.lastNeedsUpdate = false - return - } - // Skip the start slot ahead, if we need to recalculate last we can stop early. - startSlot = slot - break - } + for slot := startSlot; slot < len(mb.cache.idx); slot++ { + bi := mb.cache.idx[slot] &^ hbit + if bi == dbit { + // delete marker so skip. + continue } - } - if ss.lastNeedsUpdate { - // Mark last as updated. - ss.lastNeedsUpdate = false - - lseq := ss.Last - 1 - if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq { - lseq = mbLseq + li := int(bi) - mb.cache.off + if li >= len(mb.cache.buf) { + ss.First = ss.Last + return } - for slot := endSlot; slot >= startSlot; slot-- { - bi := mb.cache.idx[slot] &^ hbit - if bi == dbit { - // delete marker so skip. + buf := mb.cache.buf[li:] + hdr := buf[:msgHdrSize] + slen := int(le.Uint16(hdr[20:])) + if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { + seq := le.Uint64(hdr[4:]) + if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) { continue } - li := int(bi) - mb.cache.off - if li >= len(mb.cache.buf) { - // Can't overwrite ss.Last, just skip. - return - } - buf := mb.cache.buf[li:] - hdr := buf[:msgHdrSize] - slen := int(le.Uint16(hdr[20:])) - if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { - seq := le.Uint64(hdr[4:]) - if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) { - continue - } - // Sequence should never be lower, but guard against it nonetheless. - if seq < ss.First { - seq = ss.First - } + ss.First = seq + if ss.Msgs == 1 { ss.Last = seq - if ss.Msgs == 1 { - ss.First = seq - ss.firstNeedsUpdate = false - } - return } + return } } } diff --git a/server/filestore_test.go b/server/filestore_test.go index 458cef7a740..1be968f8b11 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5030,7 +5030,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) { mb.clearCacheAndOffset() // Now call with start sequence of 1, the old one // This will panic without the fix. - mb.recalculateForSubj("foo", ss) + mb.recalculateFirstForSubj("foo", 1, ss) // Make sure it was update properly. require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false}) } diff --git a/server/memstore.go b/server/memstore.go index 350cfa388e9..c3cb7d0b661 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -143,8 +143,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int return ErrMaxBytes } // If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room. - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - ms.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + ms.recalculateFirstForSubj(subj, ss.First, ss) } sm, ok := ms.msgs[ss.First] if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) { @@ -430,8 +430,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje var totalSkipped uint64 // We will track start and end sequences as we go. ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) { - if fss.firstNeedsUpdate || fss.lastNeedsUpdate { - ms.recalculateForSubj(bytesToString(subj), fss) + if fss.firstNeedsUpdate { + ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) } if sseq <= fss.First { update(fss) @@ -585,8 +585,8 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState { fss := make(map[string]SimpleState) ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) { subjs := string(subj) - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - ms.recalculateForSubj(subjs, ss) + if ss.firstNeedsUpdate { + ms.recalculateFirstForSubj(subjs, ss.First, ss) } oss := fss[subjs] if oss.First == 0 { // New @@ -675,8 +675,8 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo var totalSkipped uint64 // We will track start and end sequences as we go. IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) { - if fss.firstNeedsUpdate || fss.lastNeedsUpdate { - ms.recalculateForSubj(bytesToString(subj), fss) + if fss.firstNeedsUpdate { + ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) } if sseq <= fss.First { update(fss) @@ -793,8 +793,8 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) { return } for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs { - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - ms.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + ms.recalculateFirstForSubj(subj, ss.First, ss) } if !ms.removeMsg(ss.First, false) { break @@ -1267,8 +1267,8 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store if !ok { continue } - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - ms.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + ms.recalculateFirstForSubj(subj, ss.First, ss) } if ss.First < fseq { fseq = ss.First @@ -1362,47 +1362,25 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { } ss.Msgs-- - // We can lazily calculate the first/last sequence when needed. + // We can lazily calculate the first sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate - ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate } -// Will recalculate the first and/or last sequence for this subject. +// Will recalculate the first sequence for this subject in this block. // Lock should be held. -func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) { - if ss.firstNeedsUpdate { - tseq := ss.First + 1 - if tseq < ms.state.FirstSeq { - tseq = ms.state.FirstSeq - } - for ; tseq <= ss.Last; tseq++ { - if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { - ss.First = tseq - ss.firstNeedsUpdate = false - if ss.Msgs == 1 { - ss.Last = tseq - ss.lastNeedsUpdate = false - return - } - break - } - } - } - if ss.lastNeedsUpdate { - tseq := ss.Last - 1 - if tseq > ms.state.LastSeq { - tseq = ms.state.LastSeq - } - for ; tseq >= ss.First; tseq-- { - if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { +func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) { + tseq := startSeq + 1 + if tseq < ms.state.FirstSeq { + tseq = ms.state.FirstSeq + } + for ; tseq <= ss.Last; tseq++ { + if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { + ss.First = tseq + if ss.Msgs == 1 { ss.Last = tseq - ss.lastNeedsUpdate = false - if ss.Msgs == 1 { - ss.First = tseq - ss.firstNeedsUpdate = false - } - return } + ss.firstNeedsUpdate = false + return } } } diff --git a/server/store.go b/server/store.go index 1c8f7f7ec1f..72e039816e9 100644 --- a/server/store.go +++ b/server/store.go @@ -166,8 +166,6 @@ type SimpleState struct { // Internal usage for when the first needs to be updated before use. firstNeedsUpdate bool - // Internal usage for when the last needs to be updated before use. - lastNeedsUpdate bool } // LostStreamData indicates msgs that have been lost. diff --git a/server/store_test.go b/server/store_test.go index a916ceedb89..19d4c0251f0 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -152,19 +152,6 @@ func TestStoreSubjectStateConsistency(t *testing.T) { ss := fs.SubjectsState("foo") return ss["foo"] } - var smp StoreMsg - expectFirstSeq := func(eseq uint64) { - t.Helper() - sm, _, err := fs.LoadNextMsg("foo", false, 0, &smp) - require_NoError(t, err) - require_Equal(t, sm.seq, eseq) - } - expectLastSeq := func(eseq uint64) { - t.Helper() - sm, err := fs.LoadLastMsg("foo", &smp) - require_NoError(t, err) - require_Equal(t, sm.seq, eseq) - } // Publish an initial batch of messages. for i := 0; i < 4; i++ { @@ -176,9 +163,7 @@ func TestStoreSubjectStateConsistency(t *testing.T) { ss := getSubjectState() require_Equal(t, ss.Msgs, 4) require_Equal(t, ss.First, 1) - expectFirstSeq(1) require_Equal(t, ss.Last, 4) - expectLastSeq(4) // Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate. removed, err := fs.RemoveMsg(1) @@ -189,35 +174,29 @@ func TestStoreSubjectStateConsistency(t *testing.T) { ss = getSubjectState() require_Equal(t, ss.Msgs, 3) require_Equal(t, ss.First, 2) - expectFirstSeq(2) require_Equal(t, ss.Last, 4) - expectLastSeq(4) - // Remove last message, ss.Last is lazy so will only mark ss.lastNeedsUpdate. + // Remove last message. removed, err = fs.RemoveMsg(4) require_NoError(t, err) require_True(t, removed) - // Will update last, so corrects to 3. + // ss.Last is lazy, just like ss.First, but it's not recalculated. Only total msg count decreases. ss = getSubjectState() require_Equal(t, ss.Msgs, 2) require_Equal(t, ss.First, 2) - expectFirstSeq(2) - require_Equal(t, ss.Last, 3) - expectLastSeq(3) + require_Equal(t, ss.Last, 4) // Remove first message again. removed, err = fs.RemoveMsg(2) require_NoError(t, err) require_True(t, removed) - // Since we only have one message left, must update ss.First and ensure ss.Last equals. + // Since we only have one message left, must update ss.First and set ss.Last to equal. ss = getSubjectState() require_Equal(t, ss.Msgs, 1) require_Equal(t, ss.First, 3) - expectFirstSeq(3) require_Equal(t, ss.Last, 3) - expectLastSeq(3) // Publish some more messages so we can test another scenario. for i := 0; i < 3; i++ { @@ -229,9 +208,7 @@ func TestStoreSubjectStateConsistency(t *testing.T) { ss = getSubjectState() require_Equal(t, ss.Msgs, 4) require_Equal(t, ss.First, 3) - expectFirstSeq(3) require_Equal(t, ss.Last, 7) - expectLastSeq(7) // Remove last sequence, ss.Last is lazy so doesn't get updated. removed, err = fs.RemoveMsg(7) @@ -243,18 +220,18 @@ func TestStoreSubjectStateConsistency(t *testing.T) { require_NoError(t, err) require_True(t, removed) - // Remove (now) first sequence. Both ss.First and ss.Last are lazy and both need to be recalculated later. + // Remove (now) first sequence, but because ss.First is lazy we first need to recalculate + // to know seq 5 became ss.First. And since we're removing seq 5 we need to recalculate ss.First + // yet again, since ss.Last is lazy and is not correct. removed, err = fs.RemoveMsg(5) require_NoError(t, err) require_True(t, removed) - // ss.First and ss.Last should both be recalculated and equal each other. + // ss.First should equal ss.Last, last should have been updated now. ss = getSubjectState() require_Equal(t, ss.Msgs, 1) require_Equal(t, ss.First, 6) - expectFirstSeq(6) require_Equal(t, ss.Last, 6) - expectLastSeq(6) }, ) } From 553c6701d6229a032367e8306c89c9f041b2e868 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Fri, 13 Dec 2024 16:57:16 +0000 Subject: [PATCH 2/6] Update `nats-io/nkeys` to v0.4.9 Signed-off-by: Neil Twigg --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 1ee0d5d86fd..ad806354b14 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/minio/highwayhash v1.0.3 github.com/nats-io/jwt/v2 v2.5.8 github.com/nats-io/nats.go v1.36.0 - github.com/nats-io/nkeys v0.4.8 + github.com/nats-io/nkeys v0.4.9 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 golang.org/x/crypto v0.31.0 diff --git a/go.sum b/go.sum index 51b9fae1322..ee58f250c66 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,8 @@ github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= -github.com/nats-io/nkeys v0.4.8 h1:+wee30071y3vCZAYRsnrmIPaOe47A/SkK/UBDPdIV70= -github.com/nats-io/nkeys v0.4.8/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= +github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= From 14af79f4e2f44bbf9679f2f371fa1f0a22b2d113 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 13 Dec 2024 11:07:55 -0800 Subject: [PATCH 3/6] Update nats-io/jwt version Signed-off-by: Waldemar Quevedo --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index ad806354b14..69dccd5438c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21.0 require ( github.com/klauspost/compress v1.17.11 github.com/minio/highwayhash v1.0.3 - github.com/nats-io/jwt/v2 v2.5.8 + github.com/nats-io/jwt/v2 v2.7.3 github.com/nats-io/nats.go v1.36.0 github.com/nats-io/nkeys v0.4.9 github.com/nats-io/nuid v1.0.1 diff --git a/go.sum b/go.sum index ee58f250c66..2406dda71a7 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,8 @@ github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IX github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= -github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= -github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= +github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= From 3d2a3b381668d4c0da3f445059716ddff3990486 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 13 Dec 2024 11:47:58 -0800 Subject: [PATCH 4/6] Update go.mod version Signed-off-by: Waldemar Quevedo --- go.mod | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 69dccd5438c..6ec47234550 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/nats-io/nats-server/v2 -go 1.21.0 +go 1.22 + +toolchain go1.22.8 require ( github.com/klauspost/compress v1.17.11 From b606d5b5e586e02cbaeeea3be38ec97af14994d4 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 10 Dec 2024 20:57:51 +0100 Subject: [PATCH 5/6] De-flake RAFT state adder tests Signed-off-by: Maurice van Veen --- server/raft_helpers_test.go | 6 ++++++ server/raft_test.go | 5 +++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index 218258a88db..662a6e94603 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -246,6 +246,7 @@ func (a *stateAdder) stop() { a.Lock() defer a.Unlock() a.n.Stop() + a.n.WaitForStop() } // Restart the group @@ -273,6 +274,11 @@ func (a *stateAdder) restart() { panic(err) } + // Must reset in-memory state. + // A real restart would not preserve it, but more importantly we have no way to detect if we + // already applied an entry. So, the sum must only be updated based on append entries or snapshots. + a.sum = 0 + a.n, err = a.s.startRaftNode(globalAccountName, a.cfg, pprofLabels{}) if err != nil { panic(err) diff --git a/server/raft_test.go b/server/raft_test.go index 0f30dec288e..df028fc5485 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -605,8 +605,9 @@ func TestNRGHeartbeatOnLeaderChange(t *testing.T) { leader := rg.leader().(*stateAdder) leader.proposeDelta(22) leader.proposeDelta(-11) - leader.proposeDelta(-11) - rg.waitOnTotal(t, 0) + leader.proposeDelta(-10) + // Must observe forward progress, so each iteration will check +1 total. + rg.waitOnTotal(t, int64(i+1)) leader.stop() leader.restart() rg.waitOnLeader() From 614e55d5e831cbc9a62e7d8789d1dcc34911c0ef Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 13 Dec 2024 21:33:13 +0100 Subject: [PATCH 6/6] Revert "[FIXED] Subject state consistency (#6226)" This reverts commit 60e2982df8e82a06813881bb8a96acfac38e53cc, reversing changes made to f264fb3b4a8e91728869bad4ae83e781479bee73. --- server/filestore.go | 21 +++++++--- server/memstore.go | 22 +++++++---- server/store_test.go | 94 -------------------------------------------- 3 files changed, 30 insertions(+), 107 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index f50b54abee0..c5920587da1 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2616,6 +2616,10 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si // Always reset. ss.First, ss.Last, ss.Msgs = 0, 0, 0 + if filter == _EMPTY_ { + filter = fwcs + } + // We do need to figure out the first and last sequences. wc := subjectHasWildcard(filter) start, stop := uint32(math.MaxUint32), uint32(0) @@ -7832,6 +7836,17 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { ss.Msgs-- + // Only one left. + if ss.Msgs == 1 { + if seq == ss.Last { + ss.Last = ss.First + } else { + ss.First = ss.Last + } + ss.firstNeedsUpdate = false + return + } + // We can lazily calculate the first sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate } @@ -7857,12 +7872,8 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si startSlot = 0 } - fseq := startSeq + 1 - if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq { - fseq = mbFseq - } var le = binary.LittleEndian - for slot := startSlot; slot < len(mb.cache.idx); slot++ { + for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ { bi := mb.cache.idx[slot] &^ hbit if bi == dbit { // delete marker so skip. diff --git a/server/memstore.go b/server/memstore.go index c3cb7d0b661..e2ca1cae295 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1009,9 +1009,8 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { if sm := ms.msgs[seq]; sm != nil { bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) purged++ - ms.removeSeqPerSubject(sm.subj, seq) - // Must delete message after updating per-subject info, to be consistent with file store. delete(ms.msgs, seq) + ms.removeSeqPerSubject(sm.subj, seq) } } if purged > ms.state.Msgs { @@ -1099,9 +1098,8 @@ func (ms *memStore) Truncate(seq uint64) error { if sm := ms.msgs[i]; sm != nil { purged++ bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) - ms.removeSeqPerSubject(sm.subj, i) - // Must delete message after updating per-subject info, to be consistent with file store. delete(ms.msgs, i) + ms.removeSeqPerSubject(sm.subj, i) } } // Reset last. @@ -1362,8 +1360,17 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { } ss.Msgs-- - // We can lazily calculate the first sequence when needed. - ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + // If we know we only have 1 msg left don't need to search for next first. + if ss.Msgs == 1 { + if seq == ss.Last { + ss.Last = ss.First + } else { + ss.First = ss.Last + } + ss.firstNeedsUpdate = false + } else { + ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + } } // Will recalculate the first sequence for this subject in this block. @@ -1396,6 +1403,7 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg) + delete(ms.msgs, seq) if ms.state.Msgs > 0 { ms.state.Msgs-- if ss > ms.state.Bytes { @@ -1420,8 +1428,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { // Remove any per subject tracking. ms.removeSeqPerSubject(sm.subj, seq) - // Must delete message after updating per-subject info, to be consistent with file store. - delete(ms.msgs, seq) if ms.scb != nil { // We do not want to hold any locks here. diff --git a/server/store_test.go b/server/store_test.go index 19d4c0251f0..e447017829f 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -142,100 +142,6 @@ func TestStoreDeleteRange(t *testing.T) { require_Equal(t, num, 1) } -func TestStoreSubjectStateConsistency(t *testing.T) { - testAllStoreAllPermutations( - t, false, - StreamConfig{Name: "TEST", Subjects: []string{"foo"}}, - func(t *testing.T, fs StreamStore) { - getSubjectState := func() SimpleState { - t.Helper() - ss := fs.SubjectsState("foo") - return ss["foo"] - } - - // Publish an initial batch of messages. - for i := 0; i < 4; i++ { - _, _, err := fs.StoreMsg("foo", nil, nil) - require_NoError(t, err) - } - - // Expect 4 msgs, with first=1, last=4. - ss := getSubjectState() - require_Equal(t, ss.Msgs, 4) - require_Equal(t, ss.First, 1) - require_Equal(t, ss.Last, 4) - - // Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate. - removed, err := fs.RemoveMsg(1) - require_NoError(t, err) - require_True(t, removed) - - // Will update first, so corrects to seq 2. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 3) - require_Equal(t, ss.First, 2) - require_Equal(t, ss.Last, 4) - - // Remove last message. - removed, err = fs.RemoveMsg(4) - require_NoError(t, err) - require_True(t, removed) - - // ss.Last is lazy, just like ss.First, but it's not recalculated. Only total msg count decreases. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 2) - require_Equal(t, ss.First, 2) - require_Equal(t, ss.Last, 4) - - // Remove first message again. - removed, err = fs.RemoveMsg(2) - require_NoError(t, err) - require_True(t, removed) - - // Since we only have one message left, must update ss.First and set ss.Last to equal. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 1) - require_Equal(t, ss.First, 3) - require_Equal(t, ss.Last, 3) - - // Publish some more messages so we can test another scenario. - for i := 0; i < 3; i++ { - _, _, err := fs.StoreMsg("foo", nil, nil) - require_NoError(t, err) - } - - // Just check the state is complete again. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 4) - require_Equal(t, ss.First, 3) - require_Equal(t, ss.Last, 7) - - // Remove last sequence, ss.Last is lazy so doesn't get updated. - removed, err = fs.RemoveMsg(7) - require_NoError(t, err) - require_True(t, removed) - - // Remove first sequence, ss.First is lazy so doesn't get updated. - removed, err = fs.RemoveMsg(3) - require_NoError(t, err) - require_True(t, removed) - - // Remove (now) first sequence, but because ss.First is lazy we first need to recalculate - // to know seq 5 became ss.First. And since we're removing seq 5 we need to recalculate ss.First - // yet again, since ss.Last is lazy and is not correct. - removed, err = fs.RemoveMsg(5) - require_NoError(t, err) - require_True(t, removed) - - // ss.First should equal ss.Last, last should have been updated now. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 1) - require_Equal(t, ss.First, 6) - require_Equal(t, ss.Last, 6) - }, - ) -} - func TestStoreMaxMsgsPerUpdateBug(t *testing.T) { config := func() StreamConfig { return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0}