diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index fea4d3524c22d..6321c72eeb21c 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1770,9 +1770,23 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC if p.children[0].statsInfo().Count() > p.children[1].statsInfo().Count() { preferredBuildIndex = 1 } - } else if p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin || p.JoinType == LeftOuterJoin { + } else if p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin { preferredBuildIndex = 1 } + if p.JoinType == LeftOuterJoin || p.JoinType == RightOuterJoin { + // TiFlash does not requires that the build side must be the inner table for outer join + // so we can choose the build side based on the row count, except that + // 1. it is a broadcast join(for broadcast join, it make sense to use the broadcast side as the build side) + // 2. or session variable MPPOuterJoinFixedBuildSide is set to true + // 3. or there are otherConditions for this join + if useBCJ || p.ctx.GetSessionVars().MPPOuterJoinFixedBuildSide || len(p.OtherConditions) > 0 { + if p.JoinType == LeftOuterJoin { + preferredBuildIndex = 1 + } + } else if p.children[0].statsInfo().Count() > p.children[1].statsInfo().Count() { + preferredBuildIndex = 1 + } + } baseJoin.InnerChildIdx = preferredBuildIndex childrenProps := make([]*property.PhysicalProperty, 2) if useBCJ { diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 39613616dcaa5..43e08b735a6a8 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -462,6 +462,138 @@ func (s *testIntegrationSerialSuite) TestMPPJoin(c *C) { } } +func (s *testIntegrationSerialSuite) TestMPPOuterJoinBuildSideForBroadcastJoin(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists a") + tk.MustExec("create table a(id int, value int)") + tk.MustExec("insert into a values(1,2),(2,3)") + tk.MustExec("analyze table a") + tk.MustExec("drop table if exists b") + tk.MustExec("create table b(id int, value int)") + tk.MustExec("insert into b values(1,2),(2,3),(3,4)") + tk.MustExec("analyze table b") + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "a" || tblInfo.Name.L == "b" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") + tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side = 0") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 10000") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 10000") + var input []string + var output []struct { + SQL string + Plan []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + } +} + +func (s *testIntegrationSerialSuite) TestMPPOuterJoinBuildSideForShuffleJoinWithFixedBuildSide(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists a") + tk.MustExec("create table a(id int, value int)") + tk.MustExec("insert into a values(1,2),(2,3)") + tk.MustExec("analyze table a") + tk.MustExec("drop table if exists b") + tk.MustExec("create table b(id int, value int)") + tk.MustExec("insert into b values(1,2),(2,3),(3,4)") + tk.MustExec("analyze table b") + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "a" || tblInfo.Name.L == "b" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") + tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side = 1") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 0") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 0") + var input []string + var output []struct { + SQL string + Plan []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + } +} + +func (s *testIntegrationSerialSuite) TestMPPOuterJoinBuildSideForShuffleJoin(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists a") + tk.MustExec("create table a(id int, value int)") + tk.MustExec("insert into a values(1,2),(2,3)") + tk.MustExec("analyze table a") + tk.MustExec("drop table if exists b") + tk.MustExec("create table b(id int, value int)") + tk.MustExec("insert into b values(1,2),(2,3),(3,4)") + tk.MustExec("analyze table b") + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "a" || tblInfo.Name.L == "b" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") + tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side = 0") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 0") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 0") + var input []string + var output []struct { + SQL string + Plan []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + } +} + func (s *testIntegrationSerialSuite) TestMPPShuffledJoin(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/planner/core/testdata/integration_serial_suite_in.json b/planner/core/testdata/integration_serial_suite_in.json index f9a7a9130f742..1b1c8d9cdb969 100644 --- a/planner/core/testdata/integration_serial_suite_in.json +++ b/planner/core/testdata/integration_serial_suite_in.json @@ -34,6 +34,27 @@ "explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)" ] }, + { + "name": "TestMPPOuterJoinBuildSideForBroadcastJoin", + "cases": [ + "explain format = 'brief' select count(*) from a left join b on a.id = b.id", + "explain format = 'brief' select count(*) from b right join a on a.id = b.id" + ] + }, + { + "name": "TestMPPOuterJoinBuildSideForShuffleJoinWithFixedBuildSide", + "cases": [ + "explain format = 'brief' select count(*) from a left join b on a.id = b.id", + "explain format = 'brief' select count(*) from b right join a on a.id = b.id" + ] + }, + { + "name": "TestMPPOuterJoinBuildSideForShuffleJoin", + "cases": [ + "explain format = 'brief' select count(*) from a left join b on a.id = b.id", + "explain format = 'brief' select count(*) from b right join a on a.id = b.id" + ] + }, { "name": "TestMPPShuffledJoin", "cases": [ diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index 673315a7d9f2d..6cebbdb11980f 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -288,6 +288,113 @@ } ] }, + { + "Name": "TestMPPOuterJoinBuildSideForBroadcastJoin", + "Cases": [ + { + "SQL": "explain format = 'brief' select count(*) from a left join b on a.id = b.id", + "Plan": [ + "StreamAgg 1.00 root funcs:count(1)->Column#7", + "└─TableReader 2.00 root data:ExchangeSender", + " └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 2.00 cop[tiflash] left outer join, equal:[eq(test.a.id, test.b.id)]", + " ├─ExchangeReceiver(Build) 3.00 cop[tiflash] ", + " │ └─ExchangeSender 3.00 cop[tiflash] ExchangeType: Broadcast", + " │ └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))", + " │ └─TableFullScan 3.00 cop[tiflash] table:b keep order:false", + " └─TableFullScan(Probe) 2.00 cop[tiflash] table:a keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select count(*) from b right join a on a.id = b.id", + "Plan": [ + "StreamAgg 1.00 root funcs:count(1)->Column#7", + "└─TableReader 2.00 root data:ExchangeSender", + " └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 2.00 cop[tiflash] right outer join, equal:[eq(test.b.id, test.a.id)]", + " ├─ExchangeReceiver(Build) 3.00 cop[tiflash] ", + " │ └─ExchangeSender 3.00 cop[tiflash] ExchangeType: Broadcast", + " │ └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))", + " │ └─TableFullScan 3.00 cop[tiflash] table:b keep order:false", + " └─TableFullScan(Probe) 2.00 cop[tiflash] table:a keep order:false" + ] + } + ] + }, + { + "Name": "TestMPPOuterJoinBuildSideForShuffleJoinWithFixedBuildSide", + "Cases": [ + { + "SQL": "explain format = 'brief' select count(*) from a left join b on a.id = b.id", + "Plan": [ + "StreamAgg 1.00 root funcs:count(1)->Column#7", + "└─TableReader 2.00 root data:ExchangeSender", + " └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 2.00 cop[tiflash] left outer join, equal:[eq(test.a.id, test.b.id)]", + " ├─ExchangeReceiver(Build) 3.00 cop[tiflash] ", + " │ └─ExchangeSender 3.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.b.id", + " │ └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))", + " │ └─TableFullScan 3.00 cop[tiflash] table:b keep order:false", + " └─ExchangeReceiver(Probe) 2.00 cop[tiflash] ", + " └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.a.id", + " └─TableFullScan 2.00 cop[tiflash] table:a keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select count(*) from b right join a on a.id = b.id", + "Plan": [ + "StreamAgg 1.00 root funcs:count(1)->Column#7", + "└─TableReader 2.00 root data:ExchangeSender", + " └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 2.00 cop[tiflash] right outer join, equal:[eq(test.b.id, test.a.id)]", + " ├─ExchangeReceiver(Build) 3.00 cop[tiflash] ", + " │ └─ExchangeSender 3.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.b.id", + " │ └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))", + " │ └─TableFullScan 3.00 cop[tiflash] table:b keep order:false", + " └─ExchangeReceiver(Probe) 2.00 cop[tiflash] ", + " └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.a.id", + " └─TableFullScan 2.00 cop[tiflash] table:a keep order:false" + ] + } + ] + }, + { + "Name": "TestMPPOuterJoinBuildSideForShuffleJoin", + "Cases": [ + { + "SQL": "explain format = 'brief' select count(*) from a left join b on a.id = b.id", + "Plan": [ + "StreamAgg 1.00 root funcs:count(1)->Column#7", + "└─TableReader 2.00 root data:ExchangeSender", + " └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 2.00 cop[tiflash] left outer join, equal:[eq(test.a.id, test.b.id)]", + " ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ", + " │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.a.id", + " │ └─TableFullScan 2.00 cop[tiflash] table:a keep order:false", + " └─ExchangeReceiver(Probe) 3.00 cop[tiflash] ", + " └─ExchangeSender 3.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.b.id", + " └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))", + " └─TableFullScan 3.00 cop[tiflash] table:b keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select count(*) from b right join a on a.id = b.id", + "Plan": [ + "StreamAgg 1.00 root funcs:count(1)->Column#7", + "└─TableReader 2.00 root data:ExchangeSender", + " └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 2.00 cop[tiflash] right outer join, equal:[eq(test.b.id, test.a.id)]", + " ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ", + " │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.a.id", + " │ └─TableFullScan 2.00 cop[tiflash] table:a keep order:false", + " └─ExchangeReceiver(Probe) 3.00 cop[tiflash] ", + " └─ExchangeSender 3.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.b.id", + " └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))", + " └─TableFullScan 3.00 cop[tiflash] table:b keep order:false" + ] + } + ] + }, { "Name": "TestMPPShuffledJoin", "Cases": [ diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 82546fd553d65..363072187158f 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -482,6 +482,18 @@ type SessionVars struct { // AllowBCJ means allow broadcast join. AllowBCJ bool +<<<<<<< HEAD +======= + + // AllowCartesianBCJ means allow broadcast CARTESIAN join, 0 means not allow, 1 means allow broadcast CARTESIAN join + // but the table size should under the broadcast threshold, 2 means allow broadcast CARTESIAN join even if the table + // size exceeds the broadcast threshold + AllowCartesianBCJ int + + // MPPOuterJoinFixedBuildSide means in MPP plan, always use right(left) table as build side for left(right) out join + MPPOuterJoinFixedBuildSide bool + +>>>>>>> c59b3bcea... planner: Mpp outer join build side (#25130) // AllowDistinctAggPushDown can be set true to allow agg with distinct push down to tikv/tiflash. AllowDistinctAggPushDown bool @@ -953,6 +965,11 @@ func NewSessionVars() *SessionVars { StmtCtx: new(stmtctx.StatementContext), AllowAggPushDown: false, AllowBCJ: false, +<<<<<<< HEAD +======= + AllowCartesianBCJ: DefOptCartesianBCJ, + MPPOuterJoinFixedBuildSide: DefOptMPPOuterJoinFixedBuildSide, +>>>>>>> c59b3bcea... planner: Mpp outer join build side (#25130) BroadcastJoinThresholdSize: DefBroadcastJoinThresholdSize, BroadcastJoinThresholdCount: DefBroadcastJoinThresholdSize, OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel, diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 29a073936645c..c3feb24424b80 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -591,9 +591,29 @@ var defaultSysVars = []*SysVar{ } return normalizedValue, nil }}, +<<<<<<< HEAD {Scope: ScopeSession, Name: TiDBOptDistinctAggPushDown, Value: BoolToOnOff(config.GetGlobalConfig().Performance.DistinctAggPushDown), Type: TypeBool}, {Scope: ScopeSession, Name: TiDBOptWriteRowID, Value: BoolToOnOff(DefOptWriteRowID)}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildStatsConcurrency, Value: strconv.Itoa(DefBuildStatsConcurrency)}, +======= + {Scope: ScopeSession, Name: TiDBOptDistinctAggPushDown, Value: BoolToOnOff(config.GetGlobalConfig().Performance.DistinctAggPushDown), skipInit: true, Type: TypeBool, SetSession: func(s *SessionVars, val string) error { + s.AllowDistinctAggPushDown = TiDBOptOn(val) + return nil + }}, + {Scope: ScopeSession, Name: TiDBOptWriteRowID, Value: BoolToOnOff(DefOptWriteRowID), skipInit: true, SetSession: func(s *SessionVars, val string) error { + s.AllowWriteRowID = TiDBOptOn(val) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildStatsConcurrency, skipInit: true, Value: strconv.Itoa(DefBuildStatsConcurrency)}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCartesianBCJ, Value: strconv.Itoa(DefOptCartesianBCJ), Type: TypeInt, MinValue: 0, MaxValue: 2, SetSession: func(s *SessionVars, val string) error { + s.AllowCartesianBCJ = tidbOptInt(val, DefOptCartesianBCJ) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptMPPOuterJoinFixedBuildSide, Value: BoolToOnOff(DefOptMPPOuterJoinFixedBuildSide), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { + s.MPPOuterJoinFixedBuildSide = TiDBOptOn(val) + return nil + }}, +>>>>>>> c59b3bcea... planner: Mpp outer join build side (#25130) {Scope: ScopeGlobal, Name: TiDBAutoAnalyzeRatio, Value: strconv.FormatFloat(DefAutoAnalyzeRatio, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64}, {Scope: ScopeGlobal, Name: TiDBAutoAnalyzeStartTime, Value: DefAutoAnalyzeStartTime, Type: TypeTime}, {Scope: ScopeGlobal, Name: TiDBAutoAnalyzeEndTime, Value: DefAutoAnalyzeEndTime, Type: TypeTime}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 144d09a52d929..a6ee750dd87ae 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -48,6 +48,15 @@ const ( TiDBOptAggPushDown = "tidb_opt_agg_push_down" TiDBOptBCJ = "tidb_opt_broadcast_join" +<<<<<<< HEAD +======= + + // TiDBOptCartesianBCJ is used to disable/enable broadcast cartesian join in MPP mode + TiDBOptCartesianBCJ = "tidb_opt_broadcast_cartesian_join" + + TiDBOptMPPOuterJoinFixedBuildSide = "tidb_opt_mpp_outer_join_fixed_build_side" + +>>>>>>> c59b3bcea... planner: Mpp outer join build side (#25130) // tidb_opt_distinct_agg_push_down is used to decide whether agg with distinct should be pushed to tikv/tiflash. TiDBOptDistinctAggPushDown = "tidb_opt_distinct_agg_push_down" @@ -566,6 +575,11 @@ const ( DefSkipASCIICheck = false DefOptAggPushDown = false DefOptBCJ = false +<<<<<<< HEAD +======= + DefOptCartesianBCJ = 1 + DefOptMPPOuterJoinFixedBuildSide = true +>>>>>>> c59b3bcea... planner: Mpp outer join build side (#25130) DefOptWriteRowID = false DefOptCorrelationThreshold = 0.9 DefOptCorrelationExpFactor = 1