Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colbuilder: fall back to row-by-row processor wrapping for many renders #85822

Merged
merged 2 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/colexec/colbuilder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/col/coldata",
"//pkg/col/coldataext",
"//pkg/col/typeconv",
"//pkg/settings",
"//pkg/sql/catalog/descpb",
"//pkg/sql/colconv",
"//pkg/sql/colexec",
Expand Down
75 changes: 71 additions & 4 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coldataext"
"github.com/cockroachdb/cockroach/pkg/col/typeconv"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/colconv"
"github.com/cockroachdb/cockroach/pkg/sql/colexec"
Expand Down Expand Up @@ -1495,7 +1496,7 @@ func NewColOperator(
Op: result.Root,
ColumnTypes: result.ColumnTypes,
}
err = ppr.planPostProcessSpec(ctx, flowCtx, args, post, factory, &r.Releasables)
err = ppr.planPostProcessSpec(ctx, flowCtx, args, post, factory, &r.Releasables, args.Spec.EstimatedRowCount)
if err != nil {
err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, spec.ProcessorID, factory, err)
} else {
Expand Down Expand Up @@ -1647,6 +1648,46 @@ func (r opResult) wrapPostProcessSpec(
)
}

// renderExprCountVisitor counts how many projection operators need to be
// planned across render expressions.
type renderExprCountVisitor struct {
renderCount int64
}

var _ tree.Visitor = &renderExprCountVisitor{}

func (r *renderExprCountVisitor) VisitPre(expr tree.Expr) (recurse bool, newExpr tree.Expr) {
if _, ok := expr.(*tree.IndexedVar); ok {
// IndexedVars don't get a projection operator (we just refer to the
// vector by index), so they don't contribute to the render count.
return false, expr
}
r.renderCount++
return true, expr
}

func (r *renderExprCountVisitor) VisitPost(expr tree.Expr) tree.Expr {
return expr
}

var renderWrappingRowCountThreshold = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.distsql.vectorize_render_wrapping.max_row_count",
"determines the maximum number of estimated rows that flow through the render "+
"expressions up to which we handle those renders by wrapping a row-by-row processor",
128,
settings.NonNegativeInt,
)

var renderWrappingRenderCountThreshold = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.distsql.vectorize_render_wrapping.min_render_count",
"determines the minimum number of render expressions for which we fall "+
"back to handling renders by wrapping a row-by-row processor",
16,
settings.NonNegativeInt,
)

