Skip to content

Commit

Permalink
planner: allow requesting MPP task for simple selection or projection…
Browse files Browse the repository at this point in the history
… operator (pingcap#37422)

close pingcap#35875
  • Loading branch information
fixdb authored Aug 30, 2022
1 parent a800785 commit 42b6883
Show file tree
Hide file tree
Showing 14 changed files with 266 additions and 221 deletions.
7 changes: 4 additions & 3 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,16 +872,17 @@ func TestNotEvolvePlanForReadStorageHint(t *testing.T) {
}

// Make sure the best plan of the SQL is use TiKV index.
tk.MustExec("set @@session.tidb_executor_concurrency = 4;")
tk.MustExec("set @@session.tidb_executor_concurrency = 4; set @@tidb_allow_mpp=0;")
rows := tk.MustQuery("explain select * from t where a >= 11 and b >= 11").Rows()
require.Equal(t, "cop[tikv]", fmt.Sprintf("%v", rows[len(rows)-1][2]))
tk.MustExec("set @@tidb_allow_mpp=1")

tk.MustExec("create global binding for select * from t where a >= 1 and b >= 1 using select /*+ read_from_storage(tiflash[t]) */ * from t where a >= 1 and b >= 1")
tk.MustExec("set @@tidb_evolve_plan_baselines=1")

// Even if index of TiKV has lower cost, it chooses TiFlash.
rows = tk.MustQuery("explain select * from t where a >= 11 and b >= 11").Rows()
require.Equal(t, "cop[tiflash]", fmt.Sprintf("%v", rows[len(rows)-1][2]))
require.Equal(t, "mpp[tiflash]", fmt.Sprintf("%v", rows[len(rows)-1][2]))

tk.MustExec("admin flush bindings")
rows = tk.MustQuery("show global bindings").Rows()
Expand Down Expand Up @@ -920,7 +921,7 @@ func TestBindingWithIsolationRead(t *testing.T) {
// Even if we build a binding use index for SQL, but after we set the isolation read for TiFlash, it choose TiFlash instead of index of TiKV.
tk.MustExec("set @@tidb_isolation_read_engines = \"tiflash\"")
rows = tk.MustQuery("explain select * from t where a >= 11 and b >= 11").Rows()
require.Equal(t, "cop[tiflash]", rows[len(rows)-1][2])
require.Equal(t, "mpp[tiflash]", rows[len(rows)-1][2])
}

func TestReCreateBindAfterEvolvePlan(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion bindinfo/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestCapturePlanBaselineIgnoreTiFlash(t *testing.T) {
}
// Here the plan is the TiFlash plan.
rows := tk.MustQuery("explain select * from t").Rows()
require.Equal(t, "cop[tiflash]", fmt.Sprintf("%v", rows[len(rows)-1][2]))
require.Equal(t, "mpp[tiflash]", fmt.Sprintf("%v", rows[len(rows)-1][2]))

tk.MustQuery("show global bindings").Check(testkit.Rows())
tk.MustExec("admin capture bindings")
Expand Down
3 changes: 3 additions & 0 deletions executor/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,9 @@ func TestPrepareStmtAfterIsolationReadChange(t *testing.T) {
require.Equal(t, "cop[tikv]", rows[len(rows)-1][2])

tk.MustExec("set @@session.tidb_isolation_read_engines='tiflash'")
// allowing mpp will generate mpp[tiflash] plan, the test framework will time out due to
// "retry for TiFlash peer with region missing", so disable mpp mode to use cop mode instead.
tk.MustExec("set @@session.tidb_allow_mpp=0")
tk.MustExec("execute stmt")
tkProcess = tk.Session().ShowProcess()
ps = []*util.ProcessInfo{tkProcess}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2045,8 +2045,8 @@ func (p *LogicalProjection) exhaustPhysicalPlans(prop *property.PhysicalProperty
return nil, true, nil
}
newProps := []*property.PhysicalProperty{newProp}
// generate a mpp task candidate if enforced mpp
if newProp.TaskTp != property.MppTaskType && p.SCtx().GetSessionVars().IsMPPEnforced() && p.canPushToCop(kv.TiFlash) &&
// generate a mpp task candidate if mpp mode is allowed
if newProp.TaskTp != property.MppTaskType && p.SCtx().GetSessionVars().IsMPPAllowed() && p.canPushToCop(kv.TiFlash) &&
expression.CanExprsPushDown(p.SCtx().GetSessionVars().StmtCtx, p.Exprs, p.SCtx().GetClient(), kv.TiFlash) {
mppProp := newProp.CloneEssentialFields()
mppProp.TaskTp = property.MppTaskType
Expand Down
5 changes: 3 additions & 2 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6346,8 +6346,9 @@ func TestIssue31202(t *testing.T) {
tbl.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}

tk.MustQuery("explain format = 'brief' select * from t31202;").Check(testkit.Rows(
"TableReader 10000.00 root data:TableFullScan",
"└─TableFullScan 10000.00 cop[tiflash] table:t31202 keep order:false, stats:pseudo"))
"TableReader 10000.00 root data:ExchangeSender",
"└─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: PassThrough",
" └─TableFullScan 10000.00 mpp[tiflash] table:t31202 keep order:false, stats:pseudo"))

tk.MustQuery("explain format = 'brief' select * from t31202 use index (primary);").Check(testkit.Rows(
"TableReader 10000.00 root data:TableFullScan",
Expand Down
2 changes: 1 addition & 1 deletion planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,7 @@ func TestMPPSinglePartitionType(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists employee")
tk.MustExec("create table employee(empid int, deptid int, salary decimal(10,2))")
tk.MustExec("set tidb_enforce_mpp=1")
tk.MustExec("set tidb_enforce_mpp=0")

is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
Expand Down
4 changes: 2 additions & 2 deletions planner/core/plan_cost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,12 +1024,12 @@ func TestScanOnSmallTable(t *testing.T) {
}

rs := tk.MustQuery("explain select * from t").Rows()
useTiKVScan := false
useTiKVScan := true
for _, r := range rs {
op := r[0].(string)
task := r[2].(string)
if strings.Contains(op, "Scan") && strings.Contains(task, "tikv") {
useTiKVScan = true
useTiKVScan = false
}
}
require.True(t, useTiKVScan)
Expand Down
4 changes: 2 additions & 2 deletions planner/core/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,14 +559,14 @@ func TestExplainFormatHintRecoverableForTiFlashReplica(t *testing.T) {
}

rows := tk.MustQuery("explain select * from t").Rows()
require.Equal(t, rows[len(rows)-1][2], "cop[tiflash]")
require.Equal(t, rows[len(rows)-1][2], "mpp[tiflash]")

rows = tk.MustQuery("explain format='hint' select * from t").Rows()
require.Equal(t, rows[0][0], "read_from_storage(@`sel_1` tiflash[`test`.`t`])")

hints := tk.MustQuery("explain format='hint' select * from t;").Rows()[0][0]
rows = tk.MustQuery(fmt.Sprintf("explain select /*+ %s */ * from t", hints)).Rows()
require.Equal(t, rows[len(rows)-1][2], "cop[tiflash]")
require.Equal(t, rows[len(rows)-1][2], "mpp[tiflash]")
}

func TestNthPlanHint(t *testing.T) {
Expand Down
14 changes: 8 additions & 6 deletions planner/core/testdata/analyze_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -286,19 +286,21 @@
"Name": "TestTiFlashCostModel",
"Cases": [
[
"TableReader_7 10000.00 root data:TableFullScan_6",
"└─TableFullScan_6 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo"
"TableReader_11 10000.00 root data:ExchangeSender_10",
"└─ExchangeSender_10 10000.00 mpp[tiflash] ExchangeType: PassThrough",
" └─TableFullScan_9 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo"
],
[
"TableReader_5 10000.00 root data:TableFullScan_4",
"└─TableFullScan_4 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
"TableReader_6 10000.00 root data:TableFullScan_5",
"└─TableFullScan_5 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
[
"Batch_Point_Get_5 2.00 root table:t handle:[1 2], keep order:false, desc:false"
],
[
"TableReader_6 2.00 root data:TableRangeScan_5",
"└─TableRangeScan_5 2.00 cop[tiflash] table:t range:[1,1], [2,2], keep order:false, stats:pseudo"
"TableReader_10 2.00 root data:ExchangeSender_9",
"└─ExchangeSender_9 2.00 mpp[tiflash] ExchangeType: PassThrough",
" └─TableRangeScan_8 2.00 mpp[tiflash] table:t range:[1,1], [2,2], keep order:false, stats:pseudo"
]
]
},
Expand Down
60 changes: 30 additions & 30 deletions planner/core/testdata/enforce_mpp_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,32 +72,32 @@
{
"SQL": "explain format='verbose' select count(*) from t where a=1",
"Plan": [
"StreamAgg_30 1.00 35.88 root funcs:count(Column#7)->Column#4",
"└─IndexReader_31 1.00 32.88 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 465.00 cop[tikv] funcs:count(1)->Column#7",
" └─IndexRangeScan_29 10.00 435.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"
"StreamAgg_31 1.00 35.88 root funcs:count(Column#7)->Column#4",
"└─IndexReader_32 1.00 32.88 root index:StreamAgg_11",
" └─StreamAgg_11 1.00 465.00 cop[tikv] funcs:count(1)->Column#7",
" └─IndexRangeScan_30 10.00 435.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"
],
"Warn": null
},
{
"SQL": "explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1",
"Plan": [
"StreamAgg_18 1.00 35.88 root funcs:count(Column#6)->Column#4",
"└─IndexReader_19 1.00 32.88 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 465.00 cop[tikv] funcs:count(1)->Column#6",
" └─IndexRangeScan_17 10.00 435.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"
"StreamAgg_19 1.00 35.88 root funcs:count(Column#6)->Column#4",
"└─IndexReader_20 1.00 32.88 root index:StreamAgg_11",
" └─StreamAgg_11 1.00 465.00 cop[tikv] funcs:count(1)->Column#6",
" └─IndexRangeScan_18 10.00 435.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"
],
"Warn": null
},
{
"SQL": "explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1",
"Plan": [
"HashAgg_21 1.00 11910.73 root funcs:count(Column#6)->Column#4",
"└─TableReader_23 1.00 11877.13 root data:ExchangeSender_22",
" └─ExchangeSender_22 1.00 285030.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 285030.00 mpp[tiflash] funcs:count(1)->Column#6",
" └─Selection_20 10.00 285000.00 mpp[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_19 10000.00 255000.00 mpp[tiflash] table:t keep order:false, stats:pseudo"
"HashAgg_22 1.00 11910.73 root funcs:count(Column#6)->Column#4",
"└─TableReader_24 1.00 11877.13 root data:ExchangeSender_23",
" └─ExchangeSender_23 1.00 285030.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashAgg_10 1.00 285030.00 mpp[tiflash] funcs:count(1)->Column#6",
" └─Selection_21 10.00 285000.00 mpp[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_20 10000.00 255000.00 mpp[tiflash] table:t keep order:false, stats:pseudo"
],
"Warn": null
},
Expand All @@ -109,34 +109,34 @@
{
"SQL": "explain format='verbose' select count(*) from t where a=1",
"Plan": [
"HashAgg_24 1.00 33.89 root funcs:count(Column#6)->Column#4",
"└─TableReader_26 1.00 0.29 root data:ExchangeSender_25",
" └─ExchangeSender_25 1.00 285030.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 285030.00 mpp[tiflash] funcs:count(1)->Column#6",
" └─Selection_23 10.00 285000.00 mpp[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_22 10000.00 255000.00 mpp[tiflash] table:t keep order:false, stats:pseudo"
"HashAgg_25 1.00 33.89 root funcs:count(Column#6)->Column#4",
"└─TableReader_27 1.00 0.29 root data:ExchangeSender_26",
" └─ExchangeSender_26 1.00 285030.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashAgg_10 1.00 285030.00 mpp[tiflash] funcs:count(1)->Column#6",
" └─Selection_24 10.00 285000.00 mpp[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_23 10000.00 255000.00 mpp[tiflash] table:t keep order:false, stats:pseudo"
],
"Warn": null
},
{
"SQL": "explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1",
"Plan": [
"StreamAgg_18 1.00 35.88 root funcs:count(Column#6)->Column#4",
"└─IndexReader_19 1.00 32.88 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 465.00 cop[tikv] funcs:count(1)->Column#6",
" └─IndexRangeScan_17 10.00 435.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"
"StreamAgg_19 1.00 35.88 root funcs:count(Column#6)->Column#4",
"└─IndexReader_20 1.00 32.88 root index:StreamAgg_11",
" └─StreamAgg_11 1.00 465.00 cop[tikv] funcs:count(1)->Column#6",
" └─IndexRangeScan_18 10.00 435.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"
],
"Warn": null
},
{
"SQL": "explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1",
"Plan": [
"HashAgg_21 1.00 33.89 root funcs:count(Column#6)->Column#4",
"└─TableReader_23 1.00 0.29 root data:ExchangeSender_22",
" └─ExchangeSender_22 1.00 285030.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 285030.00 mpp[tiflash] funcs:count(1)->Column#6",
" └─Selection_20 10.00 285000.00 mpp[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_19 10000.00 255000.00 mpp[tiflash] table:t keep order:false, stats:pseudo"
"HashAgg_22 1.00 33.89 root funcs:count(Column#6)->Column#4",
"└─TableReader_24 1.00 0.29 root data:ExchangeSender_23",
" └─ExchangeSender_23 1.00 285030.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashAgg_10 1.00 285030.00 mpp[tiflash] funcs:count(1)->Column#6",
" └─Selection_21 10.00 285000.00 mpp[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_20 10000.00 255000.00 mpp[tiflash] table:t keep order:false, stats:pseudo"
],
"Warn": null
},
Expand Down
Loading

0 comments on commit 42b6883

Please sign in to comment.