From 736ee1db93351c8776108d781e059cce3036cf55 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Wed, 5 Mar 2025 11:07:31 -0500 Subject: [PATCH] db: refactor race-build file cache reference tracking Previously the file cache used a pointer to the sstable iterator to maintain the stack traces of the origination of open references during race builds. This was a bit ambiguous. With the introduction of blob files, references will also be maintained by non-sstable iterators (eg, a cached blob file reader within an iterator's blob value fetcher). This commit refactors this reference tracking to allocate a new closure in race builds. The additional allocation is expected to be significant in (already slow) race builds. --- file_cache.go | 52 ++++++++++++++++++++----------- sstable/reader_iter.go | 8 ++--- sstable/reader_iter_single_lvl.go | 9 +++--- sstable/reader_iter_two_lvl.go | 2 +- 4 files changed, 42 insertions(+), 29 deletions(-) diff --git a/file_cache.go b/file_cache.go index 65db546bb0..2c1c14961e 100644 --- a/file_cache.go +++ b/file_cache.go @@ -107,9 +107,12 @@ type fileCacheHandle struct { // This struct is only populated in race builds. raceMu struct { sync.Mutex - // iters maps sstable.Iterator objects to the stack trace recorded at - // creation time. - iters map[any][]byte + // nextRefID is the next ID to allocate for a new reference. + nextRefID uint64 + // openRefs maps reference IDs to the stack trace recorded at creation + // time. It's used to track which call paths leaked open references to + // files. + openRefs map[uint64][]byte } } @@ -134,7 +137,7 @@ func (c *FileCache) newHandle( t.readerOpts = readerOpts t.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{} if invariants.RaceEnabled { - t.raceMu.iters = make(map[any][]byte) + t.raceMu.openRefs = make(map[uint64][]byte) } return t } @@ -151,7 +154,7 @@ func (h *fileCacheHandle) Close() error { err = errors.Errorf("leaked iterators: %d", errors.Safe(v)) } else { var buf bytes.Buffer - for _, stack := range h.raceMu.iters { + for _, stack := range h.raceMu.openRefs { fmt.Fprintf(&buf, "%s\n", stack) } err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String()) @@ -348,12 +351,7 @@ func NewFileCache(numShards int, size int) *FileCache { v := vRef.Value() handle := key.handle v.readerProvider.init(c, key) - v.closeHook = func(iterator any) { - if invariants.RaceEnabled { - handle.raceMu.Lock() - delete(handle.raceMu.iters, iterator) - handle.raceMu.Unlock() - } + v.closeHook = func() { // closeHook is called when an iterator is closed; the initialization of // an iterator with this value will happen after a FindOrCreate() call // with returns the same vRef. @@ -572,17 +570,35 @@ func (h *fileCacheHandle) newPointIter( if err != nil { return nil, err } - // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take - // care to avoid introducing an allocation here by adding a closure. - iter.SetCloseHook(v.closeHook) - handle.iterCount.Add(1) + // NB: closeHook (v.closeHook) takes responsibility for calling + // unrefValue(v) here. Take care to avoid introducing an allocation here by + // adding a closure. + closeHook := h.addReference(v) + iter.SetCloseHook(closeHook) + return iter, nil +} + +func (h *fileCacheHandle) addReference(v *fileCacheValue) (closeHook func()) { + h.iterCount.Add(1) + closeHook = v.closeHook if invariants.RaceEnabled { stack := debug.Stack() h.raceMu.Lock() - h.raceMu.iters[iter] = stack + refID := h.raceMu.nextRefID + h.raceMu.openRefs[refID] = stack + h.raceMu.nextRefID++ h.raceMu.Unlock() + // In race builds, this closeHook closure will force an allocation. + // Race builds are already unperformant (and allocate a stack trace), so + // we don't care. + closeHook = func() { + v.closeHook() + h.raceMu.Lock() + defer h.raceMu.Unlock() + delete(h.raceMu.openRefs, refID) + } } - return iter, nil + return closeHook } // newRangeDelIter is an internal helper that constructs an iterator over a @@ -736,7 +752,7 @@ func (h *fileCacheHandle) getTableProperties(file *tableMetadata) (*sstable.Prop } type fileCacheValue struct { - closeHook func(i any) + closeHook func() reader io.Closer // *sstable.Reader isShared bool diff --git a/sstable/reader_iter.go b/sstable/reader_iter.go index e479c38f85..e0a8e8bbf9 100644 --- a/sstable/reader_iter.go +++ b/sstable/reader_iter.go @@ -50,11 +50,9 @@ type Iterator interface { NextPrefix(succKey []byte) *base.InternalKV // SetCloseHook sets a function that will be called when the iterator is - // closed. This is used by the file cache to release the reference count on - // the open sstable.Reader when the iterator is closed. The closures takes - // the iterator as a parameter to enable invariant-build tracking of leaked - // iterators. - SetCloseHook(fn func(any)) + // closed. This is used by the file cache to release the reference count on + // the open sstable.Reader when the iterator is closed. + SetCloseHook(func()) } // Iterator positioning optimizations and singleLevelIterator and diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index e50556bec4..978a6ed2ef 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -71,7 +71,7 @@ type singleLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIte vbRH objstorage.ReadHandle vbRHPrealloc objstorageprovider.PreallocatedReadHandle err error - closeHook func(i any) + closeHook func() readBlockEnv block.ReadEnv @@ -1515,9 +1515,8 @@ func (i *singleLevelIterator[I, PI, D, PD]) Error() error { // SetCloseHook sets a function that will be called when the iterator is closed. // This is used by the file cache to release the reference count on the open -// sstable.Reader when the iterator is closed. The closures takes the iterator -// as a parameter to enable invariant-build tracking of leaked iterators. -func (i *singleLevelIterator[I, PI, D, PD]) SetCloseHook(fn func(i any)) { +// sstable.Reader when the iterator is closed. +func (i *singleLevelIterator[I, PI, D, PD]) SetCloseHook(fn func()) { i.closeHook = fn } @@ -1546,7 +1545,7 @@ func (i *singleLevelIterator[I, PI, D, PD]) closeInternal() error { } if i.closeHook != nil { - i.closeHook(i) + i.closeHook() } var err error err = firstError(err, PD(&i.data).Close()) diff --git a/sstable/reader_iter_two_lvl.go b/sstable/reader_iter_two_lvl.go index 48e6db7df8..590acf8420 100644 --- a/sstable/reader_iter_two_lvl.go +++ b/sstable/reader_iter_two_lvl.go @@ -1015,7 +1015,7 @@ func (i *twoLevelIterator[I, PI, D, PD]) SetContext(ctx context.Context) { i.secondLevel.SetContext(ctx) } -func (i *twoLevelIterator[I, PI, D, PD]) SetCloseHook(fn func(i any)) { +func (i *twoLevelIterator[I, PI, D, PD]) SetCloseHook(fn func()) { i.secondLevel.SetCloseHook(fn) }