Skip to content

Commit

Permalink
Merge branch 'master' into no-rename-partition-col
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss authored Dec 28, 2022
2 parents 837c4f3 + f9af75f commit f34f99c
Show file tree
Hide file tree
Showing 26 changed files with 799 additions and 22 deletions.
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ type Config struct {
// EnableGlobalKill indicates whether to enable global kill.
TrxSummary TrxSummary `toml:"transaction-summary" json:"transaction-summary"`
EnableGlobalKill bool `toml:"enable-global-kill" json:"enable-global-kill"`
// InitializeSQLFile is a file that will be executed after first bootstrap only.
// It can be used to set GLOBAL system variable values
InitializeSQLFile string `toml:"initialize-sql-file" json:"initialize-sql-file"`

// The following items are deprecated. We need to keep them here temporarily
// to support the upgrade process. They can be removed in future.
Expand Down
110 changes: 110 additions & 0 deletions ddl/fktest/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1700,3 +1701,112 @@ func TestForeignKeyWithCacheTable(t *testing.T) {
tk.MustExec("alter table t2 nocache;")
tk.MustExec("drop table t1,t2;")
}

func TestForeignKeyAndConcurrentDDL(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@foreign_key_checks=1;")
tk.MustExec("use test")
// Test foreign key refer cache table.
tk.MustExec("create table t1 (a int, b int, c int, index(a), index(b), index(c));")
tk.MustExec("create table t2 (a int, b int, c int, index(a), index(b), index(c));")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("set @@foreign_key_checks=1;")
tk2.MustExec("use test")
passCases := []struct {
prepare []string
ddl1 string
ddl2 string
}{
{
ddl1: "alter table t2 add constraint fk_1 foreign key (a) references t1(a)",
ddl2: "alter table t2 add constraint fk_2 foreign key (b) references t1(b)",
},
{
ddl1: "alter table t2 drop foreign key fk_1",
ddl2: "alter table t2 drop foreign key fk_2",
},
{
prepare: []string{
"alter table t2 drop index a",
},
ddl1: "alter table t2 add index(a)",
ddl2: "alter table t2 add constraint fk_1 foreign key (a) references t1(a)",
},
{
ddl1: "alter table t2 drop index c",
ddl2: "alter table t2 add constraint fk_2 foreign key (b) references t1(b)",
},
}
for _, ca := range passCases {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
tk.MustExec(ca.ddl1)
}()
go func() {
defer wg.Done()
tk2.MustExec(ca.ddl2)
}()
wg.Wait()
}
errorCases := []struct {
prepare []string
ddl1 string
err1 string
ddl2 string
err2 string
}{
{
ddl1: "alter table t2 add constraint fk foreign key (a) references t1(a)",
err1: "[ddl:1826]Duplicate foreign key constraint name 'fk'",
ddl2: "alter table t2 add constraint fk foreign key (b) references t1(b)",
err2: "[ddl:1826]Duplicate foreign key constraint name 'fk'",
},
{
prepare: []string{
"alter table t2 add constraint fk_1 foreign key (a) references t1(a)",
},
ddl1: "alter table t2 drop foreign key fk_1",
err1: "[schema:1091]Can't DROP 'fk_1'; check that column/key exists",
ddl2: "alter table t2 drop foreign key fk_1",
err2: "[schema:1091]Can't DROP 'fk_1'; check that column/key exists",
},
{
ddl1: "alter table t2 drop index a",
err1: "[ddl:1553]Cannot drop index 'a': needed in a foreign key constraint",
ddl2: "alter table t2 add constraint fk_1 foreign key (a) references t1(a)",
err2: "[ddl:-1]Failed to add the foreign key constraint. Missing index for 'fk_1' foreign key columns in the table 't2'",
},
}
tk.MustExec("drop table t1,t2")
tk.MustExec("create table t1 (a int, b int, c int, index(a), index(b), index(c));")
tk.MustExec("create table t2 (a int, b int, c int, index(a), index(b), index(c));")
for i, ca := range errorCases {
for _, sql := range ca.prepare {
tk.MustExec(sql)
}
var wg sync.WaitGroup
var err1, err2 error
wg.Add(2)
go func() {
defer wg.Done()
err1 = tk.ExecToErr(ca.ddl1)
}()
go func() {
defer wg.Done()
err2 = tk2.ExecToErr(ca.ddl2)
}()
wg.Wait()
if (err1 == nil && err2 == nil) || (err1 != nil && err2 != nil) {
require.Failf(t, "both ddl1 and ddl2 execute success, but expect 1 error", fmt.Sprintf("idx: %v, err1: %v, err2: %v", i, err1, err2))
}
if err1 != nil {
require.Equal(t, ca.err1, err1.Error())
}
if err2 != nil {
require.Equal(t, ca.err2, err2.Error())
}
}
}
41 changes: 41 additions & 0 deletions ddl/metadatalocktest/mdl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,47 @@ func TestMDLBasicBatchPointGet(t *testing.T) {
require.Less(t, ts1, ts2)
}

