diff --git a/pkg/sql/colexec/colbuilder/BUILD.bazel b/pkg/sql/colexec/colbuilder/BUILD.bazel index 3206277b95a4..3e915e25dbf5 100644 --- a/pkg/sql/colexec/colbuilder/BUILD.bazel +++ b/pkg/sql/colexec/colbuilder/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/col/coldata", "//pkg/col/coldataext", "//pkg/col/typeconv", + "//pkg/settings", "//pkg/sql/catalog/descpb", "//pkg/sql/colconv", "//pkg/sql/colexec", diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 23233989c752..5f44473dba6a 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/colconv" "github.com/cockroachdb/cockroach/pkg/sql/colexec" @@ -1495,7 +1496,7 @@ func NewColOperator( Op: result.Root, ColumnTypes: result.ColumnTypes, } - err = ppr.planPostProcessSpec(ctx, flowCtx, args, post, factory, &r.Releasables) + err = ppr.planPostProcessSpec(ctx, flowCtx, args, post, factory, &r.Releasables, args.Spec.EstimatedRowCount) if err != nil { err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, spec.ProcessorID, factory, err) } else { @@ -1647,6 +1648,46 @@ func (r opResult) wrapPostProcessSpec( ) } +// renderExprCountVisitor counts how many projection operators need to be +// planned across render expressions. +type renderExprCountVisitor struct { + renderCount int64 +} + +var _ tree.Visitor = &renderExprCountVisitor{} + +func (r *renderExprCountVisitor) VisitPre(expr tree.Expr) (recurse bool, newExpr tree.Expr) { + if _, ok := expr.(*tree.IndexedVar); ok { + // IndexedVars don't get a projection operator (we just refer to the + // vector by index), so they don't contribute to the render count. + return false, expr + } + r.renderCount++ + return true, expr +} + +func (r *renderExprCountVisitor) VisitPost(expr tree.Expr) tree.Expr { + return expr +} + +var renderWrappingRowCountThreshold = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.distsql.vectorize_render_wrapping.max_row_count", + "determines the maximum number of estimated rows that flow through the render "+ + "expressions up to which we handle those renders by wrapping a row-by-row processor", + 128, + settings.NonNegativeInt, +) + +var renderWrappingRenderCountThreshold = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.distsql.vectorize_render_wrapping.min_render_count", + "determines the minimum number of render expressions for which we fall "+ + "back to handling renders by wrapping a row-by-row processor", + 16, + settings.NonNegativeInt, +) + // planPostProcessSpec plans the post processing stage specified in post on top // of r.Op. func (r *postProcessResult) planPostProcessSpec( @@ -1656,16 +1697,42 @@ func (r *postProcessResult) planPostProcessSpec( post *execinfrapb.PostProcessSpec, factory coldata.ColumnFactory, releasables *[]execreleasable.Releasable, + estimatedRowCount uint64, ) error { if post.Projection { r.Op, r.ColumnTypes = addProjection(r.Op, r.ColumnTypes, post.OutputColumns) } else if post.RenderExprs != nil { - var renderedCols []uint32 - for _, renderExpr := range post.RenderExprs { - expr, err := args.ExprHelper.ProcessExpr(renderExpr, flowCtx.EvalCtx, r.ColumnTypes) + // Deserialize expressions upfront. + exprs := make([]tree.TypedExpr, len(post.RenderExprs)) + var err error + for i := range exprs { + exprs[i], err = args.ExprHelper.ProcessExpr(post.RenderExprs[i], flowCtx.EvalCtx, r.ColumnTypes) if err != nil { return err } + } + // If we have an estimated row count and it doesn't exceed the wrapping + // row count threshold, we might need to fall back to wrapping a + // row-by-row processor to handle the render expressions (for better + // performance). + if estimatedRowCount != 0 && + estimatedRowCount <= uint64(renderWrappingRowCountThreshold.Get(&flowCtx.Cfg.Settings.SV)) { + renderCountThreshold := renderWrappingRenderCountThreshold.Get(&flowCtx.Cfg.Settings.SV) + // Walk over all expressions and estimate how many projection + // operators will need to be created. + var v renderExprCountVisitor + for _, expr := range exprs { + tree.WalkExpr(&v, expr) + if v.renderCount >= renderCountThreshold { + return errors.Newf( + "falling back to wrapping a row-by-row processor for at least "+ + "%d renders, estimated row count = %d", v.renderCount, estimatedRowCount, + ) + } + } + } + var renderedCols []uint32 + for _, expr := range exprs { var outputIdx int r.Op, outputIdx, r.ColumnTypes, err = planProjectionOperators( ctx, flowCtx.EvalCtx, expr, r.ColumnTypes, r.Op, args.StreamingMemAccount, factory, releasables, diff --git a/pkg/sql/opt/exec/execbuilder/testdata/vectorize_wrapping b/pkg/sql/opt/exec/execbuilder/testdata/vectorize_wrapping new file mode 100644 index 000000000000..29db1e965821 --- /dev/null +++ b/pkg/sql/opt/exec/execbuilder/testdata/vectorize_wrapping @@ -0,0 +1,78 @@ +# LogicTest: local + +statement ok +CREATE TABLE t85632 (k INT PRIMARY KEY); + +statement ok +ALTER TABLE t85632 INJECT STATISTICS '[ + { + "avg_size": 1, + "columns": [ + "k" + ], + "created_at": "2022-08-09 09:00:00.00000", + "distinct_count": 1000, + "name": "__auto__", + "null_count": 0, + "row_count": 1000 + } + ]' + +# Use experimental_always vectorize mode so that we error out when trying to +# wrap a row-by-row processor. +statement ok +SET vectorize=experimental_always; + +# Both the estimated row count exceeds the max wrapping row count and number of +# render is smaller that the min render count, so we use native projection +# operators. +query T +EXPLAIN (VEC) SELECT k + k + k + k FROM t85632 +---- +│ +└ Node 1 + └ *colexecproj.projPlusInt64Int64Op + └ *colexecproj.projPlusInt64Int64Op + └ *colexecproj.projPlusInt64Int64Op + └ *colfetcher.ColBatchScan + +statement ok +SET CLUSTER SETTING sql.distsql.vectorize_render_wrapping.min_render_count = 3; + +# The estimated row count still exceeds the max wrapping row count. +query T +EXPLAIN (VEC) SELECT k + k + k + k FROM t85632 +---- +│ +└ Node 1 + └ *colexecproj.projPlusInt64Int64Op + └ *colexecproj.projPlusInt64Int64Op + └ *colexecproj.projPlusInt64Int64Op + └ *colfetcher.ColBatchScan + +statement ok +SET CLUSTER SETTING sql.distsql.vectorize_render_wrapping.max_row_count = 1000; + +# Now both wrapping conditions are satisfied. +query error falling back to wrapping a row-by-row processor +EXPLAIN (VEC) SELECT k + k + k + k FROM t85632 + +statement ok +RESET CLUSTER SETTING sql.distsql.vectorize_render_wrapping.min_render_count; + +# The render count isn't sufficient for wrapping to kick in. +query T +EXPLAIN (VEC) SELECT k + k + k + k FROM t85632 +---- +│ +└ Node 1 + └ *colexecproj.projPlusInt64Int64Op + └ *colexecproj.projPlusInt64Int64Op + └ *colexecproj.projPlusInt64Int64Op + └ *colfetcher.ColBatchScan + +statement ok +RESET CLUSTER SETTING sql.distsql.vectorize_render_wrapping.max_row_count; + +statement ok +RESET vectorize diff --git a/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go b/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go index 159af9953155..26aeca7622d4 100644 --- a/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go +++ b/pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go @@ -578,6 +578,13 @@ func TestExecBuild_vectorize_overloads( runExecBuildLogicTest(t, "vectorize_overloads") } +func TestExecBuild_vectorize_wrapping( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runExecBuildLogicTest(t, "vectorize_wrapping") +} + func TestExecBuild_virtual( t *testing.T, ) {