diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index 2b27b40704767..bb4f563b44ae7 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -13,7 +13,7 @@ import ( // interface is to bulk create table parallelly type BulkCreateTableSession interface { - CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, batchDdlSize uint) error + CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error } // Glue is an abstraction of TiDB function calls used in BR. diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index b2dd9c1fa711b..d8c5cba59badd 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -131,9 +131,9 @@ func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) } // CreateTable implements glue.Session. -func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, batchDdlSize uint) error { +func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error { d := domain.GetDomain(gs.se).DDL() - log.Info("tidb start create tables", zap.Uint("batchDdlSize", batchDdlSize)) + log.Info("tidb start create tables") var dbName model.CIStr cloneTables := make([]*model.TableInfo, 0, len(tables)) @@ -159,7 +159,8 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo cloneTables = append(cloneTables, table) } gs.se.SetValue(sessionctx.QueryString, queryBuilder.String()) - err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, ddl.OnExistIgnore, true) + err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, ddl.OnExistIgnore) + if err != nil { log.Info("Bulk create table from tidb failure, it possible caused by version mismatch with BR.", zap.String("Error", err.Error())) return err diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 8ec2006ca59b2..9d3dd6ca67c55 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -12,6 +12,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/opentracing/opentracing-go" @@ -81,6 +82,7 @@ type Client struct { restoreStores []uint64 + cipher *backuppb.CipherInfo storage storage.ExternalStorage backend *backuppb.StorageBackend switchModeInterval time.Duration @@ -135,6 +137,10 @@ func (rc *Client) SetRateLimit(rateLimit uint64) { rc.rateLimit = rateLimit } +func (rc *Client) SetCrypter(crypter *backuppb.CipherInfo) { + rc.cipher = crypter +} + // SetStorage set ExternalStorage for client. func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error { var err error @@ -415,7 +421,7 @@ func (rc *Client) createTables( if rc.IsSkipCreateSQL() { log.Info("skip create table and alter autoIncID") } else { - err := db.CreateTables(ctx, tables, rc.GetBatchDdlSize()) + err := db.CreateTables(ctx, tables) if err != nil { return nil, errors.Trace(err) } @@ -451,11 +457,12 @@ func (rc *Client) createTable( dom *domain.Domain, table *metautil.Table, newTS uint64, + ddlTables map[UniqueTableName]bool, ) (CreatedTable, error) { if rc.IsSkipCreateSQL() { log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name)) } else { - err := db.CreateTable(ctx, table) + err := db.CreateTable(ctx, table, ddlTables) if err != nil { return CreatedTable{}, errors.Trace(err) } @@ -494,6 +501,7 @@ func (rc *Client) GoCreateTables( // Could we have a smaller size of tables? log.Info("start create tables") + ddlTables := rc.DDLJobsMap() if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("Client.GoCreateTables", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -501,55 +509,63 @@ func (rc *Client) GoCreateTables( } outCh := make(chan CreatedTable, len(tables)) rater := logutil.TraceRateOver(logutil.MetricTableCreatedCounter) - err := rc.createTablesInWorkerPool(ctx, dom, tables, dbPool, newTS, outCh) - //cts, err := rc.createTables(ctx, rc.db, dom, tables, newTS) - if err == nil { - defer close(outCh) - // fall back to old create table (sequential create table) - } else if strings.Contains(err.Error(), "[ddl:8204]invalid ddl job") { - log.Info("fall back to the old DDL way to create table.") - createOneTable := func(c context.Context, db *DB, t *metautil.Table) error { - select { - case <-c.Done(): - return c.Err() - default: - } - rt, err := rc.createTable(c, db, dom, t, newTS) - if err != nil { - log.Error("create table failed", - zap.Error(err), - zap.Stringer("db", t.DB.Name), - zap.Stringer("table", t.Info.Name)) - return errors.Trace(err) - } - log.Debug("table created and send to next", - zap.Int("output chan size", len(outCh)), - zap.Stringer("table", t.Info.Name), - zap.Stringer("database", t.DB.Name)) - outCh <- rt - rater.Inc() - rater.L().Info("table created", - zap.Stringer("table", t.Info.Name), - zap.Stringer("database", t.DB.Name)) - return nil - } - go func() { + var err error = nil + if rc.batchDllSize > 0 { + err = rc.createTablesInWorkerPool(ctx, dom, tables, dbPool, newTS, outCh) + + if err == nil { + log.Info("bulk to create tables success.") defer close(outCh) - defer log.Debug("all tables are created") - var err error - if len(dbPool) > 0 { - err = rc.createTablesWithDBPool(ctx, createOneTable, tables, dbPool) - } else { - err = rc.createTablesWithSoleDB(ctx, createOneTable, tables) - } - if err != nil { - errCh <- err - } - }() - } else { - errCh <- err + // fall back to old create table (sequential create table) + } else if strings.Contains(err.Error(), "[ddl:8204]invalid ddl job") { + log.Info("fall back to the old DDL way to create table.") + } else { + log.Error("bulk to create tables failure.") + errCh <- err + return outCh + } + } + + createOneTable := func(c context.Context, db *DB, t *metautil.Table) error { + select { + case <-c.Done(): + return c.Err() + default: + + } + rt, err := rc.createTable(c, db, dom, t, newTS, ddlTables) + if err != nil { + log.Error("create table failed", + zap.Error(err), + zap.Stringer("db", t.DB.Name), + zap.Stringer("table", t.Info.Name)) + return errors.Trace(err) + } + log.Debug("table created and send to next", + zap.Int("output chan size", len(outCh)), + zap.Stringer("table", t.Info.Name), + zap.Stringer("database", t.DB.Name)) + outCh <- rt + rater.Inc() + rater.L().Info("table created", + zap.Stringer("table", t.Info.Name), + zap.Stringer("database", t.DB.Name)) + return nil } + go func() { + defer close(outCh) + defer log.Debug("all tables are created") + var err error + if len(dbPool) > 0 { + err = rc.createTablesWithDBPool(ctx, createOneTable, tables, dbPool) + } else { + err = rc.createTablesWithSoleDB(ctx, createOneTable, tables) + } + if err != nil { + errCh <- err + } + }() return outCh } @@ -585,7 +601,9 @@ func (rc *Client) createTablesInWorkerPool(ctx context.Context, dom *domain.Doma workers := utils.NewWorkerPool(uint(len(dbPool)), "Create Tables Worker") numOfTables := len(tables) lastSent := 0 - for i := int(rc.batchDllSize); i <= numOfTables; i = i + int(rc.batchDllSize) { + + for i := int(rc.batchDllSize); i < numOfTables+int(rc.batchDllSize); i = i + int(rc.batchDllSize) { + log.Info("create tables", zap.Int("table start", lastSent), zap.Int("table end", i)) if i > numOfTables { i = numOfTables @@ -727,7 +745,7 @@ func (rc *Client) RestoreFiles( zap.Duration("take", time.Since(fileStart))) updateCh.Inc() }() - return rc.fileImporter.Import(ectx, filesReplica, rewriteRules) + return rc.fileImporter.Import(ectx, filesReplica, rewriteRules, rc.cipher) }) } @@ -768,7 +786,7 @@ func (rc *Client) RestoreRaw( rc.workerPool.ApplyOnErrorGroup(eg, func() error { defer updateCh.Inc() - return rc.fileImporter.Import(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule()) + return rc.fileImporter.Import(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule(), rc.cipher) }) } if err := eg.Wait(); err != nil { @@ -844,6 +862,8 @@ func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMo gctx, store.GetAddress(), opt, + grpc.WithBlock(), + grpc.FailOnNonTempDialError(true), grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}), // we don't need to set keepalive timeout here, because the connection lives // at most 5s. (shorter than minimal value for keepalive time!) @@ -880,17 +900,25 @@ func (rc *Client) GoValidateChecksum( ) <-chan struct{} { log.Info("Start to validate checksum") outCh := make(chan struct{}, 1) + wg := new(sync.WaitGroup) + wg.Add(2) + loadStatCh := make(chan *CreatedTable, 1024) + // run the stat loader + go func() { + defer wg.Done() + rc.updateMetaAndLoadStats(ctx, loadStatCh) + }() workers := utils.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum") go func() { - wg, ectx := errgroup.WithContext(ctx) + eg, ectx := errgroup.WithContext(ctx) defer func() { - log.Info("all checksum ended") - if err := wg.Wait(); err != nil { + if err := eg.Wait(); err != nil { errCh <- err } - outCh <- struct{}{} - close(outCh) + close(loadStatCh) + wg.Done() }() + for { select { // if we use ectx here, maybe canceled will mask real error. @@ -900,14 +928,14 @@ func (rc *Client) GoValidateChecksum( if !ok { return } - workers.ApplyOnErrorGroup(wg, func() error { + + workers.ApplyOnErrorGroup(eg, func() error { start := time.Now() defer func() { elapsed := time.Since(start) - summary.CollectDuration("restore checksum", elapsed) summary.CollectSuccessUnit("table checksum", 1, elapsed) }() - err := rc.execChecksum(ectx, tbl, kvClient, concurrency) + err := rc.execChecksum(ectx, tbl, kvClient, concurrency, loadStatCh) if err != nil { return errors.Trace(err) } @@ -917,10 +945,21 @@ func (rc *Client) GoValidateChecksum( } } }() + go func() { + wg.Wait() + log.Info("all checksum ended") + close(outCh) + }() return outCh } -func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient kv.Client, concurrency uint) error { +func (rc *Client) execChecksum( + ctx context.Context, + tbl CreatedTable, + kvClient kv.Client, + concurrency uint, + loadStatCh chan<- *CreatedTable, +) error { logger := log.With( zap.String("db", tbl.OldTable.DB.Name.O), zap.String("table", tbl.OldTable.Info.Name.O), @@ -969,16 +1008,49 @@ func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient k ) return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum") } - if table.Stats != nil { - logger.Info("start loads analyze after validate checksum", - zap.Int64("old id", tbl.OldTable.Info.ID), - zap.Int64("new id", tbl.Table.ID), - ) - if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil { - logger.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err)) + + loadStatCh <- &tbl + return nil +} + +func (rc *Client) updateMetaAndLoadStats(ctx context.Context, input <-chan *CreatedTable) { + for { + select { + case <-ctx.Done(): + return + case tbl, ok := <-input: + if !ok { + return + } + + // Not need to return err when failed because of update analysis-meta + restoreTS, err := rc.GetTS(ctx) + if err != nil { + log.Error("getTS failed", zap.Error(err)) + } else { + err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, tbl.OldTable.TotalKvs) + if err != nil { + log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err)) + } + } + + table := tbl.OldTable + if table.Stats != nil { + log.Info("start loads analyze after validate checksum", + zap.Int64("old id", tbl.OldTable.Info.ID), + zap.Int64("new id", tbl.Table.ID), + ) + start := time.Now() + if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil { + log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err)) + } + log.Info("restore stat done", + zap.String("table", table.Info.Name.L), + zap.String("db", table.DB.Name.L), + zap.Duration("cost", time.Since(start))) + } } } - return nil } const ( @@ -1155,6 +1227,24 @@ func (rc *Client) IsSkipCreateSQL() bool { return rc.noSchema } +// DDLJobsMap returns a map[UniqueTableName]bool about < db table, hasCreate/hasTruncate DDL >. +// if we execute some DDLs before create table. +// we may get two situation that need to rebase auto increment/random id. +// 1. truncate table: truncate will generate new id cache. +// 2. create table/create and rename table: the first create table will lock down the id cache. +// because we cannot create onExistReplace table. +// so the final create DDL with the correct auto increment/random id won't be executed. +func (rc *Client) DDLJobsMap() map[UniqueTableName]bool { + m := make(map[UniqueTableName]bool) + for _, job := range rc.ddlJobs { + switch job.Type { + case model.ActionTruncateTable, model.ActionCreateTable, model.ActionRenameTable: + m[UniqueTableName{job.SchemaName, job.BinlogInfo.TableInfo.Name.String()}] = true + } + } + return m +} + // PreCheckTableTiFlashReplica checks whether TiFlash replica is less than TiFlash node. func (rc *Client) PreCheckTableTiFlashReplica( ctx context.Context, diff --git a/br/pkg/restore/db.go b/br/pkg/restore/db.go index bae7cace49d53..9935d6608cf51 100644 --- a/br/pkg/restore/db.go +++ b/br/pkg/restore/db.go @@ -14,6 +14,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "go.uber.org/zap" ) @@ -22,6 +23,11 @@ type DB struct { se glue.Session } +type UniqueTableName struct { + DB string + Table string +} + // NewDB returns a new DB. func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { se, err := g.CreateSession(store) @@ -87,6 +93,28 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { return errors.Trace(err) } +// UpdateStatsMeta update count and snapshot ts in mysql.stats_meta +func (db *DB) UpdateStatsMeta(ctx context.Context, tableID int64, restoreTS uint64, count uint64) error { + sysDB := mysql.SystemDB + statsMetaTbl := "stats_meta" + + // set restoreTS to snapshot and version which is used to update stats_meta + err := db.se.ExecuteInternal( + ctx, + "update %n.%n set snapshot = %?, version = %?, count = %? where table_id = %?", + sysDB, + statsMetaTbl, + restoreTS, + restoreTS, + count, + tableID, + ) + if err != nil { + log.Error("execute update sql failed", zap.Error(err)) + } + return nil +} + // CreateDatabase executes a CREATE DATABASE SQL. func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { err := db.se.CreateDatabase(ctx, schema) @@ -97,14 +125,16 @@ func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { } // CreateTable executes a CREATE TABLE SQL. -func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table, batchDdlSize uint) error { +func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table) error { if bse, ok := db.se.(glue.BulkCreateTableSession); ok { - log.Info("session supports bulk create table.", zap.Uint("batchDdlSize", batchDdlSize), zap.Int("table size", len(tables))) + + log.Info("session supports bulk create table.", zap.Int("table size", len(tables))) + m := map[string][]*model.TableInfo{} for _, table := range tables { m[table.DB.Name.L] = append(m[table.DB.Name.L], table.Info) } - if err := bse.CreateTables(ctx, m, batchDdlSize); err != nil { + if err := bse.CreateTables(ctx, m); err != nil { return err } } @@ -169,7 +199,7 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table, batchD } // CreateTable executes a CREATE TABLE SQL. -func (db *DB) CreateTable(ctx context.Context, table *metautil.Table) error { +func (db *DB) CreateTable(ctx context.Context, table *metautil.Table, ddlTables map[UniqueTableName]bool) error { err := db.se.CreateTable(ctx, table.DB.Name, table.Info) if err != nil { log.Error("create table failed", @@ -179,7 +209,11 @@ func (db *DB) CreateTable(ctx context.Context, table *metautil.Table) error { return errors.Trace(err) } - if table.Info.IsSequence() { + var restoreMetaSQL string + switch { + case table.Info.IsView(): + return nil + case table.Info.IsSequence(): setValFormat := fmt.Sprintf("do setval(%s.%s, %%d);", utils.EncloseName(table.DB.Name.O), utils.EncloseName(table.Info.Name.O)) @@ -220,8 +254,38 @@ func (db *DB) CreateTable(ctx context.Context, table *metautil.Table) error { return errors.Trace(err) } } - restoreMetaSQL := fmt.Sprintf(setValFormat, table.Info.AutoIncID) - if err = db.se.Execute(ctx, restoreMetaSQL); err != nil { + restoreMetaSQL = fmt.Sprintf(setValFormat, table.Info.AutoIncID) + err = db.se.Execute(ctx, restoreMetaSQL) + if err != nil { + log.Error("restore meta sql failed", + zap.String("query", restoreMetaSQL), + zap.Stringer("db", table.DB.Name), + zap.Stringer("table", table.Info.Name), + zap.Error(err)) + return errors.Trace(err) + } + // only table exists in ddlJobs during incremental restoration should do alter after creation. + case ddlTables[UniqueTableName{table.DB.Name.String(), table.Info.Name.String()}]: + if utils.NeedAutoID(table.Info) { + restoreMetaSQL = fmt.Sprintf( + "alter table %s.%s auto_increment = %d;", + utils.EncloseName(table.DB.Name.O), + utils.EncloseName(table.Info.Name.O), + table.Info.AutoIncID) + } else if table.Info.PKIsHandle && table.Info.ContainsAutoRandomBits() { + restoreMetaSQL = fmt.Sprintf( + "alter table %s.%s auto_random_base = %d", + utils.EncloseName(table.DB.Name.O), + utils.EncloseName(table.Info.Name.O), + table.Info.AutoRandID) + } else { + log.Info("table exists in incremental ddl jobs, but don't need to be altered", + zap.Stringer("db", table.DB.Name), + zap.Stringer("table", table.Info.Name)) + return nil + } + err = db.se.Execute(ctx, restoreMetaSQL) + if err != nil { log.Error("restore meta sql failed", zap.String("query", restoreMetaSQL), zap.Stringer("db", table.DB.Name), @@ -268,20 +332,15 @@ func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs [ } } - type namePair struct { - db string - table string - } - for _, table := range tables { tableIDs := make(map[int64]bool) tableIDs[table.Info.ID] = true - tableNames := make(map[namePair]bool) - name := namePair{table.DB.Name.String(), table.Info.Name.String()} + tableNames := make(map[UniqueTableName]bool) + name := UniqueTableName{table.DB.Name.String(), table.Info.Name.String()} tableNames[name] = true for _, job := range allDDLJobs { if job.BinlogInfo.TableInfo != nil { - name := namePair{job.SchemaName, job.BinlogInfo.TableInfo.Name.String()} + name = UniqueTableName{job.SchemaName, job.BinlogInfo.TableInfo.Name.String()} if tableIDs[job.TableID] || tableNames[name] { ddlJobs = append(ddlJobs, job) tableIDs[job.TableID] = true diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 0f3347fd1d337..a8879134e72c0 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -47,7 +47,7 @@ const ( defaultPDConcurrency = 1 defaultBatchFlushInterval = 16 * time.Second defaultDDLConcurrency = 16 - defaultFlagDdlBatchSize = 500 + defaultFlagDdlBatchSize = 0 ) // RestoreCommonConfig is the common configuration for all BR restore tasks. @@ -83,7 +83,7 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) { "the threshold of merging small regions (Default 960_000, region split key count)") flags.Uint(FlagPDConcurrency, defaultPDConcurrency, "concurrency pd-relative operations like split & scatter.") - flags.Duration(FlagBatchFlushInterval, defaultFlagDdlBatchSize, + flags.Duration(FlagBatchFlushInterval, defaultBatchFlushInterval, "batch size for ddl to create a batch of tabes") flags.Uint(FlagDdlBatchSize, defaultFlagDdlBatchSize, "concurrency pd-relative operations like split & scatter.") diff --git a/ddl/table_test.go b/ddl/table_test.go index 98b5824c03d8e..44ef965d70673 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -202,12 +202,164 @@ func TestTable(t *testing.T) { ) require.NoError(t, err) + + job = testDropTableT(t, ctx, ddl, dbInfo, tblInfo) + testCheckJobDoneT(t, ddl, job, false) + + // for truncate table + tblInfo, err = testTableInfo(ddl, "tt", 3) + require.NoError(t, err) + job = testCreateTableT(t, ctx, ddl, dbInfo, tblInfo) + testCheckTableStateT(t, ddl, dbInfo, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + job = testTruncateTable(t, ctx, ddl, dbInfo, tblInfo) + testCheckTableStateT(t, ddl, dbInfo, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + + // for rename table + dbInfo1, err := testSchemaInfo(ddl, "test_rename_table") + require.NoError(t, err) + testCreateSchemaT(t, testNewContext(ddl), ddl, dbInfo1) + job = testRenameTable(t, ctx, ddl, dbInfo1.ID, dbInfo.ID, dbInfo.Name, tblInfo) + testCheckTableStateT(t, ddl, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + + job = testLockTable(t, ctx, ddl, dbInfo1.ID, tblInfo, model.TableLockWrite) + testCheckTableStateT(t, ddl, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + checkTableLockedTest(t, ddl, dbInfo1, tblInfo, ddl.GetID(), ctx.GetSessionVars().ConnectionID, model.TableLockWrite) + // for alter cache table + job = testAlterCacheTable(t, ctx, ddl, dbInfo1.ID, tblInfo) + testCheckTableStateT(t, ddl, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + checkTableCacheTest(t, ddl, dbInfo1, tblInfo) + // for alter no cache table + job = testAlterNoCacheTable(t, ctx, ddl, dbInfo1.ID, tblInfo) + testCheckTableStateT(t, ddl, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + checkTableNoCacheTest(t, ddl, dbInfo1, tblInfo) + + testDropSchemaT(t, testNewContext(ddl), ddl, dbInfo) + err = ddl.Stop() + require.NoError(t, err) + err = store.Close() + require.NoError(t, err) +} + +func checkTableCacheTest(t *testing.T, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { + tt := meta.NewMeta(txn) + info, err := tt.GetTable(dbInfo.ID, tblInfo.ID) + require.NoError(t, err) + require.NotNil(t, info) + require.NotNil(t, info.TableCacheStatusType) + require.Equal(t, model.TableCacheStatusEnable, info.TableCacheStatusType) + return nil + }) + require.NoError(t, err) +} + +func checkTableNoCacheTest(t *testing.T, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { + tt := meta.NewMeta(txn) + info, err := tt.GetTable(dbInfo.ID, tblInfo.ID) + require.NoError(t, err) + require.NotNil(t, info) + require.Equal(t, model.TableCacheStatusDisable, info.TableCacheStatusType) + return nil + }) + require.NoError(t, err) +} + +func testAlterCacheTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo) *model.Job { + job := &model.Job{ + SchemaID: newSchemaID, + TableID: tblInfo.ID, + Type: model.ActionAlterCacheTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{}, + } + err := d.doDDLJob(ctx, job) + require.NoError(t, err) + + v := getSchemaVerT(t, ctx) + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v}) + return job +} + +func testAlterNoCacheTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo) *model.Job { + + job := &model.Job{ + SchemaID: newSchemaID, + TableID: tblInfo.ID, + Type: model.ActionAlterNoCacheTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{}, + } + err := d.doDDLJob(ctx, job) + require.NoError(t, err) + + v := getSchemaVerT(t, ctx) + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v}) + return job +} + +func TestCreateTables(t *testing.T) { + store, err := mockstore.NewMockStore() + require.NoError(t, err) + ddl, err := testNewDDLAndStart( + context.Background(), + WithStore(store), + WithLease(testLease), + ) + require.NoError(t, err) + dbInfo, err := testSchemaInfo(ddl, "test_table") require.NoError(t, err) testCreateSchemaT(t, testNewContext(ddl), ddl, dbInfo) ctx := testNewContext(ddl) + infos := []*model.TableInfo{} + genIDs, err := ddl.genGlobalIDs(3) + require.NoError(t, err) + + infos = append(infos, &model.TableInfo{ + ID: genIDs[0], + Name: model.NewCIStr("s1"), + }) + infos = append(infos, &model.TableInfo{ + ID: genIDs[1], + Name: model.NewCIStr("s2"), + }) + infos = append(infos, &model.TableInfo{ + ID: genIDs[2], + Name: model.NewCIStr("s3"), + }) + + job := &model.Job{ + SchemaID: dbInfo.ID, + Type: model.ActionCreateTables, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{infos}, + } + + err = d.doDDLJob(ctx, job) + c.Assert(err, IsNil) + + t1 := testGetTable(c, d, s.dbInfo.ID, genIDs[0]) + c.Assert(t1, NotNil) + t2 := testGetTable(c, d, s.dbInfo.ID, genIDs[1]) + c.Assert(t2, NotNil) + t3 := testGetTable(c, d, s.dbInfo.ID, genIDs[2]) + c.Assert(t3, NotNil) +} + +func (s *testTableSuite) TestTable(c *C) { + d := s.d + + ctx := testNewContext(ddl) + tblInfo, err := testTableInfo(ddl, "t", 3) require.NoError(t, err) job := testCreateTableT(t, ctx, ddl, dbInfo, tblInfo) diff --git a/executor/executor_test.go b/executor/executor_test.go index ef4f434a9fb67..edd72e030c38f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -24,8 +24,7 @@ import ( "net" "os" "path/filepath" - "reflect" - "runtime" + "strconv" "strings" "sync" @@ -9505,129 +9504,3 @@ func (s *testSerialSuite) TestIssue30289(c *C) { c.Assert(err.Error(), Matches, "issue30289 build return error") } -// Test invoke Close without invoking Open before for each operators. -func (s *testSerialSuite) TestUnreasonablyClose(c *C) { - defer testleak.AfterTest(c)() - - is := infoschema.MockInfoSchema([]*model.TableInfo{plannercore.MockSignedTable(), plannercore.MockUnsignedTable()}) - se, err := session.CreateSession4Test(s.store) - c.Assert(err, IsNil) - _, err = se.Execute(context.Background(), "use test") - c.Assert(err, IsNil) - // To enable the shuffleExec operator. - _, err = se.Execute(context.Background(), "set @@tidb_merge_join_concurrency=4") - c.Assert(err, IsNil) - - var opsNeedsCovered = []plannercore.PhysicalPlan{ - &plannercore.PhysicalHashJoin{}, - &plannercore.PhysicalMergeJoin{}, - &plannercore.PhysicalIndexJoin{}, - &plannercore.PhysicalIndexHashJoin{}, - &plannercore.PhysicalTableReader{}, - &plannercore.PhysicalIndexReader{}, - &plannercore.PhysicalIndexLookUpReader{}, - &plannercore.PhysicalIndexMergeReader{}, - &plannercore.PhysicalApply{}, - &plannercore.PhysicalHashAgg{}, - &plannercore.PhysicalStreamAgg{}, - &plannercore.PhysicalLimit{}, - &plannercore.PhysicalSort{}, - &plannercore.PhysicalTopN{}, - &plannercore.PhysicalCTE{}, - &plannercore.PhysicalCTETable{}, - &plannercore.PhysicalMaxOneRow{}, - &plannercore.PhysicalProjection{}, - &plannercore.PhysicalSelection{}, - &plannercore.PhysicalTableDual{}, - &plannercore.PhysicalWindow{}, - &plannercore.PhysicalShuffle{}, - &plannercore.PhysicalUnionAll{}, - } - executorBuilder := executor.NewMockExecutorBuilderForTest(se, is, nil, math.MaxUint64, false, "global") - - var opsNeedsCoveredMask uint64 = 1< t1.a) AS a from t as t1) t", - "select /*+ hash_agg() */ count(f) from t group by a", - "select /*+ stream_agg() */ count(f) from t group by a", - "select * from t order by a, f", - "select * from t order by a, f limit 1", - "select * from t limit 1", - "select (select t1.a from t t1 where t1.a > t2.a) as a from t t2;", - "select a + 1 from t", - "select count(*) a from t having a > 1", - "select * from t where a = 1.1", - "with recursive cte1(c1) as (select 1 union select c1 + 1 from cte1 limit 5 offset 0) select * from cte1", - "select /*+use_index_merge(t, c_d_e, f)*/ * from t where c < 1 or f > 2", - "select sum(f) over (partition by f) from t", - "select /*+ merge_join(t1)*/ * from t t1 join t t2 on t1.d = t2.d", - "select a from t union all select a from t", - } { - comment := Commentf("case:%v sql:%s", i, tc) - c.Assert(err, IsNil, comment) - stmt, err := s.ParseOneStmt(tc, "", "") - c.Assert(err, IsNil, comment) - - err = se.NewTxn(context.Background()) - c.Assert(err, IsNil, comment) - p, _, err := planner.Optimize(context.TODO(), se, stmt, is) - c.Assert(err, IsNil, comment) - // This for loop level traverses the plan tree to get which operators are covered. - for child := []plannercore.PhysicalPlan{p.(plannercore.PhysicalPlan)}; len(child) != 0; { - newChild := make([]plannercore.PhysicalPlan, 0, len(child)) - for _, ch := range child { - found := false - for k, t := range opsNeedsCovered { - if reflect.TypeOf(t) == reflect.TypeOf(ch) { - opsAlreadyCoveredMask |= 1 << k - found = true - break - } - } - c.Assert(found, IsTrue, Commentf("case: %v sql: %s operator %v is not registered in opsNeedsCoveredMask", i, tc, reflect.TypeOf(ch))) - switch x := ch.(type) { - case *plannercore.PhysicalCTE: - newChild = append(newChild, x.RecurPlan) - newChild = append(newChild, x.SeedPlan) - continue - case *plannercore.PhysicalShuffle: - newChild = append(newChild, x.DataSources...) - newChild = append(newChild, x.Tails...) - continue - } - newChild = append(newChild, ch.Children()...) - } - child = newChild - } - - e := executorBuilder.Build(p) - - func() { - defer func() { - r := recover() - buf := make([]byte, 4096) - stackSize := runtime.Stack(buf, false) - buf = buf[:stackSize] - c.Assert(r, IsNil, Commentf("case: %v\n sql: %s\n error stack: %v", i, tc, string(buf))) - }() - c.Assert(e.Close(), IsNil, comment) - }() - } - // The following code is used to make sure all the operators registered - // in opsNeedsCoveredMask are covered. - commentBuf := strings.Builder{} - if opsAlreadyCoveredMask != opsNeedsCoveredMask { - for i := range opsNeedsCovered { - if opsAlreadyCoveredMask&(1<= version80 { - return - } - // Check if tidb_analyze_version exists in mysql.GLOBAL_VARIABLES. - // If not, insert "tidb_analyze_version | 1" since this is the old behavior before we introduce this variable. - ctx := context.Background() - rs, err := s.ExecuteInternal(ctx, "SELECT VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME=%?;", - mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBAnalyzeVersion) - terror.MustNil(err) - req := rs.NewChunk(nil) - err = rs.Next(ctx, req) - terror.MustNil(err) - if req.NumRows() != 0 { - return - } - - mustExecute(s, "INSERT HIGH_PRIORITY IGNORE INTO %n.%n VALUES (%?, %?);", - mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBAnalyzeVersion, 1) -} func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" diff --git a/session/session_test.go b/session/session_test.go index 7b5febe0d18e0..f2ecee0574f68 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -5884,34 +5884,3 @@ func (s *testSessionSuite) TestSameNameObjectWithLocalTemporaryTable(c *C) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) } -func (s *testSessionSuite) TestWriteOnMultipleCachedTable(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists ct1, ct2") - tk.MustExec("create table ct1 (id int, c int)") - tk.MustExec("create table ct2 (id int, c int)") - tk.MustExec("alter table ct1 cache") - tk.MustExec("alter table ct2 cache") - tk.MustQuery("select * from ct1").Check(testkit.Rows()) - tk.MustQuery("select * from ct2").Check(testkit.Rows()) - - cached := false - for i := 0; i < 50; i++ { - if tk.HasPlan("select * from ct1", "Union") { - if tk.HasPlan("select * from ct2", "Union") { - cached = true - break - } - } - time.Sleep(100 * time.Millisecond) - } - c.Assert(cached, IsTrue) - - tk.MustExec("begin") - tk.MustExec("insert into ct1 values (3, 4)") - tk.MustExec("insert into ct2 values (5, 6)") - tk.MustExec("commit") - - tk.MustQuery("select * from ct1").Check(testkit.Rows("3 4")) - tk.MustQuery("select * from ct2").Check(testkit.Rows("5 6")) -} diff --git a/table/tables/cache.go b/table/tables/cache.go index 7e3eb7c40b9a9..a31f5d0780854 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -183,31 +183,57 @@ func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, t } // AddRecord implements the AddRecord method for the table.Table interface. -func (c *cachedTable) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { - txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) - return c.TableCommon.AddRecord(sctx, r, opts...) -} -func txnCtxAddCachedTable(sctx sessionctx.Context, tid int64, handle StateRemote) { - txnCtx := sctx.GetSessionVars().TxnCtx - if txnCtx.CachedTables == nil { - txnCtx.CachedTables = make(map[int64]interface{}) +func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { + txn, err := ctx.Txn(true) + if err != nil { + return nil, err } - if _, ok := txnCtx.CachedTables[tid]; !ok { - txnCtx.CachedTables[tid] = handle + now := txn.StartTS() + start := time.Now() + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) + if err != nil { + return nil, errors.Trace(err) } + ctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) + return c.TableCommon.AddRecord(ctx, r, opts...) + } // UpdateRecord implements table.Table func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { - txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) + + txn, err := sctx.Txn(true) + if err != nil { + return err + } + now := txn.StartTS() + start := time.Now() + err = c.handle.LockForWrite(ctx, c.Meta().ID, leaseFromTS(now)) + if err != nil { + return errors.Trace(err) + } + sctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) + return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched) } // RemoveRecord implements table.Table RemoveRecord interface. -func (c *cachedTable) RemoveRecord(sctx sessionctx.Context, h kv.Handle, r []types.Datum) error { - txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) - return c.TableCommon.RemoveRecord(sctx, h, r) + +func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { + txn, err := ctx.Txn(true) + if err != nil { + return err + } + now := txn.StartTS() + start := time.Now() + err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) + if err != nil { + return errors.Trace(err) + } + ctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) + return c.TableCommon.RemoveRecord(ctx, h, r) + } func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) func() { diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index aeddd5b972ab2..0138a01ab94e1 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -67,7 +67,9 @@ type StateRemote interface { LockForRead(ctx context.Context, tid int64, lease uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID - LockForWrite(ctx context.Context, tid int64) (uint64, error) + + LockForWrite(ctx context.Context, tid int64, lease uint64) error + // RenewLease attempt to renew the read / write lock on the table with the specified tableID RenewLease(ctx context.Context, tid int64, newTs uint64, op RenewLeaseType) (bool, error) @@ -132,32 +134,33 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint6 return succ, err } -// LockForWrite try to add a write lock to the table with the specified tableID, return the write lock lease. -func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64) (uint64, error) { + +func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, ts uint64) error { h.Lock() defer h.Unlock() - var ret uint64 for { - waitAndRetry, lease, err := h.lockForWriteOnce(ctx, tid) + waitAndRetry, err := h.lockForWriteOnce(ctx, tid, ts) if err != nil { - return 0, err + return err } if waitAndRetry == 0 { - ret = lease + break } time.Sleep(waitAndRetry) } - return ret, nil + + return nil } -func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64) (waitAndRetry time.Duration, ts uint64, err error) { +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, ts uint64) (waitAndRetry time.Duration, err error) { + err = h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) } - ts = leaseFromTS(now) + // The lease is outdated, so lock is invalid, clear orphan lock of any kind. if now > lease { if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { @@ -218,69 +221,38 @@ func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, newLease h.Lock() defer h.Unlock() - switch op { - case RenewReadLease: - return h.renewReadLease(ctx, tid, newLease) - case RenewWriteLease: - return h.renewWriteLease(ctx, tid, newLease) - } - return false, errors.New("wrong renew lease type") -} -func (h *stateRemoteHandle) renewReadLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { var succ bool - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { - lockType, oldLease, _, err := h.loadRow(ctx, tid) - if err != nil { - return errors.Trace(err) - } - if now >= oldLease { - // read lock had already expired, fail to renew - return nil - } - if lockType != CachedTableLockRead { - // Not read lock, fail to renew - return nil - } - - if newLease > oldLease { // lease should never decrease! - err = h.updateRow(ctx, tid, "READ", newLease) + if op == RenewReadLease { + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + lockType, oldLease, _, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) } - } - succ = true - return nil - }) - return succ, err -} + if now >= oldLease { + // read lock had already expired, fail to renew + return nil + } + if lockType != CachedTableLockRead { + // Not read lock, fail to renew + return nil + } -func (h *stateRemoteHandle) renewWriteLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { - var succ bool - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { - lockType, oldLease, _, err := h.loadRow(ctx, tid) - if err != nil { - return errors.Trace(err) - } - if now >= oldLease { - // write lock had already expired, fail to renew - return nil - } - if lockType != CachedTableLockWrite { - // Not write lock, fail to renew + if newLease > oldLease { // lease should never decrease! + err = h.updateRow(ctx, tid, "READ", newLease) + if err != nil { + return errors.Trace(err) + } + } + succ = true return nil - } + }) + return succ, err + } + + // TODO: renew for write lease + return false, errors.New("not implement yet") - if newLease > oldLease { // lease should never decrease! - err = h.updateRow(ctx, tid, "WRITE", newLease) - if err != nil { - return errors.Trace(err) - } - } - succ = true - return nil - }) - return succ, err } func (h *stateRemoteHandle) beginTxn(ctx context.Context) error { diff --git a/table/tables/state_remote_test.go b/table/tables/state_remote_test.go index dc4e9272b1830..188598761d95b 100644 --- a/table/tables/state_remote_test.go +++ b/table/tables/state_remote_test.go @@ -27,7 +27,21 @@ import ( "github.com/tikv/client-go/v2/oracle" ) -// initRow add a new record into the cached table meta lock table. + +// CreateMetaLockForCachedTable initializes the cached table meta lock information. +func createMetaLockForCachedTable(h session.Session) error { + createTable := "CREATE TABLE IF NOT EXISTS `mysql`.`table_cache_meta` (" + + "`tid` int(11) NOT NULL DEFAULT 0," + + "`lock_type` enum('NONE','READ', 'INTEND', 'WRITE') NOT NULL DEFAULT 'NONE'," + + "`lease` bigint(20) NOT NULL DEFAULT 0," + + "`oldReadLease` bigint(20) NOT NULL DEFAULT 0," + + "PRIMARY KEY (`tid`))" + _, err := h.ExecuteInternal(context.Background(), createTable) + return err +} + +// InitRow add a new record into the cached table meta lock table. + func initRow(ctx context.Context, exec session.Session, tid int) error { _, err := exec.ExecuteInternal(ctx, "insert ignore into mysql.table_cache_meta values (%?, 'NONE', 0, 0)", tid) return err @@ -43,6 +57,11 @@ func TestStateRemote(t *testing.T) { ctx := context.Background() h := tables.NewStateRemote(se) + err := createMetaLockForCachedTable(se) + require.NoError(t, err) + require.Equal(t, tables.CachedTableLockNone, tables.CachedTableLockType(0)) + + // Check the initial value. require.NoError(t, initRow(ctx, se, 5)) lockType, lease, err := h.Load(ctx, 5) @@ -89,18 +108,21 @@ func TestStateRemote(t *testing.T) { require.Equal(t, lease, leaseVal) // Check write lock. - writeLease, err := h.LockForWrite(ctx, 5) - require.NoError(t, err) + + leaseVal = oracle.GoTimeToTS(physicalTime.Add(700 * time.Millisecond)) + require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) + lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) require.Equal(t, lockType.String(), "WRITE") - require.Equal(t, writeLease, lease) - require.Greater(t, writeLease, leaseVal) + + require.Equal(t, lease, leaseVal) // Lock for write again - writeLease, err = h.LockForWrite(ctx, 5) - require.NoError(t, err) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(800 * time.Millisecond)) + require.NoError(t, h.LockForWrite(ctx, 5, leaseVal)) + lockType, _, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) @@ -116,14 +138,11 @@ func TestStateRemote(t *testing.T) { require.NoError(t, err) require.False(t, succ) - // Renew write lease. - succ, err = h.RenewLease(ctx, 5, writeLease+1, tables.RenewWriteLease) + // But clear orphan write lock should success. + time.Sleep(time.Second) + leaseVal = oracle.GoTimeToTS(physicalTime.Add(2 * time.Second)) + succ, err = h.LockForRead(ctx, 5, leaseVal) require.NoError(t, err) require.True(t, succ) - lockType, lease, err = h.Load(ctx, 5) - require.NoError(t, err) - require.Equal(t, lockType, tables.CachedTableLockWrite) - require.Equal(t, lockType.String(), "WRITE") - require.Equal(t, lease, writeLease+1) } diff --git a/testkit/testkit.go b/testkit/testkit.go index c99791efe369a..e6cb548a566c8 100644 --- a/testkit/testkit.go +++ b/testkit/testkit.go @@ -238,20 +238,6 @@ func (tk *TestKit) MustUseIndex(sql string, index string, args ...interface{}) b return false } -// MustUseIndex4ExplainFor checks if the result execution plan contains specific index(es). -func (tk *TestKit) MustUseIndex4ExplainFor(result *Result, index string) bool { - for i := range result.rows { - // It depends on whether we enable to collect the execution info. - if strings.Contains(result.rows[i][3], "index:"+index) { - return true - } - if strings.Contains(result.rows[i][4], "index:"+index) { - return true - } - } - return false -} - // CheckExecResult checks the affected rows and the insert id after executing MustExec. func (tk *TestKit) CheckExecResult(affectedRows, insertID int64) { tk.require.Equal(int64(tk.Session().AffectedRows()), affectedRows)