Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.24-RC.2 #6257

Merged
merged 6 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
module github.com/nats-io/nats-server/v2

go 1.21.0
go 1.22

toolchain go1.22.8

require (
github.com/klauspost/compress v1.17.11
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.5.8
github.com/nats-io/jwt/v2 v2.7.3
github.com/nats-io/nats.go v1.36.0
github.com/nats-io/nkeys v0.4.8
github.com/nats-io/nkeys v0.4.9
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.31.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IX
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.8 h1:+wee30071y3vCZAYRsnrmIPaOe47A/SkK/UBDPdIV70=
github.com/nats-io/nkeys v0.4.8/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
156 changes: 56 additions & 100 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2315,8 +2315,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
fseq = lseq + 1
for _, subj := range subs {
ss, _ := mb.fss.Find(stringToBytes(subj))
if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if ss == nil || start > ss.Last || ss.First >= fseq {
continue
Expand Down Expand Up @@ -2445,8 +2445,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(bytesToString(bsubj), ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss)
}
if sseq <= ss.First {
update(ss)
Expand Down Expand Up @@ -2616,6 +2616,10 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si
// Always reset.
ss.First, ss.Last, ss.Msgs = 0, 0, 0

if filter == _EMPTY_ {
filter = fwcs
}

// We do need to figure out the first and last sequences.
wc := subjectHasWildcard(filter)
start, stop := uint32(math.MaxUint32), uint32(0)
Expand Down Expand Up @@ -2745,8 +2749,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
mb.lsts = time.Now().UnixNano()
mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) {
subj := string(bsubj)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
Expand Down Expand Up @@ -2936,8 +2940,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
return
}
subj := bytesToString(bsubj)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if sseq <= ss.First {
t += ss.Msgs
Expand Down Expand Up @@ -3224,8 +3228,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if sseq <= ss.First {
t += ss.Msgs
Expand Down Expand Up @@ -3898,8 +3902,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
info.fblk = i
}
}
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
mb.mu.Unlock()
// Re-acquire fs lock
Expand Down Expand Up @@ -4030,8 +4034,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
ss, ok := mb.fss.Find(stringToBytes(subj))
if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
if ok && ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
mb.mu.Unlock()
if ss == nil {
Expand Down Expand Up @@ -7832,115 +7836,67 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {

ss.Msgs--

// We can lazily calculate the first/last sequence when needed.
// Only one left.
if ss.Msgs == 1 {
if seq == ss.Last {
ss.Last = ss.First
} else {
ss.First = ss.Last
}
ss.firstNeedsUpdate = false
return
}

// We can lazily calculate the first sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}

// Will recalculate the first and/or last sequence for this subject in this block.
// Will recalulate the first sequence for this subject in this block.
// Will avoid slower path message lookups and scan the cache directly instead.
func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
// Need to make sure messages are loaded.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return
}
}

startSlot := int(ss.First - mb.cache.fseq)
if startSlot < 0 {
startSlot = 0
}
// Mark first as updated.
ss.firstNeedsUpdate = false

startSlot := int(startSeq - mb.cache.fseq)
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
return
}
endSlot := int(ss.Last - mb.cache.fseq)
if endSlot < 0 {
endSlot = 0
}
if endSlot >= len(mb.cache.idx) || startSlot > endSlot {
return
} else if startSlot < 0 {
startSlot = 0
}

var le = binary.LittleEndian
if ss.firstNeedsUpdate {
// Mark first as updated.
ss.firstNeedsUpdate = false

fseq := ss.First + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
}
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
ss.First = seq
if ss.Msgs == 1 {
ss.Last = seq
ss.lastNeedsUpdate = false
return
}
// Skip the start slot ahead, if we need to recalculate last we can stop early.
startSlot = slot
break
}
for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
}
if ss.lastNeedsUpdate {
// Mark last as updated.
ss.lastNeedsUpdate = false

lseq := ss.Last - 1
if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq {
lseq = mbLseq
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
for slot := endSlot; slot >= startSlot; slot-- {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
// Can't overwrite ss.Last, just skip.
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
// Sequence should never be lower, but guard against it nonetheless.
if seq < ss.First {
seq = ss.First
}
ss.First = seq
if ss.Msgs == 1 {
ss.Last = seq
if ss.Msgs == 1 {
ss.First = seq
ss.firstNeedsUpdate = false
}
return
}
return
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5030,7 +5030,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
mb.clearCacheAndOffset()
// Now call with start sequence of 1, the old one
// This will panic without the fix.
mb.recalculateForSubj("foo", ss)
mb.recalculateFirstForSubj("foo", 1, ss)
// Make sure it was update properly.
require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false})
}
Expand Down
Loading
Loading