Skip to content

Commit

Permalink
Merge branch 'master' into issue23897
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored Jun 24, 2021
2 parents 83c0d52 + 39c547f commit 7a6a757
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 58 deletions.
46 changes: 28 additions & 18 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1638,12 +1638,15 @@ func checkTableInfoValid(tblInfo *model.TableInfo) error {
return checkInvisibleIndexOnPK(tblInfo)
}

func buildTableInfoWithLike(ident ast.Ident, referTblInfo *model.TableInfo, s *ast.CreateTableStmt) (*model.TableInfo, error) {
func buildTableInfoWithLike(ctx sessionctx.Context, ident ast.Ident, referTblInfo *model.TableInfo, s *ast.CreateTableStmt) (*model.TableInfo, error) {
// Check the referred table is a real table object.
if referTblInfo.IsSequence() || referTblInfo.IsView() {
return nil, ErrWrongObject.GenWithStackByArgs(ident.Schema, referTblInfo.Name, "BASE TABLE")
}
tblInfo := *referTblInfo
if err := setTemporaryType(ctx, &tblInfo, s); err != nil {
return nil, errors.Trace(err)
}
// Check non-public column and adjust column offset.
newColumns := referTblInfo.Cols()
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
Expand Down Expand Up @@ -1731,22 +1734,8 @@ func buildTableInfoWithStmt(ctx sessionctx.Context, s *ast.CreateTableStmt, dbCh
if err != nil {
return nil, errors.Trace(err)
}
switch s.TemporaryKeyword {
case ast.TemporaryGlobal:
tbInfo.TempTableType = model.TempTableGlobal
if !ctx.GetSessionVars().EnableGlobalTemporaryTable {
return nil, errors.New("global temporary table is experimental and it is switched off by tidb_enable_global_temporary_table")
}
// "create global temporary table ... on commit preserve rows"
if !s.OnCommitDelete {
return nil, errors.Trace(errUnsupportedOnCommitPreserve)
}
case ast.TemporaryLocal:
// TODO: set "tbInfo.TempTableType = model.TempTableLocal" after local temporary table is supported.
tbInfo.TempTableType = model.TempTableNone
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("local TEMPORARY TABLE is not supported yet, TEMPORARY will be parsed but ignored"))
case ast.TemporaryNone:
tbInfo.TempTableType = model.TempTableNone
if err = setTemporaryType(ctx, tbInfo, s); err != nil {
return nil, errors.Trace(err)
}

if err = setTableAutoRandomBits(ctx, tbInfo, colDefs); err != nil {
Expand Down Expand Up @@ -1808,7 +1797,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
// build tableInfo
var tbInfo *model.TableInfo
if s.ReferTable != nil {
tbInfo, err = buildTableInfoWithLike(ident, referTbl.Meta(), s)
tbInfo, err = buildTableInfoWithLike(ctx, ident, referTbl.Meta(), s)
} else {
tbInfo, err = buildTableInfoWithStmt(ctx, s, schema.Charset, schema.Collate)
}
Expand All @@ -1828,6 +1817,27 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, onExist, false /*tryRetainID*/)
}

func setTemporaryType(ctx sessionctx.Context, tbInfo *model.TableInfo, s *ast.CreateTableStmt) error {
switch s.TemporaryKeyword {
case ast.TemporaryGlobal:
tbInfo.TempTableType = model.TempTableGlobal
if !ctx.GetSessionVars().EnableGlobalTemporaryTable {
return errors.New("global temporary table is experimental and it is switched off by tidb_enable_global_temporary_table")
}
// "create global temporary table ... on commit preserve rows"
if !s.OnCommitDelete {
return errors.Trace(errUnsupportedOnCommitPreserve)
}
case ast.TemporaryLocal:
// TODO: set "tbInfo.TempTableType = model.TempTableLocal" after local temporary table is supported.
tbInfo.TempTableType = model.TempTableNone
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("local TEMPORARY TABLE is not supported yet, TEMPORARY will be parsed but ignored"))
default:
tbInfo.TempTableType = model.TempTableNone
}
return nil
}

func (d *ddl) CreateTableWithInfo(
ctx sessionctx.Context,
dbName model.CIStr,
Expand Down
33 changes: 33 additions & 0 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,39 @@ func (s *testSerialSuite) TestCreateTableWithLikeAtTemporaryMode(c *C) {
c.Assert(err, IsNil)
tableInfo := table.Meta()
c.Assert(len(tableInfo.ForeignKeys), Equals, 0)

// Issue 25613.
// Test from->normal, to->normal.
tk.MustExec("drop table if exists tb1, tb2")
tk.MustExec("create table tb1(id int);")
tk.MustExec("create table tb2 like tb1")
defer tk.MustExec("drop table if exists tb1, tb2")
tk.MustQuery("show create table tb2;").Check(testkit.Rows("tb2 CREATE TABLE `tb2` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))

// Test from->normal, to->global temporary.
tk.MustExec("drop table if exists tb3, tb4")
tk.MustExec("create table tb3(id int);")
tk.MustExec("create global temporary table tb4 like tb3 on commit delete rows;")
defer tk.MustExec("drop table if exists tb3, tb4")
tk.MustQuery("show create table tb4;").Check(testkit.Rows("tb4 CREATE GLOBAL TEMPORARY TABLE `tb4` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=memory DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ON COMMIT DELETE ROWS"))

// Test from->global temporary, to->normal.
tk.MustExec("drop table if exists tb5, tb6")
tk.MustExec("create global temporary table tb5(id int) on commit delete rows;")
_, err = tk.Exec("create table tb6 like tb5;")
c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("create table like").Error())
defer tk.MustExec("drop table if exists tb5, tb6")

// Test from->global temporary, to->global temporary.
tk.MustExec("drop table if exists tb7, tb8")
tk.MustExec("create global temporary table tb7(id int) on commit delete rows;")
_, err = tk.Exec("create global temporary table tb8 like tb7 on commit delete rows;")
c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("create table like").Error())
defer tk.MustExec("drop table if exists tb7, tb8")
}

// TestCancelAddIndex1 tests canceling ddl job when the add index worker is not started.
Expand Down
2 changes: 1 addition & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/pingcap/tipb/go-tipb"
)

// DispatchMPPTasks dispathes all tasks and returns an iterator.
// DispatchMPPTasks dispatches all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) {
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks)
if resp == nil {
Expand Down
4 changes: 3 additions & 1 deletion executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
}
newSnapshotTS := getSnapshotTSByName()
newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS
if newSnapshotIsSet {
// We don't check snapshot with gc safe point for read_ts
// Client-go will automatically check the snapshotTS with gc safe point. It's unnecessary to check gc safe point during set executor.
if newSnapshotIsSet && name != variable.TiDBTxnReadTS {
err = gcutil.ValidateSnapshot(e.ctx, newSnapshotTS)
if err != nil {
fallbackOldSnapshotTS()
Expand Down
10 changes: 10 additions & 0 deletions executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ func (s *testSuite5) TestShowWarningsForExprPushdown(c *C) {
tk.MustExec("explain select * from show_warnings_expr_pushdown where date_add(value, interval 1 day) = '2020-01-01'")
c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Equals, uint16(1))
tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1105|Scalar function 'date_add'(signature: AddDateDatetimeInt) can not be pushed to tikv"))
tk.MustExec("explain select max(date_add(value, interval 1 day)) from show_warnings_expr_pushdown group by a")
c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Equals, uint16(2))
tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1105|Scalar function 'date_add'(signature: AddDateDatetimeInt) can not be pushed to tikv", "Warning|1105|Aggregation can not be pushed to tikv because arguments of AggFunc `max` contains unsupported exprs"))
tk.MustExec("explain select max(a) from show_warnings_expr_pushdown group by date_add(value, interval 1 day)")
c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Equals, uint16(2))
tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1105|Scalar function 'date_add'(signature: AddDateDatetimeInt) can not be pushed to tikv", "Warning|1105|Aggregation can not be pushed to tikv because groupByItems contain unsupported exprs"))
tk.MustExec("set tidb_opt_distinct_agg_push_down=0")
tk.MustExec("explain select max(distinct a) from show_warnings_expr_pushdown group by value")
c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Equals, uint16(1))
tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1105|Aggregation can not be pushed to storage layer in non-mpp mode because it contains agg function with distinct"))
}

