From 59b6f53a0ff8aed57a3b374349f1484a7cf69e34 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 28 Sep 2022 15:23:54 -0700 Subject: [PATCH 1/2] sql: clarify closing contract around plan node and row source adapters This commit cleans up the `rowSourceToPlanNode` adapter (from the DistSQL processor to the planNode object) in the following manner: - it removes the incorrect call to `ConsumerClosed` of the wrapped input. This call was redundant (because the other side of the adapter `planNodeToRowSource` does the closure too) but was also incorrect since it could access the row source that was put back into the sync.Pool (and maybe even picked up by another query). See issue 88964 for more details. - it removes the checks around non-nil "metadata forwarder". These were needed when the local planNode and DistSQL processor engines were merged into one about four years ago, but nowadays the adapter always gets a non-nil forwarder. Release note: None --- pkg/sql/plan_node_to_row_source.go | 5 ++- pkg/sql/row_source_to_plan_node.go | 51 +++++++++++++++--------------- 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/pkg/sql/plan_node_to_row_source.go b/pkg/sql/plan_node_to_row_source.go index aff19ac2af42..67315dc94089 100644 --- a/pkg/sql/plan_node_to_row_source.go +++ b/pkg/sql/plan_node_to_row_source.go @@ -126,6 +126,9 @@ func (p *planNodeToRowSource) SetInput(ctx context.Context, input execinfra.RowS return nil } p.input = input + // Adding the input to drain ensures that the input will be properly closed + // by this planNodeToRowSource. This is important since the + // rowSourceToPlanNode created below is not responsible for that. p.AddInputToDrain(input) // Search the plan we're wrapping for firstNotWrapped, which is the planNode // that DistSQL planning resumed in. Replace that planNode with input, @@ -133,7 +136,7 @@ func (p *planNodeToRowSource) SetInput(ctx context.Context, input execinfra.RowS return walkPlan(ctx, p.node, planObserver{ replaceNode: func(ctx context.Context, nodeName string, plan planNode) (planNode, error) { if plan == p.firstNotWrapped { - return makeRowSourceToPlanNode(input, p, planColumns(p.firstNotWrapped), p.firstNotWrapped), nil + return newRowSourceToPlanNode(input, p, planColumns(p.firstNotWrapped), p.firstNotWrapped), nil } return nil, nil }, diff --git a/pkg/sql/row_source_to_plan_node.go b/pkg/sql/row_source_to_plan_node.go index ccf6c724ca9b..944b58688115 100644 --- a/pkg/sql/row_source_to_plan_node.go +++ b/pkg/sql/row_source_to_plan_node.go @@ -12,7 +12,6 @@ package sql import ( "context" - "fmt" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" @@ -21,9 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" ) -// rowSourceToPlanNode wraps a RowSource and presents it as a PlanNode. It must -// be constructed with Create(), after which it is a PlanNode and can be treated -// as such. +// rowSourceToPlanNode wraps an execinfra.RowSource and presents it as a +// planNode. It must be constructed via newRowSourceToPlanNode, after which it +// is a planNode and can be treated as such. type rowSourceToPlanNode struct { source execinfra.RowSource forwarder metadataForwarder @@ -42,13 +41,17 @@ type rowSourceToPlanNode struct { var _ planNode = &rowSourceToPlanNode{} -// makeRowSourceToPlanNode creates a new planNode that wraps a RowSource. It -// takes an optional metadataForwarder, which if non-nil is invoked for every +// newRowSourceToPlanNode creates a new planNode that wraps an +// execinfra.RowSource. It takes a metadataForwarder, which is invoked for every // piece of metadata this wrapper receives from the wrapped RowSource. +// // It also takes an optional planNode, which is the planNode that the RowSource // that this rowSourceToPlanNode is wrapping originally replaced. That planNode // will be closed when this one is closed. -func makeRowSourceToPlanNode( +// +// NOTE: it is not guaranteed the ConsumerClosed will be called on the provided +// RowSource by the returned rowSourceToPlanNode. +func newRowSourceToPlanNode( s execinfra.RowSource, forwarder metadataForwarder, planCols colinfo.ResultColumns, @@ -72,23 +75,16 @@ func (r *rowSourceToPlanNode) startExec(params runParams) error { func (r *rowSourceToPlanNode) Next(params runParams) (bool, error) { for { - var p *execinfrapb.ProducerMetadata - r.row, p = r.source.Next() + var meta *execinfrapb.ProducerMetadata + r.row, meta = r.source.Next() - if p != nil { - if p.Err != nil { - return false, p.Err - } - if r.forwarder != nil { - r.forwarder.forwardMetadata(p) - continue + if meta != nil { + // Return errors immediately, all other metadata is "forwarded". + if meta.Err != nil { + return false, meta.Err } - if p.TraceData != nil { - // We drop trace metadata since we have no reasonable way to propagate - // it in local SQL execution. - continue - } - return false, fmt.Errorf("unexpected producer metadata: %+v", p) + r.forwarder.forwardMetadata(meta) + continue } if r.row == nil { @@ -114,11 +110,14 @@ func (r *rowSourceToPlanNode) Values() tree.Datums { } func (r *rowSourceToPlanNode) Close(ctx context.Context) { - if r.source != nil { - r.source.ConsumerClosed() - r.source = nil - } + // Make sure to lose the reference to the source. + // + // Note that we do not call ConsumerClosed on it since it is not the + // responsibility of this rowSourceToPlanNode (the responsibility belongs to + // the corresponding planNodeToRowSource). + r.source = nil if r.originalPlanNode != nil { r.originalPlanNode.Close(ctx) + r.originalPlanNode = nil } } From 0ff595271e564eeb3dc60004b42913577c361ac4 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 26 Sep 2022 19:15:49 -0700 Subject: [PATCH 2/2] sql: pool some of the processor allocations This commit makes it so that we now pool allocations of noop, planNodeToRowSource, and columnarizer processors. Additionally, trailing meta callbacks for these three as well as materializers are changed to not be anonymous functions to further reduce the allocations. Release note: None --- pkg/sql/BUILD.bazel | 1 + pkg/sql/colexec/colbuilder/execplan.go | 4 ++ pkg/sql/colexec/columnarizer.go | 56 ++++++++++----- pkg/sql/colexec/materializer.go | 14 ++-- pkg/sql/distsql_physical_planner.go | 8 +-- pkg/sql/plan_node_to_row_source.go | 97 ++++++++++++++++++-------- pkg/sql/rowexec/noop.go | 24 ++++++- 7 files changed, 142 insertions(+), 62 deletions(-) diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 633b452ca5a8..98eb0391c443 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -357,6 +357,7 @@ go_library( "//pkg/sql/execinfra", "//pkg/sql/execinfra/execagg", "//pkg/sql/execinfra/execopnode", + "//pkg/sql/execinfra/execreleasable", "//pkg/sql/execinfrapb", "//pkg/sql/execstats", "//pkg/sql/faketreeeval", diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 578ae77bc24c..dc4c721f28d7 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -576,6 +576,9 @@ func (r opResult) createAndWrapRowSource( ) } r.ColumnTypes = rs.OutputTypes() + if releasable, ok := rs.(execreleasable.Releasable); ok { + r.Releasables = append(r.Releasables, releasable) + } return rs, nil }, materializerSafeToRelease, @@ -593,6 +596,7 @@ func (r opResult) createAndWrapRowSource( r.MetadataSources = append(r.MetadataSources, r.Root.(colexecop.MetadataSource)) r.ToClose = append(r.ToClose, r.Root.(colexecop.Closer)) r.Releasables = append(r.Releasables, releasables...) + r.Releasables = append(r.Releasables, c) return nil } diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index e4730b72e096..d74a5d7d9a5f 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -12,6 +12,7 @@ package colexec import ( "context" + "sync" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" @@ -20,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -78,8 +80,9 @@ type Columnarizer struct { removedFromFlow bool } -var _ colexecop.Operator = &Columnarizer{} +var _ colexecop.DrainableClosableOperator = &Columnarizer{} var _ colexecop.VectorizedStatsCollector = &Columnarizer{} +var _ execreleasable.Releasable = &Columnarizer{} // NewBufferingColumnarizer returns a new Columnarizer that will be buffering up // rows before emitting them as output batches. @@ -121,6 +124,12 @@ func NewStreamingColumnarizer( return newColumnarizer(batchAllocator, metadataAllocator, flowCtx, processorID, input, columnarizerStreamingMode) } +var columnarizerPool = sync.Pool{ + New: func() interface{} { + return &Columnarizer{} + }, +} + // newColumnarizer returns a new Columnarizer. func newColumnarizer( batchAllocator *colmem.Allocator, @@ -135,10 +144,12 @@ func newColumnarizer( default: colexecerror.InternalError(errors.AssertionFailedf("unexpected columnarizerMode %d", mode)) } - c := &Columnarizer{ - metadataAllocator: metadataAllocator, - input: input, - mode: mode, + c := columnarizerPool.Get().(*Columnarizer) + *c = Columnarizer{ + ProcessorBaseNoHelper: c.ProcessorBaseNoHelper, + metadataAllocator: metadataAllocator, + input: input, + mode: mode, } c.ProcessorBaseNoHelper.Init( nil, /* self */ @@ -153,18 +164,12 @@ func newColumnarizer( processorID, nil, /* output */ execinfra.ProcStateOpts{ - InputsToDrain: []execinfra.RowSource{input}, - TrailingMetaCallback: func() []execinfrapb.ProducerMetadata { - // Close will call InternalClose(). Note that we don't return - // any trailing metadata here because the columnarizers - // propagate it in DrainMeta. - if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil { - // Close never returns an error. - colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close")) - } - return nil - }}, + // We append input to inputs to drain below in order to reuse the same + // underlying slice from the pooled columnarizer. + TrailingMetaCallback: c.trailingMetaCallback, + }, ) + c.AddInputToDrain(input) c.typs = c.input.OutputTypes() c.helper.Init(batchAllocator, execinfra.GetWorkMemLimit(flowCtx), c.typs) return c @@ -256,8 +261,6 @@ func (c *Columnarizer) Next() coldata.Batch { return c.batch } -var _ colexecop.DrainableClosableOperator = &Columnarizer{} - // DrainMeta is part of the colexecop.MetadataSource interface. func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata { if c.removedFromFlow { @@ -301,6 +304,23 @@ func (c *Columnarizer) Close(context.Context) error { return nil } +func (c *Columnarizer) trailingMetaCallback() []execinfrapb.ProducerMetadata { + // Close will call InternalClose(). Note that we don't return any trailing + // metadata here because the columnarizers propagate it in DrainMeta. + if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil { + // Close never returns an error. + colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close")) + } + return nil +} + +// Release releases this Columnarizer back to the pool. +func (c *Columnarizer) Release() { + c.ProcessorBaseNoHelper.Reset() + *c = Columnarizer{ProcessorBaseNoHelper: c.ProcessorBaseNoHelper} + columnarizerPool.Put(c) +} + // ChildCount is part of the execopnode.OpNode interface. func (c *Columnarizer) ChildCount(verbose bool) int { if _, ok := c.input.(execopnode.OpNode); ok { diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index ec41314a60b1..f33e2523bad9 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -227,12 +227,7 @@ func newMaterializerInternal( execinfra.ProcStateOpts{ // We append drainHelper to inputs to drain below in order to reuse // the same underlying slice from the pooled materializer. - TrailingMetaCallback: func() []execinfrapb.ProducerMetadata { - // Note that we delegate draining all of the metadata sources - // to drainHelper which is added as an input to drain below. - m.close() - return nil - }, + TrailingMetaCallback: m.trailingMetaCallback, }, ) m.AddInputToDrain(&m.drainHelper) @@ -334,6 +329,13 @@ func (m *Materializer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata return nil, m.DrainHelper() } +func (m *Materializer) trailingMetaCallback() []execinfrapb.ProducerMetadata { + // Note that we delegate draining all of the metadata sources to drainHelper + // which is added as an input to drain. + m.close() + return nil +} + func (m *Materializer) close() { if m.Closed { return diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index db9090831190..d9af9a0c12a2 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -3339,17 +3339,15 @@ func (dsp *DistSQLPlanner) wrapPlan( // expecting is in fact RowsAffected. RowsAffected statements return a single // row with the number of rows affected by the statement, and are the only // types of statement where it's valid to invoke a plan's fast path. - wrapper, err := makePlanNodeToRowSource(n, + wrapper := newPlanNodeToRowSource( + n, runParams{ extendedEvalCtx: &evalCtx, p: planCtx.planner, }, useFastPath, + firstNotWrapped, ) - if err != nil { - return nil, err - } - wrapper.firstNotWrapped = firstNotWrapped localProcIdx := p.AddLocalProcessor(wrapper) var input []execinfrapb.InputSyncSpec diff --git a/pkg/sql/plan_node_to_row_source.go b/pkg/sql/plan_node_to_row_source.go index 67315dc94089..bc0997c5e42b 100644 --- a/pkg/sql/plan_node_to_row_source.go +++ b/pkg/sql/plan_node_to_row_source.go @@ -12,9 +12,11 @@ package sql import ( "context" + "sync" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -44,33 +46,47 @@ type planNodeToRowSource struct { row rowenc.EncDatumRow } +var _ execinfra.LocalProcessor = &planNodeToRowSource{} +var _ execreleasable.Releasable = &planNodeToRowSource{} var _ execopnode.OpNode = &planNodeToRowSource{} -func makePlanNodeToRowSource( - source planNode, params runParams, fastPath bool, -) (*planNodeToRowSource, error) { - var typs []*types.T +var planNodeToRowSourcePool = sync.Pool{ + New: func() interface{} { + return &planNodeToRowSource{} + }, +} + +func newPlanNodeToRowSource( + source planNode, params runParams, fastPath bool, firstNotWrapped planNode, +) *planNodeToRowSource { + p := planNodeToRowSourcePool.Get().(*planNodeToRowSource) + *p = planNodeToRowSource{ + ProcessorBase: p.ProcessorBase, + fastPath: fastPath, + node: source, + params: params, + firstNotWrapped: firstNotWrapped, + row: p.row, + } if fastPath { // If our node is a "fast path node", it means that we're set up to // just return a row count meaning we'll output a single row with a // single INT column. - typs = []*types.T{types.Int} + p.outputTypes = []*types.T{types.Int} } else { - typs = getTypesFromResultColumns(planColumns(source)) + p.outputTypes = getTypesFromResultColumns(planColumns(source)) } - row := make(rowenc.EncDatumRow, len(typs)) - - return &planNodeToRowSource{ - node: source, - params: params, - outputTypes: typs, - row: row, - fastPath: fastPath, - }, nil + if p.row != nil && cap(p.row) >= len(p.outputTypes) { + // In some cases we might have no output columns, so nil row would have + // sufficient width, yet nil row is a special value, so we can only + // reuse the old row if it's non-nil. + p.row = p.row[:len(p.outputTypes)] + } else { + p.row = make(rowenc.EncDatumRow, len(p.outputTypes)) + } + return p } -var _ execinfra.LocalProcessor = &planNodeToRowSource{} - // MustBeStreaming implements the execinfra.Processor interface. func (p *planNodeToRowSource) MustBeStreaming() bool { // hookFnNode is special because it might be blocked forever if we decide to @@ -90,26 +106,15 @@ func (p *planNodeToRowSource) InitWithOutput( flowCtx, // Note that we have already created a copy of the extendedEvalContext // (which made a copy of the EvalContext) right before calling - // makePlanNodeToRowSource, so we can just use the eval context from the + // newPlanNodeToRowSource, so we can just use the eval context from the // params. p.params.EvalContext(), 0, /* processorID */ output, nil, /* memMonitor */ execinfra.ProcStateOpts{ - TrailingMetaCallback: func() []execinfrapb.ProducerMetadata { - var meta []execinfrapb.ProducerMetadata - if p.InternalClose() { - // Check if we're wrapping a mutation and emit the rows - // written metric if so. - if m, ok := p.node.(mutationPlanNode); ok { - metrics := execinfrapb.GetMetricsMeta() - metrics.RowsWritten = m.rowsWritten() - meta = []execinfrapb.ProducerMetadata{{Metrics: metrics}} - } - } - return meta - }, + // Input to drain is added in SetInput. + TrailingMetaCallback: p.trailingMetaCallback, }, ) } @@ -220,6 +225,36 @@ func (p *planNodeToRowSource) forwardMetadata(metadata *execinfrapb.ProducerMeta p.ProcessorBase.AppendTrailingMeta(*metadata) } +func (p *planNodeToRowSource) trailingMetaCallback() []execinfrapb.ProducerMetadata { + var meta []execinfrapb.ProducerMetadata + if p.InternalClose() { + // Check if we're wrapping a mutation and emit the rows written metric + // if so. + if m, ok := p.node.(mutationPlanNode); ok { + metrics := execinfrapb.GetMetricsMeta() + metrics.RowsWritten = m.rowsWritten() + meta = []execinfrapb.ProducerMetadata{{Metrics: metrics}} + } + } + return meta +} + +// Release releases this planNodeToRowSource back to the pool. +func (p *planNodeToRowSource) Release() { + p.ProcessorBase.Reset() + // Deeply reset the row. + for i := range p.row { + p.row[i] = rowenc.EncDatum{} + } + // Note that we don't reuse the outputTypes slice because it is exposed to + // the outer physical planning code. + *p = planNodeToRowSource{ + ProcessorBase: p.ProcessorBase, + row: p.row[:0], + } + planNodeToRowSourcePool.Put(p) +} + // ChildCount is part of the execopnode.OpNode interface. func (p *planNodeToRowSource) ChildCount(verbose bool) int { if _, ok := p.input.(execopnode.OpNode); ok { diff --git a/pkg/sql/rowexec/noop.go b/pkg/sql/rowexec/noop.go index 92153b500890..017fceb32713 100644 --- a/pkg/sql/rowexec/noop.go +++ b/pkg/sql/rowexec/noop.go @@ -12,9 +12,11 @@ package rowexec import ( "context" + "sync" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -32,10 +34,17 @@ type noopProcessor struct { var _ execinfra.Processor = &noopProcessor{} var _ execinfra.RowSource = &noopProcessor{} +var _ execreleasable.Releasable = &noopProcessor{} var _ execopnode.OpNode = &noopProcessor{} const noopProcName = "noop" +var noopPool = sync.Pool{ + New: func() interface{} { + return &noopProcessor{} + }, +} + func newNoopProcessor( flowCtx *execinfra.FlowCtx, processorID int32, @@ -43,7 +52,8 @@ func newNoopProcessor( post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, ) (*noopProcessor, error) { - n := &noopProcessor{input: input} + n := noopPool.Get().(*noopProcessor) + n.input = input if err := n.Init( n, post, @@ -52,10 +62,13 @@ func newNoopProcessor( processorID, output, nil, /* memMonitor */ - execinfra.ProcStateOpts{InputsToDrain: []execinfra.RowSource{n.input}}, + // We append input to inputs to drain below in order to reuse the same + // underlying slice from the pooled noopProcessor. + execinfra.ProcStateOpts{}, ); err != nil { return nil, err } + n.AddInputToDrain(n.input) ctx := flowCtx.EvalCtx.Ctx() if execstats.ShouldCollectStats(ctx, flowCtx.CollectStats) { n.input = newInputStatCollector(n.input) @@ -105,6 +118,13 @@ func (n *noopProcessor) execStatsForTrace() *execinfrapb.ComponentStats { } } +// Release releases this noopProcessor back to the pool. +func (n *noopProcessor) Release() { + n.ProcessorBase.Reset() + *n = noopProcessor{ProcessorBase: n.ProcessorBase} + noopPool.Put(n) +} + // ChildCount is part of the execopnode.OpNode interface. func (n *noopProcessor) ChildCount(bool) int { if _, ok := n.input.(execopnode.OpNode); ok {