Skip to content

Commit

Permalink
pkg/planner: set proj.AvoidColumnEvaluator in postOptimize (#55333) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 8, 2024
1 parent 276bea7 commit 0736f59
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 31 deletions.
5 changes: 2 additions & 3 deletions planner/cascades/implementation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,8 @@ func (*ImplProjection) OnImplement(expr *memo.GroupExpr, reqProp *property.Physi
return nil, nil
}
proj := plannercore.PhysicalProjection{
Exprs: logicProj.Exprs,
CalculateNoDelay: logicProj.CalculateNoDelay,
AvoidColumnEvaluator: logicProj.AvoidColumnEvaluator,
Exprs: logicProj.Exprs,
CalculateNoDelay: logicProj.CalculateNoDelay,
}.Init(logicProj.SCtx(), logicProp.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt), logicProj.SelectBlockOffset(), childProp)
proj.SetSchema(logicProp.Schema)
return []memo.Implementation{impl.NewProjectionImpl(proj)}, nil
Expand Down
10 changes: 4 additions & 6 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,9 +1153,8 @@ func (p *LogicalJoin) constructInnerProj(proj *LogicalProjection, child Physical
return child
}
physicalProj := PhysicalProjection{
Exprs: proj.Exprs,
CalculateNoDelay: proj.CalculateNoDelay,
AvoidColumnEvaluator: proj.AvoidColumnEvaluator,
Exprs: proj.Exprs,
CalculateNoDelay: proj.CalculateNoDelay,
}.Init(proj.ctx, proj.stats, proj.blockOffset, nil)
physicalProj.SetChildren(child)
return physicalProj
Expand Down Expand Up @@ -2606,9 +2605,8 @@ func (p *LogicalProjection) exhaustPhysicalPlans(prop *property.PhysicalProperty
ret := make([]PhysicalPlan, 0, len(newProps))
for _, newProp := range newProps {
proj := PhysicalProjection{
Exprs: p.Exprs,
CalculateNoDelay: p.CalculateNoDelay,
AvoidColumnEvaluator: p.AvoidColumnEvaluator,
Exprs: p.Exprs,
CalculateNoDelay: p.CalculateNoDelay,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, newProp)
proj.SetSchema(p.schema)
ret = append(ret, proj)
Expand Down
4 changes: 2 additions & 2 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1834,7 +1834,7 @@ func (b *PlanBuilder) buildProjection4Union(_ context.Context, u *LogicalUnionAl
}
}
b.optFlag |= flagEliminateProjection
proj := LogicalProjection{Exprs: exprs, AvoidColumnEvaluator: true}.Init(b.ctx, b.getSelectOffset())
proj := LogicalProjection{Exprs: exprs}.Init(b.ctx, b.getSelectOffset())
proj.SetSchema(u.schema.Clone())
// reset the schema type to make the "not null" flag right.
for i, expr := range exprs {
Expand Down Expand Up @@ -7690,7 +7690,7 @@ func (b *PlanBuilder) buildProjection4CTEUnion(_ context.Context, seed LogicalPl
}
}
b.optFlag |= flagEliminateProjection
proj := LogicalProjection{Exprs: exprs, AvoidColumnEvaluator: true}.Init(b.ctx, b.getSelectOffset())
proj := LogicalProjection{Exprs: exprs}.Init(b.ctx, b.getSelectOffset())
proj.SetSchema(resSchema)
proj.SetChildren(recur)
return proj, nil
Expand Down
7 changes: 0 additions & 7 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,13 +567,6 @@ type LogicalProjection struct {
// Currently it is "true" only when the current sql query is a "DO" statement.
// See "https://dev.mysql.com/doc/refman/5.7/en/do.html" for more detail.
CalculateNoDelay bool

// AvoidColumnEvaluator is a temporary variable which is ONLY used to avoid
// building columnEvaluator for the expressions of Projection which is
// built by buildProjection4Union.
// This can be removed after column pool being supported.
// Related issue: TiDB#8141(https://github.com/pingcap/tidb/issues/8141)
AvoidColumnEvaluator bool
}

// ExtractFD implements the logical plan interface, extracting the FD from bottom up.
Expand Down
17 changes: 17 additions & 0 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ func postOptimize(ctx context.Context, sctx sessionctx.Context, plan PhysicalPla
plan = InjectExtraProjection(plan)
mergeContinuousSelections(plan)
plan = eliminateUnionScanAndLock(sctx, plan)
plan = avoidColumnEvaluatorForProjBelowUnion(plan)
plan = enableParallelApply(sctx, plan)
handleFineGrainedShuffle(ctx, sctx, plan)
propagateProbeParents(plan, nil)
Expand Down Expand Up @@ -1177,6 +1178,22 @@ func physicalOptimize(logic LogicalPlan, planCounter *PlanCounterTp) (plan Physi
return t.plan(), cost, err
}

// avoidColumnEvaluatorForProjBelowUnion sets AvoidColumnEvaluator to false for the projection operator which is a child of Union operator.
func avoidColumnEvaluatorForProjBelowUnion(p PhysicalPlan) PhysicalPlan {
iteratePhysicalPlan(p, func(p PhysicalPlan) bool {
x, ok := p.(*PhysicalUnionAll)
if ok {
for _, child := range x.Children() {
if proj, ok := child.(*PhysicalProjection); ok {
proj.AvoidColumnEvaluator = true
}
}
}
return true
})
return p
}

// eliminateUnionScanAndLock set lock property for PointGet and BatchPointGet and eliminates UnionScan and Lock.
func eliminateUnionScanAndLock(sctx sessionctx.Context, p PhysicalPlan) PhysicalPlan {
var pointGet *PointGetPlan
Expand Down
73 changes: 73 additions & 0 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,3 +582,76 @@ func TestPhysicalTableScanExtractCorrelatedCols(t *testing.T) {
require.Equal(t, 1, len(correlated))
require.Equal(t, "test.t2.company_no", correlated[0].String())
}

func TestAvoidColumnEvaluatorForProjBelowUnion(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

getPhysicalPlan := func(sql string) core.Plan {
tk.MustExec(sql)
info := tk.Session().ShowProcess()
require.NotNil(t, info)
p, ok := info.Plan.(core.Plan)
require.True(t, ok)
return p
}

var findProjBelowUnion func(p core.Plan) (projsBelowUnion, normalProjs []*core.PhysicalProjection)
findProjBelowUnion = func(p core.Plan) (projsBelowUnion, normalProjs []*core.PhysicalProjection) {
if p == nil {
return projsBelowUnion, normalProjs
}
switch v := p.(type) {
case *core.PhysicalUnionAll:
for _, child := range v.Children() {
if proj, ok := child.(*core.PhysicalProjection); ok {
projsBelowUnion = append(projsBelowUnion, proj)
}
}
default:
for _, child := range p.(core.PhysicalPlan).Children() {
if proj, ok := child.(*core.PhysicalProjection); ok {
normalProjs = append(normalProjs, proj)
}
subProjsBelowUnion, subNormalProjs := findProjBelowUnion(child)
projsBelowUnion = append(projsBelowUnion, subProjsBelowUnion...)
normalProjs = append(normalProjs, subNormalProjs...)
}
}
return projsBelowUnion, normalProjs
}

checkResult := func(sql string) {
p := getPhysicalPlan(sql)
projsBelowUnion, normalProjs := findProjBelowUnion(p)
if proj, ok := p.(*core.PhysicalProjection); ok {
normalProjs = append(normalProjs, proj)
}
require.NotEmpty(t, projsBelowUnion)
for _, proj := range projsBelowUnion {
require.True(t, proj.AvoidColumnEvaluator)
}
for _, proj := range normalProjs {
require.False(t, proj.AvoidColumnEvaluator)
}
}

// Test setup
tk.MustExec("use test")
tk.MustExec(`drop table if exists t1, t2;`)
tk.MustExec(`create table t1 (cc1 int, cc2 text);`)
tk.MustExec(`insert into t1 values (1, 'aaaa'), (2, 'bbbb'), (3, 'cccc');`)
tk.MustExec(`create table t2 (cc1 int, cc2 text, primary key(cc1));`)
tk.MustExec(`insert into t2 values (2, '2');`)
tk.MustExec(`set tidb_executor_concurrency = 1;`)
tk.MustExec(`set tidb_window_concurrency = 100;`)

testCases := []string{
`select * from (SELECT DISTINCT cc2 as a, cc2 as b, cc1 as c FROM t2 UNION ALL SELECT count(1) over (partition by cc1), cc2, cc1 FROM t1) order by a, b, c;`,
`select a+1, b+1 from (select cc1 as a, cc2 as b from t1 union select cc2, cc1 from t1) tmp`,
}

for _, sql := range testCases {
checkResult(sql)
}
}
8 changes: 6 additions & 2 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,8 +1030,12 @@ func (ts *PhysicalTableScan) MemoryUsage() (sum int64) {
type PhysicalProjection struct {
physicalSchemaProducer

Exprs []expression.Expression
CalculateNoDelay bool
Exprs []expression.Expression
CalculateNoDelay bool

// AvoidColumnEvaluator is ONLY used to avoid building columnEvaluator
// for the expressions of Projection which is child of Union operator.
// Related issue: TiDB#8141(https://github.com/pingcap/tidb/issues/8141)
AvoidColumnEvaluator bool
}

Expand Down
2 changes: 1 addition & 1 deletion planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (p *LogicalUnionAll) PruneColumns(parentUsedCols []*expression.Column, opt
for j, col := range schema.Columns {
exprs[j] = col
}
proj := LogicalProjection{Exprs: exprs, AvoidColumnEvaluator: true}.Init(p.ctx, p.blockOffset)
proj := LogicalProjection{Exprs: exprs}.Init(p.ctx, p.blockOffset)
proj.SetSchema(schema)

proj.SetChildren(child)
Expand Down
15 changes: 5 additions & 10 deletions planner/core/rule_inject_extra_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ func InjectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDes
child := aggPlan.Children()[0]
prop := aggPlan.GetChildReqProps(0).CloneEssentialFields()
proj := PhysicalProjection{
Exprs: projExprs,
AvoidColumnEvaluator: false,
Exprs: projExprs,
}.Init(aggPlan.SCtx(), child.statsInfo().ScaleByExpectCnt(prop.ExpectedCnt), aggPlan.SelectBlockOffset(), prop)
proj.SetSchema(expression.NewSchema(projSchemaCols...))
proj.SetChildren(child)
Expand Down Expand Up @@ -221,8 +220,7 @@ func InjectProjBelowSort(p PhysicalPlan, orderByItems []*util.ByItems) PhysicalP
topProjExprs = append(topProjExprs, col)
}
topProj := PhysicalProjection{
Exprs: topProjExprs,
AvoidColumnEvaluator: false,
Exprs: topProjExprs,
}.Init(p.SCtx(), p.statsInfo(), p.SelectBlockOffset(), nil)
topProj.SetSchema(p.Schema().Clone())
topProj.SetChildren(p)
Expand Down Expand Up @@ -254,8 +252,7 @@ func InjectProjBelowSort(p PhysicalPlan, orderByItems []*util.ByItems) PhysicalP

childProp := p.GetChildReqProps(0).CloneEssentialFields()
bottomProj := PhysicalProjection{
Exprs: bottomProjExprs,
AvoidColumnEvaluator: false,
Exprs: bottomProjExprs,
}.Init(p.SCtx(), childPlan.statsInfo().ScaleByExpectCnt(childProp.ExpectedCnt), p.SelectBlockOffset(), childProp)
bottomProj.SetSchema(expression.NewSchema(bottomProjSchemaCols...))
bottomProj.SetChildren(childPlan)
Expand Down Expand Up @@ -304,8 +301,7 @@ func TurnNominalSortIntoProj(p PhysicalPlan, onlyColumn bool, orderByItems []*ut

childProp := p.GetChildReqProps(0).CloneEssentialFields()
bottomProj := PhysicalProjection{
Exprs: bottomProjExprs,
AvoidColumnEvaluator: false,
Exprs: bottomProjExprs,
}.Init(p.SCtx(), childPlan.statsInfo().ScaleByExpectCnt(childProp.ExpectedCnt), p.SelectBlockOffset(), childProp)
bottomProj.SetSchema(expression.NewSchema(bottomProjSchemaCols...))
bottomProj.SetChildren(childPlan)
Expand All @@ -317,8 +313,7 @@ func TurnNominalSortIntoProj(p PhysicalPlan, onlyColumn bool, orderByItems []*ut
topProjExprs = append(topProjExprs, col)
}
topProj := PhysicalProjection{
Exprs: topProjExprs,
AvoidColumnEvaluator: false,
Exprs: topProjExprs,
}.Init(p.SCtx(), childPlan.statsInfo().ScaleByExpectCnt(childProp.ExpectedCnt), p.SelectBlockOffset(), childProp)
topProj.SetSchema(childPlan.Schema().Clone())
topProj.SetChildren(bottomProj)
Expand Down

0 comments on commit 0736f59

Please sign in to comment.