Skip to content

Commit

Permalink
Merge branch 'release-4.0' into release-4.0-3f7a573caacb
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Mar 17, 2021
2 parents d093a5f + f055086 commit f966699
Show file tree
Hide file tree
Showing 103 changed files with 2,193 additions and 1,193 deletions.
4 changes: 4 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
component/executor:
- distsql/*
- executor/*
- !executor/brie*
- util/chunk/*
- util/disk/*
- util/execdetails/*
Expand Down Expand Up @@ -31,3 +32,6 @@ component/DDL:

component/config:
- config/*

sig/migrate:
- executor/brie*
36 changes: 36 additions & 0 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,28 @@ func (s *testSuite) TestCapturePlanBaselineIgnoreTiFlash(c *C) {
c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t`")
}

func (s *testSuite) TestSPMHitInfo(c *C) {
tk := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1(id int)")
tk.MustExec("create table t2(id int)")

c.Assert(tk.HasPlan("SELECT * from t1,t2 where t1.id = t2.id", "HashJoin"), IsTrue)
c.Assert(tk.HasPlan("SELECT /*+ TIDB_SMJ(t1, t2) */ * from t1,t2 where t1.id = t2.id", "MergeJoin"), IsTrue)

tk.MustExec("SELECT * from t1,t2 where t1.id = t2.id")
tk.MustQuery(`select @@last_plan_from_binding;`).Check(testkit.Rows("0"))
tk.MustExec("create global binding for SELECT * from t1,t2 where t1.id = t2.id using SELECT /*+ TIDB_SMJ(t1, t2) */ * from t1,t2 where t1.id = t2.id")

c.Assert(tk.HasPlan("SELECT * from t1,t2 where t1.id = t2.id", "MergeJoin"), IsTrue)
tk.MustExec("SELECT * from t1,t2 where t1.id = t2.id")
tk.MustQuery(`select @@last_plan_from_binding;`).Check(testkit.Rows("1"))
tk.MustExec("drop global binding for SELECT * from t1,t2 where t1.id = t2.id")
}

func (s *testSuite) TestNotEvolvePlanForReadStorageHint(c *C) {
tk := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
Expand Down Expand Up @@ -1843,3 +1865,17 @@ func (s *testSuite) TestCaptureWithZeroSlowLogThreshold(c *C) {
c.Assert(len(rows), Equals, 1)
c.Assert(rows[0][0], Equals, "select * from test . t")
}

func (s *testSuite) TestSPMWithoutUseDatabase(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk1 := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
s.cleanBindingEnv(tk1)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, key(a))")
tk.MustExec("create global binding for select * from t using select * from t force index(a)")

err := tk1.ExecToErr("select * from t")
c.Assert(err, ErrorMatches, "*No database selected")
}
144 changes: 68 additions & 76 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/parser/format"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -123,17 +122,21 @@ func NewBindHandle(ctx sessionctx.Context) *BindHandle {
func (h *BindHandle) Update(fullLoad bool) (err error) {
h.bindInfo.Lock()
lastUpdateTime := h.bindInfo.lastUpdateTime
updateTime := lastUpdateTime.String()
if fullLoad {
updateTime = "0000-00-00 00:00:00"
}

sql := "select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source from mysql.bind_info"
if !fullLoad {
sql += " where update_time > \"" + lastUpdateTime.String() + "\""
exec := h.sctx.Context.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParams(context.TODO(), `SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source
FROM mysql.bind_info WHERE update_time > %? ORDER BY update_time`, updateTime)
if err != nil {
return err
}
// We need to apply the updates by order, wrong apply order of same original sql may cause inconsistent state.
sql += " order by update_time"

// No need to acquire the session context lock for ExecRestrictedSQL, it
// No need to acquire the session context lock for ExecRestrictedStmt, it
// uses another background session.
rows, _, err := h.sctx.Context.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
rows, _, err := exec.ExecRestrictedStmt(context.Background(), stmt)
if err != nil {
h.bindInfo.Unlock()
return err
Expand Down Expand Up @@ -215,7 +218,7 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
return err
}
// Binding recreation should physically delete previous bindings.
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, ""))
_, err = exec.ExecuteInternal(context.TODO(), `DELETE FROM mysql.bind_info WHERE original_sql = %?`, record.OriginalSQL)
if err != nil {
return err
}
Expand All @@ -227,7 +230,17 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
record.Bindings[i].UpdateTime = now

// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
_, err = exec.ExecuteInternal(context.TODO(), `INSERT INTO mysql.bind_info VALUES (%?,%?, %?, %?, %?, %?, %?, %?, %?)`,
record.OriginalSQL,
record.Bindings[i].BindSQL,
record.Db,
record.Bindings[i].Status,
record.Bindings[i].CreateTime.String(),
record.Bindings[i].UpdateTime.String(),
record.Bindings[i].Charset,
record.Bindings[i].Collation,
record.Bindings[i].Source,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -289,7 +302,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
return err
}
if duplicateBinding != nil {
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, duplicateBinding.BindSQL))
_, err = exec.ExecuteInternal(context.TODO(), `DELETE FROM mysql.bind_info WHERE original_sql = %? AND bind_sql = %?`, record.OriginalSQL, duplicateBinding.BindSQL)
if err != nil {
return err
}
Expand All @@ -305,7 +318,17 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
record.Bindings[i].UpdateTime = now

// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
_, err = exec.ExecuteInternal(context.TODO(), `INSERT INTO mysql.bind_info VALUES (%?, %?, %?, %?, %?, %?, %?, %?, %?)`,
record.OriginalSQL,
record.Bindings[i].BindSQL,
record.Db,
record.Bindings[i].Status,
record.Bindings[i].CreateTime.String(),
record.Bindings[i].UpdateTime.String(),
record.Bindings[i].Charset,
record.Bindings[i].Collation,
record.Bindings[i].Source,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -349,17 +372,19 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e

// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(); err != nil {
return
return err
}

updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)
updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3).String()

bindSQL := ""
if binding != nil {
bindSQL = binding.BindSQL
if binding == nil {
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %?`,
deleted, updateTs, originalSQL, updateTs)
} else {
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND bind_sql = %?`,
deleted, updateTs, originalSQL, updateTs, binding.BindSQL)
}

_, err = exec.ExecuteInternal(context.TODO(), h.logicalDeleteBindInfoSQL(originalSQL, db, updateTs, bindSQL))
deleteRows = int(h.sctx.Context.GetSessionVars().StmtCtx.AffectedRows())
return err
}
Expand Down Expand Up @@ -575,49 +600,13 @@ func (c cache) getBindRecord(hash, normdOrigSQL, db string) *BindRecord {
return nil
}

func (h *BindHandle) deleteBindInfoSQL(normdOrigSQL, db, bindSQL string) string {
sql := fmt.Sprintf(
`DELETE FROM mysql.bind_info WHERE original_sql=%s`,
expression.Quote(normdOrigSQL),
)
if bindSQL == "" {
return sql
}
return sql + fmt.Sprintf(` and bind_sql = %s`, expression.Quote(bindSQL))
}

func (h *BindHandle) insertBindInfoSQL(orignalSQL string, db string, info Binding) string {
return fmt.Sprintf(`INSERT INTO mysql.bind_info VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)`,
expression.Quote(orignalSQL),
expression.Quote(info.BindSQL),
expression.Quote(db),
expression.Quote(info.Status),
expression.Quote(info.CreateTime.String()),
expression.Quote(info.UpdateTime.String()),
expression.Quote(info.Charset),
expression.Quote(info.Collation),
expression.Quote(info.Source),
)
}

// LockBindInfoSQL simulates LOCK TABLE by updating a same row in each pessimistic transaction.
func (h *BindHandle) LockBindInfoSQL() string {
return fmt.Sprintf("UPDATE mysql.bind_info SET source=%s WHERE original_sql=%s",
expression.Quote(Builtin),
expression.Quote(BuiltinPseudoSQL4BindLock))
}

