Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: refine fetching table structure error handling (#44801) #45082

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ func NewTargetInfoGetter(db *sql.DB) backend.TargetInfoGetter {
// TODO: refactor
func (b *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
var err error
tables := []*model.TableInfo{}
results := []*model.TableInfo{}
logger := log.FromContext(ctx)
s := common.SQLWithRetry{
DB: b.db,
Logger: log.FromContext(ctx),
Logger: logger,
}

err = s.Transact(ctx, "fetch table columns", func(c context.Context, tx *sql.Tx) error {
Expand All @@ -160,6 +161,7 @@ func (b *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaNam
curColOffset int
curTable *model.TableInfo
)
tables := []*model.TableInfo{}
for rows.Next() {
var tableName, columnName, columnType, generationExpr, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &generationExpr, &columnExtra); e != nil {
Expand Down Expand Up @@ -202,15 +204,24 @@ func (b *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaNam
// shard_row_id/auto random is only available after tidb v4.0.0
// `show table next_row_id` is also not available before tidb v4.0.0
if serverInfo.ServerType != version.ServerTypeTiDB || serverInfo.ServerVersion.Major < 4 {
results = tables
return nil
}

failpoint.Inject(
"FetchRemoteTableModels_BeforeFetchTableAutoIDInfos",
func() {
fmt.Println("failpoint: FetchRemoteTableModels_BeforeFetchTableAutoIDInfos")
},
)

// init auto id column for each table
for _, tbl := range tables {
tblName := common.UniqueTable(schemaName, tbl.Name.O)
autoIDInfos, err := FetchTableAutoIDInfos(ctx, tx, tblName)
if err != nil {
return errors.Trace(err)
logger.Warn("fetch table auto ID infos error. Ignore this table and continue.", zap.String("table_name", tblName), zap.Error(err))
continue
}
for _, info := range autoIDInfos {
for _, col := range tbl.Columns {
Expand All @@ -227,10 +238,11 @@ func (b *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaNam
}
}
}
results = append(results, tbl)
}
return nil
})
return tables, err
return results, err
}

// CheckRequirements performs the check whether the backend satisfies the version requirements.
Expand Down
48 changes: 48 additions & 0 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,54 @@ func TestFetchRemoteTableModels_4_x_auto_random(t *testing.T) {
}, tableInfos)
}

func TestFetchRemoteTableModelsDropTableHalfway(t *testing.T) {
s := createMysqlSuite(t)
defer s.TearDownTest(t)
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow(`Release Version: v99.0.0`)) // this is a fake version number
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("tbl01", "id", "bigint(20)", "", "auto_increment").
AddRow("tbl01", "val", "varchar(255)", "", "").
AddRow("tbl02", "id", "bigint(20)", "", "auto_increment").
AddRow("tbl02", "val", "varchar(255)", "", ""))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`tbl01` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "tbl01", "id", int64(1), "_TIDB_ROWID").
AddRow("test", "tbl01", "id", int64(1), "AUTO_INCREMENT"))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`tbl02` NEXT_ROW_ID").
WillReturnError(mysql.NewErr(mysql.ErrNoSuchTable, "test", "tbl02"))
s.mockDB.ExpectCommit()

infoGetter := tidb.NewTargetInfoGetter(s.dbHandle)
tableInfos, err := infoGetter.FetchRemoteTableModels(context.Background(), "test")
require.NoError(t, err)
ft := types.FieldType{}
ft.SetFlag(mysql.AutoIncrementFlag)
require.Equal(t, []*model.TableInfo{
{
Name: model.NewCIStr("tbl01"),
State: model.StatePublic,
PKIsHandle: true,
Columns: []*model.ColumnInfo{
{
Name: model.NewCIStr("id"),
Offset: 0,
State: model.StatePublic,
FieldType: ft,
},
{
Name: model.NewCIStr("val"),
Offset: 1,
State: model.StatePublic,
},
},
},
}, tableInfos)
}

func TestWriteRowsErrorNoRetry(t *testing.T) {
nonRetryableError := sql.ErrNoRows
s := createMysqlSuite(t)
Expand Down
13 changes: 13 additions & 0 deletions br/pkg/lightning/restore/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,19 @@ func (p *PreRestoreInfoGetterImpl) GetAllTableStructures(ctx context.Context, op

func (p *PreRestoreInfoGetterImpl) getTableStructuresByFileMeta(ctx context.Context, dbSrcFileMeta *mydump.MDDatabaseMeta, getPreInfoCfg *ropts.GetPreInfoConfig) ([]*model.TableInfo, error) {
dbName := dbSrcFileMeta.Name
failpoint.Inject(
"getTableStructuresByFileMeta_BeforeFetchRemoteTableModels",
func(v failpoint.Value) {
fmt.Println("failpoint: getTableStructuresByFileMeta_BeforeFetchRemoteTableModels")
const defaultMilliSeconds int = 5000
sleepMilliSeconds, ok := v.(int)
if !ok || sleepMilliSeconds <= 0 || sleepMilliSeconds > 30000 {
sleepMilliSeconds = defaultMilliSeconds
}
//nolint: errcheck
failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/tidb/FetchRemoteTableModels_BeforeFetchTableAutoIDInfos", fmt.Sprintf("sleep(%d)", sleepMilliSeconds))
},
)
currentTableInfosFromDB, err := p.targetInfoGetter.FetchRemoteTableModels(ctx, dbName)
if err != nil {
if getPreInfoCfg != nil && getPreInfoCfg.IgnoreDBNotExist {
Expand Down
2 changes: 2 additions & 0 deletions br/tests/lightning_drop_other_tables_halfway/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tikv-importer]
backend = "tidb"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE tbl01 (
id INT PRIMARY KEY AUTO_INCREMENT,
val VARCHAR(64)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,val
1,aaa
2,bbb
3,ccc
4,ddd
5,eee
28 changes: 28 additions & 0 deletions br/tests/lightning_drop_other_tables_halfway/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/sh
#
# Copyright 2023 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu

run_sql 'DROP DATABASE IF EXISTS lntest'
run_sql 'CREATE DATABASE lntest'
run_sql 'CREATE TABLE lntest.tbl02 (id BIGINT PRIMARY KEY AUTO_INCREMENT, val VARCHAR(255));'

GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/getTableStructuresByFileMeta_BeforeFetchRemoteTableModels=return(6000)" run_lightning &
pid=$!

sleep 4
run_sql 'DROP TABLE lntest.tbl02;' && echo "table dropped"
wait "${pid}"