Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Do not load blocks unnecessarily when checking if we can skip ahead in LoadNextMsg(). #5819

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 35 additions & 50 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,13 @@ func (mb *msgBlock) convertToEncrypted() error {
return nil
}

// Return the mb's index.
func (mb *msgBlock) getIndex() uint32 {
mb.mu.RLock()
defer mb.mu.RUnlock()
return mb.index
}

// Rebuild the state of the blk based on what we have on disk in the N.blk file.
// We will return any lost data, and we will return any delete tombstones we encountered.
func (mb *msgBlock) rebuildState() (*LostStreamData, []uint64, error) {
Expand Down Expand Up @@ -2594,14 +2601,14 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
return ss
}

// This is used to see if we can selectively jump start blocks based on filter subject and a floor block index.
// Will return -1 if no matches at all.
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) {
if filter == _EMPTY_ {
filter = fwcs
wc = true
// This is used to see if we can selectively jump start blocks based on filter subject and a starting block index.
// Will return -1 and ErrStoreEOF if no matches at all or no more from where we are.
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool, bi int) (int, error) {
// If we match everything, just move to next blk.
if filter == _EMPTY_ || filter == fwcs {
return bi + 1, nil
}

// Move through psim to gather start and stop bounds.
start, stop := uint32(math.MaxUint32), uint32(0)
if wc {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
Expand All @@ -2615,51 +2622,26 @@ func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) {
} else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
start, stop = psi.fblk, psi.lblk
}
// Nothing found.
// Nothing was found.
if start == uint32(math.MaxUint32) {
return -1, -1
}
// Here we need to translate this to index into fs.blks properly.
mb := fs.bim[start]
if mb == nil {
// psim fblk can be lazy.
i := start + 1
for ; i <= stop; i++ {
mb = fs.bim[i]
if mb == nil {
continue
}
if _, f, _ := mb.filteredPending(filter, wc, 0); f > 0 {
break
}
}
// Update fblk since fblk was outdated.
if !wc {
if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
psi.fblk = i
}
} else {
fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) {
if i > psi.fblk {
psi.fblk = i
}
})
}
return -1, ErrStoreEOF
}
// Still nothing.
if mb == nil {
return -1, -1
// Can not be nil so ok to inline dereference.
mbi := fs.blks[bi].getIndex()
// All matching msgs are behind us.
// Less than AND equal is important because we were called because we missed searching bi.
if stop <= mbi {
return -1, ErrStoreEOF
}
// Grab first index.
fi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))

// Grab last if applicable.
var li int
if mb = fs.bim[stop]; mb != nil {
li, _ = fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
// If start is > index return dereference of fs.blks index.
if start > mbi {
if mb := fs.bim[start]; mb != nil {
ni, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
return ni, nil
}
}

return fi, li
// Otherwise just bump to the next one.
return bi + 1, nil
}

// Optimized way for getting all num pending matching a filter subject.
Expand Down Expand Up @@ -6548,10 +6530,13 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// We should not do this at all if we are already on the last block.
// Also if we are a wildcard do not check if large subject space.
const wcMaxSizeToCheck = 64 * 1024

// if len(blks) - 1 > N

if i == bi && i < len(fs.blks)-1 && (!wc || fs.psim.Size() < wcMaxSizeToCheck) {
nbi, lbi := fs.checkSkipFirstBlock(filter, wc)
nbi, err := fs.checkSkipFirstBlock(filter, wc, bi)
// Nothing available.
if nbi < 0 || lbi <= bi {
if err == ErrStoreEOF {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// See if we can jump ahead here.
Expand Down
87 changes: 65 additions & 22 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7349,28 +7349,6 @@ func TestFileStoreCheckSkipFirstBlockBug(t *testing.T) {
require_NoError(t, err)
}

// https://github.com/nats-io/nats-server/issues/5705
func TestFileStoreCheckSkipFirstBlockEmptyFilter(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 128},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("hello")
// Create 4 blocks, each block holds 2 msgs
for i := 0; i < 4; i++ {
fs.StoreMsg("foo.22.bar", nil, msg)
fs.StoreMsg("foo.22.baz", nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 4)

nbi, lbi := fs.checkSkipFirstBlock(_EMPTY_, false)
require_Equal(t, nbi, 0)
require_Equal(t, lbi, 3)
}

// https://github.com/nats-io/nats-server/issues/5702
func TestFileStoreTombstoneRbytes(t *testing.T) {
fs, err := newFileStore(
Expand Down Expand Up @@ -7437,6 +7415,71 @@ func TestFileStoreMsgBlockShouldCompact(t *testing.T) {
require_False(t, shouldCompact)
}

func TestFileStoreCheckSkipFirstBlockNotLoadOldBlocks(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 128},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("hello")

fs.StoreMsg("foo.BB.bar", nil, msg)
fs.StoreMsg("foo.AA.bar", nil, msg)
for i := 0; i < 6; i++ {
fs.StoreMsg("foo.BB.bar", nil, msg)
}
fs.StoreMsg("foo.AA.bar", nil, msg) // Sequence 9
fs.StoreMsg("foo.AA.bar", nil, msg) // Sequence 10

for i := 0; i < 4; i++ {
fs.StoreMsg("foo.BB.bar", nil, msg)
}

// Should have created 7 blocks.
// BB AA | BB BB | BB BB | BB BB | AA AA | BB BB | BB BB
require_Equal(t, fs.numMsgBlocks(), 7)

fs.RemoveMsg(1)
fs.RemoveMsg(2)

// First block should be gone now.
// -- -- | BB BB | BB BB | BB BB | AA AA | BB BB | BB BB
require_Equal(t, fs.numMsgBlocks(), 6)

// Remove all blk cache and fss.
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.Lock()
mb.fss, mb.cache = nil, nil
mb.mu.Unlock()
}
fs.mu.RUnlock()

// But this means that the psim still points fblk to block 1.
// So when we try to load AA from near the end (last AA sequence), it will not find anything and will then
// check if we can skip ahead, but in the process reload blocks 2, 3, 4 amd 5..
// This can trigger for an up to date consumer near the end of the stream that gets a new pull request that will pop it out of msgWait
// and it will call LoadNextMsg() like we do here with starting sequence of 11.
_, _, err = fs.LoadNextMsg("foo.AA.bar", false, 11, nil)
require_Error(t, err, ErrStoreEOF)

// Now make sure we did not load fss and cache.
var loaded int
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.RLock()
if mb.cache != nil || mb.fss != nil {
loaded++
}
mb.mu.RUnlock()
}
fs.mu.RUnlock()
// We will load last block for starting seq 9, but no others should have loaded.
require_Equal(t, loaded, 1)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down