Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: refactor race-build file cache reference tracking #4362

Merged
merged 1 commit into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ type fileCacheHandle struct {
// This struct is only populated in race builds.
raceMu struct {
sync.Mutex
// iters maps sstable.Iterator objects to the stack trace recorded at
// creation time.
iters map[any][]byte
// nextRefID is the next ID to allocate for a new reference.
nextRefID uint64
// openRefs maps reference IDs to the stack trace recorded at creation
// time. It's used to track which call paths leaked open references to
// files.
openRefs map[uint64][]byte
}
}

Expand All @@ -134,7 +137,7 @@ func (c *FileCache) newHandle(
t.readerOpts = readerOpts
t.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
if invariants.RaceEnabled {
t.raceMu.iters = make(map[any][]byte)
t.raceMu.openRefs = make(map[uint64][]byte)
}
return t
}
Expand All @@ -151,7 +154,7 @@ func (h *fileCacheHandle) Close() error {
err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
} else {
var buf bytes.Buffer
for _, stack := range h.raceMu.iters {
for _, stack := range h.raceMu.openRefs {
fmt.Fprintf(&buf, "%s\n", stack)
}
err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
Expand Down Expand Up @@ -348,12 +351,7 @@ func NewFileCache(numShards int, size int) *FileCache {
v := vRef.Value()
handle := key.handle
v.readerProvider.init(c, key)
v.closeHook = func(iterator any) {
if invariants.RaceEnabled {
handle.raceMu.Lock()
delete(handle.raceMu.iters, iterator)
handle.raceMu.Unlock()
}
v.closeHook = func() {
// closeHook is called when an iterator is closed; the initialization of
// an iterator with this value will happen after a FindOrCreate() call
// with returns the same vRef.
Expand Down Expand Up @@ -572,17 +570,35 @@ func (h *fileCacheHandle) newPointIter(
if err != nil {
return nil, err
}
// NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
// care to avoid introducing an allocation here by adding a closure.
iter.SetCloseHook(v.closeHook)
handle.iterCount.Add(1)
// NB: closeHook (v.closeHook) takes responsibility for calling
// unrefValue(v) here. Take care to avoid introducing an allocation here by
// adding a closure.
closeHook := h.addReference(v)
iter.SetCloseHook(closeHook)
return iter, nil
}

func (h *fileCacheHandle) addReference(v *fileCacheValue) (closeHook func()) {
h.iterCount.Add(1)
closeHook = v.closeHook
if invariants.RaceEnabled {
stack := debug.Stack()
h.raceMu.Lock()
h.raceMu.iters[iter] = stack
refID := h.raceMu.nextRefID
h.raceMu.openRefs[refID] = stack
h.raceMu.nextRefID++
h.raceMu.Unlock()
// In race builds, this closeHook closure will force an allocation.
// Race builds are already unperformant (and allocate a stack trace), so
// we don't care.
closeHook = func() {
v.closeHook()
h.raceMu.Lock()
defer h.raceMu.Unlock()
delete(h.raceMu.openRefs, refID)
}
}
return iter, nil
return closeHook
}

// newRangeDelIter is an internal helper that constructs an iterator over a
Expand Down Expand Up @@ -736,7 +752,7 @@ func (h *fileCacheHandle) getTableProperties(file *tableMetadata) (*sstable.Prop
}

type fileCacheValue struct {
closeHook func(i any)
closeHook func()
reader io.Closer // *sstable.Reader
isShared bool

Expand Down
8 changes: 3 additions & 5 deletions sstable/reader_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@ type Iterator interface {
NextPrefix(succKey []byte) *base.InternalKV

// SetCloseHook sets a function that will be called when the iterator is
// closed. This is used by the file cache to release the reference count on
// the open sstable.Reader when the iterator is closed. The closures takes
// the iterator as a parameter to enable invariant-build tracking of leaked
// iterators.
SetCloseHook(fn func(any))
// closed. This is used by the file cache to release the reference count on
// the open sstable.Reader when the iterator is closed.
SetCloseHook(func())
}

// Iterator positioning optimizations and singleLevelIterator and
Expand Down
9 changes: 4 additions & 5 deletions sstable/reader_iter_single_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type singleLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIte
vbRH objstorage.ReadHandle
vbRHPrealloc objstorageprovider.PreallocatedReadHandle
err error
closeHook func(i any)
closeHook func()

readBlockEnv block.ReadEnv

Expand Down Expand Up @@ -1515,9 +1515,8 @@ func (i *singleLevelIterator[I, PI, D, PD]) Error() error {

// SetCloseHook sets a function that will be called when the iterator is closed.
// This is used by the file cache to release the reference count on the open
// sstable.Reader when the iterator is closed. The closures takes the iterator
// as a parameter to enable invariant-build tracking of leaked iterators.
func (i *singleLevelIterator[I, PI, D, PD]) SetCloseHook(fn func(i any)) {
// sstable.Reader when the iterator is closed.
func (i *singleLevelIterator[I, PI, D, PD]) SetCloseHook(fn func()) {
i.closeHook = fn
}

Expand Down Expand Up @@ -1546,7 +1545,7 @@ func (i *singleLevelIterator[I, PI, D, PD]) closeInternal() error {
}

if i.closeHook != nil {
i.closeHook(i)
i.closeHook()
}
var err error
err = firstError(err, PD(&i.data).Close())
Expand Down
2 changes: 1 addition & 1 deletion sstable/reader_iter_two_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ func (i *twoLevelIterator[I, PI, D, PD]) SetContext(ctx context.Context) {
i.secondLevel.SetContext(ctx)
}

func (i *twoLevelIterator[I, PI, D, PD]) SetCloseHook(fn func(i any)) {
func (i *twoLevelIterator[I, PI, D, PD]) SetCloseHook(fn func()) {
i.secondLevel.SetCloseHook(fn)
}

Expand Down