Skip to content

Commit

Permalink
WIP on planNodeToRowSource pooling
Browse files Browse the repository at this point in the history
This doesn't work because rowSourceToPlanNode might call ConsumerClosed
on already released planNodeToRowSource.
  • Loading branch information
yuzefovich committed Sep 28, 2022
1 parent e6c639e commit 4af1b13
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 36 deletions.
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfra/execagg",
"//pkg/sql/execinfra/execopnode",
"//pkg/sql/execinfra/execreleasable",
"//pkg/sql/execinfrapb",
"//pkg/sql/execstats",
"//pkg/sql/faketreeeval",
Expand Down
8 changes: 3 additions & 5 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3339,17 +3339,15 @@ func (dsp *DistSQLPlanner) wrapPlan(
// expecting is in fact RowsAffected. RowsAffected statements return a single
// row with the number of rows affected by the statement, and are the only
// types of statement where it's valid to invoke a plan's fast path.
wrapper, err := makePlanNodeToRowSource(n,
wrapper := newPlanNodeToRowSource(
n,
runParams{
extendedEvalCtx: &evalCtx,
p: planCtx.planner,
},
useFastPath,
firstNotWrapped,
)
if err != nil {
return nil, err
}
wrapper.firstNotWrapped = firstNotWrapped

localProcIdx := p.AddLocalProcessor(wrapper)
var input []execinfrapb.InputSyncSpec
Expand Down
112 changes: 81 additions & 31 deletions pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ package sql

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -44,33 +46,62 @@ type planNodeToRowSource struct {
row rowenc.EncDatumRow
}

var _ execinfra.LocalProcessor = &planNodeToRowSource{}
var _ execreleasable.Releasable = &planNodeToRowSource{}
var _ execopnode.OpNode = &planNodeToRowSource{}

func makePlanNodeToRowSource(
source planNode, params runParams, fastPath bool,
) (*planNodeToRowSource, error) {
var typs []*types.T
var planNodeToRowSourcePool = sync.Pool{
New: func() interface{} {
return &planNodeToRowSource{}
},
}

func newPlanNodeToRowSource(
source planNode, params runParams, fastPath bool, firstNotWrapped planNode,
) *planNodeToRowSource {
p := planNodeToRowSourcePool.Get().(*planNodeToRowSource)
*p = planNodeToRowSource{
ProcessorBase: p.ProcessorBase,
fastPath: fastPath,
node: source,
params: params,
outputTypes: p.outputTypes,
firstNotWrapped: firstNotWrapped,
row: p.row,
}
if fastPath {
// If our node is a "fast path node", it means that we're set up to
// just return a row count meaning we'll output a single row with a
// single INT column.
typs = []*types.T{types.Int}
if cap(p.outputTypes) >= 1 {
// Make sure to not lose larger slice for future reuse.
p.outputTypes = p.outputTypes[:1]
p.outputTypes[0] = types.Int
} else {
p.outputTypes = []*types.T{types.Int}
}
} else {
typs = getTypesFromResultColumns(planColumns(source))
resultColumns := planColumns(source)
if cap(p.outputTypes) >= len(resultColumns) {
p.outputTypes = p.outputTypes[:len(resultColumns)]
} else {
p.outputTypes = make([]*types.T, len(resultColumns))
}
for i := range resultColumns {
p.outputTypes[i] = resultColumns[i].Typ
}
}
row := make(rowenc.EncDatumRow, len(typs))

return &planNodeToRowSource{
node: source,
params: params,
outputTypes: typs,
row: row,
fastPath: fastPath,
}, nil
if p.row != nil && cap(p.row) >= len(p.outputTypes) {
// In some cases we might have no output columns, so nil row would have
// sufficient width, yet nil row is a special value, so we can only
// reuse the old row if it's non-nil.
p.row = p.row[:len(p.outputTypes)]
} else {
p.row = make(rowenc.EncDatumRow, len(p.outputTypes))
}
return p
}

var _ execinfra.LocalProcessor = &planNodeToRowSource{}

// MustBeStreaming implements the execinfra.Processor interface.
func (p *planNodeToRowSource) MustBeStreaming() bool {
// hookFnNode is special because it might be blocked forever if we decide to
Expand All @@ -90,26 +121,15 @@ func (p *planNodeToRowSource) InitWithOutput(
flowCtx,
// Note that we have already created a copy of the extendedEvalContext
// (which made a copy of the EvalContext) right before calling
// makePlanNodeToRowSource, so we can just use the eval context from the
// newPlanNodeToRowSource, so we can just use the eval context from the
// params.
p.params.EvalContext(),
0, /* processorID */
output,
nil, /* memMonitor */
execinfra.ProcStateOpts{
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
var meta []execinfrapb.ProducerMetadata
if p.InternalClose() {
// Check if we're wrapping a mutation and emit the rows
// written metric if so.
if m, ok := p.node.(mutationPlanNode); ok {
metrics := execinfrapb.GetMetricsMeta()
metrics.RowsWritten = m.rowsWritten()
meta = []execinfrapb.ProducerMetadata{{Metrics: metrics}}
}
}
return meta
},
// Input to drain is added in SetInput.
TrailingMetaCallback: p.trailingMetaCallback,
},
)
}
Expand Down Expand Up @@ -220,6 +240,36 @@ func (p *planNodeToRowSource) forwardMetadata(metadata *execinfrapb.ProducerMeta
p.ProcessorBase.AppendTrailingMeta(*metadata)
}

func (p *planNodeToRowSource) trailingMetaCallback() []execinfrapb.ProducerMetadata {
var meta []execinfrapb.ProducerMetadata
if p.InternalClose() {
// Check if we're wrapping a mutation and emit the rows written metric
// if so.
if m, ok := p.node.(mutationPlanNode); ok {
metrics := execinfrapb.GetMetricsMeta()
metrics.RowsWritten = m.rowsWritten()
meta = []execinfrapb.ProducerMetadata{{Metrics: metrics}}
}
}
return meta
}

// Release releases this planNodeToRowSource back to the pool.
func (p *planNodeToRowSource) Release() {
p.ProcessorBase.Reset()
// Deeply reset the row. Note that we don't bother deeply resetting the
// types slice since the types are small objects.
for i := range p.row {
p.row[i] = rowenc.EncDatum{}
}
*p = planNodeToRowSource{
ProcessorBase: p.ProcessorBase,
outputTypes: p.outputTypes[:0],
row: p.row[:0],
}
planNodeToRowSourcePool.Put(p)
}

// ChildCount is part of the execopnode.OpNode interface.
func (p *planNodeToRowSource) ChildCount(verbose bool) int {
if _, ok := p.input.(execopnode.OpNode); ok {
Expand Down

0 comments on commit 4af1b13

Please sign in to comment.