func TestMDLAddForeignKey(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
sv := server.CreateMockServer(t, store)

sv.SetDomain(dom)
dom.InfoSyncer().SetSessionManager(sv)
defer sv.Close()

conn1 := server.CreateMockConn(t, sv)
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)
conn2 := server.CreateMockConn(t, sv)
tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec("create table t1(id int key);")
tk.MustExec("create table t2(id int key);")

tk.MustExec("begin")
tk.MustExec("insert into t2 values(1);")

var wg sync.WaitGroup
var ddlErr error
wg.Add(1)
var ts2 time.Time
go func() {
defer wg.Done()
ddlErr = tkDDL.ExecToErr("alter table test.t2 add foreign key (id) references t1(id)")
ts2 = time.Now()
}()

time.Sleep(2 * time.Second)

ts1 := time.Now()
tk.MustExec("commit")

wg.Wait()
require.Error(t, ddlErr)
require.Equal(t, "[ddl:1452]Cannot add or update a child row: a foreign key constraint fails (`test`.`t2`, CONSTRAINT `fk_1` FOREIGN KEY (`id`) REFERENCES `t1` (`id`))", ddlErr.Error())
require.Less(t, ts1, ts2)
}

func TestMDLRRUpdateSchema(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
sv := server.CreateMockServer(t, store)
Expand Down
19 changes: 14 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"math"
"runtime/trace"
"strconv"
"strings"
Expand Down Expand Up @@ -295,8 +296,12 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
}
a.Ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityHigh

var pointExecutor *PointGetExecutor
useMaxTS := startTs == math.MaxUint64

// try to reuse point get executor
if a.PsStmt.Executor != nil {
// We should only use the cached the executor when the startTS is MaxUint64
if a.PsStmt.Executor != nil && useMaxTS {
exec, ok := a.PsStmt.Executor.(*PointGetExecutor)
if !ok {
logutil.Logger(ctx).Error("invalid executor type, not PointGetExecutor for point get path")
Expand All @@ -306,17 +311,21 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan)
a.PsStmt.Executor = exec
pointExecutor = exec
}
}
if a.PsStmt.Executor == nil {

if pointExecutor == nil {
b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti)
newExecutor := b.build(a.Plan)
pointExecutor = b.build(a.Plan).(*PointGetExecutor)
if b.err != nil {
return nil, b.err
}
a.PsStmt.Executor = newExecutor

if useMaxTS {
a.PsStmt.Executor = pointExecutor
}
}
pointExecutor := a.PsStmt.Executor.(*PointGetExecutor)

if err = pointExecutor.Open(ctx); err != nil {
terror.Call(pointExecutor.Close)
Expand Down
5 changes: 5 additions & 0 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
}

if err = sessiontxn.OptimizeWithPlanAndThenWarmUp(c.Ctx, stmt.Plan); err != nil {
return nil, err
}

if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL {
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions executor/fktest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_test(
"//infoschema",
"//kv",
"//meta/autoid",
"//parser",
"//parser/ast",
"//parser/auth",
"//parser/format",
Expand Down
90 changes: 90 additions & 0 deletions executor/fktest/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/format"
Expand Down Expand Up @@ -2643,3 +2645,91 @@ func TestForeignKeyOnReplaceInto(t *testing.T) {
tk.MustExec("replace into t1 values (1, 'new-boss', null)")
tk.MustQuery("select id from t1 order by id").Check(testkit.Rows("1"))
}

func TestForeignKeyLargeTxnErr(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@foreign_key_checks=1")
tk.MustExec("use test")
tk.MustExec("create table t1 (id int auto_increment key, pid int, name varchar(200), index(pid));")
tk.MustExec("insert into t1 (name) values ('abcdefghijklmnopqrstuvwxyz1234567890abcdefghijklmnopqrstuvwxyz1234567890abcdefghijklmnopqrstuvwxyz1234567890abcdefghijklmnopqrstuvwxyz1234567890abcdefghijklmnopqrstuvwxyz1234567890');")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t1 (name) select name from t1;")
}
tk.MustQuery("select count(*) from t1").Check(testkit.Rows("256"))
tk.MustExec("update t1 set pid=1 where id>1")
tk.MustExec("alter table t1 add foreign key (pid) references t1 (id) on update cascade")
originLimit := atomic.LoadUint64(&kv.TxnTotalSizeLimit)
defer func() {
atomic.StoreUint64(&kv.TxnTotalSizeLimit, originLimit)
}()
// Set the limitation to a small value, make it easier to reach the limitation.
atomic.StoreUint64(&kv.TxnTotalSizeLimit, 10240)
tk.MustQuery("select sum(id) from t1").Check(testkit.Rows("32896"))
// foreign key cascade behaviour will cause ErrTxnTooLarge.
tk.MustGetDBError("update t1 set id=id+100000 where id=1", kv.ErrTxnTooLarge)
tk.MustQuery("select sum(id) from t1").Check(testkit.Rows("32896"))
tk.MustGetDBError("update t1 set id=id+100000 where id=1", kv.ErrTxnTooLarge)
tk.MustQuery("select id,pid from t1 where id<3 order by id").Check(testkit.Rows("1 <nil>", "2 1"))
tk.MustExec("set @@foreign_key_checks=0")
tk.MustExec("update t1 set id=id+100000 where id=1")
tk.MustQuery("select id,pid from t1 where id<3 or pid is null order by id").Check(testkit.Rows("2 1", "100001 <nil>"))
}

func TestForeignKeyAndLockView(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (id int key)")
tk.MustExec("create table t2 (id int key, foreign key (id) references t1(id) ON DELETE CASCADE ON UPDATE CASCADE)")
tk.MustExec("insert into t1 values (1)")
tk.MustExec("insert into t2 values (1)")
tk.MustExec("begin pessimistic")
tk.MustExec("set @@foreign_key_checks=0")
tk.MustExec("update t2 set id=2")

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("set @@foreign_key_checks=1")
tk2.MustExec("use test")
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
tk2.MustExec("begin pessimistic")
tk2.MustExec("update t1 set id=2 where id=1")
tk2.MustExec("commit")
}()
time.Sleep(time.Millisecond * 200)
_, digest := parser.NormalizeDigest("update t1 set id=2 where id=1")
tk.MustQuery("select CURRENT_SQL_DIGEST from information_schema.tidb_trx where state='LockWaiting' and db='test'").Check(testkit.Rows(digest.String()))
tk.MustGetErrMsg("update t1 set id=2", "[executor:1213]Deadlock found when trying to get lock; try restarting transaction")
wg.Wait()
}