// planPostProcessSpec plans the post processing stage specified in post on top
// of r.Op.
func (r *postProcessResult) planPostProcessSpec(
Expand All @@ -1656,16 +1697,42 @@ func (r *postProcessResult) planPostProcessSpec(
post *execinfrapb.PostProcessSpec,
factory coldata.ColumnFactory,
releasables *[]execreleasable.Releasable,
estimatedRowCount uint64,
) error {
if post.Projection {
r.Op, r.ColumnTypes = addProjection(r.Op, r.ColumnTypes, post.OutputColumns)
} else if post.RenderExprs != nil {
var renderedCols []uint32
for _, renderExpr := range post.RenderExprs {
expr, err := args.ExprHelper.ProcessExpr(renderExpr, flowCtx.EvalCtx, r.ColumnTypes)
// Deserialize expressions upfront.
exprs := make([]tree.TypedExpr, len(post.RenderExprs))
var err error
for i := range exprs {
exprs[i], err = args.ExprHelper.ProcessExpr(post.RenderExprs[i], flowCtx.EvalCtx, r.ColumnTypes)
if err != nil {
return err
}
}
// If we have an estimated row count and it doesn't exceed the wrapping
// row count threshold, we might need to fall back to wrapping a
// row-by-row processor to handle the render expressions (for better
// performance).
if estimatedRowCount != 0 &&
estimatedRowCount <= uint64(renderWrappingRowCountThreshold.Get(&flowCtx.Cfg.Settings.SV)) {
renderCountThreshold := renderWrappingRenderCountThreshold.Get(&flowCtx.Cfg.Settings.SV)
// Walk over all expressions and estimate how many projection
// operators will need to be created.
var v renderExprCountVisitor
for _, expr := range exprs {
tree.WalkExpr(&v, expr)
if v.renderCount >= renderCountThreshold {
return errors.Newf(
"falling back to wrapping a row-by-row processor for at least "+
"%d renders, estimated row count = %d", v.renderCount, estimatedRowCount,
)
}
}
}
var renderedCols []uint32
for _, expr := range exprs {
var outputIdx int
r.Op, outputIdx, r.ColumnTypes, err = planProjectionOperators(
ctx, flowCtx.EvalCtx, expr, r.ColumnTypes, r.Op, args.StreamingMemAccount, factory, releasables,
Expand Down
41 changes: 41 additions & 0 deletions pkg/sql/colexec/colbuilder/execplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package colbuilder

import (
"context"
"fmt"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -159,3 +161,42 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) {
}
require.Equal(t, numRows, rowIdx)
}

// BenchmarkRenderPlanning benchmarks how long it takes to run a query with many
// render expressions inside. With small number of rows to read, the overhead of
// allocating the initial vectors for the projection operators dominates.
func BenchmarkRenderPlanning(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)

ctx := context.Background()
s, db, _ := serverutils.StartServer(b, base.TestServerArgs{SQLMemoryPoolSize: 10 << 30})
defer s.Stopper().Stop(ctx)

jsonValue := `'{"string": "string", "int": 123, "null": null, "nested": {"string": "string", "int": 123, "null": null, "nested": {"string": "string", "int": 123, "null": null}}}'`

sqlDB := sqlutils.MakeSQLRunner(db)
for _, numRows := range []int{1, 1 << 3, 1 << 6, 1 << 9} {
sqlDB.Exec(b, "DROP TABLE IF EXISTS bench")
sqlDB.Exec(b, "CREATE TABLE bench (id INT PRIMARY KEY, state JSONB)")
sqlDB.Exec(b, fmt.Sprintf(`INSERT INTO bench SELECT i, %s FROM generate_series(1, %d) AS g(i)`, jsonValue, numRows))
sqlDB.Exec(b, "ANALYZE bench")
for _, numRenders := range []int{1, 1 << 4, 1 << 8, 1 << 12} {
var sb strings.Builder
sb.WriteString("SELECT ")
for i := 0; i < numRenders; i++ {
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString(fmt.Sprintf("state->'nested'->>'nested' AS test%d", i+1))
}
sb.WriteString(" FROM bench")
query := sb.String()
b.Run(fmt.Sprintf("rows=%d/renders=%d", numRows, numRenders), func(b *testing.B) {
for i := 0; i < b.N; i++ {
sqlDB.Exec(b, query)
}
})
}
}
}
78 changes: 78 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/vectorize_wrapping
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# LogicTest: local

statement ok
CREATE TABLE t85632 (k INT PRIMARY KEY);

statement ok
ALTER TABLE t85632 INJECT STATISTICS '[
{
"avg_size": 1,
"columns": [
"k"
],
"created_at": "2022-08-09 09:00:00.00000",
"distinct_count": 1000,
"name": "__auto__",
"null_count": 0,
"row_count": 1000
}
]'

# Use experimental_always vectorize mode so that we error out when trying to
# wrap a row-by-row processor.
statement ok
SET vectorize=experimental_always;

# Both the estimated row count exceeds the max wrapping row count and number of
# render is smaller that the min render count, so we use native projection
# operators.
query T
EXPLAIN (VEC) SELECT k + k + k + k FROM t85632
----
└ Node 1
└ *colexecproj.projPlusInt64Int64Op
└ *colexecproj.projPlusInt64Int64Op
└ *colexecproj.projPlusInt64Int64Op
└ *colfetcher.ColBatchScan

statement ok
SET CLUSTER SETTING sql.distsql.vectorize_render_wrapping.min_render_count = 3;

# The estimated row count still exceeds the max wrapping row count.
query T
EXPLAIN (VEC) SELECT k + k + k + k FROM t85632
----
└ Node 1
└ *colexecproj.projPlusInt64Int64Op
└ *colexecproj.projPlusInt64Int64Op
└ *colexecproj.projPlusInt64Int64Op
└ *colfetcher.ColBatchScan

statement ok
SET CLUSTER SETTING sql.distsql.vectorize_render_wrapping.max_row_count = 1000;

# Now both wrapping conditions are satisfied.
query error falling back to wrapping a row-by-row processor
EXPLAIN (VEC) SELECT k + k + k + k FROM t85632

statement ok
RESET CLUSTER SETTING sql.distsql.vectorize_render_wrapping.min_render_count;

# The render count isn't sufficient for wrapping to kick in.
query T
EXPLAIN (VEC) SELECT k + k + k + k FROM t85632
----
└ Node 1
└ *colexecproj.projPlusInt64Int64Op
└ *colexecproj.projPlusInt64Int64Op
└ *colexecproj.projPlusInt64Int64Op
└ *colfetcher.ColBatchScan

statement ok
RESET CLUSTER SETTING sql.distsql.vectorize_render_wrapping.max_row_count;

statement ok
RESET vectorize
7 changes: 7 additions & 0 deletions pkg/sql/opt/exec/execbuilder/tests/local/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.