func (s *testSuite5) TestShowGrantsPrivilege(c *C) {
Expand Down
6 changes: 6 additions & 0 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,14 +1271,20 @@ var tableTableTiFlashTablesCols = []columnInfo{
{name: "AVG_PACK_ROWS_IN_STABLE", tp: mysql.TypeDouble, size: 64},
{name: "AVG_PACK_SIZE_IN_STABLE", tp: mysql.TypeDouble, size: 64},
{name: "STORAGE_STABLE_NUM_SNAPSHOTS", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_STABLE_OLDEST_SNAPSHOT_LIFETIME", tp: mysql.TypeDouble, size: 64},
{name: "STORAGE_STABLE_OLDEST_SNAPSHOT_THREAD_ID", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_STABLE_NUM_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_STABLE_NUM_NORMAL_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_STABLE_MAX_PAGE_ID", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_DELTA_NUM_SNAPSHOTS", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_DELTA_OLDEST_SNAPSHOT_LIFETIME", tp: mysql.TypeDouble, size: 64},
{name: "STORAGE_DELTA_OLDEST_SNAPSHOT_THREAD_ID", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_DELTA_NUM_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_DELTA_NUM_NORMAL_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_DELTA_MAX_PAGE_ID", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_META_NUM_SNAPSHOTS", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_META_OLDEST_SNAPSHOT_LIFETIME", tp: mysql.TypeDouble, size: 64},
{name: "STORAGE_META_OLDEST_SNAPSHOT_THREAD_ID", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_META_NUM_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_META_NUM_NORMAL_PAGES", tp: mysql.TypeLonglong, size: 64},
{name: "STORAGE_META_MAX_PAGE_ID", tp: mysql.TypeLonglong, size: 64},
Expand Down
15 changes: 13 additions & 2 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math"
"sort"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -2369,17 +2370,24 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P

// TODO: support more operators and distinct later
func (la *LogicalAggregation) checkCanPushDownToMPP() bool {
hasUnsupportedDistinct := false
for _, agg := range la.AggFuncs {
// MPP does not support distinct except count distinct now
if agg.HasDistinct {
if agg.Name != ast.AggFuncCount {
return false
hasUnsupportedDistinct = true
}
}
// MPP does not support AggFuncApproxCountDistinct now
if agg.Name == ast.AggFuncApproxCountDistinct {
return false
hasUnsupportedDistinct = true
}
}
if hasUnsupportedDistinct {
if la.ctx.GetSessionVars().StmtCtx.InExplainStmt {
la.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("Aggregation can not be pushed to storage layer in mpp mode because it contains agg function with distinct"))
}
return false
}
return CheckAggCanPushCop(la.ctx, la.AggFuncs, la.GroupByItems, kv.TiFlash)
}
Expand Down Expand Up @@ -2461,6 +2469,9 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
if la.HasDistinct() {
// TODO: remove after the cost estimation of distinct pushdown is implemented.
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
if la.ctx.GetSessionVars().StmtCtx.InExplainStmt {
la.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("Aggregation can not be pushed to storage layer in non-mpp mode because it contains agg function with distinct"))
}
taskTypes = []property.TaskType{property.RootTaskType}
}
} else if !la.aggHints.preferAggToCop {
Expand Down
55 changes: 34 additions & 21 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,37 +1401,50 @@ func (sel *PhysicalSelection) attach2Task(tasks ...task) task {
func CheckAggCanPushCop(sctx sessionctx.Context, aggFuncs []*aggregation.AggFuncDesc, groupByItems []expression.Expression, storeType kv.StoreType) bool {
sc := sctx.GetSessionVars().StmtCtx
client := sctx.GetClient()
ret := true
reason := ""
for _, aggFunc := range aggFuncs {
// if the aggFunc contain VirtualColumn or CorrelatedColumn, it can not be pushed down.
if expression.ContainVirtualColumn(aggFunc.Args) || expression.ContainCorrelatedColumn(aggFunc.Args) {
sctx.GetSessionVars().RaiseWarningWhenMPPEnforced(
"MPP mode may be blocked because expressions of AggFunc `" + aggFunc.Name + "` contain virtual column or correlated column, which is not supported now.")
return false
}
pb := aggregation.AggFuncToPBExpr(sc, client, aggFunc)
if pb == nil {
sctx.GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because AggFunc `" + aggFunc.Name + "` is not supported now.")
return false
reason = "expressions of AggFunc `" + aggFunc.Name + "` contain virtual column or correlated column, which is not supported now"
ret = false
break
}
if !aggregation.CheckAggPushDown(aggFunc, storeType) {
if sc.InExplainStmt {
storageName := storeType.Name()
if storeType == kv.UnSpecified {
storageName = "storage layer"
}
sc.AppendWarning(errors.New("Agg function '" + aggFunc.Name + "' can not be pushed to " + storageName))
}
return false
reason = "AggFunc `" + aggFunc.Name + "` is not supported now"
ret = false
break
}
if !expression.CanExprsPushDown(sc, aggFunc.Args, client, storeType) {
return false
reason = "arguments of AggFunc `" + aggFunc.Name + "` contains unsupported exprs"
ret = false
break
}
pb := aggregation.AggFuncToPBExpr(sc, client, aggFunc)
if pb == nil {
reason = "AggFunc `" + aggFunc.Name + "` can not be converted to pb expr"
ret = false
break
}
}
if expression.ContainVirtualColumn(groupByItems) {
sctx.GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because groupByItems contain virtual column, which is not supported now.")
return false
if ret && expression.ContainVirtualColumn(groupByItems) {
reason = "groupByItems contain virtual columns, which is not supported now"
ret = false
}
if ret && !expression.CanExprsPushDown(sc, groupByItems, client, storeType) {
reason = "groupByItems contain unsupported exprs"
ret = false
}

if !ret && sc.InExplainStmt {
sctx.GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because " + reason)
storageName := storeType.Name()
if storeType == kv.UnSpecified {
storageName = "storage layer"
}
sc.AppendWarning(errors.New("Aggregation can not be pushed to " + storageName + " because " + reason))
}
return expression.CanExprsPushDown(sc, groupByItems, client, storeType)
return ret
}

// AggInfo stores the information of an Aggregation.
Expand Down
Loading

0 comments on commit 7a6a757

Please sign in to comment.