Skip to content

Commit

Permalink
db: refactor race-build file cache reference tracking
Browse files Browse the repository at this point in the history
Previously the file cache used a pointer to the sstable iterator to maintain
the stack traces of the origination of open references during race builds. This
was a bit ambiguous. With the introduction of blob files, references will also
be maintained by non-sstable iterators (eg, a cached blob file reader within an
iterator's blob value fetcher).

This commit refactors this reference tracking to allocate a new closure in race
builds. The additional allocation is expected to be significant in (already
slow) race builds.
  • Loading branch information
jbowens committed Mar 5, 2025
1 parent 5b12eaf commit e99057c
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 29 deletions.
52 changes: 34 additions & 18 deletions file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ type fileCacheHandle struct {
// This struct is only populated in race builds.
raceMu struct {
sync.Mutex
// iters maps sstable.Iterator objects to the stack trace recorded at
// creation time.
iters map[any][]byte
// nextRefID is the next ID to allocate for a new reference.
nextRefID uint64
// openRefs maps reference IDs to the stack trace recorded at creation
// time. It's used to track which call paths leaked open references to
// files.
openRefs map[uint64][]byte
}
}

Expand All @@ -134,7 +137,7 @@ func (c *FileCache) newHandle(
t.readerOpts = readerOpts
t.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
if invariants.RaceEnabled {
t.raceMu.iters = make(map[any][]byte)
t.raceMu.openRefs = make(map[uint64][]byte)
}
return t
}
Expand All @@ -151,7 +154,7 @@ func (h *fileCacheHandle) Close() error {
err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
} else {
var buf bytes.Buffer
for _, stack := range h.raceMu.iters {
for _, stack := range h.raceMu.openRefs {
fmt.Fprintf(&buf, "%s\n", stack)
}
err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
Expand Down Expand Up @@ -348,12 +351,7 @@ func NewFileCache(numShards int, size int) *FileCache {
v := vRef.Value()
handle := key.handle
v.readerProvider.init(c, key)
v.closeHook = func(iterator any) {
if invariants.RaceEnabled {
handle.raceMu.Lock()
delete(handle.raceMu.iters, iterator)
handle.raceMu.Unlock()
}
v.closeHook = func() {
// closeHook is called when an iterator is closed; the initialization of
// an iterator with this value will happen after a FindOrCreate() call
// with returns the same vRef.
Expand Down Expand Up @@ -572,17 +570,35 @@ func (h *fileCacheHandle) newPointIter(
if err != nil {
return nil, err
}
// NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
// care to avoid introducing an allocation here by adding a closure.
iter.SetCloseHook(v.closeHook)
handle.iterCount.Add(1)
// NB: closeHook (v.closeHook) takes responsibility for calling
// unrefValue(v) here. Take care to avoid introducing an allocation here by
// adding a closure.
closeHook := h.addReference(v)
iter.SetCloseHook(closeHook)
return iter, nil
}

func (h *fileCacheHandle) addReference(v *fileCacheValue) (closeHook func()) {
h.iterCount.Add(1)
closeHook = v.closeHook
if invariants.RaceEnabled {
stack := debug.Stack()
h.raceMu.Lock()
h.raceMu.iters[iter] = stack
refID := h.raceMu.nextRefID
h.raceMu.openRefs[refID] = stack
h.raceMu.nextRefID++
h.raceMu.Unlock()
// In race builds, this closeHook closure will force an allocation.
// Race builds are already unperformant (and allocate a stack trace), so
// we don't care.
closeHook = func() {
v.closeHook()
h.raceMu.Lock()
defer h.raceMu.Unlock()
delete(h.raceMu.openRefs, refID)
}
}
return iter, nil
return closeHook
}

// newRangeDelIter is an internal helper that constructs an iterator over a
Expand Down Expand Up @@ -736,7 +752,7 @@ func (h *fileCacheHandle) getTableProperties(file *tableMetadata) (*sstable.Prop
}

type fileCacheValue struct {
closeHook func(i any)
closeHook func()
reader io.Closer // *sstable.Reader
isShared bool

Expand Down
8 changes: 3 additions & 5 deletions sstable/reader_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@ type Iterator interface {
NextPrefix(succKey []byte) *base.InternalKV

// SetCloseHook sets a function that will be called when the iterator is
// closed. This is used by the file cache to release the reference count on
// the open sstable.Reader when the iterator is closed. The closures takes
// the iterator as a parameter to enable invariant-build tracking of leaked
// iterators.
SetCloseHook(fn func(any))
// closed. This is used by the file cache to release the reference count on
// the open sstable.Reader when the iterator is closed.
SetCloseHook(func())
}

// Iterator positioning optimizations and singleLevelIterator and
Expand Down
9 changes: 4 additions & 5 deletions sstable/reader_iter_single_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type singleLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIte
vbRH objstorage.ReadHandle
vbRHPrealloc objstorageprovider.PreallocatedReadHandle
err error
closeHook func(i any)
closeHook func()

readBlockEnv block.ReadEnv

Expand Down Expand Up @@ -1515,9 +1515,8 @@ func (i *singleLevelIterator[I, PI, D, PD]) Error() error {

// SetCloseHook sets a function that will be called when the iterator is closed.
// This is used by the file cache to release the reference count on the open
// sstable.Reader when the iterator is closed. The closures takes the iterator
// as a parameter to enable invariant-build tracking of leaked iterators.
func (i *singleLevelIterator[I, PI, D, PD]) SetCloseHook(fn func(i any)) {
// sstable.Reader when the iterator is closed.
func (i *singleLevelIterator[I, PI, D, PD]) SetCloseHook(fn func()) {
i.closeHook = fn
}

Expand Down Expand Up @@ -1546,7 +1545,7 @@ func (i *singleLevelIterator[I, PI, D, PD]) closeInternal() error {
}

if i.closeHook != nil {
i.closeHook(i)
i.closeHook()
}
var err error
err = firstError(err, PD(&i.data).Close())
Expand Down
2 changes: 1 addition & 1 deletion sstable/reader_iter_two_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ func (i *twoLevelIterator[I, PI, D, PD]) SetContext(ctx context.Context) {
i.secondLevel.SetContext(ctx)
}

func (i *twoLevelIterator[I, PI, D, PD]) SetCloseHook(fn func(i any)) {
func (i *twoLevelIterator[I, PI, D, PD]) SetCloseHook(fn func()) {
i.secondLevel.SetCloseHook(fn)
}

Expand Down

0 comments on commit e99057c

Please sign in to comment.