func TestForeignKeyAndMemoryTracker(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@foreign_key_checks=1")
tk.MustExec("use test")
tk.MustExec("create table t1 (id int auto_increment key, pid int, name varchar(200), index(pid));")
tk.MustExec("insert into t1 (name) values ('abcdefghijklmnopqrstuvwxyz1234567890abcdefghijklmnopqrstuvwxyz');")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t1 (name) select name from t1;")
}
tk.MustQuery("select count(*) from t1").Check(testkit.Rows("256"))
tk.MustExec("update t1 set pid=1 where id>1")
tk.MustExec("alter table t1 add foreign key (pid) references t1 (id) on update cascade")
tk.MustQuery("select sum(id) from t1").Check(testkit.Rows("32896"))
defer tk.MustExec("SET GLOBAL tidb_mem_oom_action = DEFAULT")
tk.MustExec("SET GLOBAL tidb_mem_oom_action='CANCEL'")
tk.MustExec("set @@tidb_mem_quota_query=40960;")
// foreign key cascade behaviour will exceed memory quota.
err := tk.ExecToErr("update t1 set id=id+100000 where id=1")
require.Error(t, err)
require.Contains(t, err.Error(), "Out Of Memory Quota!")
tk.MustQuery("select id,pid from t1 where id = 1").Check(testkit.Rows("1 <nil>"))
tk.MustExec("set @@foreign_key_checks=0")
// After disable foreign_key_checks, following DML will execute successful.
tk.MustExec("update t1 set id=id+100000 where id=1")
tk.MustQuery("select id,pid from t1 where id<3 or pid is null order by id").Check(testkit.Rows("2 1", "100001 <nil>"))
}
17 changes: 17 additions & 0 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,3 +825,20 @@ func TestPointGetIssue25167(t *testing.T) {
tk.MustExec("insert into t values (1)")
tk.MustQuery("select * from t as of timestamp @a where a = 1").Check(testkit.Rows())
}

func TestPointGetIssue40194(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1(id int primary key, v int)")
tk.MustExec("insert into t1 values(1, 10)")
tk.MustExec("prepare s from 'select * from t1 where id=1'")
tk.MustExec("set @@tidb_enable_plan_replayer_capture=1")
tk.MustQuery("execute s").Check(testkit.Rows("1 10"))
tk.MustQuery("execute s").Check(testkit.Rows("1 10"))

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("update t1 set v=v+1")
tk.MustQuery("execute s").Check(testkit.Rows("1 11"))
}
1 change: 1 addition & 0 deletions planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ go_test(
"flat_plan_test.go",
"fragment_test.go",
"indexmerge_intersection_test.go",
"indexmerge_path_test.go",
"indexmerge_test.go",
"integration_partition_test.go",
"integration_test.go",
Expand Down
6 changes: 6 additions & 0 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,12 @@ func (ds *DataSource) addSelection4PlanCache(task *rootTask, stats *property.Sta
// convertToIndexScan converts the DataSource to index scan with idx.
func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty,
candidate *candidatePath, _ *physicalOptimizeOp) (task task, err error) {
if candidate.path.Index.MVIndex {
// MVIndex is special since different index rows may return the same _row_id and this can break some assumptions of IndexReader.
// Currently only support using IndexMerge to access MVIndex instead of IndexReader.
// TODO: make IndexReader support accessing MVIndex directly.
return invalidTask, nil
}
if !candidate.path.IsSingleScan {
// If it's parent requires single read task, return max cost.
if prop.TaskTp == property.CopSingleReadTaskType {
Expand Down
Loading

0 comments on commit f34f99c

Please sign in to comment.