diff --git a/go.mod b/go.mod index 210200936..97b34d52e 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/gorilla/websocket v1.4.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20180820150422-93bf4626fba7 // indirect + github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d github.com/kr/pretty v0.1.0 // indirect github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808 // indirect github.com/pierrec/lz4 v2.0.5+incompatible // indirect diff --git a/go.sum b/go.sum index bc28c787d..1dac798f4 100644 --- a/go.sum +++ b/go.sum @@ -91,6 +91,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d h1:cVtBfNW5XTHiKQe7jDaDBSh/EVM4XLPutLAGboIXuM0= +github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d/go.mod h1:P2viExyCEfeWGU259JnaQ34Inuec4R38JCyBx2edgD0= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/reparo/syncer/memory_test.go b/reparo/syncer/memory_test.go new file mode 100644 index 000000000..08cb687ba --- /dev/null +++ b/reparo/syncer/memory_test.go @@ -0,0 +1,22 @@ +package syncer + +import ( + "github.com/pingcap/check" +) + +type testMemorySuite struct{} + +var _ = check.Suite(&testMemorySuite{}) + +func (s *testMemorySuite) TestMemorySyncer(c *check.C) { + syncer, err := newMemSyncer() + c.Assert(err, check.IsNil) + + syncTest(c, Syncer(syncer)) + + binlog := syncer.GetBinlogs() + c.Assert(binlog, check.HasLen, 2) + + err = syncer.Close() + c.Assert(err, check.IsNil) +} diff --git a/reparo/syncer/mysql.go b/reparo/syncer/mysql.go index 723ae22e2..d8e93cd8a 100644 --- a/reparo/syncer/mysql.go +++ b/reparo/syncer/mysql.go @@ -42,7 +42,11 @@ type mysqlSyncer struct { loaderErr error } -var _ Syncer = &mysqlSyncer{} +var ( + _ Syncer = &mysqlSyncer{} + defaultWorkerCount = 16 + defaultBatchSize = 20 +) func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) { db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port) @@ -50,7 +54,11 @@ func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) { return nil, errors.Trace(err) } - loader, err := loader.NewLoader(db, loader.WorkerCount(16), loader.BatchSize(20)) + return newMysqlSyncerFromSQLDB(db) +} + +func newMysqlSyncerFromSQLDB(db *sql.DB) (*mysqlSyncer, error) { + loader, err := loader.NewLoader(db, loader.WorkerCount(defaultWorkerCount), loader.BatchSize(defaultBatchSize)) if err != nil { return nil, errors.Annotate(err, "new loader failed") } diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go new file mode 100644 index 000000000..c149b7c4b --- /dev/null +++ b/reparo/syncer/mysql_test.go @@ -0,0 +1,78 @@ +package syncer + +import ( + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/pingcap/check" + pb "github.com/pingcap/tidb-binlog/proto/binlog" +) + +type testMysqlSuite struct{} + +var _ = check.Suite(&testMysqlSuite{}) + +func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { + originWorkerCount := defaultWorkerCount + defaultWorkerCount = 1 + defer func() { + defaultWorkerCount = originWorkerCount + }() + + db, mock, err := sqlmock.New() + c.Assert(err, check.IsNil) + + syncer, err := newMysqlSyncerFromSQLDB(db) + c.Assert(err, check.IsNil) + + mock.ExpectBegin() + mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectCommit() + + mock.ExpectQuery("SELECT column_name, extra FROM information_schema.columns").WithArgs("test", "t1").WillReturnRows(sqlmock.NewRows([]string{"column_name", "extra"}).AddRow("a", "").AddRow("b", "")) + + rows := sqlmock.NewRows([]string{"non_unique", "index_name", "seq_in_index", "column_name"}) + mock.ExpectQuery("SELECT non_unique, index_name, seq_in_index, column_name FROM information_schema.statistics"). + WithArgs("test", "t1"). + WillReturnRows(rows) + + mock.ExpectBegin() + mock.ExpectExec("INSERT INTO").WithArgs(1, "test").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("DELETE FROM").WithArgs(1, "test").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("UPDATE").WithArgs("abc").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + + syncTest(c, Syncer(syncer)) + + err = syncer.Close() + c.Assert(err, check.IsNil) +} + +func syncTest(c *check.C, syncer Syncer) { + ddlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DDL, + DdlQuery: []byte("create database test;"), + } + dmlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DML, + DmlData: &pb.DMLData{ + Events: generateDMLEvents(c), + }, + } + + binlogs := make([]*pb.Binlog, 0, 2) + err := syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) { + c.Log(binlog) + binlogs = append(binlogs, binlog) + }) + c.Assert(err, check.IsNil) + + err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) { + c.Log(binlog) + binlogs = append(binlogs, binlog) + }) + c.Assert(err, check.IsNil) + + time.Sleep(100 * time.Millisecond) + c.Assert(binlogs, check.HasLen, 2) +} diff --git a/reparo/syncer/print.go b/reparo/syncer/print.go index c9dc4c192..d6bb1ca26 100644 --- a/reparo/syncer/print.go +++ b/reparo/syncer/print.go @@ -125,7 +125,7 @@ func printInsertOrDeleteEvent(row [][]byte) error { } tp := col.Tp[0] - fmt.Printf("%s(%s): %s \n", col.Name, col.MysqlType, formatValueToString(val, tp)) + fmt.Printf("%s(%s): %s\n", col.Name, col.MysqlType, formatValueToString(val, tp)) } return nil } diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go new file mode 100644 index 000000000..9485f0f03 --- /dev/null +++ b/reparo/syncer/print_test.go @@ -0,0 +1,110 @@ +package syncer + +import ( + "strings" + + capturer "github.com/kami-zh/go-capturer" + "github.com/pingcap/check" + pb "github.com/pingcap/tidb-binlog/proto/binlog" +) + +type testPrintSuite struct{} + +var _ = check.Suite(&testPrintSuite{}) + +func (s *testPrintSuite) TestPrintSyncer(c *check.C) { + syncer, err := newPrintSyncer() + c.Assert(err, check.IsNil) + + out := capturer.CaptureStdout(func() { + syncTest(c, Syncer(syncer)) + }) + + c.Assert(out, check.Equals, + "DDL query: create database test;\n"+ + "schema: test; table: t1; type: Insert\n"+ + "a(int): 1\n"+ + "b(varchar): test\n"+ + "schema: test; table: t1; type: Delete\n"+ + "a(int): 1\n"+ + "b(varchar): test\n"+ + "schema: test; table: t1; type: Update\n"+ + "c(varchar): test => abc\n") + + err = syncer.Close() + c.Assert(err, check.IsNil) +} + +func (s *testPrintSuite) TestPrintEventHeader(c *check.C) { + schema := "test" + table := "t1" + event := &pb.Event{ + Tp: pb.EventType_Insert, + SchemaName: &schema, + TableName: &table, + } + + out := capturer.CaptureStdout(func() { + printEventHeader(event) + }) + lines := strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 1) + c.Assert(lines[0], check.Matches, ".*schema: test; table: t1; type: Insert.*") +} + +func (s *testPrintSuite) TestPrintDDL(c *check.C) { + ddlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DDL, + DdlQuery: []byte("create database test;"), + } + + out := capturer.CaptureStdout(func() { + printDDL(ddlBinlog) + }) + lines := strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 1) + c.Assert(lines[0], check.Matches, ".*DDL query: create database test;.*") +} + +func (s *testPrintSuite) TestPrintRow(c *check.C) { + cols := generateColumns(c) + + insertEvent := &pb.Event{ + Tp: pb.EventType_Insert, + Row: [][]byte{cols[0], cols[1]}, + } + + out := capturer.CaptureStdout(func() { + printEvent(insertEvent) + }) + lines := strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 3) + c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Insert") + c.Assert(lines[1], check.Equals, "a(int): 1") + c.Assert(lines[2], check.Equals, "b(varchar): test") + + deleteEvent := &pb.Event{ + Tp: pb.EventType_Delete, + Row: [][]byte{cols[0], cols[1]}, + } + out = capturer.CaptureStdout(func() { + printEvent(deleteEvent) + }) + lines = strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 3) + c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Delete") + c.Assert(lines[1], check.Equals, "a(int): 1") + c.Assert(lines[2], check.Equals, "b(varchar): test") + + updateEvent := &pb.Event{ + Tp: pb.EventType_Update, + Row: [][]byte{cols[2]}, + } + out = capturer.CaptureStdout(func() { + printEvent(updateEvent) + }) + lines = strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 2) + c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Update") + c.Assert(lines[1], check.Equals, "c(varchar): test => abc") +} diff --git a/reparo/syncer/syncer_test.go b/reparo/syncer/syncer_test.go new file mode 100644 index 000000000..f6610b79f --- /dev/null +++ b/reparo/syncer/syncer_test.go @@ -0,0 +1,41 @@ +package syncer + +import ( + "reflect" + + "github.com/pingcap/check" +) + +type testSyncerSuite struct{} + +var _ = check.Suite(&testSyncerSuite{}) + +func (s *testSyncerSuite) TestNewSyncer(c *check.C) { + cfg := new(DBConfig) + + testCases := []struct { + typeStr string + tp reflect.Type + checker check.Checker + }{ + { + "print", + reflect.TypeOf(new(printSyncer)), + check.Equals, + }, { + "memory", + reflect.TypeOf(new(MemSyncer)), + check.Equals, + }, { + "print", + reflect.TypeOf(new(MemSyncer)), + check.Not(check.Equals), + }, + } + + for _, testCase := range testCases { + syncer, err := New(testCase.typeStr, cfg) + c.Assert(err, check.IsNil) + c.Assert(reflect.TypeOf(syncer), testCase.checker, testCase.tp) + } +} diff --git a/reparo/syncer/translate.go b/reparo/syncer/translate.go index 823cd9923..52fa76bf3 100644 --- a/reparo/syncer/translate.go +++ b/reparo/syncer/translate.go @@ -62,11 +62,11 @@ func pbBinlogToTxn(binlog *pb.Binlog) (txn *loader.Txn, err error) { return nil, errors.Trace(err) } - _, newDatum, err := codec.DecodeOne(col.Value) + _, oldDatum, err := codec.DecodeOne(col.Value) if err != nil { return nil, errors.Trace(err) } - _, oldDatum, err := codec.DecodeOne(col.ChangedValue) + _, newDatum, err := codec.DecodeOne(col.ChangedValue) if err != nil { return nil, errors.Trace(err) } diff --git a/reparo/syncer/translate_test.go b/reparo/syncer/translate_test.go index 6084c5171..c256098d7 100644 --- a/reparo/syncer/translate_test.go +++ b/reparo/syncer/translate_test.go @@ -19,6 +19,8 @@ import ( "github.com/pingcap/check" "github.com/pingcap/tidb-binlog/pkg/loader" pb "github.com/pingcap/tidb-binlog/proto/binlog" + "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/util/codec" ) func Test(t *testing.T) { check.TestingT(t) } @@ -37,18 +39,133 @@ func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { SQL: "use db1; create table table1(id int)", }, }, - // TODO add dml test { Tp: pb.BinlogType_DML, DmlData: &pb.DMLData{ - Events: []pb.Event{}, + Events: generateDMLEvents(c), }, - }: {}, + }: { + DMLs: []*loader.DML{ + { + Database: "test", + Table: "t1", + Tp: loader.InsertDMLType, + Values: map[string]interface{}{ + "a": int64(1), + "b": "test", + }, + }, { + Database: "test", + Table: "t1", + Tp: loader.DeleteDMLType, + Values: map[string]interface{}{ + "a": int64(1), + "b": "test", + }, + }, { + Database: "test", + Table: "t1", + Tp: loader.UpdateDMLType, + Values: map[string]interface{}{ + "c": "abc", + }, + OldValues: map[string]interface{}{ + "c": "test", + }, + }, + }, + }, } for binlog, txn := range tests { getTxn, err := pbBinlogToTxn(binlog) c.Assert(err, check.IsNil) - c.Assert(getTxn, check.DeepEquals, txn) + c.Assert(getTxn.DDL, check.DeepEquals, txn.DDL) + c.Assert(getTxn.DMLs, check.DeepEquals, txn.DMLs) } } + +func (s *testTranslateSuite) TestGenColsAndArgs(c *check.C) { + cols, args, err := genColsAndArgs(generateColumns(c)) + c.Assert(err, check.IsNil) + c.Assert(cols, check.DeepEquals, []string{"a", "b", "c"}) + c.Assert(args, check.DeepEquals, []interface{}{int64(1), "test", "test"}) +} + +// generateDMLEvents generates three DML Events for test. +func generateDMLEvents(c *check.C) []pb.Event { + schema := "test" + table := "t1" + cols := generateColumns(c) + + return []pb.Event{ + { + Tp: pb.EventType_Insert, + SchemaName: &schema, + TableName: &table, + Row: [][]byte{cols[0], cols[1]}, + }, { + Tp: pb.EventType_Delete, + SchemaName: &schema, + TableName: &table, + Row: [][]byte{cols[0], cols[1]}, + }, { + Tp: pb.EventType_Update, + SchemaName: &schema, + TableName: &table, + Row: [][]byte{cols[2]}, + }, + } +} + +// generateColumns generates three columns for test, the last one used for update. +func generateColumns(c *check.C) [][]byte { + allColBytes := make([][]byte, 0, 3) + + cols := []*pb.Column{ + { + Name: "a", + Tp: []byte{mysql.TypeInt24}, + MysqlType: "int", + Value: encodeIntValue(1), + }, { + Name: "b", + Tp: []byte{mysql.TypeVarchar}, + MysqlType: "varchar", + Value: encodeBytesValue([]byte("test")), + }, { + Name: "c", + Tp: []byte{mysql.TypeVarchar}, + MysqlType: "varchar", + Value: encodeBytesValue([]byte("test")), + ChangedValue: encodeBytesValue([]byte("abc")), + }, + } + + for _, col := range cols { + colBytes, err := col.Marshal() + if err != nil { + c.Fatal(err) + } + + allColBytes = append(allColBytes, colBytes) + } + + return allColBytes +} + +func encodeIntValue(value int64) []byte { + b := make([]byte, 0, 5) + // 3 means intFlag + b = append(b, 3) + b = codec.EncodeInt(b, value) + return b +} + +func encodeBytesValue(value []byte) []byte { + b := make([]byte, 0, 5) + // 1 means bytesFlag + b = append(b, 1) + b = codec.EncodeBytes(b, value) + return b +} diff --git a/reparo/syncer/util.go b/reparo/syncer/util.go index 880064221..78a94adc2 100644 --- a/reparo/syncer/util.go +++ b/reparo/syncer/util.go @@ -33,7 +33,6 @@ func formatValueToString(data types.Datum, tp byte) string { } } -// TODO: test it. func formatValue(value types.Datum, tp byte) types.Datum { if value.GetValue() == nil { return value diff --git a/reparo/syncer/util_test.go b/reparo/syncer/util_test.go new file mode 100644 index 000000000..f5f1591f9 --- /dev/null +++ b/reparo/syncer/util_test.go @@ -0,0 +1,66 @@ +package syncer + +import ( + "time" + + "github.com/pingcap/check" + "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/types" +) + +type testUtilSuite struct{} + +var _ = check.Suite(&testUtilSuite{}) + +func (s *testUtilSuite) TestFormatValue(c *check.C) { + datetime, err := time.Parse("20060102150405", "20190415121212") + c.Assert(err, check.IsNil) + + testCases := []struct { + value interface{} + tp byte + expectStr string + }{ + { + value: 1, + tp: mysql.TypeInt24, + expectStr: "1", + }, + { + value: 1.11, + tp: mysql.TypeFloat, + expectStr: "1.11", + }, + { + value: 1.11, + tp: mysql.TypeDouble, + expectStr: "1.11", + }, + { + value: "a", + tp: mysql.TypeVarchar, + expectStr: "a", + }, + { + value: "a", + tp: mysql.TypeString, + expectStr: "a", + }, + { + value: datetime, + tp: mysql.TypeDatetime, + expectStr: "2019-04-15 12:12:12 +0000 UTC", + }, + { + value: time.Duration(time.Second), + tp: mysql.TypeDuration, + expectStr: "1s", + }, + } + + for _, testCase := range testCases { + datum := types.NewDatum(testCase.value) + str := formatValueToString(datum, testCase.tp) + c.Assert(str, check.Equals, testCase.expectStr) + } +} diff --git a/tests/reparo/run.sh b/tests/reparo/run.sh index 3d2882a9a..65a04869f 100755 --- a/tests/reparo/run.sh +++ b/tests/reparo/run.sh @@ -20,7 +20,7 @@ run_sql "CREATE TABLE \`reparo_test\`.\`test\`(\`id\` int, \`name\` varchar(10), run_sql "INSERT INTO \`reparo_test\`.\`test\` VALUES(1, 'a', 'a'), (2, 'b', 'b')" run_sql "INSERT INTO \`reparo_test\`.\`test\` VALUES(3, 'c', 'c'), (4, 'd', 'c')" run_sql "UPDATE \`reparo_test\`.\`test\` SET \`name\` = 'bb' where \`id\` = 2" -run_sql "DELETE FROM \`reparo_test\`.\`test\` WHERE \`name\` = 'bb'" +run_sql "DELETE FROM \`reparo_test\`.\`test\` WHERE \`id\` = '1'" run_sql "INSERT INTO \`reparo_test\`.\`test\` VALUES(5, 'e', 'e')" sleep 5