Skip to content

Commit

Permalink
planner: avoid nil PhysicalProperty when to build agg (#55201)
Browse files Browse the repository at this point in the history
close #55169
  • Loading branch information
hawkingrei authored Aug 6, 2024
1 parent 0e43ba0 commit de943d1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 26 deletions.
54 changes: 28 additions & 26 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,13 +814,13 @@ func buildIndexJoinInner2TableScan(
return nil
}
rangeInfo := indexJoinPathRangeInfo(p.SCtx(), outerJoinKeys, indexJoinResult)
innerTask = constructInnerTableScanTask(p, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt)
innerTask = constructInnerTableScanTask(p, prop, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt)
// The index merge join's inner plan is different from index join, so we
// should construct another inner plan for it.
// Because we can't keep order for union scan, if there is a union scan in inner task,
// we can't construct index merge join.
if !wrapper.hasDitryWrite {
innerTask2 = constructInnerTableScanTask(p, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
innerTask2 = constructInnerTableScanTask(p, prop, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
}
ranges = indexJoinResult.chosenRanges
} else {
Expand Down Expand Up @@ -854,13 +854,13 @@ func buildIndexJoinInner2TableScan(
}
buffer.WriteString("]")
rangeInfo := buffer.String()
innerTask = constructInnerTableScanTask(p, wrapper, ranges, outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt)
innerTask = constructInnerTableScanTask(p, prop, wrapper, ranges, outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt)
// The index merge join's inner plan is different from index join, so we
// should construct another inner plan for it.
// Because we can't keep order for union scan, if there is a union scan in inner task,
// we can't construct index merge join.
if !wrapper.hasDitryWrite {
innerTask2 = constructInnerTableScanTask(p, wrapper, ranges, outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
innerTask2 = constructInnerTableScanTask(p, prop, wrapper, ranges, outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
}
}
var (
Expand Down Expand Up @@ -922,7 +922,7 @@ func buildIndexJoinInner2IndexScan(
maxOneRow = ok && (sf.FuncName.L == ast.EQ)
}
}
innerTask := constructInnerIndexScanTask(p, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, false, false, avgInnerRowCnt, maxOneRow)
innerTask := constructInnerIndexScanTask(p, prop, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, false, false, avgInnerRowCnt, maxOneRow)
failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) {
if val.(bool) && !p.SCtx().GetSessionVars().InRestrictedSQL && innerTask != nil {
failpoint.Return(constructIndexHashJoin(p, prop, outerIdx, innerTask, indexJoinResult.chosenRanges, keyOff2IdxOff, indexJoinResult.chosenPath, indexJoinResult.lastColManager))
Expand All @@ -939,7 +939,7 @@ func buildIndexJoinInner2IndexScan(
// Because we can't keep order for union scan, if there is a union scan in inner task,
// we can't construct index merge join.
if !wrapper.hasDitryWrite {
innerTask2 := constructInnerIndexScanTask(p, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow)
innerTask2 := constructInnerIndexScanTask(p, prop, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow)
if innerTask2 != nil {
joins = append(joins, constructIndexMergeJoin(p, prop, outerIdx, innerTask2, indexJoinResult.chosenRanges, keyOff2IdxOff, indexJoinResult.chosenPath, indexJoinResult.lastColManager)...)
}
Expand All @@ -950,6 +950,7 @@ func buildIndexJoinInner2IndexScan(
// constructInnerTableScanTask is specially used to construct the inner plan for PhysicalIndexJoin.
func constructInnerTableScanTask(
p *LogicalJoin,
prop *property.PhysicalProperty,
wrapper *indexJoinInnerChildWrapper,
ranges ranger.Ranges,
_ []*expression.Column,
Expand Down Expand Up @@ -1022,61 +1023,61 @@ func constructInnerTableScanTask(
ts.PlanPartInfo = copTask.physPlanPartInfo
selStats := ts.StatsInfo().Scale(selectivity)
ts.addPushedDownSelection(copTask, selStats)
return constructIndexJoinInnerSideTask(p, copTask, ds, nil, wrapper)
return constructIndexJoinInnerSideTask(p, prop, copTask, ds, nil, wrapper)
}

func constructInnerByZippedChildren(zippedChildren []base.LogicalPlan, child base.PhysicalPlan) base.PhysicalPlan {
func constructInnerByZippedChildren(prop *property.PhysicalProperty, zippedChildren []base.LogicalPlan, child base.PhysicalPlan) base.PhysicalPlan {
for i := len(zippedChildren) - 1; i >= 0; i-- {
switch x := zippedChildren[i].(type) {
case *LogicalUnionScan:
child = constructInnerUnionScan(x, child)
child = constructInnerUnionScan(prop, x, child)
case *logicalop.LogicalProjection:
child = constructInnerProj(x, child)
child = constructInnerProj(prop, x, child)
case *LogicalSelection:
child = constructInnerSel(x, child)
child = constructInnerSel(prop, x, child)
case *LogicalAggregation:
child = constructInnerAgg(x, child)
child = constructInnerAgg(prop, x, child)
}
}
return child
}

func constructInnerAgg(logicalAgg *LogicalAggregation, child base.PhysicalPlan) base.PhysicalPlan {
func constructInnerAgg(prop *property.PhysicalProperty, logicalAgg *LogicalAggregation, child base.PhysicalPlan) base.PhysicalPlan {
if logicalAgg == nil {
return child
}
physicalHashAgg := NewPhysicalHashAgg(logicalAgg, logicalAgg.StatsInfo(), nil)
physicalHashAgg := NewPhysicalHashAgg(logicalAgg, logicalAgg.StatsInfo(), prop)
physicalHashAgg.SetSchema(logicalAgg.Schema().Clone())
physicalHashAgg.SetChildren(child)
return physicalHashAgg
}

func constructInnerSel(sel *LogicalSelection, child base.PhysicalPlan) base.PhysicalPlan {
func constructInnerSel(prop *property.PhysicalProperty, sel *LogicalSelection, child base.PhysicalPlan) base.PhysicalPlan {
if sel == nil {
return child
}
physicalSel := PhysicalSelection{
Conditions: sel.Conditions,
}.Init(sel.SCtx(), sel.StatsInfo(), sel.QueryBlockOffset(), nil)
}.Init(sel.SCtx(), sel.StatsInfo(), sel.QueryBlockOffset(), prop)
physicalSel.SetChildren(child)
return physicalSel
}

func constructInnerProj(proj *logicalop.LogicalProjection, child base.PhysicalPlan) base.PhysicalPlan {
func constructInnerProj(prop *property.PhysicalProperty, proj *logicalop.LogicalProjection, child base.PhysicalPlan) base.PhysicalPlan {
if proj == nil {
return child
}
physicalProj := PhysicalProjection{
Exprs: proj.Exprs,
CalculateNoDelay: proj.CalculateNoDelay,
AvoidColumnEvaluator: proj.AvoidColumnEvaluator,
}.Init(proj.SCtx(), proj.StatsInfo(), proj.QueryBlockOffset(), nil)
}.Init(proj.SCtx(), proj.StatsInfo(), proj.QueryBlockOffset(), prop)
physicalProj.SetChildren(child)
physicalProj.SetSchema(proj.Schema())
return physicalProj
}

func constructInnerUnionScan(us *LogicalUnionScan, reader base.PhysicalPlan) base.PhysicalPlan {
func constructInnerUnionScan(prop *property.PhysicalProperty, us *LogicalUnionScan, reader base.PhysicalPlan) base.PhysicalPlan {
if us == nil {
return reader
}
Expand All @@ -1085,7 +1086,7 @@ func constructInnerUnionScan(us *LogicalUnionScan, reader base.PhysicalPlan) bas
physicalUnionScan := PhysicalUnionScan{
Conditions: us.Conditions,
HandleCols: us.HandleCols,
}.Init(us.SCtx(), reader.StatsInfo(), us.QueryBlockOffset(), nil)
}.Init(us.SCtx(), reader.StatsInfo(), us.QueryBlockOffset(), prop)
physicalUnionScan.SetChildren(reader)
return physicalUnionScan
}
Expand Down Expand Up @@ -1142,6 +1143,7 @@ func getColsNDVLowerBoundFromHistColl(colUIDs []int64, histColl *statistics.Hist
// constructInnerIndexScanTask is specially used to construct the inner plan for PhysicalIndexJoin.
func constructInnerIndexScanTask(
p *LogicalJoin,
prop *property.PhysicalProperty,
wrapper *indexJoinInnerChildWrapper,
path *util.AccessPath,
ranges ranger.Ranges,
Expand Down Expand Up @@ -1323,7 +1325,7 @@ func constructInnerIndexScanTask(
logutil.BgLogger().Warn("unexpected error happened during addPushedDownSelection function", zap.Error(err))
return nil
}
return constructIndexJoinInnerSideTask(p, cop, ds, path, wrapper)
return constructIndexJoinInnerSideTask(p, prop, cop, ds, path, wrapper)
}

// construct the inner join task by inner child plan tree
Expand All @@ -1334,7 +1336,7 @@ func constructInnerIndexScanTask(
// There are two kinds of agg: stream agg and hash agg. Stream agg depends on some conditions, such as the group by cols
//
// Step2: build other inner plan node to task
func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task {
func constructIndexJoinInnerSideTask(p *LogicalJoin, prop *property.PhysicalProperty, dsCopTask *CopTask, ds *DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task {
var la *LogicalAggregation
var canPushAggToCop bool
if len(wrapper.zippedChildren) > 0 {
Expand All @@ -1351,7 +1353,7 @@ func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *Dat
// If the bottom plan is not aggregation or the aggregation can't be pushed to coprocessor, we will construct a root task directly.
if !canPushAggToCop {
result := dsCopTask.ConvertToRootTask(ds.SCtx()).(*RootTask)
result.SetPlan(constructInnerByZippedChildren(wrapper.zippedChildren, result.GetPlan()))
result.SetPlan(constructInnerByZippedChildren(prop, wrapper.zippedChildren, result.GetPlan()))
return result
}

Expand Down Expand Up @@ -1403,7 +1405,7 @@ func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *Dat
streamAgg := basePhysicalAgg{
GroupByItems: newGbyItems,
AggFuncs: newAggFuncs,
}.initForStream(la.SCtx(), la.StatsInfo(), la.QueryBlockOffset(), nil)
}.initForStream(la.SCtx(), la.StatsInfo(), la.QueryBlockOffset(), prop)
streamAgg.SetSchema(la.Schema().Clone())
// change to keep order for index scan and dsCopTask
if dsCopTask.indexPlan != nil {
Expand All @@ -1422,7 +1424,7 @@ func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *Dat

// build hash agg, when the stream agg is illegal such as the order by prop is not matched
if aggTask == nil {
physicalHashAgg := NewPhysicalHashAgg(la, la.StatsInfo(), nil)
physicalHashAgg := NewPhysicalHashAgg(la, la.StatsInfo(), prop)
physicalHashAgg.SetSchema(la.Schema().Clone())
aggTask = physicalHashAgg.Attach2Task(dsCopTask)
}
Expand All @@ -1432,7 +1434,7 @@ func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *Dat
if !ok {
return nil
}
result.SetPlan(constructInnerByZippedChildren(wrapper.zippedChildren[0:len(wrapper.zippedChildren)-1], result.p))
result.SetPlan(constructInnerByZippedChildren(prop, wrapper.zippedChildren[0:len(wrapper.zippedChildren)-1], result.p))
return result
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/planner/core/issuetest/planner_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,9 @@ func TestIssue54535(t *testing.T) {
" │ └─IndexRangeScan_10 10000.00 cop[tikv] table:tb, index:idx_b(b1) range: decided by [eq(test.tb.b1, test.ta.a1)], keep order:false, stats:pseudo",
" └─HashAgg_13(Probe) 79840080.00 cop[tikv] group by:test.tb.b1, test.tb.b2, funcs:count(test.tb.b3)->Column#11",
" └─TableRowIDScan_11 9990.00 cop[tikv] table:tb keep order:false, stats:pseudo"))
// test for issues/55169
tk.MustExec("create table t1(col_1 int, index idx_1(col_1));")
tk.MustExec("create table t2(col_1 int, col_2 int, index idx_2(col_1));")
tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows())
tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(distinct col_2 order by col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows())
}

0 comments on commit de943d1

Please sign in to comment.