Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: pool some of the processor allocations #88973

Merged
merged 2 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfra/execagg",
"//pkg/sql/execinfra/execopnode",
"//pkg/sql/execinfra/execreleasable",
"//pkg/sql/execinfrapb",
"//pkg/sql/execstats",
"//pkg/sql/faketreeeval",
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,9 @@ func (r opResult) createAndWrapRowSource(
)
}
r.ColumnTypes = rs.OutputTypes()
if releasable, ok := rs.(execreleasable.Releasable); ok {
r.Releasables = append(r.Releasables, releasable)
}
return rs, nil
},
materializerSafeToRelease,
Expand All @@ -593,6 +596,7 @@ func (r opResult) createAndWrapRowSource(
r.MetadataSources = append(r.MetadataSources, r.Root.(colexecop.MetadataSource))
r.ToClose = append(r.ToClose, r.Root.(colexecop.Closer))
r.Releasables = append(r.Releasables, releasables...)
r.Releasables = append(r.Releasables, c)
return nil
}

Expand Down
56 changes: 38 additions & 18 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colexec

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -78,8 +80,9 @@ type Columnarizer struct {
removedFromFlow bool
}

var _ colexecop.Operator = &Columnarizer{}
var _ colexecop.DrainableClosableOperator = &Columnarizer{}
var _ colexecop.VectorizedStatsCollector = &Columnarizer{}
var _ execreleasable.Releasable = &Columnarizer{}

// NewBufferingColumnarizer returns a new Columnarizer that will be buffering up
// rows before emitting them as output batches.
Expand Down Expand Up @@ -121,6 +124,12 @@ func NewStreamingColumnarizer(
return newColumnarizer(batchAllocator, metadataAllocator, flowCtx, processorID, input, columnarizerStreamingMode)
}

var columnarizerPool = sync.Pool{
New: func() interface{} {
return &Columnarizer{}
},
}

