diff --git a/drainer/translator/translator.go b/drainer/translator/translator.go index 6c0daaf81..9d9659462 100644 --- a/drainer/translator/translator.go +++ b/drainer/translator/translator.go @@ -17,6 +17,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/parser" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" + "go.uber.org/zap" ) var sqlMode mysql.SQLMode @@ -63,7 +65,15 @@ func insertRowToDatums(table *model.TableInfo, row []byte) (pk types.Datum, datu for _, col := range table.Columns { if IsPKHandleColumn(table, col) { - datums[col.ID] = pk + // If pk is handle, the datums TiDB write will always be Int64 type. + // https://github.com/pingcap/tidb/blob/cd10bca6660937beb5d6de11d49ec50e149fe083/table/tables/tables.go#L721 + // + // create table pk(id BIGINT UNSIGNED); + // insert into pk(id) values(18446744073709551615) + // + // Will get -1 here, note: uint64(int64(-1)) = 18446744073709551615 + // so we change it to uint64 if the column type is unsigned + datums[col.ID] = fixType(pk, col) } } @@ -168,3 +178,14 @@ func newUpdateDecoder(table *model.TableInfo) updateDecoder { func (ud updateDecoder) decode(b []byte, loc *time.Location) (map[int64]types.Datum, map[int64]types.Datum, error) { return DecodeOldAndNewRow(b, ud.colsTypes, loc) } + +func fixType(data types.Datum, col *model.ColumnInfo) types.Datum { + if mysql.HasUnsignedFlag(col.Flag) { + switch oldV := data.GetValue().(type) { + case int64: + log.Debug("convert int64 type to uint64", zap.Int64("value", oldV)) + return types.NewDatum(uint64(oldV)) + } + } + return data +} diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index 9dfddf404..90ff1110a 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -16,6 +16,7 @@ package dailytest import ( "database/sql" "fmt" + "math" "math/rand" "strings" "time" @@ -142,6 +143,8 @@ func (tr *testRunner) execSQLs(sqls []string) { func RunCase(src *sql.DB, dst *sql.DB, schema string) { tr := &testRunner{src: src, dst: dst, schema: schema} + runPKcases(tr) + tr.execSQLs(case1) tr.execSQLs(case1Clean) @@ -375,3 +378,88 @@ func updatePKUK(db *sql.DB, opNum int) error { _, err = db.Exec("DROP TABLE pkuk") return errors.Trace(err) } + +// create a table with one column id with different type +// test the case whether it is primary key too, this can +// also help test when the column is handle or not. +func runPKcases(tr *testRunner) { + cases := []struct { + Tp string + Value interface{} + Update interface{} + }{ + { + Tp: "BIGINT UNSIGNED", + Value: uint64(math.MaxUint64), + Update: uint64(math.MaxUint64) - 1, + }, + { + Tp: "BIGINT SIGNED", + Value: int64(math.MaxInt64), + Update: int64(math.MaxInt64) - 1, + }, + { + Tp: "INT UNSIGNED", + Value: uint32(math.MaxUint32), + Update: uint32(math.MaxUint32) - 1, + }, + { + Tp: "INT SIGNED", + Value: int32(math.MaxInt32), + Update: int32(math.MaxInt32) - 1, + }, + { + Tp: "SMALLINT UNSIGNED", + Value: uint16(math.MaxUint16), + Update: uint16(math.MaxUint16) - 1, + }, + { + Tp: "SMALLINT SIGNED", + Value: int16(math.MaxInt16), + Update: int16(math.MaxInt16) - 1, + }, + { + Tp: "TINYINT UNSIGNED", + Value: uint8(math.MaxUint8), + Update: uint8(math.MaxUint8) - 1, + }, + { + Tp: "TINYINT SIGNED", + Value: int8(math.MaxInt8), + Update: int8(math.MaxInt8) - 1, + }, + } + + for _, c := range cases { + for _, ispk := range []string{"", "PRIMARY KEY"} { + + tr.run(func(src *sql.DB) { + sql := fmt.Sprintf("CREATE TABLE pk(id %s %s)", c.Tp, ispk) + mustExec(src, sql) + + sql = "INSERT INTO pk(id) values( ? )" + mustExec(src, sql, c.Value) + + if len(ispk) == 0 { + // insert a null value + mustExec(src, sql, nil) + } + + sql = "UPDATE pk set id = ? where id = ?" + mustExec(src, sql, c.Update, c.Value) + + sql = "DELETE from pk where id = ?" + mustExec(src, sql, c.Update) + }) + + tr.execSQLs([]string{"DROP TABLE pk"}) + } + } +} + +func mustExec(db *sql.DB, sql string, args ...interface{}) { + _, err := db.Exec(sql, args...) + if err != nil { + log.S().Fatalf("exec failed, sql: %s args: %v, err: %+v", sql, args, err) + } +}