func (h *BindHandle) logicalDeleteBindInfoSQL(originalSQL, db string, updateTs types.Time, bindingSQL string) string {
updateTsStr := updateTs.String()
sql := fmt.Sprintf(`UPDATE mysql.bind_info SET status=%s,update_time=%s WHERE original_sql=%s and update_time<%s`,
expression.Quote(deleted),
expression.Quote(updateTsStr),
expression.Quote(originalSQL),
expression.Quote(updateTsStr))
if bindingSQL == "" {
return sql
sql, err := sqlexec.EscapeSQL("UPDATE mysql.bind_info SET source= %? WHERE original_sql= %?", Builtin, BuiltinPseudoSQL4BindLock)
if err != nil {
return ""
}
return sql + fmt.Sprintf(` and bind_sql = %s`, expression.Quote(bindingSQL))
return sql
}

// CaptureBaselines is used to automatically capture plan baselines.
Expand Down Expand Up @@ -661,16 +650,14 @@ func (h *BindHandle) CaptureBaselines() {
func getHintsForSQL(sctx sessionctx.Context, sql string) (string, error) {
oriVals := sctx.GetSessionVars().UsePlanBaselines
sctx.GetSessionVars().UsePlanBaselines = false
recordSets, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), fmt.Sprintf("explain format='hint' %s", sql))
rs, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), fmt.Sprintf("explain format='hint' %s", sql))
sctx.GetSessionVars().UsePlanBaselines = oriVals
if len(recordSets) > 0 {
defer terror.Log(recordSets[0].Close())
}
if err != nil {
return "", err
}
chk := recordSets[0].NewChunk()
err = recordSets[0].Next(context.TODO(), chk)
defer terror.Call(rs.Close)
chk := rs.NewChunk()
err = rs.Next(context.TODO(), chk)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -766,9 +753,17 @@ func (h *BindHandle) SaveEvolveTasksToStore() {
}

func getEvolveParameters(ctx sessionctx.Context) (time.Duration, time.Time, time.Time, error) {
sql := fmt.Sprintf("select variable_name, variable_value from mysql.global_variables where variable_name in ('%s', '%s', '%s')",
variable.TiDBEvolvePlanTaskMaxTime, variable.TiDBEvolvePlanTaskStartTime, variable.TiDBEvolvePlanTaskEndTime)
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(
context.TODO(),
"SELECT variable_name, variable_value FROM mysql.global_variables WHERE variable_name IN (%?, %?, %?)",
variable.TiDBEvolvePlanTaskMaxTime,
variable.TiDBEvolvePlanTaskStartTime,
variable.TiDBEvolvePlanTaskEndTime,
)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.TODO(), stmt)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
Expand Down Expand Up @@ -839,7 +834,7 @@ func (h *BindHandle) getOnePendingVerifyJob() (string, string, Binding) {
func (h *BindHandle) getRunningDuration(sctx sessionctx.Context, db, sql string, maxTime time.Duration) (time.Duration, error) {
ctx := context.TODO()
if db != "" {
_, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, fmt.Sprintf("use `%s`", db))
_, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "use %n", db)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -873,23 +868,20 @@ func runSQL(ctx context.Context, sctx sessionctx.Context, sql string, resultChan
resultChan <- fmt.Errorf("run sql panicked: %v", string(buf))
}
}()
recordSets, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rs, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
if err != nil {
if len(recordSets) > 0 {
terror.Call(recordSets[0].Close)
}
terror.Call(rs.Close)
resultChan <- err
return
}
recordSet := recordSets[0]
chk := recordSets[0].NewChunk()
chk := rs.NewChunk()
for {
err = recordSet.Next(ctx, chk)
err = rs.Next(ctx, chk)
if err != nil || chk.NumRows() == 0 {
break
}
}
terror.Call(recordSets[0].Close)
terror.Call(rs.Close)
resultChan <- err
}

Expand Down
20 changes: 15 additions & 5 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package ddl

import (
"context"
"fmt"
"math/bits"
"strings"
Expand Down Expand Up @@ -538,16 +539,25 @@ func checkAndApplyNewAutoRandomBits(job *model.Job, t *meta.Meta, tblInfo *model
// checkForNullValue ensure there are no null values of the column of this table.
// `isDataTruncated` indicates whether the new field and the old field type are the same, in order to be compatible with mysql.
func checkForNullValue(ctx sessionctx.Context, isDataTruncated bool, schema, table, newCol model.CIStr, oldCols ...*model.ColumnInfo) error {
colsStr := ""
var buf strings.Builder
buf.WriteString("select 1 from %n.%n where ")
paramsList := make([]interface{}, 0, 2+len(oldCols))
paramsList = append(paramsList, schema.L, table.L)
for i, col := range oldCols {
if i == 0 {
colsStr += "`" + col.Name.L + "` is null"
buf.WriteString("%n is null")
paramsList = append(paramsList, col.Name.L)
} else {
colsStr += " or `" + col.Name.L + "` is null"
buf.WriteString(" or %n is null")
paramsList = append(paramsList, col.Name.L)
}
}
sql := fmt.Sprintf("select 1 from `%s`.`%s` where %s limit 1;", schema.L, table.L, colsStr)
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
buf.WriteString(" limit 1")
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), buf.String(), paramsList...)
if err != nil {
return errors.Trace(err)
}
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.Background(), stmt)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit f966699

Please sign in to comment.