Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] When writing tombstones we could violate max block size. #5973

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 83 additions & 24 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ type fileStore struct {
closed bool
fip bool
receivedAny bool
firstMoved bool
}

// Represents a message store block and its data.
Expand Down Expand Up @@ -456,8 +457,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Check if our prior state remembers a last sequence past where we can see.
if fs.ld != nil && prior.LastSeq > fs.state.LastSeq {
fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime
if lmb, err := fs.newMsgBlockForWrite(); err == nil {
lmb.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano())
if _, err := fs.newMsgBlockForWrite(); err == nil {
if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil {
return nil, err
}
} else {
return nil, err
}
Expand Down Expand Up @@ -2122,7 +2125,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
// Check if we have no messages and blocks left.
if fs.lmb == nil && last.seq != 0 {
if lmb, _ := fs.newMsgBlockForWrite(); lmb != nil {
lmb.writeTombstone(last.seq, last.ts)
fs.writeTombstone(last.seq, last.ts)
}
// Clear any global subject state.
fs.psim, fs.tsl = fs.psim.Empty(), 0
Expand Down Expand Up @@ -4078,11 +4081,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// This is for user initiated removes or to hold the first seq
// when the last block is empty.

// If not via limits and not empty and last (empty writes tombstone above if last) write tombstone.
if !viaLimits && !(isEmpty && isLastBlock) {
if lmb := fs.lmb; sm != nil && lmb != nil {
lmb.writeTombstone(sm.seq, sm.ts)
}
// If not via limits and not empty (empty writes tombstone above if last) write tombstone.
if !viaLimits && !isEmpty && sm != nil {
fs.writeTombstone(sm.seq, sm.ts)
}

if cb := fs.scb; cb != nil {
Expand Down Expand Up @@ -4122,11 +4123,18 @@ func (mb *msgBlock) shouldCompactSync() bool {
return mb.bytes*2 < mb.rbytes && !mb.noCompact
}

// This will compact and rewrite this block. This version will not process any tombstone cleanup.
// Write lock needs to be held.
func (mb *msgBlock) compact() {
mb.compactWithFloor(0)
}

// This will compact and rewrite this block. This should only be called when we know we want to rewrite this block.
// This should not be called on the lmb since we will prune tail deleted messages which could cause issues with
// writing new messages. We will silently bail on any issues with the underlying block and let someone else detect.
// if fseq > 0 we will attempt to cleanup stale tombstones.
// Write lock needs to be held.
func (mb *msgBlock) compact() {
func (mb *msgBlock) compactWithFloor(floor uint64) {
wasLoaded := mb.cacheAlreadyLoaded()
if !wasLoaded {
if err := mb.loadMsgsWithLock(); err != nil {
Expand Down Expand Up @@ -4168,7 +4176,9 @@ func (mb *msgBlock) compact() {
if seq&tbit != 0 {
seq = seq &^ tbit
// If this entry is for a lower seq than ours then keep around.
if seq < fseq {
// We also check that it is greater than our floor. Floor is zero on normal
// calls to compact.
if seq < fseq && seq >= floor {
nbuf = append(nbuf, buf[index:index+rl]...)
}
} else {
Expand All @@ -4185,7 +4195,7 @@ func (mb *msgBlock) compact() {
}

// Handle compression
if mb.cmp != NoCompression {
if mb.cmp != NoCompression && len(nbuf) > 0 {
cbuf, err := mb.cmp.Compress(nbuf)
if err != nil {
return
Expand Down Expand Up @@ -4628,6 +4638,8 @@ func (fs *fileStore) selectNextFirst() {
fs.state.FirstSeq = fs.state.LastSeq + 1
fs.state.FirstTime = time.Time{}
}
// Mark first as moved. Plays into tombstone cleanup for syncBlocks.
fs.firstMoved = true
}

// Lock should be held.
Expand Down Expand Up @@ -5172,6 +5184,26 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg
return rl, err
}

// For writing tombstones to our lmb. This version will enforce maximum block sizes.
// Lock should be held.
func (fs *fileStore) writeTombstone(seq uint64, ts int64) error {
// Grab our current last message block.
lmb := fs.lmb
var err error

if lmb == nil || lmb.blkSize()+emptyRecordLen > fs.fcfg.BlockSize {
if lmb != nil && fs.fcfg.Compression != NoCompression {
// We've now reached the end of this message block, if we want
// to compress blocks then now's the time to do it.
go lmb.recompressOnDiskIfNeeded()
}
if lmb, err = fs.newMsgBlockForWrite(); err != nil {
return err
}
}
return lmb.writeTombstone(seq, ts)
}

func (mb *msgBlock) recompressOnDiskIfNeeded() error {
alg := mb.fs.fcfg.Compression
mb.mu.Lock()
Expand All @@ -5185,7 +5217,7 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
// 1. The block will be compressed already and have a valid metadata
// header, in which case we do nothing.
// 2. The block will be uncompressed, in which case we will compress it
// and then write it back out to disk, reencrypting if necessary.
// and then write it back out to disk, re-encrypting if necessary.
<-dios
origBuf, err := os.ReadFile(origFN)
dios <- struct{}{}
Expand Down Expand Up @@ -5298,6 +5330,10 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
// compression algorithm is up-to-date, since this will be needed when
// compacting or truncating.
mb.cmp = alg

// Also update rbytes
mb.rbytes = uint64(len(cmpBuf))

return nil
}

Expand Down Expand Up @@ -5341,15 +5377,17 @@ func (mb *msgBlock) ensureRawBytesLoaded() error {

// Sync msg and index files as needed. This is called from a timer.
func (fs *fileStore) syncBlocks() {
fs.mu.RLock()
fs.mu.Lock()
// If closed or a snapshot is in progress bail.
if fs.closed || fs.sips > 0 {
fs.mu.RUnlock()
fs.mu.Unlock()
return
}
blks := append([]*msgBlock(nil), fs.blks...)
lmb := fs.lmb
fs.mu.RUnlock()
lmb, firstMoved, firstSeq := fs.lmb, fs.firstMoved, fs.state.FirstSeq
// Clear first moved.
fs.firstMoved = false
fs.mu.Unlock()

var markDirty bool
for _, mb := range blks {
Expand All @@ -5364,6 +5402,11 @@ func (fs *fileStore) syncBlocks() {
mb.dirtyCloseWithRemove(false)
}

// If our first has moved and we are set to noCompact (which is from tombstones),
// clear so that we might cleanup tombstones.
if firstMoved && mb.noCompact {
mb.noCompact = false
}
// Check if we should compact here as well.
// Do not compact last mb.
var needsCompact bool
Expand All @@ -5381,13 +5424,26 @@ func (fs *fileStore) syncBlocks() {
mb.mu.Unlock()

// Check if we should compact here.
// Need to hold fs lock in case we reference psim when loading in the mb.
// Need to hold fs lock in case we reference psim when loading in the mb and we may remove this block if truly empty.
if needsCompact {
fs.mu.RLock()
mb.mu.Lock()
mb.compact()
mb.compactWithFloor(firstSeq)
// If this compact removed all raw bytes due to tombstone cleanup, schedule to remove.
shouldRemove := mb.rbytes == 0
mb.mu.Unlock()
fs.mu.RUnlock()

// Check if we should remove. This will not be common, so we will re-take fs write lock here vs changing
// it above which we would prefer to be a readlock such that other lookups can occur while compacting this block.
if shouldRemove {
fs.mu.Lock()
mb.mu.Lock()
fs.removeMsgBlock(mb)
mb.mu.Unlock()
fs.mu.Unlock()
needSync = false
}
}

// Check if we need to sync this block.
Expand Down Expand Up @@ -6553,8 +6609,11 @@ func (fs *fileStore) State() StreamState {
state.Deleted = append(state.Deleted, seq)
}
}
cur = atomic.LoadUint64(&mb.last.seq) + 1 // Expected next first.

// Only advance cur if we are increasing. We could have marker blocks with just tombstones.
if last := atomic.LoadUint64(&mb.last.seq); last >= cur {
cur = last + 1 // Expected next first.
}
// Add in deleted.
mb.dmap.Range(func(seq uint64) bool {
if seq < fseq {
mb.dmap.Delete(seq)
Expand Down Expand Up @@ -6929,7 +6988,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
// Write any tombstones as needed.
for _, tomb := range tombs {
if tomb.seq > fseq {
fs.lmb.writeTombstone(tomb.seq, tomb.ts)
fs.writeTombstone(tomb.seq, tomb.ts)
}
}

Expand Down Expand Up @@ -7026,7 +7085,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
if lseq := atomic.LoadUint64(&lmb.last.seq); lseq > 1 {
// Leave a tombstone so we can remember our starting sequence in case
// full state becomes corrupted.
lmb.writeTombstone(lseq, lmb.last.ts)
fs.writeTombstone(lseq, lmb.last.ts)
}

cb := fs.scb
Expand Down Expand Up @@ -7424,7 +7483,7 @@ func (fs *fileStore) Truncate(seq uint64) error {
// Write any tombstones as needed.
for _, tomb := range tombs {
if tomb.seq <= lsm.seq {
fs.lmb.writeTombstone(tomb.seq, tomb.ts)
fs.writeTombstone(tomb.seq, tomb.ts)
}
}

Expand Down Expand Up @@ -7499,7 +7558,7 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
mb.mu.Unlock()
// Write the tombstone to remember since this was last block.
if lmb, _ := fs.newMsgBlockForWrite(); lmb != nil {
lmb.writeTombstone(lseq, lts)
fs.writeTombstone(lseq, lts)
}
mb.mu.Lock()
}
Expand Down
100 changes: 96 additions & 4 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5581,7 +5581,15 @@ func TestFileStoreFullStateTestUserRemoveWAL(t *testing.T) {
require_NoError(t, err)
defer fs.Stop()

if newState := fs.State(); !reflect.DeepEqual(state, newState) {
newState := fs.State()
// We will properly detect lost data for sequence #2 here.
require_True(t, newState.Lost != nil)
require_Equal(t, len(newState.Lost.Msgs), 1)
require_Equal(t, newState.Lost.Msgs[0], 2)
// Clear for deep equal compare below.
newState.Lost = nil

if !reflect.DeepEqual(state, newState) {
t.Fatalf("Restore state does not match:\n%+v\n%+v",
state, newState)
}
Expand Down Expand Up @@ -7284,7 +7292,7 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) {
removed, err = fs.RemoveMsg(2)
require_NoError(t, err)
require_True(t, removed)
// Make sure 3 blks left
// Make sure 3 blks left.
require_Equal(t, fs.numMsgBlocks(), 3)

psi = fetch("foo.22.baz")
Expand Down Expand Up @@ -7549,11 +7557,11 @@ func TestFileStoreSyncCompressOnlyIfDirty(t *testing.T) {
_, err = fs.RemoveMsg(seq)
require_NoError(t, err)
}
// Now make sure we add 4th block so syncBlocks will try to compress.
// Now make sure we add 4/5th block so syncBlocks will try to compact.
for i := 0; i < 6; i++ {
fs.StoreMsg("foo.BB", nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 4)
require_Equal(t, fs.numMsgBlocks(), 5)

// All should have compact set.
fs.mu.Lock()
Expand Down Expand Up @@ -7707,6 +7715,90 @@ func TestFileStoreRestoreIndexWithMatchButLeftOverBlocks(t *testing.T) {
require_Equal(t, state.LastSeq, 18)
}

func TestFileStoreRestoreDeleteTombstonesExceedingMaxBlkSize(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 256
fs, err := newFileStoreWithCreated(
fcfg,
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage},
time.Now(),
prf(&fcfg),
nil,
)
require_NoError(t, err)
defer fs.Stop()

n, err := fs.PurgeEx(_EMPTY_, 1_000_000_000, 0)
require_NoError(t, err)
require_Equal(t, n, 0)

msg := []byte("hello")
// 6 msgs per block with blk size 256.
for i := 1; i <= 10_000; i++ {
fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg)
}
// Now delete msgs which will write tombstones.
for seq := uint64(1_000_000_001); seq < 1_000_000_101; seq++ {
removed, err := fs.RemoveMsg(seq)
require_NoError(t, err)
require_True(t, removed)
}

// Check last block and make sure the tombstones did not exceed blk size maximum.
// Check to make sure no blocks exceed blk size.
fs.mu.RLock()
blks := append([]*msgBlock(nil), fs.blks...)
lmb := fs.lmb
fs.mu.RUnlock()

var emptyBlks []*msgBlock
for _, mb := range blks {
mb.mu.RLock()
bytes, rbytes := mb.bytes, mb.rbytes
mb.mu.RUnlock()
require_True(t, bytes < 256)
require_True(t, rbytes < 256)
if bytes == 0 && mb != lmb {
emptyBlks = append(emptyBlks, mb)
}
}
// Check each block such that it signals it can be compacted but if we attempt compact here nothing should change.
for _, mb := range emptyBlks {
mb.mu.Lock()
mb.ensureRawBytesLoaded()
bytes, rbytes, shouldCompact := mb.bytes, mb.rbytes, mb.shouldCompactSync()
// Do the compact and make sure nothing changed.
mb.compact()
nbytes, nrbytes := mb.bytes, mb.rbytes
mb.mu.Unlock()
require_True(t, shouldCompact)
require_Equal(t, bytes, nbytes)
require_Equal(t, rbytes, nrbytes)
}

// Now remove first msg which will invalidate the tombstones since they will be < first sequence.
removed, err := fs.RemoveMsg(1_000_000_000)
require_NoError(t, err)
require_True(t, removed)

// Now simulate a syncBlocks call and make sure it cleans up the tombstones that are no longer relevant.
fs.syncBlocks()
for _, mb := range emptyBlks {
mb.mu.Lock()
mb.ensureRawBytesLoaded()
index, bytes, rbytes := mb.index, mb.bytes, mb.rbytes
mb.mu.Unlock()
require_Equal(t, bytes, 0)
require_Equal(t, rbytes, 0)
// Also make sure we removed these blks all together.
fs.mu.RLock()
imb := fs.bim[index]
fs.mu.RUnlock()
require_True(t, imb == nil)
}
})
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down