Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#44801
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
dsdashun authored and ti-chi-bot committed Jun 29, 2023
1 parent 666df2d commit 7dfe19f
Show file tree
Hide file tree
Showing 8 changed files with 1,147 additions and 0 deletions.
59 changes: 59 additions & 0 deletions br/pkg/lightning/backend/tidb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "tidb",
srcs = ["tidb.go"],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/tidb",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/errormanager",
"//br/pkg/lightning/log",
"//br/pkg/lightning/verification",
"//br/pkg/redact",
"//br/pkg/utils",
"//br/pkg/version",
"//parser/model",
"//parser/mysql",
"//sessionctx",
"//table",
"//types",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)

go_test(
name = "tidb_test",
timeout = "short",
srcs = ["tidb_test.go"],
flaky = True,
shard_count = 12,
deps = [
":tidb",
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/errormanager",
"//br/pkg/lightning/log",
"//br/pkg/lightning/verification",
"//parser/charset",
"//parser/model",
"//parser/mysql",
"//table",
"//table/tables",
"//types",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_stretchr_testify//require",
"@org_uber_go_atomic//:atomic",
],
)
174 changes: 174 additions & 0 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,180 @@ type tidbEncoder struct {
// directly check the total column count, so we fall back to only check that
// the there are enough columns.
columnCnt int
<<<<<<< HEAD
=======
// data file path
path string
logger log.Logger
}

type encodingBuilder struct{}

// NewEncodingBuilder creates an EncodingBuilder with TiDB backend implementation.
func NewEncodingBuilder() encode.EncodingBuilder {
return new(encodingBuilder)
}

// NewEncoder creates a KV encoder.
// It implements the `backend.EncodingBuilder` interface.
func (*encodingBuilder) NewEncoder(ctx context.Context, config *encode.EncodingConfig) (encode.Encoder, error) {
se := kv.NewSessionCtx(&config.SessionOptions, log.FromContext(ctx))
if config.SQLMode.HasStrictMode() {
se.GetSessionVars().SkipUTF8Check = false
se.GetSessionVars().SkipASCIICheck = false
}

return &tidbEncoder{
mode: config.SQLMode,
tbl: config.Table,
se: se,
path: config.Path,
logger: config.Logger,
}, nil
}

// MakeEmptyRows creates an empty KV rows.
// It implements the `backend.EncodingBuilder` interface.
func (*encodingBuilder) MakeEmptyRows() encode.Rows {
return tidbRows(nil)
}

type targetInfoGetter struct {
db *sql.DB
}

// NewTargetInfoGetter creates an TargetInfoGetter with TiDB backend implementation.
func NewTargetInfoGetter(db *sql.DB) backend.TargetInfoGetter {
return &targetInfoGetter{
db: db,
}
}

