Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner,executor: enable plan cache for partition table #19124

Merged
merged 12 commits into from
Sep 8, 2020
10 changes: 9 additions & 1 deletion executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,15 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
SchemaVersion: e.is.SchemaMetaVersion(),
}

prepared.UseCache = plannercore.PreparedPlanCacheEnabled() && plannercore.Cacheable(stmt, e.is)
if !plannercore.PreparedPlanCacheEnabled() {
prepared.UseCache = false
} else {
if tryOldPartitionImplementation(e.ctx) {
prepared.UseCache = plannercore.Cacheable(stmt, e.is)
} else {
prepared.UseCache = plannercore.Cacheable(stmt, nil)
}
}

// We try to build the real statement of preparedStmt.
for i := range prepared.Params {
Expand Down
8 changes: 5 additions & 3 deletions planner/core/cacheable_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,11 @@ func (checker *cacheableChecker) Enter(in ast.Node) (out ast.Node, skipChildren
return in, true
}
case *ast.TableName:
if checker.isPartitionTable(node) {
checker.cacheable = false
return in, true
if checker.schema != nil {
if checker.isPartitionTable(node) {
checker.cacheable = false
return in, true
}
}
}
return in, false
Expand Down
91 changes: 5 additions & 86 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand All @@ -40,7 +39,6 @@ import (
"github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/math"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/texttree"
"go.uber.org/zap"
Expand Down Expand Up @@ -441,15 +439,6 @@ func (e *Execute) rebuildRange(p Plan) error {
if err != nil {
return err
}
if ts.Table.Partition != nil && ts.Table.Partition.Type == model.PartitionTypeHash {
pID, err := rebuildNewTableIDFromTable(e.ctx, ts, sc, pkCol)
if err != nil {
return err
}
if pID != -1 {
ts.physicalTableID = pID
}
}
} else {
ts.Ranges = ranger.FullIntRange(false)
}
Expand All @@ -460,32 +449,12 @@ func (e *Execute) rebuildRange(p Plan) error {
if err != nil {
return err
}
if is.Table.Partition != nil && is.Table.Partition.Type == model.PartitionTypeHash {
pID, err := rebuildNewTableIDFromIndex(e.ctx, is, sc)
if err != nil {
return err
}
if pID != -1 {
is.physicalTableID = pID
}
}
case *PhysicalIndexLookUpReader:
is := x.IndexPlans[0].(*PhysicalIndexScan)
is.Ranges, err = e.buildRangeForIndexScan(sctx, is)
if err != nil {
return err
}
if is.Table.Partition != nil && is.Table.Partition.Type == model.PartitionTypeHash {
pID, err := rebuildNewTableIDFromIndex(e.ctx, is, sc)
if err != nil {
return err
}
if pID != -1 {
is.physicalTableID = pID
tblScan := x.TablePlans[0].(*PhysicalTableScan)
tblScan.physicalTableID = pID
}
}
case *PointGetPlan:
// if access condition is not nil, which means it's a point get generated by cbo.
if x.AccessConditions != nil {
Expand Down Expand Up @@ -513,36 +482,25 @@ func (e *Execute) rebuildRange(p Plan) error {
}
}
}
// The code should never run here as long as we're not using point get for partition table.
// And if we change the logic one day, here work as defensive programming to cache the error.
if x.PartitionInfo != nil {
return errors.New("point get for partition table can not use plan cache")
}
if x.HandleParam != nil {
var iv int64
iv, err = x.HandleParam.Datum.ToInt64(sc)
if err != nil {
return err
}
x.Handle = kv.IntHandle(iv)
if x.PartitionInfo != nil {
if x.TblInfo.Partition.Type != model.PartitionTypeHash {
return errors.New("range partition table can not use plan cache")
}
num := x.TblInfo.Partition.Num
pos := math.Abs(iv) % int64(num)
x.PartitionInfo = &x.TblInfo.Partition.Definitions[pos]
}
return nil
}
for i, param := range x.IndexValueParams {
if param != nil {
x.IndexValues[i] = param.Datum
}
}
if x.PartitionInfo != nil {
if x.TblInfo.Partition.Type != model.PartitionTypeHash {
return errors.New("range partition table can not use plan cache")
}
val := x.IndexValues[x.partitionColumnPos].GetInt64()
partitionID := val % int64(x.TblInfo.Partition.Num)
x.PartitionInfo = &x.TblInfo.Partition.Definitions[partitionID]
}
return nil
case *BatchPointGetPlan:
// if access condition is not nil, which means it's a point get generated by cbo.
Expand Down Expand Up @@ -1292,42 +1250,3 @@ func locateHashPartition(ctx sessionctx.Context, expr expression.Expression, pi
}
return int(ret % int64(pi.Num)), nil
}

func getPhysicalTableIDForPartition(ctx sessionctx.Context, pi *model.PartitionInfo, schema *expression.Schema, names types.NameSlice, val []types.Datum) (int64, error) {
expr, err := expression.ParseSimpleExprsWithNames(ctx, pi.Expr, schema, names)
if err != nil {
return 0, err
}
pos, err := locateHashPartition(ctx, expr[0], pi, val)
if err != nil {
return 0, err
}
pID := pi.Definitions[pos].ID
return pID, nil
}

func rebuildNewTableIDFromIndex(ctx sessionctx.Context, is *PhysicalIndexScan, sc *stmtctx.StatementContext) (int64, error) {
pi := is.Table.Partition
if pi.Type == model.PartitionTypeHash && len(is.Ranges) == 1 && is.Ranges[0].IsPoint(sc) {
schema, names := buildSchemaAndNameFromIndex(is.IdxCols, is.DBName, is.Table, is.Index)
pID, err := getPhysicalTableIDForPartition(ctx, pi, schema, names, is.Ranges[0].LowVal)
if err != nil {
return -1, err
}
return pID, nil
}
return -1, nil
}

func rebuildNewTableIDFromTable(ctx sessionctx.Context, ts *PhysicalTableScan, sc *stmtctx.StatementContext, pkCol *expression.Column) (int64, error) {
pi := ts.Table.Partition
if pi.Type == model.PartitionTypeHash && len(ts.Ranges) == 1 && ts.Ranges[0].IsPoint(sc) {
schema, names := buildSchemaAndNameFromPKCol(pkCol, ts.DBName, ts.Table)
pID, err := getPhysicalTableIDForPartition(ctx, pi, schema, names, ts.Ranges[0].LowVal)
if err != nil {
return -1, err
}
return pID, nil
}
return -1, nil
}
129 changes: 67 additions & 62 deletions planner/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,68 +411,73 @@ func (s *testPrepareSerialSuite) TestPrepareCacheForPartition(c *C) {
c.Assert(err, IsNil)

tk.MustExec("use test")
// Test for PointGet and IndexRead.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the old test case is sufficient @winoros

tk.MustExec("drop table if exists t_index_read")
tk.MustExec("create table t_index_read (id int, k int, c varchar(10), primary key (id, k)) partition by hash(id+k) partitions 10")
tk.MustExec("insert into t_index_read values (1, 2, 'abc'), (3, 4, 'def'), (5, 6, 'xyz')")
tk.MustExec("prepare stmt1 from 'select c from t_index_read where id = ? and k = ?;'")
tk.MustExec("set @id=1, @k=2")
// When executing one statement at the first time, we don't use cache, so we need to execute it at least twice to test the cache.
tk.MustQuery("execute stmt1 using @id, @k").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt1 using @id, @k").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5, @k=6")
tk.MustQuery("execute stmt1 using @id, @k").Check(testkit.Rows("xyz"))
tk.MustExec("prepare stmt2 from 'select c from t_index_read where id = ? and k = ? and 1 = 1;'")
tk.MustExec("set @id=1, @k=2")
tk.MustQuery("execute stmt2 using @id, @k").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt2 using @id, @k").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5, @k=6")
tk.MustQuery("execute stmt2 using @id, @k").Check(testkit.Rows("xyz"))
// Test for TableScan.
tk.MustExec("drop table if exists t_table_read")
tk.MustExec("create table t_table_read (id int, k int, c varchar(10), primary key(id)) partition by hash(id) partitions 10")
tk.MustExec("insert into t_table_read values (1, 2, 'abc'), (3, 4, 'def'), (5, 6, 'xyz')")
tk.MustExec("prepare stmt3 from 'select c from t_index_read where id = ?;'")
tk.MustExec("set @id=1")
// When executing one statement at the first time, we don't use cache, so we need to execute it at least twice to test the cache.
tk.MustQuery("execute stmt3 using @id").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt3 using @id").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5")
tk.MustQuery("execute stmt3 using @id").Check(testkit.Rows("xyz"))
tk.MustExec("prepare stmt4 from 'select c from t_index_read where id = ? and k = ?'")
tk.MustExec("set @id=1, @k=2")
tk.MustQuery("execute stmt4 using @id, @k").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt4 using @id, @k").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5, @k=6")
tk.MustQuery("execute stmt4 using @id, @k").Check(testkit.Rows("xyz"))
// Query on range partition tables should not raise error.
tk.MustExec("create table t_range_index (id int, k int, c varchar(10), primary key(id)) partition by range(id) ( PARTITION p0 VALUES LESS THAN (4), PARTITION p1 VALUES LESS THAN (14),PARTITION p2 VALUES LESS THAN (20) )")
tk.MustExec("insert into t_range_index values (1, 2, 'abc'), (5, 4, 'def'), (13, 6, 'xyz'), (17, 6, 'hij')")
tk.MustExec("prepare stmt5 from 'select c from t_range_index where id = ?'")
tk.MustExec("set @id=1")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("def"))
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("def"))
tk.MustExec("set @id=13")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("xyz"))
tk.MustExec("set @id=17")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("hij"))

