Skip to content

Commit

Permalink
colexec: remove a couple of now-stale TODOs
Browse files Browse the repository at this point in the history
This commit removes a stale TODO about investigating why the
materializers could not be released in all cases when they are created
to wrap a row-by-row processor into the vectorized flow. The root cause
was addressed in cockroachdb#88973 (the problem was that we could call
`ConsumerClosed` on an already `Release`d object), so it is now safe to
always release the materializers. For the same reason we no longer need
to perform a deep copy of the closers when creating the materializer.

Additionally, this commit removes a temporary allocation for a slice of
releasables by directly modifying the main "tracking" slice.

Release note: None
  • Loading branch information
yuzefovich committed Oct 10, 2022
1 parent 9e7e704 commit d4d5d99
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 36 deletions.
34 changes: 8 additions & 26 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ func checkNumIn(inputs []colexecargs.OpWithMetaInfo, numIn int) error {

// wrapRowSources, given input Operators, integrates toWrap into a columnar
// execution flow and returns toWrap's output as an Operator.
// - materializerSafeToRelease indicates whether the materializers created in
// order to row-sourcify the inputs are safe to be released on the flow cleanup.
func wrapRowSources(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand All @@ -72,11 +70,10 @@ func wrapRowSources(
monitorRegistry *colexecargs.MonitorRegistry,
processorID int32,
newToWrap func([]execinfra.RowSource) (execinfra.RowSource, error),
materializerSafeToRelease bool,
factory coldata.ColumnFactory,
) (*colexec.Columnarizer, []execreleasable.Releasable, error) {
releasables *[]execreleasable.Releasable,
) (*colexec.Columnarizer, error) {
var toWrapInputs []execinfra.RowSource
var releasables []execreleasable.Releasable
for i := range inputs {
// Optimization: if the input is a Columnarizer, its input is
// necessarily a execinfra.RowSource, so remove the unnecessary
Expand Down Expand Up @@ -105,20 +102,18 @@ func wrapRowSources(
inputs[i].MetadataSources = nil
inputs[i].ToClose = nil
toWrapInputs = append(toWrapInputs, toWrapInput)
if materializerSafeToRelease {
releasables = append(releasables, toWrapInput)
}
*releasables = append(*releasables, toWrapInput)
}
}

toWrap, err := newToWrap(toWrapInputs)
if err != nil {
return nil, releasables, err
return nil, err
}

proc, isProcessor := toWrap.(execinfra.Processor)
if !isProcessor {
return nil, nil, errors.AssertionFailedf("unexpectedly %T is not an execinfra.Processor", toWrap)
return nil, errors.AssertionFailedf("unexpectedly %T is not an execinfra.Processor", toWrap)
}
batchAllocator := colmem.NewAllocator(ctx, monitorRegistry.NewStreamingMemAccount(flowCtx), factory)
metadataAllocator := colmem.NewAllocator(ctx, monitorRegistry.NewStreamingMemAccount(flowCtx), factory)
Expand All @@ -128,7 +123,7 @@ func wrapRowSources(
} else {
c = colexec.NewBufferingColumnarizer(batchAllocator, metadataAllocator, flowCtx, processorID, toWrap)
}
return c, releasables, nil
return c, nil
}

type opResult struct {
Expand Down Expand Up @@ -535,19 +530,7 @@ func (r opResult) createAndWrapRowSource(
// natively since it is more interesting.
return causeToWrap
}
// Note that the materializers aren't safe to release in all cases since in
// some cases they could be released before being closed. Namely, this would
// occur if we have a subquery with LocalPlanNode core and a materializer is
// added in order to wrap that core - what will happen is that all
// releasables are put back into their pools upon the subquery's flow
// cleanup, yet the subquery planNode tree isn't closed yet since its
// closure is done when the main planNode tree is being closed.
// TODO(yuzefovich): currently there are some other cases as well, figure
// those out. I believe all those cases can occur **only** if we have
// LocalPlanNode cores which is the case when we have non-empty
// LocalProcessors.
materializerSafeToRelease := len(args.LocalProcessors) == 0
c, releasables, err := wrapRowSources(
c, err := wrapRowSources(
ctx,
flowCtx,
inputs,
Expand Down Expand Up @@ -581,8 +564,8 @@ func (r opResult) createAndWrapRowSource(
}
return rs, nil
},
materializerSafeToRelease,
factory,
&r.Releasables,
)
if err != nil {
return err
Expand All @@ -595,7 +578,6 @@ func (r opResult) createAndWrapRowSource(
takeOverMetaInfo(&r.OpWithMetaInfo, inputs)
r.MetadataSources = append(r.MetadataSources, r.Root.(colexecop.MetadataSource))
r.ToClose = append(r.ToClose, r.Root.(colexecop.Closer))
r.Releasables = append(r.Releasables, releasables...)
r.Releasables = append(r.Releasables, c)
return nil
}
Expand Down
11 changes: 1 addition & 10 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,7 @@ func newMaterializerInternal(
typs: typs,
converter: colconv.NewAllVecToDatumConverter(len(typs)),
row: make(rowenc.EncDatumRow, len(typs)),
// We have to perform a deep copy of closers because the input object
// might be released before the materializer is closed.
// TODO(yuzefovich): improve this. It will require untangling of
// planTop.close and the row sources pointed to by the plan via
// rowSourceToPlanNode wrappers.
closers: append(m.closers[:0], input.ToClose...),
closers: input.ToClose,
}
m.drainHelper.allocator = allocator
m.drainHelper.statsCollectors = input.StatsCollectors
Expand Down Expand Up @@ -364,14 +359,10 @@ func (m *Materializer) ConsumerClosed() {
func (m *Materializer) Release() {
m.ProcessorBaseNoHelper.Reset()
m.converter.Release()
for i := range m.closers {
m.closers[i] = nil
}
*m = Materializer{
// We're keeping the reference to the same ProcessorBaseNoHelper since
// it allows us to reuse some of the slices.
ProcessorBaseNoHelper: m.ProcessorBaseNoHelper,
closers: m.closers[:0],
}
materializerPool.Put(m)
}

0 comments on commit d4d5d99

Please sign in to comment.