From d3361c72beb6c35ec4ee8fbba4d6bab968c83765 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 3 Jul 2019 13:56:27 +0800 Subject: [PATCH] [release2.1] reparo: add unit test && add safe mode (#662) * reparo/syncer: add unit test (#540) * update log and add safe mode config (#652) --- go.mod | 1 + go.sum | 2 + reparo/config.go | 15 +++- reparo/reparo.go | 2 +- reparo/syncer/memory_test.go | 22 ++++++ reparo/syncer/mysql.go | 20 ++++-- reparo/syncer/mysql_test.go | 100 ++++++++++++++++++++++++++ reparo/syncer/print.go | 2 +- reparo/syncer/print_test.go | 110 ++++++++++++++++++++++++++++ reparo/syncer/syncer.go | 4 +- reparo/syncer/syncer_test.go | 41 +++++++++++ reparo/syncer/translate_test.go | 123 ++++++++++++++++++++++++++++++-- reparo/syncer/util.go | 1 - reparo/syncer/util_test.go | 66 +++++++++++++++++ 14 files changed, 495 insertions(+), 14 deletions(-) create mode 100644 reparo/syncer/memory_test.go create mode 100644 reparo/syncer/mysql_test.go create mode 100644 reparo/syncer/print_test.go create mode 100644 reparo/syncer/syncer_test.go create mode 100644 reparo/syncer/util_test.go diff --git a/go.mod b/go.mod index ac0444bad..762514063 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/gorilla/websocket v1.4.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20180820150422-93bf4626fba7 // indirect + github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d github.com/kr/pretty v0.1.0 // indirect github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808 // indirect github.com/ngaut/log v0.0.0-20160810023011-cec23d3e10b0 diff --git a/go.sum b/go.sum index 1e519562a..e12d50461 100644 --- a/go.sum +++ b/go.sum @@ -91,6 +91,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d h1:cVtBfNW5XTHiKQe7jDaDBSh/EVM4XLPutLAGboIXuM0= +github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d/go.mod h1:P2viExyCEfeWGU259JnaQ34Inuec4R38JCyBx2edgD0= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/reparo/config.go b/reparo/config.go index efad1fd2e..115c3a0b3 100644 --- a/reparo/config.go +++ b/reparo/config.go @@ -1,6 +1,7 @@ package reparo import ( + "encoding/json" "flag" "fmt" "os" @@ -23,7 +24,7 @@ const ( // Config is the main configuration for the retore tool. type Config struct { - *flag.FlagSet + *flag.FlagSet `toml:"-" json:"-"` Dir string `toml:"data-dir" json:"data-dir"` StartDatetime string `toml:"start-datetime" json:"start-datetime"` StopDatetime string `toml:"stop-datetime" json:"stop-datetime"` @@ -43,6 +44,8 @@ type Config struct { LogRotate string `toml:"log-rotate" json:"log-rotate"` LogLevel string `toml:"log-level" json:"log-level"` + SafeMode bool `toml:"safe-mode" json:"safe-mode"` + configFile string printVersion bool } @@ -67,9 +70,19 @@ func NewConfig() *Config { fs.StringVar(&c.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal") fs.StringVar(&c.configFile, "config", "", "[REQUIRED] path to configuration file") fs.BoolVar(&c.printVersion, "V", false, "print reparo version info") + fs.BoolVar(&c.SafeMode, "safe-mode", false, "enable safe mode to support reentrant") return c } +func (c *Config) String() string { + cfgBytes, err := json.Marshal(c) + if err != nil { + log.Errorf("marshal config failed %v", err) + } + + return string(cfgBytes) +} + // Parse parses keys/values from command line flags and toml configuration file. func (c *Config) Parse(args []string) (err error) { // Parse first to get config file diff --git a/reparo/reparo.go b/reparo/reparo.go index 0566c0a2c..02372ea0d 100644 --- a/reparo/reparo.go +++ b/reparo/reparo.go @@ -23,7 +23,7 @@ type Reparo struct { func New(cfg *Config) (*Reparo, error) { log.Infof("cfg %+v", cfg) - syncer, err := syncer.New(cfg.DestType, cfg.DestDB) + syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.SafeMode) if err != nil { return nil, errors.Trace(err) } diff --git a/reparo/syncer/memory_test.go b/reparo/syncer/memory_test.go new file mode 100644 index 000000000..08cb687ba --- /dev/null +++ b/reparo/syncer/memory_test.go @@ -0,0 +1,22 @@ +package syncer + +import ( + "github.com/pingcap/check" +) + +type testMemorySuite struct{} + +var _ = check.Suite(&testMemorySuite{}) + +func (s *testMemorySuite) TestMemorySyncer(c *check.C) { + syncer, err := newMemSyncer() + c.Assert(err, check.IsNil) + + syncTest(c, Syncer(syncer)) + + binlog := syncer.GetBinlogs() + c.Assert(binlog, check.HasLen, 2) + + err = syncer.Close() + c.Assert(err, check.IsNil) +} diff --git a/reparo/syncer/mysql.go b/reparo/syncer/mysql.go index d8b570dd2..b4fda8767 100644 --- a/reparo/syncer/mysql.go +++ b/reparo/syncer/mysql.go @@ -29,19 +29,31 @@ type mysqlSyncer struct { loaderErr error } -var _ Syncer = &mysqlSyncer{} +var ( + _ Syncer = &mysqlSyncer{} + defaultWorkerCount = 16 + defaultBatchSize = 20 +) + +// should be only used for unit test to create mock db +var createDB = loader.CreateDB -func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) { - db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port) +func newMysqlSyncer(cfg *DBConfig, safemode bool) (*mysqlSyncer, error) { + db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port) if err != nil { return nil, errors.Trace(err) } - loader, err := loader.NewLoader(db, loader.WorkerCount(16), loader.BatchSize(20)) + return newMysqlSyncerFromSQLDB(db, safemode) +} + +func newMysqlSyncerFromSQLDB(db *sql.DB, safemode bool) (*mysqlSyncer, error) { + loader, err := loader.NewLoader(db, loader.WorkerCount(defaultWorkerCount), loader.BatchSize(defaultBatchSize)) if err != nil { return nil, errors.Annotate(err, "new loader failed") } + loader.SetSafeMode(safemode) syncer := &mysqlSyncer{db: db, loader: loader} syncer.runLoader() diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go new file mode 100644 index 000000000..c9379b3e1 --- /dev/null +++ b/reparo/syncer/mysql_test.go @@ -0,0 +1,100 @@ +package syncer + +import ( + "database/sql" + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/pingcap/check" + pb "github.com/pingcap/tidb-binlog/proto/binlog" +) + +type testMysqlSuite struct{} + +var _ = check.Suite(&testMysqlSuite{}) + +func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { + s.testMysqlSyncer(c, true) + s.testMysqlSyncer(c, false) +} + +func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) { + var ( + mock sqlmock.Sqlmock + ) + originWorkerCount := defaultWorkerCount + defaultWorkerCount = 1 + defer func() { + defaultWorkerCount = originWorkerCount + }() + + oldCreateDB := createDB + createDB = func(string, string, string, int) (db *sql.DB, err error) { + db, mock, err = sqlmock.New() + return + } + defer func() { + createDB = oldCreateDB + }() + + syncer, err := newMysqlSyncer(&DBConfig{}, safemode) + c.Assert(err, check.IsNil) + + mock.ExpectBegin() + mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectCommit() + + mock.ExpectQuery("show columns from `test`.`t1`").WillReturnRows(sqlmock.NewRows([]string{"Field", "Type", "Null", "Key", "Default", "Extra"}).AddRow("a", "int", "YES", "", "NULL", "").AddRow("b", "varchar(24)", "YES", "", "NULL", "").AddRow("c", "varchar(24)", "YES", "", "NULL", "")) + + rows := sqlmock.NewRows([]string{"Table", "Non_unique", "Key_name", "Seq_in_index", "Column_name", "Collation", "Cardinality", "Sub_part", "Packed", "Null", "Index_type", "Comment", "Index_comment"}) + mock.ExpectQuery("show index from `test`.`t1`").WillReturnRows(rows) + + mock.ExpectBegin() + insertPattern := "INSERT INTO" + if safemode { + insertPattern = "REPLACE INTO" + } + mock.ExpectExec(insertPattern).WithArgs(1, "test", nil).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("DELETE FROM").WithArgs(1, "test").WillReturnResult(sqlmock.NewResult(0, 1)) + if safemode { + mock.ExpectExec("DELETE FROM").WithArgs().WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(insertPattern).WithArgs(nil, nil, "abc").WillReturnResult(sqlmock.NewResult(0, 1)) + } else { + mock.ExpectExec("UPDATE").WithArgs("abc", "test").WillReturnResult(sqlmock.NewResult(0, 1)) + } + mock.ExpectCommit() + + syncTest(c, Syncer(syncer)) + + err = syncer.Close() + c.Assert(err, check.IsNil) +} + +func syncTest(c *check.C, syncer Syncer) { + ddlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DDL, + DdlQuery: []byte("create database test;"), + } + dmlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DML, + DmlData: &pb.DMLData{ + Events: generateDMLEvents(c), + }, + } + + binlogs := make([]*pb.Binlog, 0, 2) + err := syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) { + c.Log(binlog) + binlogs = append(binlogs, binlog) + }) + c.Assert(err, check.IsNil) + + err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) { + c.Log(binlog) + binlogs = append(binlogs, binlog) + }) + c.Assert(err, check.IsNil) + + time.Sleep(100 * time.Millisecond) + c.Assert(binlogs, check.HasLen, 2) +} diff --git a/reparo/syncer/print.go b/reparo/syncer/print.go index fab38a813..0c562ac68 100644 --- a/reparo/syncer/print.go +++ b/reparo/syncer/print.go @@ -106,6 +106,6 @@ func printInsertOrDeleteEvent(row [][]byte) { } tp := col.Tp[0] - fmt.Printf("%s(%s): %s \n", col.Name, col.MysqlType, formatValueToString(val, tp)) + fmt.Printf("%s(%s): %s\n", col.Name, col.MysqlType, formatValueToString(val, tp)) } } diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go new file mode 100644 index 000000000..9485f0f03 --- /dev/null +++ b/reparo/syncer/print_test.go @@ -0,0 +1,110 @@ +package syncer + +import ( + "strings" + + capturer "github.com/kami-zh/go-capturer" + "github.com/pingcap/check" + pb "github.com/pingcap/tidb-binlog/proto/binlog" +) + +type testPrintSuite struct{} + +var _ = check.Suite(&testPrintSuite{}) + +func (s *testPrintSuite) TestPrintSyncer(c *check.C) { + syncer, err := newPrintSyncer() + c.Assert(err, check.IsNil) + + out := capturer.CaptureStdout(func() { + syncTest(c, Syncer(syncer)) + }) + + c.Assert(out, check.Equals, + "DDL query: create database test;\n"+ + "schema: test; table: t1; type: Insert\n"+ + "a(int): 1\n"+ + "b(varchar): test\n"+ + "schema: test; table: t1; type: Delete\n"+ + "a(int): 1\n"+ + "b(varchar): test\n"+ + "schema: test; table: t1; type: Update\n"+ + "c(varchar): test => abc\n") + + err = syncer.Close() + c.Assert(err, check.IsNil) +} + +func (s *testPrintSuite) TestPrintEventHeader(c *check.C) { + schema := "test" + table := "t1" + event := &pb.Event{ + Tp: pb.EventType_Insert, + SchemaName: &schema, + TableName: &table, + } + + out := capturer.CaptureStdout(func() { + printEventHeader(event) + }) + lines := strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 1) + c.Assert(lines[0], check.Matches, ".*schema: test; table: t1; type: Insert.*") +} + +func (s *testPrintSuite) TestPrintDDL(c *check.C) { + ddlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DDL, + DdlQuery: []byte("create database test;"), + } + + out := capturer.CaptureStdout(func() { + printDDL(ddlBinlog) + }) + lines := strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 1) + c.Assert(lines[0], check.Matches, ".*DDL query: create database test;.*") +} + +func (s *testPrintSuite) TestPrintRow(c *check.C) { + cols := generateColumns(c) + + insertEvent := &pb.Event{ + Tp: pb.EventType_Insert, + Row: [][]byte{cols[0], cols[1]}, + } + + out := capturer.CaptureStdout(func() { + printEvent(insertEvent) + }) + lines := strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 3) + c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Insert") + c.Assert(lines[1], check.Equals, "a(int): 1") + c.Assert(lines[2], check.Equals, "b(varchar): test") + + deleteEvent := &pb.Event{ + Tp: pb.EventType_Delete, + Row: [][]byte{cols[0], cols[1]}, + } + out = capturer.CaptureStdout(func() { + printEvent(deleteEvent) + }) + lines = strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 3) + c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Delete") + c.Assert(lines[1], check.Equals, "a(int): 1") + c.Assert(lines[2], check.Equals, "b(varchar): test") + + updateEvent := &pb.Event{ + Tp: pb.EventType_Update, + Row: [][]byte{cols[2]}, + } + out = capturer.CaptureStdout(func() { + printEvent(updateEvent) + }) + lines = strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 2) + c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Update") + c.Assert(lines[1], check.Equals, "c(varchar): test => abc") +} diff --git a/reparo/syncer/syncer.go b/reparo/syncer/syncer.go index fe4c23d80..3a2872695 100644 --- a/reparo/syncer/syncer.go +++ b/reparo/syncer/syncer.go @@ -16,10 +16,10 @@ type Syncer interface { } // New creates a new executor based on the name. -func New(name string, cfg *DBConfig) (Syncer, error) { +func New(name string, cfg *DBConfig, safemode bool) (Syncer, error) { switch name { case "mysql": - return newMysqlSyncer(cfg) + return newMysqlSyncer(cfg, safemode) case "print": return newPrintSyncer() case "memory": diff --git a/reparo/syncer/syncer_test.go b/reparo/syncer/syncer_test.go new file mode 100644 index 000000000..7b2a4ce5a --- /dev/null +++ b/reparo/syncer/syncer_test.go @@ -0,0 +1,41 @@ +package syncer + +import ( + "reflect" + + "github.com/pingcap/check" +) + +type testSyncerSuite struct{} + +var _ = check.Suite(&testSyncerSuite{}) + +func (s *testSyncerSuite) TestNewSyncer(c *check.C) { + cfg := new(DBConfig) + + testCases := []struct { + typeStr string + tp reflect.Type + checker check.Checker + }{ + { + "print", + reflect.TypeOf(new(printSyncer)), + check.Equals, + }, { + "memory", + reflect.TypeOf(new(MemSyncer)), + check.Equals, + }, { + "print", + reflect.TypeOf(new(MemSyncer)), + check.Not(check.Equals), + }, + } + + for _, testCase := range testCases { + syncer, err := New(testCase.typeStr, cfg, false) + c.Assert(err, check.IsNil) + c.Assert(reflect.TypeOf(syncer), testCase.checker, testCase.tp) + } +} diff --git a/reparo/syncer/translate_test.go b/reparo/syncer/translate_test.go index b72f5b60a..6e53f18f7 100644 --- a/reparo/syncer/translate_test.go +++ b/reparo/syncer/translate_test.go @@ -6,6 +6,8 @@ import ( "github.com/pingcap/check" "github.com/pingcap/tidb-binlog/pkg/loader" pb "github.com/pingcap/tidb-binlog/proto/binlog" + "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/util/codec" ) func Test(t *testing.T) { check.TestingT(t) } @@ -26,20 +28,133 @@ func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { Table: "table1", }, }, - // TODO add dml test { Tp: pb.BinlogType_DML, DmlData: &pb.DMLData{ - Events: []pb.Event{}, + Events: generateDMLEvents(c), }, }: { - // DMLs: []*loader.DML{}, + DMLs: []*loader.DML{ + { + Database: "test", + Table: "t1", + Tp: loader.InsertDMLType, + Values: map[string]interface{}{ + "a": int64(1), + "b": "test", + }, + }, { + Database: "test", + Table: "t1", + Tp: loader.DeleteDMLType, + Values: map[string]interface{}{ + "a": int64(1), + "b": "test", + }, + }, { + Database: "test", + Table: "t1", + Tp: loader.UpdateDMLType, + Values: map[string]interface{}{ + "c": "abc", + }, + OldValues: map[string]interface{}{ + "c": "test", + }, + }, + }, }, } for binlog, txn := range tests { getTxn, err := pbBinlogToTxn(binlog) c.Assert(err, check.IsNil) - c.Assert(getTxn, check.DeepEquals, txn) + c.Assert(getTxn.DDL, check.DeepEquals, txn.DDL) + c.Assert(getTxn.DMLs, check.DeepEquals, txn.DMLs) + } +} + +func (s *testTranslateSuite) TestGenColsAndArgs(c *check.C) { + cols, args, err := genColsAndArgs(generateColumns(c)) + c.Assert(err, check.IsNil) + c.Assert(cols, check.DeepEquals, []string{"a", "b", "c"}) + c.Assert(args, check.DeepEquals, []interface{}{int64(1), "test", "test"}) +} + +// generateDMLEvents generates three DML Events for test. +func generateDMLEvents(c *check.C) []pb.Event { + schema := "test" + table := "t1" + cols := generateColumns(c) + + return []pb.Event{ + { + Tp: pb.EventType_Insert, + SchemaName: &schema, + TableName: &table, + Row: [][]byte{cols[0], cols[1]}, + }, { + Tp: pb.EventType_Delete, + SchemaName: &schema, + TableName: &table, + Row: [][]byte{cols[0], cols[1]}, + }, { + Tp: pb.EventType_Update, + SchemaName: &schema, + TableName: &table, + Row: [][]byte{cols[2]}, + }, } } + +// generateColumns generates three columns for test, the last one used for update. +func generateColumns(c *check.C) [][]byte { + allColBytes := make([][]byte, 0, 3) + + cols := []*pb.Column{ + { + Name: "a", + Tp: []byte{mysql.TypeInt24}, + MysqlType: "int", + Value: encodeIntValue(1), + }, { + Name: "b", + Tp: []byte{mysql.TypeVarchar}, + MysqlType: "varchar", + Value: encodeBytesValue([]byte("test")), + }, { + Name: "c", + Tp: []byte{mysql.TypeVarchar}, + MysqlType: "varchar", + Value: encodeBytesValue([]byte("test")), + ChangedValue: encodeBytesValue([]byte("abc")), + }, + } + + for _, col := range cols { + colBytes, err := col.Marshal() + if err != nil { + c.Fatal(err) + } + + allColBytes = append(allColBytes, colBytes) + } + + return allColBytes +} + +func encodeIntValue(value int64) []byte { + b := make([]byte, 0, 5) + // 3 means intFlag + b = append(b, 3) + b = codec.EncodeInt(b, value) + return b +} + +func encodeBytesValue(value []byte) []byte { + b := make([]byte, 0, 5) + // 1 means bytesFlag + b = append(b, 1) + b = codec.EncodeBytes(b, value) + return b +} diff --git a/reparo/syncer/util.go b/reparo/syncer/util.go index ad9dae422..3d31e7b38 100644 --- a/reparo/syncer/util.go +++ b/reparo/syncer/util.go @@ -20,7 +20,6 @@ func formatValueToString(data types.Datum, tp byte) string { } } -// TODO: test it. func formatValue(value types.Datum, tp byte) types.Datum { if value.GetValue() == nil { return value diff --git a/reparo/syncer/util_test.go b/reparo/syncer/util_test.go new file mode 100644 index 000000000..f5f1591f9 --- /dev/null +++ b/reparo/syncer/util_test.go @@ -0,0 +1,66 @@ +package syncer + +import ( + "time" + + "github.com/pingcap/check" + "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/types" +) + +type testUtilSuite struct{} + +var _ = check.Suite(&testUtilSuite{}) + +func (s *testUtilSuite) TestFormatValue(c *check.C) { + datetime, err := time.Parse("20060102150405", "20190415121212") + c.Assert(err, check.IsNil) + + testCases := []struct { + value interface{} + tp byte + expectStr string + }{ + { + value: 1, + tp: mysql.TypeInt24, + expectStr: "1", + }, + { + value: 1.11, + tp: mysql.TypeFloat, + expectStr: "1.11", + }, + { + value: 1.11, + tp: mysql.TypeDouble, + expectStr: "1.11", + }, + { + value: "a", + tp: mysql.TypeVarchar, + expectStr: "a", + }, + { + value: "a", + tp: mysql.TypeString, + expectStr: "a", + }, + { + value: datetime, + tp: mysql.TypeDatetime, + expectStr: "2019-04-15 12:12:12 +0000 UTC", + }, + { + value: time.Duration(time.Second), + tp: mysql.TypeDuration, + expectStr: "1s", + }, + } + + for _, testCase := range testCases { + datum := types.NewDatum(testCase.value) + str := formatValueToString(datum, testCase.tp) + c.Assert(str, check.Equals, testCase.expectStr) + } +}