// FetchRemoteTableModels obtains the models of all tables given the schema name.
// It implements the `backend.TargetInfoGetter` interface.
// TODO: refactor
func (b *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
var err error
results := []*model.TableInfo{}
logger := log.FromContext(ctx)
s := common.SQLWithRetry{
DB: b.db,
Logger: logger,
}

err = s.Transact(ctx, "fetch table columns", func(c context.Context, tx *sql.Tx) error {
var versionStr string
if versionStr, err = version.FetchVersion(ctx, tx); err != nil {
return err
}
serverInfo := version.ParseServerInfo(versionStr)

rows, e := tx.Query(`
SELECT table_name, column_name, column_type, generation_expression, extra
FROM information_schema.columns
WHERE table_schema = ?
ORDER BY table_name, ordinal_position;
`, schemaName)
if e != nil {
return e
}
defer rows.Close()

var (
curTableName string
curColOffset int
curTable *model.TableInfo
)
tables := []*model.TableInfo{}
for rows.Next() {
var tableName, columnName, columnType, generationExpr, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &generationExpr, &columnExtra); e != nil {
return e
}
if tableName != curTableName {
curTable = &model.TableInfo{
Name: model.NewCIStr(tableName),
State: model.StatePublic,
PKIsHandle: true,
}
tables = append(tables, curTable)
curTableName = tableName
curColOffset = 0
}

// see: https://github.com/pingcap/parser/blob/3b2fb4b41d73710bc6c4e1f4e8679d8be6a4863e/types/field_type.go#L185-L191
var flag uint
if strings.HasSuffix(columnType, "unsigned") {
flag |= mysql.UnsignedFlag
}
if strings.Contains(columnExtra, "auto_increment") {
flag |= mysql.AutoIncrementFlag
}

ft := types.FieldType{}
ft.SetFlag(flag)
curTable.Columns = append(curTable.Columns, &model.ColumnInfo{
Name: model.NewCIStr(columnName),
Offset: curColOffset,
State: model.StatePublic,
FieldType: ft,
GeneratedExprString: generationExpr,
})
curColOffset++
}
if err := rows.Err(); err != nil {
return err
}
// shard_row_id/auto random is only available after tidb v4.0.0
// `show table next_row_id` is also not available before tidb v4.0.0
if serverInfo.ServerType != version.ServerTypeTiDB || serverInfo.ServerVersion.Major < 4 {
results = tables
return nil
}

failpoint.Inject(
"FetchRemoteTableModels_BeforeFetchTableAutoIDInfos",
func() {
fmt.Println("failpoint: FetchRemoteTableModels_BeforeFetchTableAutoIDInfos")
},
)

// init auto id column for each table
for _, tbl := range tables {
tblName := common.UniqueTable(schemaName, tbl.Name.O)
autoIDInfos, err := FetchTableAutoIDInfos(ctx, tx, tblName)
if err != nil {
logger.Warn("fetch table auto ID infos error. Ignore this table and continue.", zap.String("table_name", tblName), zap.Error(err))
continue
}
for _, info := range autoIDInfos {
for _, col := range tbl.Columns {
if col.Name.O == info.Column {
switch info.Type {
case "AUTO_INCREMENT":
col.AddFlag(mysql.AutoIncrementFlag)
case "AUTO_RANDOM":
col.AddFlag(mysql.PriKeyFlag)
tbl.PKIsHandle = true
// set a stub here, since we don't really need the real value
tbl.AutoRandomBits = 1
}
}
}
}
results = append(results, tbl)
}
return nil
})
return results, err
}

// CheckRequirements performs the check whether the backend satisfies the version requirements.
// It implements the `backend.TargetInfoGetter` interface.
func (*targetInfoGetter) CheckRequirements(ctx context.Context, _ *backend.CheckCtx) error {
log.FromContext(ctx).Info("skipping check requirements for tidb backend")
return nil
>>>>>>> 2f1c891d075 (lightning: refine fetching table structure error handling (#44801))
}

type tidbBackend struct {
Expand Down
48 changes: 48 additions & 0 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,54 @@ func TestFetchRemoteTableModels_4_x_auto_random(t *testing.T) {
}, tableInfos)
}

func TestFetchRemoteTableModelsDropTableHalfway(t *testing.T) {
s := createMysqlSuite(t)
defer s.TearDownTest(t)
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow(`Release Version: v99.0.0`)) // this is a fake version number
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("tbl01", "id", "bigint(20)", "", "auto_increment").
AddRow("tbl01", "val", "varchar(255)", "", "").
AddRow("tbl02", "id", "bigint(20)", "", "auto_increment").
AddRow("tbl02", "val", "varchar(255)", "", ""))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`tbl01` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "tbl01", "id", int64(1), "_TIDB_ROWID").
AddRow("test", "tbl01", "id", int64(1), "AUTO_INCREMENT"))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`tbl02` NEXT_ROW_ID").
WillReturnError(mysql.NewErr(mysql.ErrNoSuchTable, "test", "tbl02"))
s.mockDB.ExpectCommit()

infoGetter := tidb.NewTargetInfoGetter(s.dbHandle)
tableInfos, err := infoGetter.FetchRemoteTableModels(context.Background(), "test")
require.NoError(t, err)
ft := types.FieldType{}
ft.SetFlag(mysql.AutoIncrementFlag)
require.Equal(t, []*model.TableInfo{
{
Name: model.NewCIStr("tbl01"),
State: model.StatePublic,
PKIsHandle: true,
Columns: []*model.ColumnInfo{
{
Name: model.NewCIStr("id"),
Offset: 0,
State: model.StatePublic,
FieldType: ft,
},
{
Name: model.NewCIStr("val"),
Offset: 1,
State: model.StatePublic,
},
},
},
}, tableInfos)
}

func TestWriteRowsErrorNoRetry(t *testing.T) {
nonRetryableError := sql.ErrNoRows
s := createMysqlSuite(t)
Expand Down
Loading

0 comments on commit 7dfe19f

Please sign in to comment.