Skip to content

Commit

Permalink
sql: audit implementations of Releasable interface of slices' reuse
Browse files Browse the repository at this point in the history
This commit performed the audit of all slices that are kept by
components implementing `execinfra.Releasable` interface to make sure
that the slices that might be referencing large objects are deeply
reset. (By deep reset I mean all slots are set to `nil` so that the
possibly large objects could be garbage-collected.) This was prompted by
the previous commit which fixed a recent regression, but this commit
seems like a good idea on its own, and it might be worth backporting it
too.

Release note: None
  • Loading branch information
yuzefovich committed May 5, 2021
1 parent 31f5d28 commit 99153b4
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 13 deletions.
11 changes: 10 additions & 1 deletion pkg/sql/colconv/vec_to_datum.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion pkg/sql/colconv/vec_to_datum_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,17 @@ func NewAllVecToDatumConverter(batchWidth int) *VecToDatumConverter {

// Release is part of the execinfra.Releasable interface.
func (c *VecToDatumConverter) Release() {
// Deeply reset the converted vectors so that we don't hold onto the old
// datums.
for _, vec := range c.convertedVecs {
for i := range vec {
//gcassert:bce
vec[i] = nil
}
}
*c = VecToDatumConverter{
convertedVecs: c.convertedVecs[:0],
convertedVecs: c.convertedVecs[:0],
// This slice is of integers, so there is no need to reset it deeply.
vecIdxsToConvert: c.vecIdxsToConvert[:0],
}
vecToDatumConverterPool.Put(c)
Expand Down
18 changes: 12 additions & 6 deletions pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ func (r *NewColOperatorResult) Release() {
for _, releasable := range r.Releasables {
releasable.Release()
}
// Explicitly unset each slot in the slices of meta components in order to
// lose references to the old objects. If we don't do it, we might have a
// memory leak in case the slices aren't appended to for a while (because
// we're slicing them up to 0 below, the references to the old objects would
// be kept "alive" until the spot in the slice is overwritten by a new
// object).
// Explicitly unset each slot in the slices of objects of non-trivial size
// in order to lose references to the old objects. If we don't do it, we
// might have a memory leak in case the slices aren't appended to for a
// while (because we're slicing them up to 0 below, the references to the
// old objects would be kept "alive" until the spot in the slice is
// overwritten by a new object).
for i := range r.StatsCollectors {
r.StatsCollectors[i] = nil
}
Expand All @@ -162,12 +162,18 @@ func (r *NewColOperatorResult) Release() {
for i := range r.ToClose {
r.ToClose[i] = nil
}
for i := range r.Releasables {
r.Releasables[i] = nil
}
*r = NewColOperatorResult{
OpWithMetaInfo: OpWithMetaInfo{
StatsCollectors: r.StatsCollectors[:0],
MetadataSources: r.MetadataSources[:0],
ToClose: r.ToClose[:0],
},
// There is no need to deeply reset the column types and the memory
// monitoring infra slices because these objects are very tiny in the
// grand scheme of things.
ColumnTypes: r.ColumnTypes[:0],
OpMonitors: r.OpMonitors[:0],
OpAccounts: r.OpAccounts[:0],
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func newCTableInfo() *cTableInfo {

// Release implements the execinfra.Releasable interface.
func (c *cTableInfo) Release() {
// Note that all slices are being reused, but there is no need to deeply
// reset them since all of the slices are of Go native types.
c.colIdxMap.ords = c.colIdxMap.ords[:0]
c.colIdxMap.vals = c.colIdxMap.vals[:0]
*c = cTableInfo{
Expand Down Expand Up @@ -1564,6 +1566,8 @@ var cFetcherPool = sync.Pool{
func (rf *cFetcher) Release() {
rf.table.Release()
*rf = cFetcher{
// The types are small objects, so we don't bother deeply resetting this
// slice.
typs: rf.typs[:0],
}
cFetcherPool.Put(rf)
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ func initCRowFetcher(
// Release implements the execinfra.Releasable interface.
func (s *ColBatchScan) Release() {
s.rf.Release()
// Deeply reset the spans so that we don't hold onto the keys of the spans.
for i := range s.spans {
s.spans[i] = roachpb.Span{}
}
*s = ColBatchScan{
spans: s.spans[:0],
}
Expand Down
30 changes: 25 additions & 5 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,15 +590,28 @@ func (s *vectorizedFlowCreator) Release() {
for _, r := range s.releasables {
r.Release()
}
// Deeply reset slices that might point to the objects of non-trivial size
// so that the old references don't interfere with the objects being
// garbage-collected.
for i := range s.opChains {
s.opChains[i] = nil
}
for i := range s.releasables {
s.releasables[i] = nil
}
*s = vectorizedFlowCreator{
streamIDToInputOp: s.streamIDToInputOp,
streamIDToSpecIdx: s.streamIDToSpecIdx,
exprHelper: s.exprHelper,
procIdxQueue: s.procIdxQueue[:0],
opChains: s.opChains[:0],
monitors: s.monitors[:0],
accounts: s.accounts[:0],
releasables: s.releasables[:0],
// procIdxQueue is a slice of ints, so it's ok to just slice up to 0 to
// prime it for reuse.
procIdxQueue: s.procIdxQueue[:0],
opChains: s.opChains[:0],
// There is no need to deeply reset the memory monitoring infra slices
// because these objects are very tiny in the grand scheme of things.
monitors: s.monitors[:0],
accounts: s.accounts[:0],
releasables: s.releasables[:0],
}
vectorizedFlowCreatorPool.Put(s)
}
Expand Down Expand Up @@ -1244,6 +1257,13 @@ func (r *vectorizedFlowCreatorHelper) getCancelFlowFn() context.CancelFunc {
}

func (r *vectorizedFlowCreatorHelper) Release() {
// Note that processors here can only be of 0 or 1 length, but always of
// 1 capacity (only the root materializer can be appended to this
// slice). Unset the slot so that we don't keep the reference to the old
// materializer.
if len(r.processors) == 1 {
r.processors[0] = nil
}
*r = vectorizedFlowCreatorHelper{
processors: r.processors[:0],
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,13 @@ func (pb *ProcessorBase) MustBeStreaming() bool {
// Reset resets this ProcessorBase, retaining allocated memory in slices.
func (pb *ProcessorBase) Reset() {
pb.Out.Reset()
// Deeply reset the slices so that we don't hold onto the old objects.
for i := range pb.trailingMeta {
pb.trailingMeta[i] = execinfrapb.ProducerMetadata{}
}
for i := range pb.inputsToDrain {
pb.inputsToDrain[i] = nil
}
*pb = ProcessorBase{
Out: pb.Out,
trailingMeta: pb.trailingMeta[:0],
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/rowexec/tablereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ func (tr *tableReader) startScan(ctx context.Context) error {
func (tr *tableReader) Release() {
tr.ProcessorBase.Reset()
tr.fetcher.Reset()
// Deeply reset the spans so that we don't hold onto the keys of the spans.
for i := range tr.spans {
tr.spans[i] = roachpb.Span{}
}
*tr = tableReader{
ProcessorBase: tr.ProcessorBase,
fetcher: tr.fetcher,
Expand Down

0 comments on commit 99153b4

Please sign in to comment.