Skip to content

Commit

Permalink
cherry pick pingcap#25130 to release-5.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
windtalker authored and ti-srebot committed Jun 4, 2021
1 parent 0ecd6bd commit 7d777df
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 1 deletion.
16 changes: 15 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1770,9 +1770,23 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
if p.children[0].statsInfo().Count() > p.children[1].statsInfo().Count() {
preferredBuildIndex = 1
}
} else if p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin || p.JoinType == LeftOuterJoin {
} else if p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin {
preferredBuildIndex = 1
}
if p.JoinType == LeftOuterJoin || p.JoinType == RightOuterJoin {
// TiFlash does not requires that the build side must be the inner table for outer join
// so we can choose the build side based on the row count, except that
// 1. it is a broadcast join(for broadcast join, it make sense to use the broadcast side as the build side)
// 2. or session variable MPPOuterJoinFixedBuildSide is set to true
// 3. or there are otherConditions for this join
if useBCJ || p.ctx.GetSessionVars().MPPOuterJoinFixedBuildSide || len(p.OtherConditions) > 0 {
if p.JoinType == LeftOuterJoin {
preferredBuildIndex = 1
}
} else if p.children[0].statsInfo().Count() > p.children[1].statsInfo().Count() {
preferredBuildIndex = 1
}
}
baseJoin.InnerChildIdx = preferredBuildIndex
childrenProps := make([]*property.PhysicalProperty, 2)
if useBCJ {
Expand Down
132 changes: 132 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,138 @@ func (s *testIntegrationSerialSuite) TestMPPJoin(c *C) {
}
}