tk.MustExec("create table t_range_table (id int, k int, c varchar(10)) partition by range(id) ( PARTITION p0 VALUES LESS THAN (4), PARTITION p1 VALUES LESS THAN (14),PARTITION p2 VALUES LESS THAN (20) )")
tk.MustExec("insert into t_range_table values (1, 2, 'abc'), (5, 4, 'def'), (13, 6, 'xyz'), (17, 6, 'hij')")
tk.MustExec("prepare stmt6 from 'select c from t_range_table where id = ?'")
tk.MustExec("set @id=1")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("def"))
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("def"))
tk.MustExec("set @id=13")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("xyz"))
tk.MustExec("set @id=17")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("hij"))
for _, val := range []string{"1", "null"} {
tk.MustExec("set @try_old_partition_implementation = " + val)
// Test for PointGet and IndexRead.
tk.MustExec("drop table if exists t_index_read")
tk.MustExec("create table t_index_read (id int, k int, c varchar(10), primary key (id, k)) partition by hash(id+k) partitions 10")
tk.MustExec("insert into t_index_read values (1, 2, 'abc'), (3, 4, 'def'), (5, 6, 'xyz')")
tk.MustExec("prepare stmt1 from 'select c from t_index_read where id = ? and k = ?;'")
tk.MustExec("set @id=1, @k=2")
// When executing one statement at the first time, we don't use cache, so we need to execute it at least twice to test the cache.
tk.MustQuery("execute stmt1 using @id, @k").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt1 using @id, @k").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5, @k=6")
tk.MustQuery("execute stmt1 using @id, @k").Check(testkit.Rows("xyz"))
tk.MustExec("prepare stmt2 from 'select c from t_index_read where id = ? and k = ? and 1 = 1;'")
tk.MustExec("set @id=1, @k=2")
tk.MustQuery("execute stmt2 using @id, @k").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt2 using @id, @k").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5, @k=6")
tk.MustQuery("execute stmt2 using @id, @k").Check(testkit.Rows("xyz"))
// Test for TableScan.
tk.MustExec("drop table if exists t_table_read")
tk.MustExec("create table t_table_read (id int, k int, c varchar(10), primary key(id)) partition by hash(id) partitions 10")
tk.MustExec("insert into t_table_read values (1, 2, 'abc'), (3, 4, 'def'), (5, 6, 'xyz')")
tk.MustExec("prepare stmt3 from 'select c from t_index_read where id = ?;'")
tk.MustExec("set @id=1")
// When executing one statement at the first time, we don't use cache, so we need to execute it at least twice to test the cache.
tk.MustQuery("execute stmt3 using @id").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt3 using @id").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5")
tk.MustQuery("execute stmt3 using @id").Check(testkit.Rows("xyz"))
tk.MustExec("prepare stmt4 from 'select c from t_index_read where id = ? and k = ?'")
tk.MustExec("set @id=1, @k=2")
tk.MustQuery("execute stmt4 using @id, @k").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt4 using @id, @k").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5, @k=6")
tk.MustQuery("execute stmt4 using @id, @k").Check(testkit.Rows("xyz"))
// Query on range partition tables should not raise error.
tk.MustExec("drop table if exists t_range_index")
tk.MustExec("create table t_range_index (id int, k int, c varchar(10), primary key(id)) partition by range(id) ( PARTITION p0 VALUES LESS THAN (4), PARTITION p1 VALUES LESS THAN (14),PARTITION p2 VALUES LESS THAN (20) )")
tk.MustExec("insert into t_range_index values (1, 2, 'abc'), (5, 4, 'def'), (13, 6, 'xyz'), (17, 6, 'hij')")
tk.MustExec("prepare stmt5 from 'select c from t_range_index where id = ?'")
tk.MustExec("set @id=1")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("def"))
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("def"))
tk.MustExec("set @id=13")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("xyz"))
tk.MustExec("set @id=17")
tk.MustQuery("execute stmt5 using @id").Check(testkit.Rows("hij"))

tk.MustExec("drop table if exists t_range_table")
tk.MustExec("create table t_range_table (id int, k int, c varchar(10)) partition by range(id) ( PARTITION p0 VALUES LESS THAN (4), PARTITION p1 VALUES LESS THAN (14),PARTITION p2 VALUES LESS THAN (20) )")
tk.MustExec("insert into t_range_table values (1, 2, 'abc'), (5, 4, 'def'), (13, 6, 'xyz'), (17, 6, 'hij')")
tk.MustExec("prepare stmt6 from 'select c from t_range_table where id = ?'")
tk.MustExec("set @id=1")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("abc"))
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("abc"))
tk.MustExec("set @id=5")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("def"))
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("def"))
tk.MustExec("set @id=13")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("xyz"))
tk.MustExec("set @id=17")
tk.MustQuery("execute stmt6 using @id").Check(testkit.Rows("hij"))
}
}

func newSession(c *C, store kv.Storage, dbName string) session.Session {
Expand Down