From bddfc6244e54ee3e7a7bfeda08f7b99c572d109a Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 28 Dec 2022 12:40:16 +0800 Subject: [PATCH 1/4] ddl: add more foreign key test case (#40052) close pingcap/tidb#40189 --- ddl/fktest/foreign_key_test.go | 110 ++++++++++++++++++ ddl/metadatalocktest/mdl_test.go | 41 +++++++ executor/fktest/BUILD.bazel | 1 + executor/fktest/foreign_key_test.go | 90 ++++++++++++++ .../addindextest/add_index_test.go | 26 +++++ .../realtikvtest/pessimistictest/BUILD.bazel | 1 + .../pessimistictest/pessimistic_test.go | 61 ++++++++++ tests/realtikvtest/testkit.go | 7 +- 8 files changed, 336 insertions(+), 1 deletion(-) diff --git a/ddl/fktest/foreign_key_test.go b/ddl/fktest/foreign_key_test.go index ba466a8cef07b..df461fa048e5c 100644 --- a/ddl/fktest/foreign_key_test.go +++ b/ddl/fktest/foreign_key_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "fmt" + "sync" "testing" "time" @@ -1700,3 +1701,112 @@ func TestForeignKeyWithCacheTable(t *testing.T) { tk.MustExec("alter table t2 nocache;") tk.MustExec("drop table t1,t2;") } + +func TestForeignKeyAndConcurrentDDL(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@foreign_key_checks=1;") + tk.MustExec("use test") + // Test foreign key refer cache table. + tk.MustExec("create table t1 (a int, b int, c int, index(a), index(b), index(c));") + tk.MustExec("create table t2 (a int, b int, c int, index(a), index(b), index(c));") + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("set @@foreign_key_checks=1;") + tk2.MustExec("use test") + passCases := []struct { + prepare []string + ddl1 string + ddl2 string + }{ + { + ddl1: "alter table t2 add constraint fk_1 foreign key (a) references t1(a)", + ddl2: "alter table t2 add constraint fk_2 foreign key (b) references t1(b)", + }, + { + ddl1: "alter table t2 drop foreign key fk_1", + ddl2: "alter table t2 drop foreign key fk_2", + }, + { + prepare: []string{ + "alter table t2 drop index a", + }, + ddl1: "alter table t2 add index(a)", + ddl2: "alter table t2 add constraint fk_1 foreign key (a) references t1(a)", + }, + { + ddl1: "alter table t2 drop index c", + ddl2: "alter table t2 add constraint fk_2 foreign key (b) references t1(b)", + }, + } + for _, ca := range passCases { + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + tk.MustExec(ca.ddl1) + }() + go func() { + defer wg.Done() + tk2.MustExec(ca.ddl2) + }() + wg.Wait() + } + errorCases := []struct { + prepare []string + ddl1 string + err1 string + ddl2 string + err2 string + }{ + { + ddl1: "alter table t2 add constraint fk foreign key (a) references t1(a)", + err1: "[ddl:1826]Duplicate foreign key constraint name 'fk'", + ddl2: "alter table t2 add constraint fk foreign key (b) references t1(b)", + err2: "[ddl:1826]Duplicate foreign key constraint name 'fk'", + }, + { + prepare: []string{ + "alter table t2 add constraint fk_1 foreign key (a) references t1(a)", + }, + ddl1: "alter table t2 drop foreign key fk_1", + err1: "[schema:1091]Can't DROP 'fk_1'; check that column/key exists", + ddl2: "alter table t2 drop foreign key fk_1", + err2: "[schema:1091]Can't DROP 'fk_1'; check that column/key exists", + }, + { + ddl1: "alter table t2 drop index a", + err1: "[ddl:1553]Cannot drop index 'a': needed in a foreign key constraint", + ddl2: "alter table t2 add constraint fk_1 foreign key (a) references t1(a)", + err2: "[ddl:-1]Failed to add the foreign key constraint. Missing index for 'fk_1' foreign key columns in the table 't2'", + }, + } + tk.MustExec("drop table t1,t2") + tk.MustExec("create table t1 (a int, b int, c int, index(a), index(b), index(c));") + tk.MustExec("create table t2 (a int, b int, c int, index(a), index(b), index(c));") + for i, ca := range errorCases { + for _, sql := range ca.prepare { + tk.MustExec(sql) + } + var wg sync.WaitGroup + var err1, err2 error + wg.Add(2) + go func() { + defer wg.Done() + err1 = tk.ExecToErr(ca.ddl1) + }() + go func() { + defer wg.Done() + err2 = tk2.ExecToErr(ca.ddl2) + }() + wg.Wait() + if (err1 == nil && err2 == nil) || (err1 != nil && err2 != nil) { + require.Failf(t, "both ddl1 and ddl2 execute success, but expect 1 error", fmt.Sprintf("idx: %v, err1: %v, err2: %v", i, err1, err2)) + } + if err1 != nil { + require.Equal(t, ca.err1, err1.Error()) + } + if err2 != nil { + require.Equal(t, ca.err2, err2.Error()) + } + } +} diff --git a/ddl/metadatalocktest/mdl_test.go b/ddl/metadatalocktest/mdl_test.go index 64bdf77d55707..fd307968cad73 100644 --- a/ddl/metadatalocktest/mdl_test.go +++ b/ddl/metadatalocktest/mdl_test.go @@ -257,6 +257,47 @@ func TestMDLBasicBatchPointGet(t *testing.T) { require.Less(t, ts1, ts2) } +func TestMDLAddForeignKey(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + sv := server.CreateMockServer(t, store) + + sv.SetDomain(dom) + dom.InfoSyncer().SetSessionManager(sv) + defer sv.Close() + + conn1 := server.CreateMockConn(t, sv) + tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session) + conn2 := server.CreateMockConn(t, sv) + tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session) + tk.MustExec("use test") + tk.MustExec("set global tidb_enable_metadata_lock=1") + tk.MustExec("create table t1(id int key);") + tk.MustExec("create table t2(id int key);") + + tk.MustExec("begin") + tk.MustExec("insert into t2 values(1);") + + var wg sync.WaitGroup + var ddlErr error + wg.Add(1) + var ts2 time.Time + go func() { + defer wg.Done() + ddlErr = tkDDL.ExecToErr("alter table test.t2 add foreign key (id) references t1(id)") + ts2 = time.Now() + }() + + time.Sleep(2 * time.Second) + + ts1 := time.Now() + tk.MustExec("commit") + + wg.Wait() + require.Error(t, ddlErr) + require.Equal(t, "[ddl:1452]Cannot add or update a child row: a foreign key constraint fails (`test`.`t2`, CONSTRAINT `fk_1` FOREIGN KEY (`id`) REFERENCES `t1` (`id`))", ddlErr.Error()) + require.Less(t, ts1, ts2) +} + func TestMDLRRUpdateSchema(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) sv := server.CreateMockServer(t, store) diff --git a/executor/fktest/BUILD.bazel b/executor/fktest/BUILD.bazel index f245bba152c59..2c9f00dfa0624 100644 --- a/executor/fktest/BUILD.bazel +++ b/executor/fktest/BUILD.bazel @@ -15,6 +15,7 @@ go_test( "//infoschema", "//kv", "//meta/autoid", + "//parser", "//parser/ast", "//parser/auth", "//parser/format", diff --git a/executor/fktest/foreign_key_test.go b/executor/fktest/foreign_key_test.go index 8d6442f39fad4..1dc92d6954a2e 100644 --- a/executor/fktest/foreign_key_test.go +++ b/executor/fktest/foreign_key_test.go @@ -21,6 +21,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/format" @@ -2643,3 +2645,91 @@ func TestForeignKeyOnReplaceInto(t *testing.T) { tk.MustExec("replace into t1 values (1, 'new-boss', null)") tk.MustQuery("select id from t1 order by id").Check(testkit.Rows("1")) } + +func TestForeignKeyLargeTxnErr(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@foreign_key_checks=1") + tk.MustExec("use test") + tk.MustExec("create table t1 (id int auto_increment key, pid int, name varchar(200), index(pid));") + tk.MustExec("insert into t1 (name) values ('abcdefghijklmnopqrstuvwxyz1234567890abcdefghijklmnopqrstuvwxyz1234567890abcdefghijklmnopqrstuvwxyz1234567890abcdefghijklmnopqrstuvwxyz1234567890abcdefghijklmnopqrstuvwxyz1234567890');") + for i := 0; i < 8; i++ { + tk.MustExec("insert into t1 (name) select name from t1;") + } + tk.MustQuery("select count(*) from t1").Check(testkit.Rows("256")) + tk.MustExec("update t1 set pid=1 where id>1") + tk.MustExec("alter table t1 add foreign key (pid) references t1 (id) on update cascade") + originLimit := atomic.LoadUint64(&kv.TxnTotalSizeLimit) + defer func() { + atomic.StoreUint64(&kv.TxnTotalSizeLimit, originLimit) + }() + // Set the limitation to a small value, make it easier to reach the limitation. + atomic.StoreUint64(&kv.TxnTotalSizeLimit, 10240) + tk.MustQuery("select sum(id) from t1").Check(testkit.Rows("32896")) + // foreign key cascade behaviour will cause ErrTxnTooLarge. + tk.MustGetDBError("update t1 set id=id+100000 where id=1", kv.ErrTxnTooLarge) + tk.MustQuery("select sum(id) from t1").Check(testkit.Rows("32896")) + tk.MustGetDBError("update t1 set id=id+100000 where id=1", kv.ErrTxnTooLarge) + tk.MustQuery("select id,pid from t1 where id<3 order by id").Check(testkit.Rows("1 ", "2 1")) + tk.MustExec("set @@foreign_key_checks=0") + tk.MustExec("update t1 set id=id+100000 where id=1") + tk.MustQuery("select id,pid from t1 where id<3 or pid is null order by id").Check(testkit.Rows("2 1", "100001 ")) +} + +func TestForeignKeyAndLockView(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (id int key)") + tk.MustExec("create table t2 (id int key, foreign key (id) references t1(id) ON DELETE CASCADE ON UPDATE CASCADE)") + tk.MustExec("insert into t1 values (1)") + tk.MustExec("insert into t2 values (1)") + tk.MustExec("begin pessimistic") + tk.MustExec("set @@foreign_key_checks=0") + tk.MustExec("update t2 set id=2") + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("set @@foreign_key_checks=1") + tk2.MustExec("use test") + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + tk2.MustExec("begin pessimistic") + tk2.MustExec("update t1 set id=2 where id=1") + tk2.MustExec("commit") + }() + time.Sleep(time.Millisecond * 200) + _, digest := parser.NormalizeDigest("update t1 set id=2 where id=1") + tk.MustQuery("select CURRENT_SQL_DIGEST from information_schema.tidb_trx where state='LockWaiting' and db='test'").Check(testkit.Rows(digest.String())) + tk.MustGetErrMsg("update t1 set id=2", "[executor:1213]Deadlock found when trying to get lock; try restarting transaction") + wg.Wait() +} + +func TestForeignKeyAndMemoryTracker(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@foreign_key_checks=1") + tk.MustExec("use test") + tk.MustExec("create table t1 (id int auto_increment key, pid int, name varchar(200), index(pid));") + tk.MustExec("insert into t1 (name) values ('abcdefghijklmnopqrstuvwxyz1234567890abcdefghijklmnopqrstuvwxyz');") + for i := 0; i < 8; i++ { + tk.MustExec("insert into t1 (name) select name from t1;") + } + tk.MustQuery("select count(*) from t1").Check(testkit.Rows("256")) + tk.MustExec("update t1 set pid=1 where id>1") + tk.MustExec("alter table t1 add foreign key (pid) references t1 (id) on update cascade") + tk.MustQuery("select sum(id) from t1").Check(testkit.Rows("32896")) + defer tk.MustExec("SET GLOBAL tidb_mem_oom_action = DEFAULT") + tk.MustExec("SET GLOBAL tidb_mem_oom_action='CANCEL'") + tk.MustExec("set @@tidb_mem_quota_query=40960;") + // foreign key cascade behaviour will exceed memory quota. + err := tk.ExecToErr("update t1 set id=id+100000 where id=1") + require.Error(t, err) + require.Contains(t, err.Error(), "Out Of Memory Quota!") + tk.MustQuery("select id,pid from t1 where id = 1").Check(testkit.Rows("1 ")) + tk.MustExec("set @@foreign_key_checks=0") + // After disable foreign_key_checks, following DML will execute successful. + tk.MustExec("update t1 set id=id+100000 where id=1") + tk.MustQuery("select id,pid from t1 where id<3 or pid is null order by id").Check(testkit.Rows("2 1", "100001 ")) +} diff --git a/tests/realtikvtest/addindextest/add_index_test.go b/tests/realtikvtest/addindextest/add_index_test.go index 7dd4919570594..1c1403f66a922 100644 --- a/tests/realtikvtest/addindextest/add_index_test.go +++ b/tests/realtikvtest/addindextest/add_index_test.go @@ -100,3 +100,29 @@ func TestCreateMultiColsIndex(t *testing.T) { ctx := initTest(t) testTwoColsFrame(ctx, coliIDs, coljIDs, addIndexMultiCols) } + +func TestAddForeignKeyWithAutoCreateIndex(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists fk_index;") + tk.MustExec("create database fk_index;") + tk.MustExec("use fk_index;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=1;`) + tk.MustExec("create table employee (id bigint auto_increment key, pid bigint)") + tk.MustExec("insert into employee (id) values (1),(2),(3),(4),(5),(6),(7),(8)") + for i := 0; i < 14; i++ { + tk.MustExec("insert into employee (pid) select pid from employee") + } + tk.MustExec("update employee set pid=id-1 where id>1") + tk.MustQuery("select count(*) from employee").Check(testkit.Rows("131072")) + tk.MustExec("alter table employee add foreign key fk_1(pid) references employee(id)") + tk.MustExec("alter table employee drop foreign key fk_1") + tk.MustExec("alter table employee drop index fk_1") + tk.MustExec("update employee set pid=0 where id=1") + tk.MustGetErrMsg("alter table employee add foreign key fk_1(pid) references employee(id)", + "[ddl:1452]Cannot add or update a child row: a foreign key constraint fails (`fk_index`.`employee`, CONSTRAINT `fk_1` FOREIGN KEY (`pid`) REFERENCES `employee` (`id`))") + tk.MustExec("update employee set pid=null where id=1") + tk.MustExec("insert into employee (pid) select pid from employee") + tk.MustExec("update employee set pid=id-1 where id>1 and pid is null") + tk.MustExec("alter table employee add foreign key fk_1(pid) references employee(id)") +} diff --git a/tests/realtikvtest/pessimistictest/BUILD.bazel b/tests/realtikvtest/pessimistictest/BUILD.bazel index 97890c8b8b70b..67a01e83cf386 100644 --- a/tests/realtikvtest/pessimistictest/BUILD.bazel +++ b/tests/realtikvtest/pessimistictest/BUILD.bazel @@ -18,6 +18,7 @@ go_test( "//parser/model", "//parser/mysql", "//parser/terror", + "//planner/core", "//session", "//sessionctx/variable", "//sessiontxn", diff --git a/tests/realtikvtest/pessimistictest/pessimistic_test.go b/tests/realtikvtest/pessimistictest/pessimistic_test.go index ae7545e0e91f6..a70b31f0a87b8 100644 --- a/tests/realtikvtest/pessimistictest/pessimistic_test.go +++ b/tests/realtikvtest/pessimistictest/pessimistic_test.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" @@ -2816,6 +2817,66 @@ func TestAsyncCommitCalTSFail(t *testing.T) { tk2.MustExec("commit") } +func TestAsyncCommitAndForeignKey(t *testing.T) { + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.TiKVClient.AsyncCommit.SafeWindow = time.Second + conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 + }) + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := createAsyncCommitTestKit(t, store) + tk.MustExec("drop table if exists t_parent, t_child") + tk.MustExec("create table t_parent (id int primary key)") + tk.MustExec("create table t_child (id int primary key, pid int, foreign key (pid) references t_parent(id) on delete cascade on update cascade)") + tk.MustExec("insert into t_parent values (1),(2),(3),(4)") + tk.MustExec("insert into t_child values (1,1),(2,2),(3,3)") + tk.MustExec("set tidb_enable_1pc = true") + tk.MustExec("begin pessimistic") + tk.MustExec("delete from t_parent where id in (1,4)") + tk.MustExec("update t_parent set id=22 where id=2") + tk.MustExec("commit") + tk.MustQuery("select * from t_parent order by id").Check(testkit.Rows("3", "22")) + tk.MustQuery("select * from t_child order by id").Check(testkit.Rows("2 22", "3 3")) +} + +func TestTransactionIsolationAndForeignKey(t *testing.T) { + if !*realtikvtest.WithRealTiKV { + t.Skip("The test only support test with tikv.") + } + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk2.MustExec("use test") + tk.MustExec("drop table if exists t1,t2") + tk.MustExec("create table t1 (id int primary key)") + tk.MustExec("create table t2 (id int primary key, pid int, foreign key (pid) references t1(id) on delete cascade on update cascade)") + tk.MustExec("insert into t1 values (1)") + tk.MustExec("set tx_isolation = 'READ-COMMITTED'") + tk.MustExec("begin pessimistic") + tk.MustExec("insert into t2 values (1,1)") + tk.MustGetDBError("insert into t2 values (2,2)", plannercore.ErrNoReferencedRow2) + tk2.MustExec("insert into t1 values (2)") + tk.MustQuery("select * from t1").Check(testkit.Rows("1", "2")) + tk.MustExec("insert into t2 values (2,2)") + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + tk2.MustExec("delete from t1 where id=2") + }() + time.Sleep(time.Millisecond * 10) + tk.MustExec("commit") + wg.Wait() + tk.MustQuery("select * from t1").Check(testkit.Rows("1")) + tk.MustQuery("select * from t2").Check(testkit.Rows("1 1")) + tk2.MustExec("delete from t1 where id=1") + tk.MustQuery("select * from t1").Check(testkit.Rows()) + tk.MustQuery("select * from t2").Check(testkit.Rows()) + tk.MustExec("admin check table t1") + tk.MustExec("admin check table t2") +} + func TestChangeLockToPut(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) diff --git a/tests/realtikvtest/testkit.go b/tests/realtikvtest/testkit.go index b3ae5f3c6a2ac..4b8a749e65c9d 100644 --- a/tests/realtikvtest/testkit.go +++ b/tests/realtikvtest/testkit.go @@ -19,6 +19,7 @@ package realtikvtest import ( "flag" "fmt" + "strings" "sync/atomic" "testing" "time" @@ -110,8 +111,12 @@ func CreateMockStoreAndDomainAndSetup(t *testing.T, opts ...mockstore.MockTiKVSt tk.MustExec(fmt.Sprintf("set global innodb_lock_wait_timeout = %d", variable.DefInnodbLockWaitTimeout)) tk.MustExec("use test") rs := tk.MustQuery("show tables") + tables := []string{} for _, row := range rs.Rows() { - tk.MustExec(fmt.Sprintf("drop table %s", row[0])) + tables = append(tables, fmt.Sprintf("`%v`", row[0])) + } + if len(tables) > 0 { + tk.MustExec(fmt.Sprintf("drop table %s", strings.Join(tables, ","))) } } else { store, err = mockstore.NewMockStore(opts...) From 11f5c1791ec723723380feea11b080cb5dfefd5c Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 27 Dec 2022 22:02:16 -0700 Subject: [PATCH 2/4] *: add support for -initialize-sql-file on first bootstrap (#35625) close pingcap/tidb#35624 --- config/config.go | 3 +++ session/bootstrap.go | 37 ++++++++++++++++++++++++++ session/bootstrap_test.go | 56 +++++++++++++++++++++++++++++++++++++++ session/session.go | 10 +++++-- tidb-server/main.go | 18 ++++++++++--- 5 files changed, 119 insertions(+), 5 deletions(-) diff --git a/config/config.go b/config/config.go index 2352a17acf0d1..bc25b8c9b9ec3 100644 --- a/config/config.go +++ b/config/config.go @@ -261,6 +261,9 @@ type Config struct { // EnableGlobalKill indicates whether to enable global kill. TrxSummary TrxSummary `toml:"transaction-summary" json:"transaction-summary"` EnableGlobalKill bool `toml:"enable-global-kill" json:"enable-global-kill"` + // InitializeSQLFile is a file that will be executed after first bootstrap only. + // It can be used to set GLOBAL system variable values + InitializeSQLFile string `toml:"initialize-sql-file" json:"initialize-sql-file"` // The following items are deprecated. We need to keep them here temporarily // to support the upgrade process. They can be removed in future. diff --git a/session/bootstrap.go b/session/bootstrap.go index f76f8b4bb1d36..94484eefd9438 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -23,6 +23,7 @@ import ( "encoding/hex" "flag" "fmt" + "io/ioutil" osuser "os/user" "runtime/debug" "strconv" @@ -526,6 +527,7 @@ func bootstrap(s Session) { if dom.DDL().OwnerManager().IsOwner() { doDDLWorks(s) doDMLWorks(s) + runBootstrapSQLFile = true logutil.BgLogger().Info("bootstrap successful", zap.Duration("take time", time.Since(startTime))) return @@ -746,6 +748,9 @@ var currentBootstrapVersion int64 = version109 // DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it. var internalSQLTimeout = owner.ManagerSessionTTL + 15 +// whether to run the sql file in bootstrap. +var runBootstrapSQLFile = false + var ( bootstrapVersion = []func(Session, int64){ upgradeToVer2, @@ -2321,6 +2326,38 @@ func doDDLWorks(s Session) { mustExecute(s, CreateTTLTableStatus) } +// doBootstrapSQLFile executes SQL commands in a file as the last stage of bootstrap. +// It is useful for setting the initial value of GLOBAL variables. +func doBootstrapSQLFile(s Session) { + sqlFile := config.GetGlobalConfig().InitializeSQLFile + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) + if sqlFile == "" { + return + } + logutil.BgLogger().Info("executing -initialize-sql-file", zap.String("file", sqlFile)) + b, err := ioutil.ReadFile(sqlFile) //nolint:gosec + if err != nil { + logutil.BgLogger().Fatal("unable to read InitializeSQLFile", zap.Error(err)) + } + stmts, err := s.Parse(ctx, string(b)) + if err != nil { + logutil.BgLogger().Fatal("unable to parse InitializeSQLFile", zap.Error(err)) + } + for _, stmt := range stmts { + rs, err := s.ExecuteStmt(ctx, stmt) + if err != nil { + logutil.BgLogger().Warn("InitializeSQLFile error", zap.Error(err)) + } + if rs != nil { + // I don't believe we need to drain the result-set in bootstrap mode + // but if required we can do this here in future. + if err := rs.Close(); err != nil { + logutil.BgLogger().Fatal("unable to close result", zap.Error(err)) + } + } + } +} + // inTestSuite checks if we are bootstrapping in the context of tests. // There are some historical differences in behavior between tests and non-tests. func inTestSuite() bool { diff --git a/session/bootstrap_test.go b/session/bootstrap_test.go index 81e4703d72217..a010daf32b14c 100644 --- a/session/bootstrap_test.go +++ b/session/bootstrap_test.go @@ -17,12 +17,14 @@ package session import ( "context" "fmt" + "os" "strconv" "strings" "testing" "time" "github.com/pingcap/tidb/bindinfo" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/auth" @@ -1043,6 +1045,60 @@ func TestUpgradeToVer85(t *testing.T) { mustExec(t, se, "delete from mysql.bind_info where default_db = 'test'") } +func TestInitializeSQLFile(t *testing.T) { + // We create an initialize-sql-file and then bootstrap the server with it. + // The observed behavior should be that tidb_enable_noop_variables is now + // disabled, and the feature works as expected. + initializeSQLFile, err := os.CreateTemp("", "init.sql") + require.NoError(t, err) + defer func() { + path := initializeSQLFile.Name() + err = initializeSQLFile.Close() + require.NoError(t, err) + err = os.Remove(path) + require.NoError(t, err) + }() + // Implicitly test multi-line init files + _, err = initializeSQLFile.WriteString( + "CREATE DATABASE initsqlfiletest;\n" + + "SET GLOBAL tidb_enable_noop_variables = OFF;\n") + require.NoError(t, err) + + // Create a mock store + // Set the config parameter for initialize sql file + store, err := mockstore.NewMockStore() + require.NoError(t, err) + config.GetGlobalConfig().InitializeSQLFile = initializeSQLFile.Name() + defer func() { + require.NoError(t, store.Close()) + config.GetGlobalConfig().InitializeSQLFile = "" + }() + + // Bootstrap with the InitializeSQLFile config option + dom, err := BootstrapSession(store) + require.NoError(t, err) + defer dom.Close() + se := createSessionAndSetID(t, store) + ctx := context.Background() + r, err := exec(se, `SHOW VARIABLES LIKE 'query_cache_type'`) + require.NoError(t, err) + req := r.NewChunk(nil) + err = r.Next(ctx, req) + require.NoError(t, err) + require.Equal(t, 0, req.NumRows()) // not shown in noopvariables mode + require.NoError(t, r.Close()) + + r, err = exec(se, `SHOW VARIABLES LIKE 'tidb_enable_noop_variables'`) + require.NoError(t, err) + req = r.NewChunk(nil) + err = r.Next(ctx, req) + require.NoError(t, err) + require.Equal(t, 1, req.NumRows()) + row := req.GetRow(0) + require.Equal(t, []byte("OFF"), row.GetBytes(1)) + require.NoError(t, r.Close()) +} + func TestTiDBEnablePagingVariable(t *testing.T) { store, dom := createStoreAndBootstrap(t) se := createSessionAndSetID(t, store) diff --git a/session/session.go b/session/session.go index d358d761560e2..e58d534197582 100644 --- a/session/session.go +++ b/session/session.go @@ -3297,7 +3297,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota) concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency) - ses, err := createSessions(store, 9) + ses, err := createSessions(store, 10) if err != nil { return nil, err } @@ -3397,7 +3397,13 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { // setup historical stats worker dom.SetupHistoricalStatsWorker(ses[8]) dom.StartHistoricalStatsWorker() - + if runBootstrapSQLFile { + pm := &privileges.UserPrivileges{ + Handle: dom.PrivilegeHandle(), + } + privilege.BindPrivilegeManager(ses[9], pm) + doBootstrapSQLFile(ses[9]) + } // A sub context for update table stats, and other contexts for concurrent stats loading. cnt := 1 + concurrency syncStatsCtxs, err := createSessions(store, cnt) diff --git a/tidb-server/main.go b/tidb-server/main.go index 0e42c059e6c05..97ad1917105ca 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -122,6 +122,7 @@ const ( nmInitializeSecure = "initialize-secure" nmInitializeInsecure = "initialize-insecure" + nmInitializeSQLFile = "initialize-sql-file" nmDisconnectOnExpiredPassword = "disconnect-on-expired-password" ) @@ -166,9 +167,10 @@ var ( proxyProtocolNetworks = flag.String(nmProxyProtocolNetworks, "", "proxy protocol networks allowed IP or *, empty mean disable proxy protocol support") proxyProtocolHeaderTimeout = flag.Uint(nmProxyProtocolHeaderTimeout, 5, "proxy protocol header read timeout, unit is second. (Deprecated: as proxy protocol using lazy mode, header read timeout no longer used)") - // Security + // Bootstrap and security initializeSecure = flagBoolean(nmInitializeSecure, false, "bootstrap tidb-server in secure mode") initializeInsecure = flagBoolean(nmInitializeInsecure, true, "bootstrap tidb-server in insecure mode") + initializeSQLFile = flag.String(nmInitializeSQLFile, "", "SQL file to execute on first bootstrap") disconnectOnExpiredPassword = flagBoolean(nmDisconnectOnExpiredPassword, true, "the server disconnects the client when the password is expired") ) @@ -531,7 +533,7 @@ func overrideConfig(cfg *config.Config) { // Sanity check: can't specify both options if actualFlags[nmInitializeSecure] && actualFlags[nmInitializeInsecure] { - err = fmt.Errorf("the options --initialize-insecure and --initialize-secure are mutually exclusive") + err = fmt.Errorf("the options -initialize-insecure and -initialize-secure are mutually exclusive") terror.MustNil(err) } // The option --initialize-secure=true ensures that a secure bootstrap is used. @@ -550,9 +552,19 @@ func overrideConfig(cfg *config.Config) { // which is not supported on windows. Only the insecure bootstrap // method is supported. if runtime.GOOS == "windows" && cfg.Security.SecureBootstrap { - err = fmt.Errorf("the option --initialize-secure is not supported on Windows") + err = fmt.Errorf("the option -initialize-secure is not supported on Windows") terror.MustNil(err) } + // Initialize SQL File is used to run a set of SQL statements after first bootstrap. + // It is important in the use case that you want to set GLOBAL variables, which + // are persisted to the cluster and not read from a config file. + if actualFlags[nmInitializeSQLFile] { + if _, err := os.Stat(*initializeSQLFile); err != nil { + err = fmt.Errorf("can not access -initialize-sql-file %s", *initializeSQLFile) + terror.MustNil(err) + } + cfg.InitializeSQLFile = *initializeSQLFile + } } func setVersions() { From b268c65710b2eedd5bec3f3c222ed488f5aa1772 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Wed, 28 Dec 2022 16:00:17 +0800 Subject: [PATCH 3/4] *: fix PointGet will return an stale value when `tidb_enable_plan_replayer_capture` is set (#40197) close pingcap/tidb#40194 --- executor/adapter.go | 19 ++++++++++++++----- executor/compiler.go | 5 +++++ executor/point_get_test.go | 17 +++++++++++++++++ planner/core/plan_cache_utils.go | 11 +++++++---- session/session.go | 4 ---- sessiontxn/interface.go | 2 ++ sessiontxn/isolation/optimistic.go | 8 +++++++- 7 files changed, 52 insertions(+), 14 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 5195ee4d1dae5..c087a50e5f5f0 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "fmt" + "math" "runtime/trace" "strconv" "strings" @@ -295,8 +296,12 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) { } a.Ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityHigh + var pointExecutor *PointGetExecutor + useMaxTS := startTs == math.MaxUint64 + // try to reuse point get executor - if a.PsStmt.Executor != nil { + // We should only use the cached the executor when the startTS is MaxUint64 + if a.PsStmt.Executor != nil && useMaxTS { exec, ok := a.PsStmt.Executor.(*PointGetExecutor) if !ok { logutil.Logger(ctx).Error("invalid executor type, not PointGetExecutor for point get path") @@ -306,17 +311,21 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) { pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan) exec.Init(pointGetPlan) a.PsStmt.Executor = exec + pointExecutor = exec } } - if a.PsStmt.Executor == nil { + + if pointExecutor == nil { b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti) - newExecutor := b.build(a.Plan) + pointExecutor = b.build(a.Plan).(*PointGetExecutor) if b.err != nil { return nil, b.err } - a.PsStmt.Executor = newExecutor + + if useMaxTS { + a.PsStmt.Executor = pointExecutor + } } - pointExecutor := a.PsStmt.Executor.(*PointGetExecutor) if err = pointExecutor.Open(ctx); err != nil { terror.Call(pointExecutor.Close) diff --git a/executor/compiler.go b/executor/compiler.go index e000a22ba633e..821561899f4e7 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -157,6 +157,11 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS } } } + + if err = sessiontxn.OptimizeWithPlanAndThenWarmUp(c.Ctx, stmt.Plan); err != nil { + return nil, err + } + if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL { startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS() if err != nil { diff --git a/executor/point_get_test.go b/executor/point_get_test.go index c615c3a75cb1a..8f13675457481 100644 --- a/executor/point_get_test.go +++ b/executor/point_get_test.go @@ -825,3 +825,20 @@ func TestPointGetIssue25167(t *testing.T) { tk.MustExec("insert into t values (1)") tk.MustQuery("select * from t as of timestamp @a where a = 1").Check(testkit.Rows()) } + +func TestPointGetIssue40194(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1(id int primary key, v int)") + tk.MustExec("insert into t1 values(1, 10)") + tk.MustExec("prepare s from 'select * from t1 where id=1'") + tk.MustExec("set @@tidb_enable_plan_replayer_capture=1") + tk.MustQuery("execute s").Check(testkit.Rows("1 10")) + tk.MustQuery("execute s").Check(testkit.Rows("1 10")) + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.MustExec("update t1 set v=v+1") + tk.MustQuery("execute s").Check(testkit.Rows("1 11")) +} diff --git a/planner/core/plan_cache_utils.go b/planner/core/plan_cache_utils.go index 8dc867316207d..6408d269ef799 100644 --- a/planner/core/plan_cache_utils.go +++ b/planner/core/plan_cache_utils.go @@ -405,10 +405,13 @@ func NewPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*model.Ta // PlanCacheStmt store prepared ast from PrepareExec and other related fields type PlanCacheStmt struct { - PreparedAst *ast.Prepared - StmtDB string // which DB the statement will be processed over - VisitInfos []visitInfo - ColumnInfos interface{} + PreparedAst *ast.Prepared + StmtDB string // which DB the statement will be processed over + VisitInfos []visitInfo + ColumnInfos interface{} + // Executor is only used for point get scene. + // Notice that we should only cache the PointGetExecutor that have a snapshot with MaxTS in it. + // If the current plan is not PointGet or does not use MaxTS optimization, this value should be nil here. Executor interface{} NormalizedSQL string NormalizedPlan string diff --git a/session/session.go b/session/session.go index e58d534197582..528f647978240 100644 --- a/session/session.go +++ b/session/session.go @@ -2180,10 +2180,6 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex // Transform abstract syntax tree to a physical plan(stored in executor.ExecStmt). compiler := executor.Compiler{Ctx: s} stmt, err := compiler.Compile(ctx, stmtNode) - if err == nil { - err = sessiontxn.OptimizeWithPlanAndThenWarmUp(s, stmt.Plan) - } - if err != nil { s.rollbackOnError(ctx) diff --git a/sessiontxn/interface.go b/sessiontxn/interface.go index 85d9217a90c0f..d91f1c291b588 100644 --- a/sessiontxn/interface.go +++ b/sessiontxn/interface.go @@ -160,8 +160,10 @@ type TxnManager interface { // GetReadReplicaScope returns the read replica scope GetReadReplicaScope() string // GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update) + // Calling this method will activate the txn implicitly if current read is not stale/historical read GetStmtReadTS() (uint64, error) // GetStmtForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update + // Calling this method will activate the txn implicitly if current read is not stale/historical read GetStmtForUpdateTS() (uint64, error) // GetContextProvider returns the current TxnContextProvider GetContextProvider() TxnContextProvider diff --git a/sessiontxn/isolation/optimistic.go b/sessiontxn/isolation/optimistic.go index 3c60eba09331b..9a1f8d58aabbd 100644 --- a/sessiontxn/isolation/optimistic.go +++ b/sessiontxn/isolation/optimistic.go @@ -114,6 +114,12 @@ func (p *OptimisticTxnContextProvider) AdviseOptimizeWithPlan(plan interface{}) return nil } + if p.txn != nil { + // `p.txn != nil` means the txn has already been activated, we should not optimize the startTS because the startTS + // has already been used. + return nil + } + realPlan, ok := plan.(plannercore.Plan) if !ok { return nil @@ -141,7 +147,7 @@ func (p *OptimisticTxnContextProvider) AdviseOptimizeWithPlan(plan interface{}) zap.Uint64("conn", sessVars.ConnectionID), zap.String("text", sessVars.StmtCtx.OriginalSQL), ) - return nil + return err } p.optimizeWithMaxTS = true From f9af75f2ae3c1a16e2d3d8f34708c56d54a192d1 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Wed, 28 Dec 2022 16:42:16 +0800 Subject: [PATCH 4/4] planner: support converting `json_member_of` to IndexMerge to access MVIndex (#40175) ref pingcap/tidb#40191 --- planner/core/BUILD.bazel | 1 + planner/core/find_best_task.go | 6 + planner/core/indexmerge_path.go | 168 +++++++++++++++++- planner/core/indexmerge_path_test.go | 53 ++++++ planner/core/logical_plan_builder.go | 3 + .../core/testdata/index_merge_suite_in.json | 10 ++ .../core/testdata/index_merge_suite_out.json | 53 ++++++ 7 files changed, 292 insertions(+), 2 deletions(-) create mode 100644 planner/core/indexmerge_path_test.go diff --git a/planner/core/BUILD.bazel b/planner/core/BUILD.bazel index 1a7ec1196f0f1..b7f37923547be 100644 --- a/planner/core/BUILD.bazel +++ b/planner/core/BUILD.bazel @@ -171,6 +171,7 @@ go_test( "flat_plan_test.go", "fragment_test.go", "indexmerge_intersection_test.go", + "indexmerge_path_test.go", "indexmerge_test.go", "integration_partition_test.go", "integration_test.go", diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 639bc15dbdc98..f14e343610b5d 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -1419,6 +1419,12 @@ func (ds *DataSource) addSelection4PlanCache(task *rootTask, stats *property.Sta // convertToIndexScan converts the DataSource to index scan with idx. func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candidate *candidatePath, _ *physicalOptimizeOp) (task task, err error) { + if candidate.path.Index.MVIndex { + // MVIndex is special since different index rows may return the same _row_id and this can break some assumptions of IndexReader. + // Currently only support using IndexMerge to access MVIndex instead of IndexReader. + // TODO: make IndexReader support accessing MVIndex directly. + return invalidTask, nil + } if !candidate.path.IsSingleScan { // If it's parent requires single read task, return max cost. if prop.TaskTp == property.CopSingleReadTaskType { diff --git a/planner/core/indexmerge_path.go b/planner/core/indexmerge_path.go index 51f950bf7d61a..f0ecf02a00231 100644 --- a/planner/core/indexmerge_path.go +++ b/planner/core/indexmerge_path.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/planner/util" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/ranger" "go.uber.org/zap" @@ -63,7 +64,19 @@ func (ds *DataSource) generateIndexMergePath() error { _, remaining := expression.PushDownExprs(stmtCtx, indexMergeConds, ds.ctx.GetClient(), kv.UnSpecified) stmtCtx.SetWarnings(warnings) stmtCtx.SetExtraWarnings(extraWarnings) - if len(remaining) != 0 { + + remainingExpr := 0 + for _, expr := range remaining { + // Handle these 3 functions specially since they can be used to access MVIndex. + if sf, ok := expr.(*expression.ScalarFunction); ok { + if sf.FuncName.L == ast.JSONMemberOf || sf.FuncName.L == ast.JSONOverlaps || + sf.FuncName.L == ast.JSONContains { + continue + } + } + remainingExpr++ + } + if remainingExpr > 0 { needConsiderIndexMerge = false } } @@ -435,8 +448,16 @@ func (ds *DataSource) generateAndPruneIndexMergePath(indexMergeConds []expressio if indexMergeAndPath != nil { ds.possibleAccessPaths = append(ds.possibleAccessPaths, indexMergeAndPath) } + // 3. Generate possible IndexMerge paths for MVIndex. + mvIndexMergePath, err := ds.generateIndexMergeJSONMVIndexPath(regularPathCount, indexMergeConds) + if err != nil { + return err + } + if mvIndexMergePath != nil { + ds.possibleAccessPaths = append(ds.possibleAccessPaths, mvIndexMergePath...) + } - // 3. If needed, append a warning if no IndexMerge is generated. + // 4. If needed, append a warning if no IndexMerge is generated. // If without hints, it means that `enableIndexMerge` is true if len(ds.indexMergeHints) == 0 { @@ -467,3 +488,146 @@ func (ds *DataSource) generateAndPruneIndexMergePath(indexMergeConds []expressio } return nil } + +// generateIndexMergeJSONMVIndexPath generates paths for (json_member_of / json_overlaps / json_contains) on multi-valued index. +/* + 1. select * from t where 1 member of (a) + IndexMerge(AND) + IndexRangeScan(a, [1,1]) + TableRowIdScan(t) + 2. select * from t where json_contains(a, '[1, 2, 3]') + IndexMerge(AND) + IndexRangeScan(a, [1,1]) + IndexRangeScan(a, [2,2]) + IndexRangeScan(a, [3,3]) + TableRowIdScan(t) + 3. select * from t where json_overlap(a, '[1, 2, 3]') + IndexMerge(OR) + IndexRangeScan(a, [1,1]) + IndexRangeScan(a, [2,2]) + IndexRangeScan(a, [3,3]) + TableRowIdScan(t) +*/ +func (ds *DataSource) generateIndexMergeJSONMVIndexPath(normalPathCnt int, filters []expression.Expression) (mvIndexPaths []*util.AccessPath, err error) { + for idx := 0; idx < normalPathCnt; idx++ { + if ds.possibleAccessPaths[idx].IsTablePath() || ds.possibleAccessPaths[idx].Index == nil || !ds.possibleAccessPaths[idx].Index.MVIndex { + continue // not a MVIndex path + } + if !ds.isSpecifiedInIndexMergeHints(ds.possibleAccessPaths[idx].Index.Name.L) { + continue // for safety, only consider using MVIndex when there is a `use_index_merge` hint now. + // TODO: remove this limitation + } + + // Step 1. Extract the underlying JSON column from MVIndex Info. + mvIndex := ds.possibleAccessPaths[idx].Index + if len(mvIndex.Columns) != 1 { + // only support single-column MVIndex now: idx((cast(a->'$.zip' as signed array))) + // TODO: support composite MVIndex idx((x, cast(a->'$.zip' as int array), z)) + continue + } + mvVirColOffset := mvIndex.Columns[0].Offset + mvVirColMeta := ds.table.Meta().Cols()[mvVirColOffset] + + var virCol *expression.Column + for _, ce := range ds.TblCols { + if ce.ID == mvVirColMeta.ID { + virCol = ce.Clone().(*expression.Column) + virCol.RetType = ce.GetType().ArrayType() // use the underlying type directly: JSON-ARRAY(INT) --> INT + break + } + } + // unwrap the outside cast: cast(json_extract(test.t.a, $.zip), JSON) --> json_extract(test.t.a, $.zip) + targetJSONPath, ok := unwrapJSONCast(virCol.VirtualExpr) + if !ok { + continue + } + + // Step 2. Iterate all filters and generate corresponding IndexMerge paths. + for filterIdx, filter := range filters { + // Step 2.1. Extract jsonPath and vals from json_member / json_overlaps / json_contains functions. + sf, ok := filter.(*expression.ScalarFunction) + if !ok { + continue + } + + var jsonPath expression.Expression + var vals []expression.Expression + switch sf.FuncName.L { + case ast.JSONMemberOf: // (1 member of a->'$.zip') + jsonPath = sf.GetArgs()[1] + v, ok := unwrapJSONCast(sf.GetArgs()[0]) // cast(1 as json) --> 1 + if !ok { + continue + } + vals = append(vals, v) + case ast.JSONOverlaps: // (json_overlaps(a->'$.zip', '[1, 2, 3]') + continue // TODO: support json_overlaps + case ast.JSONContains: // (json_contains(a->'$.zip', '[1, 2, 3]') + continue // TODO: support json_contains + default: + continue + } + + // Step 2.2. Check some limitations. + if jsonPath == nil || len(vals) == 0 { + continue + } + if !jsonPath.Equal(ds.ctx, targetJSONPath) { + continue // not on the same JSON col + } + // only support INT now + // TODO: support more types + if jsonPath.GetType().EvalType() == types.ETInt { + continue + } + allInt := true + // TODO: support using IndexLookUp to handle single-value cases. + for _, v := range vals { + if v.GetType().EvalType() != types.ETInt { + allInt = false + } + } + if !allInt { + continue + } + + // Step 2.3. Generate a IndexMerge Path of this filter on the current MVIndex. + var partialPaths []*util.AccessPath + for _, v := range vals { + partialPath := &util.AccessPath{Index: mvIndex} + partialPath.Ranges = ranger.FullRange() + // TODO: get the actual column length of this virtual column + partialPath.IdxCols, partialPath.IdxColLens = []*expression.Column{virCol}, []int{types.UnspecifiedLength} + partialPath.FullIdxCols, partialPath.FullIdxColLens = []*expression.Column{virCol}, []int{types.UnspecifiedLength} + + // calculate the path range with the condition `a->'$.zip' = 1`. + eq, err := expression.NewFunction(ds.ctx, ast.EQ, types.NewFieldType(mysql.TypeTiny), virCol, v) + if err != nil { + return nil, err + } + if err = ds.detachCondAndBuildRangeForPath(partialPath, []expression.Expression{eq}); err != nil { + return nil, err + } + + partialPaths = append(partialPaths, partialPath) + } + indexMergePath := ds.buildIndexMergeOrPath(filters, partialPaths, filterIdx) + mvIndexPaths = append(mvIndexPaths, indexMergePath) + } + } + return +} + +func unwrapJSONCast(expr expression.Expression) (expression.Expression, bool) { + if expr == nil { + return nil, false + } + sf, ok := expr.(*expression.ScalarFunction) + if !ok { + return nil, false + } + if sf == nil || sf.FuncName.L != ast.Cast || sf.GetType().EvalType() != types.ETJson { + return nil, false + } + return sf.GetArgs()[0], true +} diff --git a/planner/core/indexmerge_path_test.go b/planner/core/indexmerge_path_test.go new file mode 100644 index 0000000000000..fcb0d27903c64 --- /dev/null +++ b/planner/core/indexmerge_path_test.go @@ -0,0 +1,53 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core_test + +import ( + "testing" + + "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/testkit/testdata" +) + +func TestIndexMergeJSONMemberOf(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`create table t( +a int, j0 json, j1 json, +index j0_0((cast(j0->'$.path0' as signed array))), +index j0_1((cast(j0->'$.path1' as signed array))), +index j1((cast(j1 as signed array))))`) + + var input []string + var output []struct { + SQL string + Plan []string + } + planSuiteData := core.GetIndexMergeSuiteData() + planSuiteData.LoadTestCases(t, &input, &output) + + for i, query := range input { + testdata.OnRecord(func() { + output[i].SQL = query + }) + result := tk.MustQuery("explain format = 'brief' " + query) + testdata.OnRecord(func() { + output[i].Plan = testdata.ConvertRowsToStrings(result.Rows()) + }) + result.Check(testkit.Rows(output[i].Plan...)) + } +} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 6bc8a677bc6de..f60c1f617e467 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -4673,7 +4673,10 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as if i < len(columns) { if columns[i].IsGenerated() && !columns[i].GeneratedStored { var err error + originVal := b.allowBuildCastArray + b.allowBuildCastArray = true expr, _, err = b.rewrite(ctx, columns[i].GeneratedExpr, ds, nil, true) + b.allowBuildCastArray = originVal if err != nil { return nil, err } diff --git a/planner/core/testdata/index_merge_suite_in.json b/planner/core/testdata/index_merge_suite_in.json index db7ebacdb29c7..2841de33bae0c 100644 --- a/planner/core/testdata/index_merge_suite_in.json +++ b/planner/core/testdata/index_merge_suite_in.json @@ -1,4 +1,14 @@ [ + { + "name": "TestIndexMergeJSONMemberOf", + "cases": [ + "select /*+ use_index_merge(t, j0_0) */ * from t where (1 member of (j0->'$.path0'))", + "select /*+ use_index_merge(t, j0_1) */ * from t where (1 member of (j0->'$.path1')) and a<10", + "select /*+ use_index_merge(t, j0_1) */ * from t where (1 member of (j0->'$.XXX')) and a<10", + "select /*+ use_index_merge(t, j0_1) */ * from t where (1 member of (j0->'$.path1')) and (2 member of (j1)) and a<10", + "select /*+ use_index_merge(t, j1) */ * from t where (1 member of (j0->'$.path1')) and (2 member of (j1)) and a<10" + ] + }, { "name": "TestIndexMergePathGeneration", "cases": [ diff --git a/planner/core/testdata/index_merge_suite_out.json b/planner/core/testdata/index_merge_suite_out.json index 3d67e5e372251..31427fbf4c7e0 100644 --- a/planner/core/testdata/index_merge_suite_out.json +++ b/planner/core/testdata/index_merge_suite_out.json @@ -1,4 +1,57 @@ [ + { + "Name": "TestIndexMergeJSONMemberOf", + "Cases": [ + { + "SQL": "select /*+ use_index_merge(t, j0_0) */ * from t where (1 member of (j0->'$.path0'))", + "Plan": [ + "Selection 0.00 root json_memberof(cast(1, json BINARY), json_extract(test.t.j0, \"$.path0\"))", + "└─IndexMerge 0.00 root type: union", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t, index:j0_0(cast(json_extract(`j0`, _utf8mb4'$.path0') as signed array)) range:[1,1], keep order:false, stats:pseudo", + " └─TableRowIDScan(Probe) 0.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "select /*+ use_index_merge(t, j0_1) */ * from t where (1 member of (j0->'$.path1')) and a<10", + "Plan": [ + "Selection 0.00 root json_memberof(cast(1, json BINARY), json_extract(test.t.j0, \"$.path1\"))", + "└─IndexMerge 0.00 root type: union", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t, index:j0_1(cast(json_extract(`j0`, _utf8mb4'$.path1') as signed array)) range:[1,1], keep order:false, stats:pseudo", + " └─Selection(Probe) 0.00 cop[tikv] lt(test.t.a, 10)", + " └─TableRowIDScan 0.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "select /*+ use_index_merge(t, j0_1) */ * from t where (1 member of (j0->'$.XXX')) and a<10", + "Plan": [ + "Selection 2658.67 root json_memberof(cast(1, json BINARY), json_extract(test.t.j0, \"$.XXX\"))", + "└─TableReader 3323.33 root data:Selection", + " └─Selection 3323.33 cop[tikv] lt(test.t.a, 10)", + " └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "select /*+ use_index_merge(t, j0_1) */ * from t where (1 member of (j0->'$.path1')) and (2 member of (j1)) and a<10", + "Plan": [ + "Selection 0.00 root json_memberof(cast(1, json BINARY), json_extract(test.t.j0, \"$.path1\")), json_memberof(cast(2, json BINARY), test.t.j1)", + "└─IndexMerge 0.00 root type: union", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t, index:j0_1(cast(json_extract(`j0`, _utf8mb4'$.path1') as signed array)) range:[1,1], keep order:false, stats:pseudo", + " └─Selection(Probe) 0.00 cop[tikv] lt(test.t.a, 10)", + " └─TableRowIDScan 0.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "select /*+ use_index_merge(t, j1) */ * from t where (1 member of (j0->'$.path1')) and (2 member of (j1)) and a<10", + "Plan": [ + "Selection 0.00 root json_memberof(cast(1, json BINARY), json_extract(test.t.j0, \"$.path1\")), json_memberof(cast(2, json BINARY), test.t.j1)", + "└─IndexMerge 0.00 root type: union", + " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t, index:j1(cast(`j1` as signed array)) range:[2,2], keep order:false, stats:pseudo", + " └─Selection(Probe) 0.00 cop[tikv] lt(test.t.a, 10)", + " └─TableRowIDScan 0.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + } + ] + }, { "Name": "TestIndexMergePathGeneration", "Cases": [