func (s *testIntegrationSerialSuite) TestMPPOuterJoinBuildSideForBroadcastJoin(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists a")
tk.MustExec("create table a(id int, value int)")
tk.MustExec("insert into a values(1,2),(2,3)")
tk.MustExec("analyze table a")
tk.MustExec("drop table if exists b")
tk.MustExec("create table b(id int, value int)")
tk.MustExec("insert into b values(1,2),(2,3),(3,4)")
tk.MustExec("analyze table b")
// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "a" || tblInfo.Name.L == "b" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}
tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side = 0")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 10000")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 10000")
var input []string
var output []struct {
SQL string
Plan []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testIntegrationSerialSuite) TestMPPOuterJoinBuildSideForShuffleJoinWithFixedBuildSide(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists a")
tk.MustExec("create table a(id int, value int)")
tk.MustExec("insert into a values(1,2),(2,3)")
tk.MustExec("analyze table a")
tk.MustExec("drop table if exists b")
tk.MustExec("create table b(id int, value int)")
tk.MustExec("insert into b values(1,2),(2,3),(3,4)")
tk.MustExec("analyze table b")
// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "a" || tblInfo.Name.L == "b" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}
tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side = 1")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 0")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 0")
var input []string
var output []struct {
SQL string
Plan []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testIntegrationSerialSuite) TestMPPOuterJoinBuildSideForShuffleJoin(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists a")
tk.MustExec("create table a(id int, value int)")
tk.MustExec("insert into a values(1,2),(2,3)")
tk.MustExec("analyze table a")
tk.MustExec("drop table if exists b")
tk.MustExec("create table b(id int, value int)")
tk.MustExec("insert into b values(1,2),(2,3),(3,4)")
tk.MustExec("analyze table b")
// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "a" || tblInfo.Name.L == "b" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}
tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side = 0")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 0")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 0")
var input []string
var output []struct {
SQL string
Plan []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testIntegrationSerialSuite) TestMPPShuffledJoin(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
21 changes: 21 additions & 0 deletions planner/core/testdata/integration_serial_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,27 @@
"explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)"
]
},
{
"name": "TestMPPOuterJoinBuildSideForBroadcastJoin",
"cases": [
"explain format = 'brief' select count(*) from a left join b on a.id = b.id",
"explain format = 'brief' select count(*) from b right join a on a.id = b.id"
]
},
{
"name": "TestMPPOuterJoinBuildSideForShuffleJoinWithFixedBuildSide",
"cases": [
"explain format = 'brief' select count(*) from a left join b on a.id = b.id",
"explain format = 'brief' select count(*) from b right join a on a.id = b.id"
]
},
{
"name": "TestMPPOuterJoinBuildSideForShuffleJoin",
"cases": [
"explain format = 'brief' select count(*) from a left join b on a.id = b.id",
"explain format = 'brief' select count(*) from b right join a on a.id = b.id"
]
},
{
"name": "TestMPPShuffledJoin",
"cases": [
Expand Down
107 changes: 107 additions & 0 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,113 @@
}
]
},
{
"Name": "TestMPPOuterJoinBuildSideForBroadcastJoin",
"Cases": [
{
"SQL": "explain format = 'brief' select count(*) from a left join b on a.id = b.id",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─TableReader 2.00 root data:ExchangeSender",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] left outer join, equal:[eq(test.a.id, test.b.id)]",
" ├─ExchangeReceiver(Build) 3.00 cop[tiflash] ",
" │ └─ExchangeSender 3.00 cop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))",
" │ └─TableFullScan 3.00 cop[tiflash] table:b keep order:false",
" └─TableFullScan(Probe) 2.00 cop[tiflash] table:a keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from b right join a on a.id = b.id",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─TableReader 2.00 root data:ExchangeSender",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] right outer join, equal:[eq(test.b.id, test.a.id)]",
" ├─ExchangeReceiver(Build) 3.00 cop[tiflash] ",
" │ └─ExchangeSender 3.00 cop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))",
" │ └─TableFullScan 3.00 cop[tiflash] table:b keep order:false",
" └─TableFullScan(Probe) 2.00 cop[tiflash] table:a keep order:false"
]
}
]
},
{
"Name": "TestMPPOuterJoinBuildSideForShuffleJoinWithFixedBuildSide",
"Cases": [
{
"SQL": "explain format = 'brief' select count(*) from a left join b on a.id = b.id",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─TableReader 2.00 root data:ExchangeSender",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] left outer join, equal:[eq(test.a.id, test.b.id)]",
" ├─ExchangeReceiver(Build) 3.00 cop[tiflash] ",
" │ └─ExchangeSender 3.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.b.id",
" │ └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))",
" │ └─TableFullScan 3.00 cop[tiflash] table:b keep order:false",
" └─ExchangeReceiver(Probe) 2.00 cop[tiflash] ",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.a.id",
" └─TableFullScan 2.00 cop[tiflash] table:a keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from b right join a on a.id = b.id",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─TableReader 2.00 root data:ExchangeSender",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] right outer join, equal:[eq(test.b.id, test.a.id)]",
" ├─ExchangeReceiver(Build) 3.00 cop[tiflash] ",
" │ └─ExchangeSender 3.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.b.id",
" │ └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))",
" │ └─TableFullScan 3.00 cop[tiflash] table:b keep order:false",
" └─ExchangeReceiver(Probe) 2.00 cop[tiflash] ",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.a.id",
" └─TableFullScan 2.00 cop[tiflash] table:a keep order:false"
]
}
]
},
{
"Name": "TestMPPOuterJoinBuildSideForShuffleJoin",
"Cases": [
{
"SQL": "explain format = 'brief' select count(*) from a left join b on a.id = b.id",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─TableReader 2.00 root data:ExchangeSender",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] left outer join, equal:[eq(test.a.id, test.b.id)]",
" ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ",
" │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.a.id",
" │ └─TableFullScan 2.00 cop[tiflash] table:a keep order:false",
" └─ExchangeReceiver(Probe) 3.00 cop[tiflash] ",
" └─ExchangeSender 3.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.b.id",
" └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))",
" └─TableFullScan 3.00 cop[tiflash] table:b keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from b right join a on a.id = b.id",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─TableReader 2.00 root data:ExchangeSender",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] right outer join, equal:[eq(test.b.id, test.a.id)]",
" ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ",
" │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.a.id",
" │ └─TableFullScan 2.00 cop[tiflash] table:a keep order:false",
" └─ExchangeReceiver(Probe) 3.00 cop[tiflash] ",
" └─ExchangeSender 3.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.b.id",
" └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))",
" └─TableFullScan 3.00 cop[tiflash] table:b keep order:false"
]
}
]
},
{
"Name": "TestMPPShuffledJoin",
"Cases": [
Expand Down
17 changes: 17 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,18 @@ type SessionVars struct {

// AllowBCJ means allow broadcast join.
AllowBCJ bool
<<<<<<< HEAD
=======

// AllowCartesianBCJ means allow broadcast CARTESIAN join, 0 means not allow, 1 means allow broadcast CARTESIAN join
// but the table size should under the broadcast threshold, 2 means allow broadcast CARTESIAN join even if the table
// size exceeds the broadcast threshold
AllowCartesianBCJ int

// MPPOuterJoinFixedBuildSide means in MPP plan, always use right(left) table as build side for left(right) out join
MPPOuterJoinFixedBuildSide bool

>>>>>>> c59b3bcea... planner: Mpp outer join build side (#25130)
// AllowDistinctAggPushDown can be set true to allow agg with distinct push down to tikv/tiflash.
AllowDistinctAggPushDown bool

Expand Down Expand Up @@ -953,6 +965,11 @@ func NewSessionVars() *SessionVars {
StmtCtx: new(stmtctx.StatementContext),
AllowAggPushDown: false,
AllowBCJ: false,
<<<<<<< HEAD
=======
AllowCartesianBCJ: DefOptCartesianBCJ,
MPPOuterJoinFixedBuildSide: DefOptMPPOuterJoinFixedBuildSide,
>>>>>>> c59b3bcea... planner: Mpp outer join build side (#25130)
BroadcastJoinThresholdSize: DefBroadcastJoinThresholdSize,
BroadcastJoinThresholdCount: DefBroadcastJoinThresholdSize,
OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel,
Expand Down
20 changes: 20 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,9 +591,29 @@ var defaultSysVars = []*SysVar{
}
return normalizedValue, nil
}},
<<<<<<< HEAD
{Scope: ScopeSession, Name: TiDBOptDistinctAggPushDown, Value: BoolToOnOff(config.GetGlobalConfig().Performance.DistinctAggPushDown), Type: TypeBool},
{Scope: ScopeSession, Name: TiDBOptWriteRowID, Value: BoolToOnOff(DefOptWriteRowID)},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildStatsConcurrency, Value: strconv.Itoa(DefBuildStatsConcurrency)},
=======
{Scope: ScopeSession, Name: TiDBOptDistinctAggPushDown, Value: BoolToOnOff(config.GetGlobalConfig().Performance.DistinctAggPushDown), skipInit: true, Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.AllowDistinctAggPushDown = TiDBOptOn(val)
return nil
}},
{Scope: ScopeSession, Name: TiDBOptWriteRowID, Value: BoolToOnOff(DefOptWriteRowID), skipInit: true, SetSession: func(s *SessionVars, val string) error {
s.AllowWriteRowID = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildStatsConcurrency, skipInit: true, Value: strconv.Itoa(DefBuildStatsConcurrency)},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCartesianBCJ, Value: strconv.Itoa(DefOptCartesianBCJ), Type: TypeInt, MinValue: 0, MaxValue: 2, SetSession: func(s *SessionVars, val string) error {
s.AllowCartesianBCJ = tidbOptInt(val, DefOptCartesianBCJ)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptMPPOuterJoinFixedBuildSide, Value: BoolToOnOff(DefOptMPPOuterJoinFixedBuildSide), Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.MPPOuterJoinFixedBuildSide = TiDBOptOn(val)
return nil
}},
>>>>>>> c59b3bcea... planner: Mpp outer join build side (#25130)
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeRatio, Value: strconv.FormatFloat(DefAutoAnalyzeRatio, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeStartTime, Value: DefAutoAnalyzeStartTime, Type: TypeTime},
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeEndTime, Value: DefAutoAnalyzeEndTime, Type: TypeTime},
Expand Down
14 changes: 14 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ const (
TiDBOptAggPushDown = "tidb_opt_agg_push_down"

TiDBOptBCJ = "tidb_opt_broadcast_join"
<<<<<<< HEAD
=======

// TiDBOptCartesianBCJ is used to disable/enable broadcast cartesian join in MPP mode
TiDBOptCartesianBCJ = "tidb_opt_broadcast_cartesian_join"

TiDBOptMPPOuterJoinFixedBuildSide = "tidb_opt_mpp_outer_join_fixed_build_side"

>>>>>>> c59b3bcea... planner: Mpp outer join build side (#25130)
// tidb_opt_distinct_agg_push_down is used to decide whether agg with distinct should be pushed to tikv/tiflash.
TiDBOptDistinctAggPushDown = "tidb_opt_distinct_agg_push_down"

Expand Down Expand Up @@ -566,6 +575,11 @@ const (
DefSkipASCIICheck = false
DefOptAggPushDown = false
DefOptBCJ = false
<<<<<<< HEAD
=======
DefOptCartesianBCJ = 1
DefOptMPPOuterJoinFixedBuildSide = true
>>>>>>> c59b3bcea... planner: Mpp outer join build side (#25130)
DefOptWriteRowID = false
DefOptCorrelationThreshold = 0.9
DefOptCorrelationExpFactor = 1
Expand Down

0 comments on commit 7d777df

Please sign in to comment.