// newColumnarizer returns a new Columnarizer.
func newColumnarizer(
batchAllocator *colmem.Allocator,
Expand All @@ -135,10 +144,12 @@ func newColumnarizer(
default:
colexecerror.InternalError(errors.AssertionFailedf("unexpected columnarizerMode %d", mode))
}
c := &Columnarizer{
metadataAllocator: metadataAllocator,
input: input,
mode: mode,
c := columnarizerPool.Get().(*Columnarizer)
*c = Columnarizer{
ProcessorBaseNoHelper: c.ProcessorBaseNoHelper,
metadataAllocator: metadataAllocator,
input: input,
mode: mode,
}
c.ProcessorBaseNoHelper.Init(
nil, /* self */
Expand All @@ -153,18 +164,12 @@ func newColumnarizer(
processorID,
nil, /* output */
execinfra.ProcStateOpts{
InputsToDrain: []execinfra.RowSource{input},
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
// Close will call InternalClose(). Note that we don't return
// any trailing metadata here because the columnarizers
// propagate it in DrainMeta.
if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil {
// Close never returns an error.
colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close"))
}
return nil
}},
// We append input to inputs to drain below in order to reuse the same
// underlying slice from the pooled columnarizer.
TrailingMetaCallback: c.trailingMetaCallback,
},
)
c.AddInputToDrain(input)
c.typs = c.input.OutputTypes()
c.helper.Init(batchAllocator, execinfra.GetWorkMemLimit(flowCtx), c.typs)
return c
Expand Down Expand Up @@ -256,8 +261,6 @@ func (c *Columnarizer) Next() coldata.Batch {
return c.batch
}

var _ colexecop.DrainableClosableOperator = &Columnarizer{}

// DrainMeta is part of the colexecop.MetadataSource interface.
func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata {
if c.removedFromFlow {
Expand Down Expand Up @@ -301,6 +304,23 @@ func (c *Columnarizer) Close(context.Context) error {
return nil
}

func (c *Columnarizer) trailingMetaCallback() []execinfrapb.ProducerMetadata {
// Close will call InternalClose(). Note that we don't return any trailing
// metadata here because the columnarizers propagate it in DrainMeta.
if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil {
// Close never returns an error.
colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close"))
}
return nil
}

// Release releases this Columnarizer back to the pool.
func (c *Columnarizer) Release() {
c.ProcessorBaseNoHelper.Reset()
*c = Columnarizer{ProcessorBaseNoHelper: c.ProcessorBaseNoHelper}
columnarizerPool.Put(c)
}

// ChildCount is part of the execopnode.OpNode interface.
func (c *Columnarizer) ChildCount(verbose bool) int {
if _, ok := c.input.(execopnode.OpNode); ok {
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,7 @@ func newMaterializerInternal(
execinfra.ProcStateOpts{
// We append drainHelper to inputs to drain below in order to reuse
// the same underlying slice from the pooled materializer.
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
// Note that we delegate draining all of the metadata sources
// to drainHelper which is added as an input to drain below.
m.close()
return nil
},
TrailingMetaCallback: m.trailingMetaCallback,
},
)
m.AddInputToDrain(&m.drainHelper)
Expand Down Expand Up @@ -334,6 +329,13 @@ func (m *Materializer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata
return nil, m.DrainHelper()
}

func (m *Materializer) trailingMetaCallback() []execinfrapb.ProducerMetadata {
// Note that we delegate draining all of the metadata sources to drainHelper
// which is added as an input to drain.
m.close()
return nil
}

func (m *Materializer) close() {
if m.Closed {
return
Expand Down
8 changes: 3 additions & 5 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3339,17 +3339,15 @@ func (dsp *DistSQLPlanner) wrapPlan(
// expecting is in fact RowsAffected. RowsAffected statements return a single
// row with the number of rows affected by the statement, and are the only
// types of statement where it's valid to invoke a plan's fast path.
wrapper, err := makePlanNodeToRowSource(n,
wrapper := newPlanNodeToRowSource(
n,
runParams{
extendedEvalCtx: &evalCtx,
p: planCtx.planner,
},
useFastPath,
firstNotWrapped,
)
if err != nil {
return nil, err
}
wrapper.firstNotWrapped = firstNotWrapped

localProcIdx := p.AddLocalProcessor(wrapper)
var input []execinfrapb.InputSyncSpec
Expand Down
102 changes: 70 additions & 32 deletions pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ package sql

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -44,33 +46,47 @@ type planNodeToRowSource struct {
row rowenc.EncDatumRow
}

var _ execinfra.LocalProcessor = &planNodeToRowSource{}
var _ execreleasable.Releasable = &planNodeToRowSource{}
var _ execopnode.OpNode = &planNodeToRowSource{}

func makePlanNodeToRowSource(
source planNode, params runParams, fastPath bool,
) (*planNodeToRowSource, error) {
var typs []*types.T
var planNodeToRowSourcePool = sync.Pool{
New: func() interface{} {
return &planNodeToRowSource{}
},
}

func newPlanNodeToRowSource(
source planNode, params runParams, fastPath bool, firstNotWrapped planNode,
) *planNodeToRowSource {
p := planNodeToRowSourcePool.Get().(*planNodeToRowSource)
*p = planNodeToRowSource{
ProcessorBase: p.ProcessorBase,
fastPath: fastPath,
node: source,
params: params,
firstNotWrapped: firstNotWrapped,
row: p.row,
}
if fastPath {
// If our node is a "fast path node", it means that we're set up to
// just return a row count meaning we'll output a single row with a
// single INT column.
typs = []*types.T{types.Int}
p.outputTypes = []*types.T{types.Int}
} else {
typs = getTypesFromResultColumns(planColumns(source))
p.outputTypes = getTypesFromResultColumns(planColumns(source))
}
row := make(rowenc.EncDatumRow, len(typs))

return &planNodeToRowSource{
node: source,
params: params,
outputTypes: typs,
row: row,
fastPath: fastPath,
}, nil
if p.row != nil && cap(p.row) >= len(p.outputTypes) {
// In some cases we might have no output columns, so nil row would have
// sufficient width, yet nil row is a special value, so we can only
// reuse the old row if it's non-nil.
p.row = p.row[:len(p.outputTypes)]
} else {
p.row = make(rowenc.EncDatumRow, len(p.outputTypes))
}
return p
}

var _ execinfra.LocalProcessor = &planNodeToRowSource{}

// MustBeStreaming implements the execinfra.Processor interface.
func (p *planNodeToRowSource) MustBeStreaming() bool {
// hookFnNode is special because it might be blocked forever if we decide to
Expand All @@ -90,26 +106,15 @@ func (p *planNodeToRowSource) InitWithOutput(
flowCtx,
// Note that we have already created a copy of the extendedEvalContext
// (which made a copy of the EvalContext) right before calling
// makePlanNodeToRowSource, so we can just use the eval context from the
// newPlanNodeToRowSource, so we can just use the eval context from the
// params.
p.params.EvalContext(),
0, /* processorID */
output,
nil, /* memMonitor */
execinfra.ProcStateOpts{
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
var meta []execinfrapb.ProducerMetadata
if p.InternalClose() {
// Check if we're wrapping a mutation and emit the rows
// written metric if so.
if m, ok := p.node.(mutationPlanNode); ok {
metrics := execinfrapb.GetMetricsMeta()
metrics.RowsWritten = m.rowsWritten()
meta = []execinfrapb.ProducerMetadata{{Metrics: metrics}}
}
}
return meta
},
// Input to drain is added in SetInput.
TrailingMetaCallback: p.trailingMetaCallback,
},
)
}
Expand All @@ -126,14 +131,17 @@ func (p *planNodeToRowSource) SetInput(ctx context.Context, input execinfra.RowS
return nil
}
p.input = input
// Adding the input to drain ensures that the input will be properly closed
// by this planNodeToRowSource. This is important since the
// rowSourceToPlanNode created below is not responsible for that.
p.AddInputToDrain(input)
// Search the plan we're wrapping for firstNotWrapped, which is the planNode
// that DistSQL planning resumed in. Replace that planNode with input,
// wrapped as a planNode.
return walkPlan(ctx, p.node, planObserver{
replaceNode: func(ctx context.Context, nodeName string, plan planNode) (planNode, error) {
if plan == p.firstNotWrapped {
return makeRowSourceToPlanNode(input, p, planColumns(p.firstNotWrapped), p.firstNotWrapped), nil
return newRowSourceToPlanNode(input, p, planColumns(p.firstNotWrapped), p.firstNotWrapped), nil
}
return nil, nil
},
Expand Down Expand Up @@ -217,6 +225,36 @@ func (p *planNodeToRowSource) forwardMetadata(metadata *execinfrapb.ProducerMeta
p.ProcessorBase.AppendTrailingMeta(*metadata)
}

func (p *planNodeToRowSource) trailingMetaCallback() []execinfrapb.ProducerMetadata {
var meta []execinfrapb.ProducerMetadata
if p.InternalClose() {
// Check if we're wrapping a mutation and emit the rows written metric
// if so.
if m, ok := p.node.(mutationPlanNode); ok {
metrics := execinfrapb.GetMetricsMeta()
metrics.RowsWritten = m.rowsWritten()
meta = []execinfrapb.ProducerMetadata{{Metrics: metrics}}
}
}
return meta
}

// Release releases this planNodeToRowSource back to the pool.
func (p *planNodeToRowSource) Release() {
p.ProcessorBase.Reset()
// Deeply reset the row.
for i := range p.row {
p.row[i] = rowenc.EncDatum{}
}
// Note that we don't reuse the outputTypes slice because it is exposed to
// the outer physical planning code.
*p = planNodeToRowSource{
ProcessorBase: p.ProcessorBase,
row: p.row[:0],
}
planNodeToRowSourcePool.Put(p)
}

// ChildCount is part of the execopnode.OpNode interface.
func (p *planNodeToRowSource) ChildCount(verbose bool) int {
if _, ok := p.input.(execopnode.OpNode); ok {
Expand Down
Loading