Skip to content

Commit

Permalink
[FIXED] Improve per-subject state performance & keep ss.Last up-to-…
Browse files Browse the repository at this point in the history
…date (#6235)

Improve performance by only setting `ss.Last` during recalculation, so
only when it's needed.

Also fixes a bug where `ss.Last` was not kept up-to-date if `ss.Msgs >
1`, by introducing a `ss.lastNeedsUpdate` just like
`ss.firstNeedsUpdate`.

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
derekcollison authored and MauriceVanVeen committed Dec 17, 2024
1 parent 19b7cc6 commit 177944c
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 114 deletions.
164 changes: 100 additions & 64 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2315,8 +2315,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
fseq = lseq + 1
for _, subj := range subs {
ss, _ := mb.fss.Find(stringToBytes(subj))
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
}
if ss == nil || start > ss.Last || ss.First >= fseq {
continue
Expand Down Expand Up @@ -2445,8 +2445,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(bytesToString(bsubj), ss)
}
if sseq <= ss.First {
update(ss)
Expand Down Expand Up @@ -2745,8 +2745,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
mb.lsts = time.Now().UnixNano()
mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) {
subj := string(bsubj)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
Expand Down Expand Up @@ -2936,8 +2936,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
return
}
subj := bytesToString(bsubj)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
if sseq <= ss.First {
t += ss.Msgs
Expand Down Expand Up @@ -3224,8 +3224,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
if sseq <= ss.First {
t += ss.Msgs
Expand Down Expand Up @@ -3898,8 +3898,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
info.fblk = i
}
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
mb.mu.Unlock()
// Re-acquire fs lock
Expand Down Expand Up @@ -4030,8 +4030,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
ss, ok := mb.fss.Find(stringToBytes(subj))
if ok && ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
}
mb.mu.Unlock()
if ss == nil {
Expand Down Expand Up @@ -7387,9 +7387,6 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
// Update fss
smb.removeSeqPerSubject(sm.subj, mseq)
fs.removePerSubject(sm.subj)
// Need to mark the sequence as deleted. Otherwise, recalculating ss.First
// for per-subject info would be able to find it still.
smb.dmap.Insert(mseq)
}
}

Expand Down Expand Up @@ -7835,76 +7832,115 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {

ss.Msgs--

// Only one left.
if ss.Msgs == 1 {
// Update first if we need to, we must check if this removal is about what's going to be ss.First
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
// If we're removing the first message, we must recalculate again.
// ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
if ss.First == seq {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
ss.Last = ss.First
ss.firstNeedsUpdate = false
return
}

// We can lazily calculate the first sequence when needed.
// We can lazily calculate the first/last sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}

// Will recalulate the first sequence for this subject in this block.
// Will recalculate the first and/or last sequence for this subject in this block.
// Will avoid slower path message lookups and scan the cache directly instead.
func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
// Need to make sure messages are loaded.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return
}
}

// Mark first as updated.
ss.firstNeedsUpdate = false

startSlot := int(startSeq - mb.cache.fseq)
startSlot := int(ss.First - mb.cache.fseq)
if startSlot < 0 {
startSlot = 0
}
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
return
} else if startSlot < 0 {
startSlot = 0
}

fseq := startSeq + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
endSlot := int(ss.Last - mb.cache.fseq)
if endSlot < 0 {
endSlot = 0
}
if endSlot >= len(mb.cache.idx) || startSlot > endSlot {
return
}

var le = binary.LittleEndian
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
if ss.firstNeedsUpdate {
// Mark first as updated.
ss.firstNeedsUpdate = false

fseq := ss.First + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
ss.First = seq
if ss.Msgs == 1 {
ss.Last = seq
ss.lastNeedsUpdate = false
return
}
// Skip the start slot ahead, if we need to recalculate last we can stop early.
startSlot = slot
break
}
}
}
if ss.lastNeedsUpdate {
// Mark last as updated.
ss.lastNeedsUpdate = false

lseq := ss.Last - 1
if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq {
lseq = mbLseq
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
for slot := endSlot; slot >= startSlot; slot-- {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
ss.First = seq
if ss.Msgs == 1 {
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
// Can't overwrite ss.Last, just skip.
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
// Sequence should never be lower, but guard against it nonetheless.
if seq < ss.First {
seq = ss.First
}
ss.Last = seq
if ss.Msgs == 1 {
ss.First = seq
ss.firstNeedsUpdate = false
}
return
}
return
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5030,7 +5030,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
mb.clearCacheAndOffset()
// Now call with start sequence of 1, the old one
// This will panic without the fix.
mb.recalculateFirstForSubj("foo", 1, ss)
mb.recalculateForSubj("foo", ss)
// Make sure it was update properly.
require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false})
}
Expand Down
Loading

0 comments on commit 177944c

Please sign in to comment.