From e91c93938ffc4a168aec488f42e3f6a912145e04 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 16 Apr 2019 14:07:53 +0800 Subject: [PATCH 01/31] add test for formatvalue --- reparo/syncer/util_test.go | 80 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 reparo/syncer/util_test.go diff --git a/reparo/syncer/util_test.go b/reparo/syncer/util_test.go new file mode 100644 index 000000000..e00460582 --- /dev/null +++ b/reparo/syncer/util_test.go @@ -0,0 +1,80 @@ +package syncer + +import ( + "time" + + "github.com/pingcap/check" + "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/types" +) + +type testUtilSuite struct{} + +var _ = check.Suite(&testUtilSuite{}) + +func (s *testUtilSuite) TestFormatValue(c *check.C) { + datetime, err := time.Parse("20060102150405", "20190415121212") + c.Assert(err, check.IsNil) + + testCases := []struct { + value interface{} + tp byte + changeTp byte + expectStr string + }{ + { + value: 1, + tp: mysql.TypeInt24, + changeTp: mysql.TypeFloat, + expectStr: "1", + }, + { + value: 1.11, + tp: mysql.TypeFloat, + changeTp: mysql.TypeDouble, + expectStr: "1.11", + }, + { + value: 1.11, + tp: mysql.TypeDouble, + changeTp: mysql.TypeFloat, + expectStr: "1.11", + }, + { + value: "a", + tp: mysql.TypeVarchar, + changeTp: mysql.TypeString, + expectStr: "a", + }, + { + value: "a", + tp: mysql.TypeString, + changeTp: mysql.TypeVarchar, + expectStr: "a", + }, + { + value: datetime, + tp: mysql.TypeDatetime, + changeTp: mysql.TypeDatetime, + expectStr: "2019-04-15 12:12:12 +0000 UTC", + }, + { + value: time.Duration(time.Second), + tp: mysql.TypeDuration, + changeTp: mysql.TypeDuration, + expectStr: "1s", + }, + } + + for _, testCase := range testCases { + datum := types.NewDatum(testCase.value) + str := formatValueToString(types.NewDatum(testCase.value), testCase.tp) + c.Assert(str, check.Equals, testCase.expectStr) + + newDatum := formatValue(datum, testCase.changeTp) + newStr := formatValueToString(newDatum, testCase.changeTp) + c.Assert(newStr, check.Equals, testCase.expectStr) + } + + +} \ No newline at end of file From f2e8739d744254289d62da43ef5bb7bb7bf16994 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 16 Apr 2019 14:22:25 +0800 Subject: [PATCH 02/31] add test for New syncer --- reparo/syncer/syncer_test.go | 34 ++++++++++++++++++++++++++++++++++ reparo/syncer/util_test.go | 2 -- 2 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 reparo/syncer/syncer_test.go diff --git a/reparo/syncer/syncer_test.go b/reparo/syncer/syncer_test.go new file mode 100644 index 000000000..03d6b4299 --- /dev/null +++ b/reparo/syncer/syncer_test.go @@ -0,0 +1,34 @@ +package syncer + +import ( + + "github.com/pingcap/check" +) + +type testSyncerSuite struct{} + +var _ = check.Suite(&testSyncerSuite{}) + +func (s *testSyncerSuite) TestNewSyncer(c *check.C) { + cfg := new(DBConfig) + + syncer, err := New("mysql", cfg) + c.Assert(err, check.IsNil) + _, ok := syncer.(*mysqlSyncer) + c.Assert(ok, check.Equals, true) + + syncer, err = New("print", cfg) + c.Assert(err, check.IsNil) + _, ok = syncer.(*printSyncer) + c.Assert(ok, check.Equals, true) + + syncer, err = New("memory", cfg) + c.Assert(err, check.IsNil) + _, ok = syncer.(*MemSyncer) + c.Assert(ok, check.Equals, true) + + syncer, err = New("mysql", cfg) + c.Assert(err, check.IsNil) + _, ok = syncer.(*MemSyncer) + c.Assert(ok, check.Equals, false) +} \ No newline at end of file diff --git a/reparo/syncer/util_test.go b/reparo/syncer/util_test.go index e00460582..c0702f875 100644 --- a/reparo/syncer/util_test.go +++ b/reparo/syncer/util_test.go @@ -75,6 +75,4 @@ func (s *testUtilSuite) TestFormatValue(c *check.C) { newStr := formatValueToString(newDatum, testCase.changeTp) c.Assert(newStr, check.Equals, testCase.expectStr) } - - } \ No newline at end of file From cd8b5acd9caa874a0ecccc91b6c21031abc51712 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 16 Apr 2019 15:03:21 +0800 Subject: [PATCH 03/31] refine some function to let it easy to add unit test --- reparo/syncer/print.go | 101 ++++++++++++++++++++---------------- reparo/syncer/print_test.go | 47 +++++++++++++++++ 2 files changed, 104 insertions(+), 44 deletions(-) create mode 100644 reparo/syncer/print_test.go diff --git a/reparo/syncer/print.go b/reparo/syncer/print.go index fab38a813..01009b129 100644 --- a/reparo/syncer/print.go +++ b/reparo/syncer/print.go @@ -18,20 +18,23 @@ func newPrintSyncer() (*printSyncer, error) { } func (p *printSyncer) Sync(pbBinlog *pb.Binlog, cb func(binlog *pb.Binlog)) error { + var info string switch pbBinlog.Tp { case pb.BinlogType_DDL: - printDDL(pbBinlog) - cb(pbBinlog) + info = getDDLStr(pbBinlog) case pb.BinlogType_DML: for _, event := range pbBinlog.GetDmlData().GetEvents() { - printEvent(&event) + header := getEventHeaderStr(&event) + info = header + getEventDataStr(&event) } - cb(pbBinlog) default: return errors.Errorf("unknown type: %v", pbBinlog.Tp) } + fmt.Print(info) + cb(pbBinlog) + return nil } @@ -39,73 +42,83 @@ func (p *printSyncer) Close() error { return nil } -func printEvent(event *pb.Event) { - printHeader(event) - +func getEventDataStr(event *pb.Event) string { switch event.GetTp() { case pb.EventType_Insert: - printInsertOrDeleteEvent(event.Row) + return getInsertOrDeleteEventStr(event.Row) case pb.EventType_Update: - printUpdateEvent(event.Row) + return getUpdateEventStr(event.Row) case pb.EventType_Delete: - printInsertOrDeleteEvent(event.Row) + return getInsertOrDeleteEventStr(event.Row) } + + return "" } -func printHeader(event *pb.Event) { - printEventHeader(event) +func getDDLStr(binlog *pb.Binlog) string { + return fmt.Sprintf("DDL query: %s\n", binlog.DdlQuery) } -func printDDL(binlog *pb.Binlog) { - fmt.Printf("DDL query: %s\n", binlog.DdlQuery) +func getEventHeaderStr(event *pb.Event) string { + return fmt.Sprintf("schema: %s; table: %s; type: %s\n", event.GetSchemaName(), event.GetTableName(), event.GetTp()) } -func printEventHeader(event *pb.Event) { - fmt.Printf("schema: %s; table: %s; type: %s\n", event.GetSchemaName(), event.GetTableName(), event.GetTp()) +func getUpdateEventStr(rows [][]byte) string { + var eventStr string + for _, row := range rows { + eventStr += getUpdateRowStr(row) + } + + return eventStr } -func printUpdateEvent(row [][]byte) { - for _, c := range row { - col := &pb.Column{} - err := col.Unmarshal(c) - if err != nil { - log.Errorf("unmarshal error %v", err) - return - } +func getUpdateRowStr(row []byte) string { + col := &pb.Column{} + err := col.Unmarshal(row) + if err != nil { + log.Errorf("unmarshal error %v", err) + return "" + } - _, val, err := codec.DecodeOne(col.Value) - if err != nil { - log.Errorf("decode row error %v", err) - return - } + _, val, err := codec.DecodeOne(col.Value) + if err != nil { + log.Errorf("decode row error %v", err) + return "" + } - _, changedVal, err := codec.DecodeOne(col.ChangedValue) - if err != nil { - log.Errorf("decode row error %v", err) - return - } + _, changedVal, err := codec.DecodeOne(col.ChangedValue) + if err != nil { + log.Errorf("decode row error %v", err) + return "" + } - tp := col.Tp[0] - fmt.Printf("%s(%s): %s => %s\n", col.Name, col.MysqlType, formatValueToString(val, tp), formatValueToString(changedVal, tp)) + tp := col.Tp[0] + return fmt.Sprintf("%s(%s): %s => %s\n", col.Name, col.MysqlType, formatValueToString(val, tp), formatValueToString(changedVal, tp)) +} + +func getInsertOrDeleteEventStr(rows [][]byte) string { + var eventStr string + for _, row := range rows { + eventStr += getInsertOrDeleteRowStr(row) } + + return eventStr } -func printInsertOrDeleteEvent(row [][]byte) { - for _, c := range row { - col := &pb.Column{} - err := col.Unmarshal(c) +func getInsertOrDeleteRowStr(row []byte) string { + col := &pb.Column{} + err := col.Unmarshal(row) if err != nil { log.Errorf("unmarshal error %v", err) - return + return "" } _, val, err := codec.DecodeOne(col.Value) if err != nil { log.Errorf("decode row error %v", err) - return + return "" } tp := col.Tp[0] - fmt.Printf("%s(%s): %s \n", col.Name, col.MysqlType, formatValueToString(val, tp)) - } + return fmt.Sprintf("%s(%s): %s \n", col.Name, col.MysqlType, formatValueToString(val, tp)) } diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go new file mode 100644 index 000000000..0a8119905 --- /dev/null +++ b/reparo/syncer/print_test.go @@ -0,0 +1,47 @@ +package syncer + +import ( + + "github.com/pingcap/check" + pb "github.com/pingcap/tidb-binlog/proto/binlog" +) + +type testPrintSuite struct{} + +var _ = check.Suite(&testPrintSuite{}) + +func (s *testPrintSuite) TestPrintSyncerStr(c *check.C) { + syncer, err := newPrintSyncer() + c.Assert(err, check.IsNil) + + ddlBinlog := &pb.Binlog { + Tp: pb.BinlogType_DDL, + DdlQuery: []byte("create database test;"), + } + dmlBinlog := &pb.Binlog { + Tp: pb.BinlogType_DML, + } + + err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog){}) + c.Assert(err, check.IsNil) + + err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog){}) + c.Assert(err, check.IsNil) + + err = syncer.Close() + c.Assert(err, check.IsNil) + + ddlStr := getDDLStr(ddlBinlog) + c.Assert(ddlStr, check.Equals, "DDL query: create database test;\n") + + schema := "test" + table := "t1" + event := &pb.Event { + Tp: pb.EventType_Insert, + SchemaName: &schema, + TableName: &table, + } + + eventHeaderStr := getEventHeaderStr(event) + c.Assert(eventHeaderStr, check.Equals, "schema: test; table: t1; type: Insert\n") +} From 52311869a4cd2e56d16287a36939b4ac40759e5e Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 16 Apr 2019 16:35:39 +0800 Subject: [PATCH 04/31] add unit test for decode --- reparo/syncer/print_test.go | 92 +++++++++++++++++++++++++++++++++---- 1 file changed, 83 insertions(+), 9 deletions(-) diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index 0a8119905..fc3942058 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -1,9 +1,11 @@ package syncer import ( - "github.com/pingcap/check" pb "github.com/pingcap/tidb-binlog/proto/binlog" + "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" ) type testPrintSuite struct{} @@ -14,18 +16,18 @@ func (s *testPrintSuite) TestPrintSyncerStr(c *check.C) { syncer, err := newPrintSyncer() c.Assert(err, check.IsNil) - ddlBinlog := &pb.Binlog { + ddlBinlog := &pb.Binlog{ Tp: pb.BinlogType_DDL, DdlQuery: []byte("create database test;"), } - dmlBinlog := &pb.Binlog { - Tp: pb.BinlogType_DML, + dmlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DML, } - err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog){}) + err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) {}) c.Assert(err, check.IsNil) - err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog){}) + err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) {}) c.Assert(err, check.IsNil) err = syncer.Close() @@ -36,12 +38,84 @@ func (s *testPrintSuite) TestPrintSyncerStr(c *check.C) { schema := "test" table := "t1" - event := &pb.Event { - Tp: pb.EventType_Insert, + event := &pb.Event{ + Tp: pb.EventType_Insert, SchemaName: &schema, - TableName: &table, + TableName: &table, } eventHeaderStr := getEventHeaderStr(event) c.Assert(eventHeaderStr, check.Equals, "schema: test; table: t1; type: Insert\n") + + /* + userIDCol := &model.ColumnInfo{ + ID: 1, + Name: model.NewCIStr("ID"), + Offset: 0, + FieldType: types.FieldType{ + Tp: mysql.TypeLong, + Flag: mysql.BinaryFlag, + Flen: 11, + Decimal: -1, + Charset: "binary", + Collate: "binary", + }, + State: model.StatePublic, + } + */ + + /* + str := new(types.Datum) + *str = types.NewStringDatum("a") + var raw []byte + raw = append(raw, bytesFlag) + raw = EncodeBytes(raw, str.GetBytes()) + */ + + str := new(types.Datum) + *str = types.NewStringDatum("a") + c.Log(str.GetBytes()) + _, val, err := codec.DecodeOne(encodeIntValue(1)) + c.Log(val) + c.Assert(err, check.IsNil) + //c.Assert(err, check.NotNil) + + col1 := &pb.Column{ + Name: "a", + Tp: []byte{mysql.TypeInt24}, + MysqlType: "int", + Value: encodeIntValue(1), + } + col2 := &pb.Column{ + Name: "b", + Tp: []byte{mysql.TypeVarchar}, + MysqlType: "varchar", + Value: encodeBytesValue([]byte("test")), + } + + colBytes, err := col1.Marshal() + c.Assert(err, check.IsNil) + colStr := getInsertOrDeleteColStr(colBytes) + c.Assert(colStr, check.Equals, "a(int): 1 \n") + + colBytes, err = col2.Marshal() + c.Assert(err, check.IsNil) + colStr = getInsertOrDeleteColStr(colBytes) + c.Assert(colStr, check.Equals, "b(varchar): test \n") +} + +func encodeIntValue(value int64) []byte { + b := make([]byte, 0, 5) + // 3 means intFlag + b = append(b, 3) + b = codec.EncodeInt(b, value) + return b } + +func encodeBytesValue(value []byte) []byte { + b := make([]byte, 0, 5) + // 1 means bytesFlag + b = append(b, 1) + b = codec.EncodeBytes(b, value) + return b +} \ No newline at end of file From cf74baf2c521956be656e3d4e213f9ea24850687 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 16 Apr 2019 17:16:51 +0800 Subject: [PATCH 05/31] refine test --- reparo/syncer/print.go | 48 ++++++++--------- reparo/syncer/print_test.go | 95 ++++++++++++++++++--------------- reparo/syncer/syncer_test.go | 5 +- reparo/syncer/translate_test.go | 2 +- reparo/syncer/util.go | 1 + reparo/syncer/util_test.go | 50 ++++++++--------- 6 files changed, 106 insertions(+), 95 deletions(-) diff --git a/reparo/syncer/print.go b/reparo/syncer/print.go index 01009b129..8ee6ecc0d 100644 --- a/reparo/syncer/print.go +++ b/reparo/syncer/print.go @@ -45,11 +45,11 @@ func (p *printSyncer) Close() error { func getEventDataStr(event *pb.Event) string { switch event.GetTp() { case pb.EventType_Insert: - return getInsertOrDeleteEventStr(event.Row) + return getInsertOrDeleteRowStr(event.Row) case pb.EventType_Update: - return getUpdateEventStr(event.Row) + return getUpdateRowStr(event.Row) case pb.EventType_Delete: - return getInsertOrDeleteEventStr(event.Row) + return getInsertOrDeleteRowStr(event.Row) } return "" @@ -63,18 +63,18 @@ func getEventHeaderStr(event *pb.Event) string { return fmt.Sprintf("schema: %s; table: %s; type: %s\n", event.GetSchemaName(), event.GetTableName(), event.GetTp()) } -func getUpdateEventStr(rows [][]byte) string { +func getUpdateRowStr(row [][]byte) string { var eventStr string - for _, row := range rows { - eventStr += getUpdateRowStr(row) + for _, col := range row { + eventStr += getUpdateColumnStr(col) } return eventStr } -func getUpdateRowStr(row []byte) string { +func getUpdateColumnStr(column []byte) string { col := &pb.Column{} - err := col.Unmarshal(row) + err := col.Unmarshal(column) if err != nil { log.Errorf("unmarshal error %v", err) return "" @@ -96,29 +96,29 @@ func getUpdateRowStr(row []byte) string { return fmt.Sprintf("%s(%s): %s => %s\n", col.Name, col.MysqlType, formatValueToString(val, tp), formatValueToString(changedVal, tp)) } -func getInsertOrDeleteEventStr(rows [][]byte) string { +func getInsertOrDeleteRowStr(row [][]byte) string { var eventStr string - for _, row := range rows { - eventStr += getInsertOrDeleteRowStr(row) + for _, col := range row { + eventStr += getInsertOrDeleteColumnStr(col) } return eventStr } -func getInsertOrDeleteRowStr(row []byte) string { +func getInsertOrDeleteColumnStr(column []byte) string { col := &pb.Column{} - err := col.Unmarshal(row) - if err != nil { - log.Errorf("unmarshal error %v", err) - return "" - } + err := col.Unmarshal(column) + if err != nil { + log.Errorf("unmarshal error %v", err) + return "" + } - _, val, err := codec.DecodeOne(col.Value) - if err != nil { - log.Errorf("decode row error %v", err) - return "" - } + _, val, err := codec.DecodeOne(col.Value) + if err != nil { + log.Errorf("decode row error %v", err) + return "" + } - tp := col.Tp[0] - return fmt.Sprintf("%s(%s): %s \n", col.Name, col.MysqlType, formatValueToString(val, tp)) + tp := col.Tp[0] + return fmt.Sprintf("%s(%s): %s \n", col.Name, col.MysqlType, formatValueToString(val, tp)) } diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index fc3942058..0c560ec2d 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -4,7 +4,7 @@ import ( "github.com/pingcap/check" pb "github.com/pingcap/tidb-binlog/proto/binlog" "github.com/pingcap/tidb/mysql" - "github.com/pingcap/tidb/types" + //"github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" ) @@ -12,7 +12,7 @@ type testPrintSuite struct{} var _ = check.Suite(&testPrintSuite{}) -func (s *testPrintSuite) TestPrintSyncerStr(c *check.C) { +func (s *testPrintSuite) TestPrintSyncer(c *check.C) { syncer, err := newPrintSyncer() c.Assert(err, check.IsNil) @@ -32,10 +32,9 @@ func (s *testPrintSuite) TestPrintSyncerStr(c *check.C) { err = syncer.Close() c.Assert(err, check.IsNil) +} - ddlStr := getDDLStr(ddlBinlog) - c.Assert(ddlStr, check.Equals, "DDL query: create database test;\n") - +func (s *testPrintSuite) TestPrintEventHeader(c *check.C) { schema := "test" table := "t1" event := &pb.Event{ @@ -46,40 +45,18 @@ func (s *testPrintSuite) TestPrintSyncerStr(c *check.C) { eventHeaderStr := getEventHeaderStr(event) c.Assert(eventHeaderStr, check.Equals, "schema: test; table: t1; type: Insert\n") +} - /* - userIDCol := &model.ColumnInfo{ - ID: 1, - Name: model.NewCIStr("ID"), - Offset: 0, - FieldType: types.FieldType{ - Tp: mysql.TypeLong, - Flag: mysql.BinaryFlag, - Flen: 11, - Decimal: -1, - Charset: "binary", - Collate: "binary", - }, - State: model.StatePublic, +func (s *testPrintSuite) TestPrintDDL(c *check.C) { + ddlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DDL, + DdlQuery: []byte("create database test;"), } - */ - - /* - str := new(types.Datum) - *str = types.NewStringDatum("a") - var raw []byte - raw = append(raw, bytesFlag) - raw = EncodeBytes(raw, str.GetBytes()) - */ - - str := new(types.Datum) - *str = types.NewStringDatum("a") - c.Log(str.GetBytes()) - _, val, err := codec.DecodeOne(encodeIntValue(1)) - c.Log(val) - c.Assert(err, check.IsNil) - //c.Assert(err, check.NotNil) + ddlStr := getDDLStr(ddlBinlog) + c.Assert(ddlStr, check.Equals, "DDL query: create database test;\n") +} +func (s *testPrintSuite) TestPrintRow(c *check.C) { col1 := &pb.Column{ Name: "a", Tp: []byte{mysql.TypeInt24}, @@ -92,16 +69,50 @@ func (s *testPrintSuite) TestPrintSyncerStr(c *check.C) { MysqlType: "varchar", Value: encodeBytesValue([]byte("test")), } + col3 := &pb.Column{ + Name: "c", + Tp: []byte{mysql.TypeVarchar}, + MysqlType: "varchar", + Value: encodeBytesValue([]byte("test")), + ChangedValue: encodeBytesValue([]byte("abc")), + } + + col1Bytes, err := col1.Marshal() + c.Assert(err, check.IsNil) + col1Str := getInsertOrDeleteColumnStr(col1Bytes) + c.Assert(col1Str, check.Equals, "a(int): 1 \n") - colBytes, err := col1.Marshal() + col2Bytes, err := col2.Marshal() c.Assert(err, check.IsNil) - colStr := getInsertOrDeleteColStr(colBytes) - c.Assert(colStr, check.Equals, "a(int): 1 \n") + col2Str := getInsertOrDeleteColumnStr(col2Bytes) + c.Assert(col2Str, check.Equals, "b(varchar): test \n") - colBytes, err = col2.Marshal() + col3Bytes, err := col3.Marshal() c.Assert(err, check.IsNil) - colStr = getInsertOrDeleteColStr(colBytes) - c.Assert(colStr, check.Equals, "b(varchar): test \n") + col3Str := getUpdateColumnStr(col3Bytes) + c.Assert(col3Str, check.Equals, "c(varchar): test => abc\n") + + + insertEvent := &pb.Event { + Tp: pb.EventType_Insert, + Row: [][]byte{col1Bytes, col2Bytes}, + } + eventStr := getEventDataStr(insertEvent) + c.Assert(eventStr, check.Equals, "a(int): 1 \nb(varchar): test \n") + + deleteEvent := &pb.Event { + Tp: pb.EventType_Delete, + Row: [][]byte{col1Bytes, col2Bytes}, + } + eventStr = getEventDataStr(deleteEvent) + c.Assert(eventStr, check.Equals, "a(int): 1 \nb(varchar): test \n") + + updateEvent := &pb.Event { + Tp: pb.EventType_Delete, + Row: [][]byte{col3Bytes, col3Bytes}, + } + eventStr = getEventDataStr(updateEvent) + c.Assert(eventStr, check.Equals, "c(varchar): test \nc(varchar): test \n") } func encodeIntValue(value int64) []byte { diff --git a/reparo/syncer/syncer_test.go b/reparo/syncer/syncer_test.go index 03d6b4299..69c02c2b7 100644 --- a/reparo/syncer/syncer_test.go +++ b/reparo/syncer/syncer_test.go @@ -1,7 +1,6 @@ package syncer import ( - "github.com/pingcap/check" ) @@ -11,7 +10,7 @@ var _ = check.Suite(&testSyncerSuite{}) func (s *testSyncerSuite) TestNewSyncer(c *check.C) { cfg := new(DBConfig) - + syncer, err := New("mysql", cfg) c.Assert(err, check.IsNil) _, ok := syncer.(*mysqlSyncer) @@ -31,4 +30,4 @@ func (s *testSyncerSuite) TestNewSyncer(c *check.C) { c.Assert(err, check.IsNil) _, ok = syncer.(*MemSyncer) c.Assert(ok, check.Equals, false) -} \ No newline at end of file +} diff --git a/reparo/syncer/translate_test.go b/reparo/syncer/translate_test.go index b1e7a6c29..56f7032c9 100644 --- a/reparo/syncer/translate_test.go +++ b/reparo/syncer/translate_test.go @@ -31,7 +31,7 @@ func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { Events: []pb.Event{}, }, }: &loader.Txn{ - // DMLs: []*loader.DML{}, + // DMLs: []*loader.DML{}, }, } diff --git a/reparo/syncer/util.go b/reparo/syncer/util.go index ad9dae422..d52449fd0 100644 --- a/reparo/syncer/util.go +++ b/reparo/syncer/util.go @@ -16,6 +16,7 @@ func formatValueToString(data types.Datum, tp byte) string { } fallthrough default: + fmt.Println(tp) return fmt.Sprintf("%v", val) } } diff --git a/reparo/syncer/util_test.go b/reparo/syncer/util_test.go index c0702f875..3af2c26e8 100644 --- a/reparo/syncer/util_test.go +++ b/reparo/syncer/util_test.go @@ -17,62 +17,62 @@ func (s *testUtilSuite) TestFormatValue(c *check.C) { c.Assert(err, check.IsNil) testCases := []struct { - value interface{} - tp byte - changeTp byte + value interface{} + tp byte + changeTp byte expectStr string }{ { - value: 1, - tp: mysql.TypeInt24, - changeTp: mysql.TypeFloat, + value: 1, + tp: mysql.TypeInt24, + changeTp: mysql.TypeFloat, expectStr: "1", }, { - value: 1.11, - tp: mysql.TypeFloat, - changeTp: mysql.TypeDouble, + value: 1.11, + tp: mysql.TypeFloat, + changeTp: mysql.TypeDouble, expectStr: "1.11", }, { - value: 1.11, - tp: mysql.TypeDouble, - changeTp: mysql.TypeFloat, + value: 1.11, + tp: mysql.TypeDouble, + changeTp: mysql.TypeFloat, expectStr: "1.11", }, { - value: "a", - tp: mysql.TypeVarchar, + value: "a", + tp: mysql.TypeVarchar, changeTp: mysql.TypeString, expectStr: "a", }, { - value: "a", - tp: mysql.TypeString, - changeTp: mysql.TypeVarchar, + value: "a", + tp: mysql.TypeString, + changeTp: mysql.TypeVarchar, expectStr: "a", }, { - value: datetime, - tp: mysql.TypeDatetime, - changeTp: mysql.TypeDatetime, + value: datetime, + tp: mysql.TypeDatetime, + changeTp: mysql.TypeDatetime, expectStr: "2019-04-15 12:12:12 +0000 UTC", }, { - value: time.Duration(time.Second), - tp: mysql.TypeDuration, + value: time.Duration(time.Second), + tp: mysql.TypeDuration, changeTp: mysql.TypeDuration, expectStr: "1s", }, } - + for _, testCase := range testCases { datum := types.NewDatum(testCase.value) - str := formatValueToString(types.NewDatum(testCase.value), testCase.tp) + str := formatValueToString(datum, testCase.tp) c.Assert(str, check.Equals, testCase.expectStr) newDatum := formatValue(datum, testCase.changeTp) newStr := formatValueToString(newDatum, testCase.changeTp) c.Assert(newStr, check.Equals, testCase.expectStr) } -} \ No newline at end of file +} From b262cfcb00f66d2c2686c8c6885ec6ed3afe3b92 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 16 Apr 2019 17:22:20 +0800 Subject: [PATCH 06/31] add test for memory syncer --- reparo/syncer/memory_test.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 reparo/syncer/memory_test.go diff --git a/reparo/syncer/memory_test.go b/reparo/syncer/memory_test.go new file mode 100644 index 000000000..4ce833c33 --- /dev/null +++ b/reparo/syncer/memory_test.go @@ -0,0 +1,35 @@ +package syncer + +import ( + "github.com/pingcap/check" + pb "github.com/pingcap/tidb-binlog/proto/binlog" +) + +type testMemorySuite struct{} + +var _ = check.Suite(&testMemorySuite{}) + +func (s *testMemorySuite) TestMemorySyncer(c *check.C) { + syncer, err := newMemSyncer() + c.Assert(err, check.IsNil) + + ddlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DDL, + DdlQuery: []byte("create database test;"), + } + dmlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DML, + } + + err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) {}) + c.Assert(err, check.IsNil) + + err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) {}) + c.Assert(err, check.IsNil) + + err = syncer.Close() + c.Assert(err, check.IsNil) + + binlog := syncer.GetBinlogs() + c.Assert(binlog, check.DeepEquals, []*pb.Binlog{ddlBinlog, dmlBinlog}) +} \ No newline at end of file From 7dd35db6e605f4fec6b7c6b45d2a7729e251ecaf Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 16 Apr 2019 20:14:05 +0800 Subject: [PATCH 07/31] minor update --- reparo/config_test.go | 4 +- reparo/syncer/print_test.go | 62 +++------------ reparo/syncer/translate.go | 4 +- reparo/syncer/translate_test.go | 131 +++++++++++++++++++++++++++++++- 4 files changed, 142 insertions(+), 59 deletions(-) diff --git a/reparo/config_test.go b/reparo/config_test.go index 3b1d0e809..7bbf8e19e 100644 --- a/reparo/config_test.go +++ b/reparo/config_test.go @@ -27,8 +27,8 @@ func (s *testConfigSuite) TestTSORangeParsing(c *check.C) { err := config.Parse([]string{ "-data-dir=/tmp/data", - "-start-datetime=2019-01-01 15:07:00", - "-stop-datetime=2019-02-01 15:07:00", + "-start-datetime=2019-01-01 15:07:00", + "-stop-datetime=2019-02-01 15:07:00", }) c.Assert(err, check.IsNil) c.Assert(config.StartTSO, check.Not(check.Equals), 0) diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index 0c560ec2d..d01b1d929 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -3,9 +3,9 @@ package syncer import ( "github.com/pingcap/check" pb "github.com/pingcap/tidb-binlog/proto/binlog" - "github.com/pingcap/tidb/mysql" + //"github.com/pingcap/tidb/mysql" //"github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/codec" + //"github.com/pingcap/tidb/util/codec" ) type testPrintSuite struct{} @@ -57,76 +57,36 @@ func (s *testPrintSuite) TestPrintDDL(c *check.C) { } func (s *testPrintSuite) TestPrintRow(c *check.C) { - col1 := &pb.Column{ - Name: "a", - Tp: []byte{mysql.TypeInt24}, - MysqlType: "int", - Value: encodeIntValue(1), - } - col2 := &pb.Column{ - Name: "b", - Tp: []byte{mysql.TypeVarchar}, - MysqlType: "varchar", - Value: encodeBytesValue([]byte("test")), - } - col3 := &pb.Column{ - Name: "c", - Tp: []byte{mysql.TypeVarchar}, - MysqlType: "varchar", - Value: encodeBytesValue([]byte("test")), - ChangedValue: encodeBytesValue([]byte("abc")), - } + cols := generateColumns() - col1Bytes, err := col1.Marshal() - c.Assert(err, check.IsNil) - col1Str := getInsertOrDeleteColumnStr(col1Bytes) + col1Str := getInsertOrDeleteColumnStr(cols[0]) c.Assert(col1Str, check.Equals, "a(int): 1 \n") - col2Bytes, err := col2.Marshal() - c.Assert(err, check.IsNil) - col2Str := getInsertOrDeleteColumnStr(col2Bytes) + col2Str := getInsertOrDeleteColumnStr(cols[1]) c.Assert(col2Str, check.Equals, "b(varchar): test \n") - col3Bytes, err := col3.Marshal() - c.Assert(err, check.IsNil) - col3Str := getUpdateColumnStr(col3Bytes) + col3Str := getUpdateColumnStr(cols[2]) c.Assert(col3Str, check.Equals, "c(varchar): test => abc\n") - insertEvent := &pb.Event { Tp: pb.EventType_Insert, - Row: [][]byte{col1Bytes, col2Bytes}, + Row: [][]byte{cols[0], cols[1]}, } eventStr := getEventDataStr(insertEvent) c.Assert(eventStr, check.Equals, "a(int): 1 \nb(varchar): test \n") deleteEvent := &pb.Event { Tp: pb.EventType_Delete, - Row: [][]byte{col1Bytes, col2Bytes}, + Row: [][]byte{cols[0], cols[1]}, } eventStr = getEventDataStr(deleteEvent) c.Assert(eventStr, check.Equals, "a(int): 1 \nb(varchar): test \n") updateEvent := &pb.Event { - Tp: pb.EventType_Delete, - Row: [][]byte{col3Bytes, col3Bytes}, + Tp: pb.EventType_Update, + Row: [][]byte{cols[2]}, } eventStr = getEventDataStr(updateEvent) - c.Assert(eventStr, check.Equals, "c(varchar): test \nc(varchar): test \n") -} - -func encodeIntValue(value int64) []byte { - b := make([]byte, 0, 5) - // 3 means intFlag - b = append(b, 3) - b = codec.EncodeInt(b, value) - return b + c.Assert(eventStr, check.Equals, "c(varchar): test => abc\n") } -func encodeBytesValue(value []byte) []byte { - b := make([]byte, 0, 5) - // 1 means bytesFlag - b = append(b, 1) - b = codec.EncodeBytes(b, value) - return b -} \ No newline at end of file diff --git a/reparo/syncer/translate.go b/reparo/syncer/translate.go index 65681609e..5603c7e8e 100644 --- a/reparo/syncer/translate.go +++ b/reparo/syncer/translate.go @@ -48,11 +48,11 @@ func pbBinlogToTxn(binlog *pb.Binlog) (txn *loader.Txn, err error) { return nil, errors.Trace(err) } - _, newDatum, err := codec.DecodeOne(col.Value) + _, oldDatum, err := codec.DecodeOne(col.Value) if err != nil { return nil, errors.Trace(err) } - _, oldDatum, err := codec.DecodeOne(col.ChangedValue) + _, newDatum, err := codec.DecodeOne(col.ChangedValue) if err != nil { return nil, errors.Trace(err) } diff --git a/reparo/syncer/translate_test.go b/reparo/syncer/translate_test.go index 56f7032c9..b886adf90 100644 --- a/reparo/syncer/translate_test.go +++ b/reparo/syncer/translate_test.go @@ -6,6 +6,9 @@ import ( "github.com/pingcap/check" "github.com/pingcap/tidb-binlog/pkg/loader" pb "github.com/pingcap/tidb-binlog/proto/binlog" + "github.com/pingcap/tidb/mysql" + //"github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" ) func Test(t *testing.T) { check.TestingT(t) } @@ -24,20 +27,140 @@ func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { SQL: "use db1; create table table1(id int)", }, }, - // TODO add dml test &pb.Binlog{ Tp: pb.BinlogType_DML, DmlData: &pb.DMLData{ - Events: []pb.Event{}, + Events: generateDMLEvents(), }, }: &loader.Txn{ - // DMLs: []*loader.DML{}, + DMLs: []*loader.DML{ + { + Database: "test", + Table: "t1", + Tp: loader.InsertDMLType, + Values: map[string]interface{}{ + "a": int64(1), + "b": "test", + }, + }, { + Database: "test", + Table: "t1", + Tp: loader.DeleteDMLType, + Values: map[string]interface{}{ + "a": int64(1), + "b": "test", + }, + }, { + Database: "test", + Table: "t1", + Tp: loader.UpdateDMLType, + Values: map[string]interface{}{ + "c": "abc", + }, + OldValues: map[string]interface{}{ + "c": "test", + }, + }, + }, }, } for binlog, txn := range tests { getTxn, err := pbBinlogToTxn(binlog) c.Assert(err, check.IsNil) - c.Assert(getTxn, check.DeepEquals, txn) + //c.Assert(getTxn, check.DeepEquals, txn) + c.Assert(getTxn.DDL, check.DeepEquals, txn.DDL) + for i, dml := range getTxn.DMLs { + c.Assert(dml.Database, check.Equals, txn.DMLs[i].Database) + c.Assert(dml.Table, check.Equals, txn.DMLs[i].Table) + c.Assert(dml.Tp, check.Equals, txn.DMLs[i].Tp) + c.Assert(dml.Values, check.DeepEquals, txn.DMLs[i].Values) + c.Assert(dml.OldValues, check.DeepEquals, txn.DMLs[i].OldValues) + } + } +} + +func (s *testTranslateSuite) TestGenColsAndArgs(c *check.C) { + cols, args, err := genColsAndArgs(generateColumns()) + c.Assert(err, check.IsNil) + c.Assert(cols, check.DeepEquals, []string{"a", "b", "c"}) + c.Assert(args, check.DeepEquals, []interface {}{int64(1), "test", "test"}) +} + +// generateDMLEvents generates three DML Events for test. +func generateDMLEvents() []pb.Event { + schema := "test" + table := "t1" + cols := generateColumns() + + return []pb.Event { + { + Tp: pb.EventType_Insert, + SchemaName: &schema, + TableName: &table, + Row: [][]byte{cols[0], cols[1]}, + },{ + Tp: pb.EventType_Delete, + SchemaName: &schema, + TableName: &table, + Row: [][]byte{cols[0], cols[1]}, + },{ + Tp: pb.EventType_Update, + SchemaName: &schema, + TableName: &table, + Row: [][]byte{cols[2]}, + }, } } + +// generateColumns generates three columns for test, the last one used for update. +func generateColumns() [][]byte { + colsBytes := make([][]byte, 0, 3) + + cols := []*pb.Column { + { + Name: "a", + Tp: []byte{mysql.TypeInt24}, + MysqlType: "int", + Value: encodeIntValue(1), + }, { + Name: "b", + Tp: []byte{mysql.TypeVarchar}, + MysqlType: "varchar", + Value: encodeBytesValue([]byte("test")), + }, { + Name: "c", + Tp: []byte{mysql.TypeVarchar}, + MysqlType: "varchar", + Value: encodeBytesValue([]byte("test")), + ChangedValue: encodeBytesValue([]byte("abc")), + }, + } + + for _, col := range cols { + colBytes, err := col.Marshal() + if err != nil { + panic(err) + } + + colsBytes = append(colsBytes, colBytes) + } + + return colsBytes +} + +func encodeIntValue(value int64) []byte { + b := make([]byte, 0, 5) + // 3 means intFlag + b = append(b, 3) + b = codec.EncodeInt(b, value) + return b +} + +func encodeBytesValue(value []byte) []byte { + b := make([]byte, 0, 5) + // 1 means bytesFlag + b = append(b, 1) + b = codec.EncodeBytes(b, value) + return b +} \ No newline at end of file From a54c49a2324e12446e1a03b5345f03d90403892b Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 16 Apr 2019 20:35:24 +0800 Subject: [PATCH 08/31] minor update --- reparo/syncer/mysql.go | 1 + reparo/syncer/print_test.go | 9 ++++++--- reparo/syncer/syncer_test.go | 11 +++-------- reparo/syncer/translate_test.go | 2 -- reparo/syncer/util.go | 1 - 5 files changed, 10 insertions(+), 14 deletions(-) diff --git a/reparo/syncer/mysql.go b/reparo/syncer/mysql.go index d8b570dd2..a40b2c89c 100644 --- a/reparo/syncer/mysql.go +++ b/reparo/syncer/mysql.go @@ -34,6 +34,7 @@ var _ Syncer = &mysqlSyncer{} func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) { db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port) if err != nil { + log.Infof("create db failed %v", err) return nil, errors.Trace(err) } diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index d01b1d929..0c46d5108 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -3,9 +3,6 @@ package syncer import ( "github.com/pingcap/check" pb "github.com/pingcap/tidb-binlog/proto/binlog" - //"github.com/pingcap/tidb/mysql" - //"github.com/pingcap/tidb/types" - //"github.com/pingcap/tidb/util/codec" ) type testPrintSuite struct{} @@ -74,6 +71,8 @@ func (s *testPrintSuite) TestPrintRow(c *check.C) { } eventStr := getEventDataStr(insertEvent) c.Assert(eventStr, check.Equals, "a(int): 1 \nb(varchar): test \n") + rowStr := getInsertOrDeleteRowStr(insertEvent.Row) + c.Assert(rowStr, check.Equals, "a(int): 1 \nb(varchar): test \n") deleteEvent := &pb.Event { Tp: pb.EventType_Delete, @@ -81,6 +80,8 @@ func (s *testPrintSuite) TestPrintRow(c *check.C) { } eventStr = getEventDataStr(deleteEvent) c.Assert(eventStr, check.Equals, "a(int): 1 \nb(varchar): test \n") + rowStr = getInsertOrDeleteRowStr(deleteEvent.Row) + c.Assert(rowStr, check.Equals, "a(int): 1 \nb(varchar): test \n") updateEvent := &pb.Event { Tp: pb.EventType_Update, @@ -88,5 +89,7 @@ func (s *testPrintSuite) TestPrintRow(c *check.C) { } eventStr = getEventDataStr(updateEvent) c.Assert(eventStr, check.Equals, "c(varchar): test => abc\n") + rowStr = getUpdateRowStr(updateEvent.Row) + c.Assert(rowStr, check.Equals, "c(varchar): test => abc\n") } diff --git a/reparo/syncer/syncer_test.go b/reparo/syncer/syncer_test.go index 69c02c2b7..839ef2afc 100644 --- a/reparo/syncer/syncer_test.go +++ b/reparo/syncer/syncer_test.go @@ -11,14 +11,9 @@ var _ = check.Suite(&testSyncerSuite{}) func (s *testSyncerSuite) TestNewSyncer(c *check.C) { cfg := new(DBConfig) - syncer, err := New("mysql", cfg) + syncer, err := New("print", cfg) c.Assert(err, check.IsNil) - _, ok := syncer.(*mysqlSyncer) - c.Assert(ok, check.Equals, true) - - syncer, err = New("print", cfg) - c.Assert(err, check.IsNil) - _, ok = syncer.(*printSyncer) + _, ok := syncer.(*printSyncer) c.Assert(ok, check.Equals, true) syncer, err = New("memory", cfg) @@ -26,7 +21,7 @@ func (s *testSyncerSuite) TestNewSyncer(c *check.C) { _, ok = syncer.(*MemSyncer) c.Assert(ok, check.Equals, true) - syncer, err = New("mysql", cfg) + syncer, err = New("print", cfg) c.Assert(err, check.IsNil) _, ok = syncer.(*MemSyncer) c.Assert(ok, check.Equals, false) diff --git a/reparo/syncer/translate_test.go b/reparo/syncer/translate_test.go index b886adf90..51946a109 100644 --- a/reparo/syncer/translate_test.go +++ b/reparo/syncer/translate_test.go @@ -7,7 +7,6 @@ import ( "github.com/pingcap/tidb-binlog/pkg/loader" pb "github.com/pingcap/tidb-binlog/proto/binlog" "github.com/pingcap/tidb/mysql" - //"github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" ) @@ -68,7 +67,6 @@ func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { for binlog, txn := range tests { getTxn, err := pbBinlogToTxn(binlog) c.Assert(err, check.IsNil) - //c.Assert(getTxn, check.DeepEquals, txn) c.Assert(getTxn.DDL, check.DeepEquals, txn.DDL) for i, dml := range getTxn.DMLs { c.Assert(dml.Database, check.Equals, txn.DMLs[i].Database) diff --git a/reparo/syncer/util.go b/reparo/syncer/util.go index d52449fd0..a39337905 100644 --- a/reparo/syncer/util.go +++ b/reparo/syncer/util.go @@ -21,7 +21,6 @@ func formatValueToString(data types.Datum, tp byte) string { } } -// TODO: test it. func formatValue(value types.Datum, tp byte) types.Datum { if value.GetValue() == nil { return value From 04ab7bc3f39aa0756777a9c7ffdc74f48ec1c169 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 16 Apr 2019 20:54:51 +0800 Subject: [PATCH 09/31] update reparo integration test --- tests/reparo/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/reparo/run.sh b/tests/reparo/run.sh index 3d2882a9a..65a04869f 100755 --- a/tests/reparo/run.sh +++ b/tests/reparo/run.sh @@ -20,7 +20,7 @@ run_sql "CREATE TABLE \`reparo_test\`.\`test\`(\`id\` int, \`name\` varchar(10), run_sql "INSERT INTO \`reparo_test\`.\`test\` VALUES(1, 'a', 'a'), (2, 'b', 'b')" run_sql "INSERT INTO \`reparo_test\`.\`test\` VALUES(3, 'c', 'c'), (4, 'd', 'c')" run_sql "UPDATE \`reparo_test\`.\`test\` SET \`name\` = 'bb' where \`id\` = 2" -run_sql "DELETE FROM \`reparo_test\`.\`test\` WHERE \`name\` = 'bb'" +run_sql "DELETE FROM \`reparo_test\`.\`test\` WHERE \`id\` = '1'" run_sql "INSERT INTO \`reparo_test\`.\`test\` VALUES(5, 'e', 'e')" sleep 5 From e78bc8c5ccac99862fad6374cfb40d358bf26e68 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 16 Apr 2019 21:04:38 +0800 Subject: [PATCH 10/31] add mysql syncer test --- reparo/syncer/mysql.go | 8 +------- reparo/syncer/mysql_test.go | 21 +++++++++++++++++++++ reparo/syncer/syncer.go | 10 +++++++++- 3 files changed, 31 insertions(+), 8 deletions(-) create mode 100644 reparo/syncer/mysql_test.go diff --git a/reparo/syncer/mysql.go b/reparo/syncer/mysql.go index a40b2c89c..57c6335b9 100644 --- a/reparo/syncer/mysql.go +++ b/reparo/syncer/mysql.go @@ -31,13 +31,7 @@ type mysqlSyncer struct { var _ Syncer = &mysqlSyncer{} -func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) { - db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port) - if err != nil { - log.Infof("create db failed %v", err) - return nil, errors.Trace(err) - } - +func newMysqlSyncer(db *sql.DB) (*mysqlSyncer, error) { loader, err := loader.NewLoader(db, loader.WorkerCount(16), loader.BatchSize(20)) if err != nil { return nil, errors.Annotate(err, "new loader failed") diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go new file mode 100644 index 000000000..8ac23bdd5 --- /dev/null +++ b/reparo/syncer/mysql_test.go @@ -0,0 +1,21 @@ +package syncer + +import ( + "github.com/pingcap/check" + sqlmock "github.com/DATA-DOG/go-sqlmock" +) + +type testMysqlSuite struct{} + +var _ = check.Suite(&testMysqlSuite{}) + +func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { + db, _, err := sqlmock.New() + c.Assert(err, check.IsNil) + + syncer, err := newMysqlSyncer(db) + c.Assert(err, check.IsNil) + + err = syncer.Close() + c.Assert(err, check.IsNil) +} \ No newline at end of file diff --git a/reparo/syncer/syncer.go b/reparo/syncer/syncer.go index fe4c23d80..60a0678ee 100644 --- a/reparo/syncer/syncer.go +++ b/reparo/syncer/syncer.go @@ -3,6 +3,9 @@ package syncer import ( "fmt" + "github.com/ngaut/log" + "github.com/pingcap/errors" + "github.com/pingcap/tidb-binlog/pkg/loader" pb "github.com/pingcap/tidb-binlog/proto/binlog" ) @@ -19,7 +22,12 @@ type Syncer interface { func New(name string, cfg *DBConfig) (Syncer, error) { switch name { case "mysql": - return newMysqlSyncer(cfg) + db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port) + if err != nil { + log.Infof("create db failed %v", err) + return nil, errors.Trace(err) + } + return newMysqlSyncer(db) case "print": return newPrintSyncer() case "memory": From 96452eba63da9dadc16c80920b9873dd7d1ff18a Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 16 Apr 2019 21:12:52 +0800 Subject: [PATCH 11/31] minor update --- reparo/syncer/syncer.go | 2 -- reparo/syncer/util.go | 1 - 2 files changed, 3 deletions(-) diff --git a/reparo/syncer/syncer.go b/reparo/syncer/syncer.go index 60a0678ee..54487623e 100644 --- a/reparo/syncer/syncer.go +++ b/reparo/syncer/syncer.go @@ -3,7 +3,6 @@ package syncer import ( "fmt" - "github.com/ngaut/log" "github.com/pingcap/errors" "github.com/pingcap/tidb-binlog/pkg/loader" pb "github.com/pingcap/tidb-binlog/proto/binlog" @@ -24,7 +23,6 @@ func New(name string, cfg *DBConfig) (Syncer, error) { case "mysql": db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port) if err != nil { - log.Infof("create db failed %v", err) return nil, errors.Trace(err) } return newMysqlSyncer(db) diff --git a/reparo/syncer/util.go b/reparo/syncer/util.go index a39337905..3d31e7b38 100644 --- a/reparo/syncer/util.go +++ b/reparo/syncer/util.go @@ -16,7 +16,6 @@ func formatValueToString(data types.Datum, tp byte) string { } fallthrough default: - fmt.Println(tp) return fmt.Sprintf("%v", val) } } From 768f8f14cbc2f3d0b4cb9dab77c17b568ff100dc Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 17 Apr 2019 13:11:03 +0800 Subject: [PATCH 12/31] minor fix --- reparo/syncer/print.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reparo/syncer/print.go b/reparo/syncer/print.go index 8ee6ecc0d..3e47939e6 100644 --- a/reparo/syncer/print.go +++ b/reparo/syncer/print.go @@ -25,7 +25,7 @@ func (p *printSyncer) Sync(pbBinlog *pb.Binlog, cb func(binlog *pb.Binlog)) erro case pb.BinlogType_DML: for _, event := range pbBinlog.GetDmlData().GetEvents() { header := getEventHeaderStr(&event) - info = header + getEventDataStr(&event) + info += header + getEventDataStr(&event) } default: return errors.Errorf("unknown type: %v", pbBinlog.Tp) From 8f875b5c7edfd5942466a06f770d697fe71a667b Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 18 Apr 2019 16:23:44 +0800 Subject: [PATCH 13/31] update val name --- reparo/syncer/translate_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/reparo/syncer/translate_test.go b/reparo/syncer/translate_test.go index bb7b0972c..e520f313e 100644 --- a/reparo/syncer/translate_test.go +++ b/reparo/syncer/translate_test.go @@ -107,7 +107,7 @@ func generateDMLEvents() []pb.Event { // generateColumns generates three columns for test, the last one used for update. func generateColumns() [][]byte { - colsBytes := make([][]byte, 0, 3) + allColBytes := make([][]byte, 0, 3) cols := []*pb.Column { { @@ -135,10 +135,10 @@ func generateColumns() [][]byte { panic(err) } - colsBytes = append(colsBytes, colBytes) + allColBytes = append(allColBytes, colBytes) } - return colsBytes + return allColBytes } func encodeIntValue(value int64) []byte { From 31a9ace9db49f45436a3ad4cbf461f3c37818462 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 18 Apr 2019 16:42:51 +0800 Subject: [PATCH 14/31] address comment --- reparo/syncer/mysql.go | 11 ++++++++++- reparo/syncer/mysql_test.go | 2 +- reparo/syncer/syncer.go | 8 +------- reparo/syncer/util_test.go | 12 ------------ 4 files changed, 12 insertions(+), 21 deletions(-) diff --git a/reparo/syncer/mysql.go b/reparo/syncer/mysql.go index 57c6335b9..f035be883 100644 --- a/reparo/syncer/mysql.go +++ b/reparo/syncer/mysql.go @@ -31,7 +31,16 @@ type mysqlSyncer struct { var _ Syncer = &mysqlSyncer{} -func newMysqlSyncer(db *sql.DB) (*mysqlSyncer, error) { +func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) { + db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port) + if err != nil { + return nil, errors.Trace(err) + } + + return newMysqlSyncerFromSQLDB(db) +} + +func newMysqlSyncerFromSQLDB(db *sql.DB) (*mysqlSyncer, error) { loader, err := loader.NewLoader(db, loader.WorkerCount(16), loader.BatchSize(20)) if err != nil { return nil, errors.Annotate(err, "new loader failed") diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index 8ac23bdd5..faac54ed1 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -13,7 +13,7 @@ func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { db, _, err := sqlmock.New() c.Assert(err, check.IsNil) - syncer, err := newMysqlSyncer(db) + syncer, err := newMysqlSyncerFromSQLDB(db) c.Assert(err, check.IsNil) err = syncer.Close() diff --git a/reparo/syncer/syncer.go b/reparo/syncer/syncer.go index 54487623e..fe4c23d80 100644 --- a/reparo/syncer/syncer.go +++ b/reparo/syncer/syncer.go @@ -3,8 +3,6 @@ package syncer import ( "fmt" - "github.com/pingcap/errors" - "github.com/pingcap/tidb-binlog/pkg/loader" pb "github.com/pingcap/tidb-binlog/proto/binlog" ) @@ -21,11 +19,7 @@ type Syncer interface { func New(name string, cfg *DBConfig) (Syncer, error) { switch name { case "mysql": - db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port) - if err != nil { - return nil, errors.Trace(err) - } - return newMysqlSyncer(db) + return newMysqlSyncer(cfg) case "print": return newPrintSyncer() case "memory": diff --git a/reparo/syncer/util_test.go b/reparo/syncer/util_test.go index 3af2c26e8..f5f1591f9 100644 --- a/reparo/syncer/util_test.go +++ b/reparo/syncer/util_test.go @@ -19,49 +19,41 @@ func (s *testUtilSuite) TestFormatValue(c *check.C) { testCases := []struct { value interface{} tp byte - changeTp byte expectStr string }{ { value: 1, tp: mysql.TypeInt24, - changeTp: mysql.TypeFloat, expectStr: "1", }, { value: 1.11, tp: mysql.TypeFloat, - changeTp: mysql.TypeDouble, expectStr: "1.11", }, { value: 1.11, tp: mysql.TypeDouble, - changeTp: mysql.TypeFloat, expectStr: "1.11", }, { value: "a", tp: mysql.TypeVarchar, - changeTp: mysql.TypeString, expectStr: "a", }, { value: "a", tp: mysql.TypeString, - changeTp: mysql.TypeVarchar, expectStr: "a", }, { value: datetime, tp: mysql.TypeDatetime, - changeTp: mysql.TypeDatetime, expectStr: "2019-04-15 12:12:12 +0000 UTC", }, { value: time.Duration(time.Second), tp: mysql.TypeDuration, - changeTp: mysql.TypeDuration, expectStr: "1s", }, } @@ -70,9 +62,5 @@ func (s *testUtilSuite) TestFormatValue(c *check.C) { datum := types.NewDatum(testCase.value) str := formatValueToString(datum, testCase.tp) c.Assert(str, check.Equals, testCase.expectStr) - - newDatum := formatValue(datum, testCase.changeTp) - newStr := formatValueToString(newDatum, testCase.changeTp) - c.Assert(newStr, check.Equals, testCase.expectStr) } } From d2eb5ad43c91e4576ac417fa8542f9e2eceee9a3 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Fri, 19 Apr 2019 13:08:51 +0800 Subject: [PATCH 15/31] use strings.Builder --- reparo/syncer/memory_test.go | 2 +- reparo/syncer/mysql.go | 6 ++-- reparo/syncer/mysql_test.go | 4 +-- reparo/syncer/print.go | 23 +++++++------- reparo/syncer/print_test.go | 11 +++---- reparo/syncer/translate_test.go | 56 ++++++++++++++++----------------- 6 files changed, 51 insertions(+), 51 deletions(-) diff --git a/reparo/syncer/memory_test.go b/reparo/syncer/memory_test.go index 4ce833c33..8a719c0c4 100644 --- a/reparo/syncer/memory_test.go +++ b/reparo/syncer/memory_test.go @@ -32,4 +32,4 @@ func (s *testMemorySuite) TestMemorySyncer(c *check.C) { binlog := syncer.GetBinlogs() c.Assert(binlog, check.DeepEquals, []*pb.Binlog{ddlBinlog, dmlBinlog}) -} \ No newline at end of file +} diff --git a/reparo/syncer/mysql.go b/reparo/syncer/mysql.go index f035be883..7a7ffd442 100644 --- a/reparo/syncer/mysql.go +++ b/reparo/syncer/mysql.go @@ -32,9 +32,9 @@ type mysqlSyncer struct { var _ Syncer = &mysqlSyncer{} func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) { - db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port) - if err != nil { - return nil, errors.Trace(err) + db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port) + if err != nil { + return nil, errors.Trace(err) } return newMysqlSyncerFromSQLDB(db) diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index faac54ed1..97903ed4a 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -1,8 +1,8 @@ package syncer import ( - "github.com/pingcap/check" sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/pingcap/check" ) type testMysqlSuite struct{} @@ -18,4 +18,4 @@ func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { err = syncer.Close() c.Assert(err, check.IsNil) -} \ No newline at end of file +} diff --git a/reparo/syncer/print.go b/reparo/syncer/print.go index 3e47939e6..82c01bf8d 100644 --- a/reparo/syncer/print.go +++ b/reparo/syncer/print.go @@ -2,6 +2,7 @@ package syncer import ( "fmt" + "strings" "github.com/ngaut/log" "github.com/pingcap/errors" @@ -18,21 +19,21 @@ func newPrintSyncer() (*printSyncer, error) { } func (p *printSyncer) Sync(pbBinlog *pb.Binlog, cb func(binlog *pb.Binlog)) error { - var info string + var info strings.Builder switch pbBinlog.Tp { case pb.BinlogType_DDL: - info = getDDLStr(pbBinlog) + info.WriteString(getDDLStr(pbBinlog)) case pb.BinlogType_DML: for _, event := range pbBinlog.GetDmlData().GetEvents() { - header := getEventHeaderStr(&event) - info += header + getEventDataStr(&event) + info.WriteString(getEventHeaderStr(&event)) + info.WriteString(getEventDataStr(&event)) } default: return errors.Errorf("unknown type: %v", pbBinlog.Tp) } - fmt.Print(info) + fmt.Print(info.String()) cb(pbBinlog) return nil @@ -64,12 +65,12 @@ func getEventHeaderStr(event *pb.Event) string { } func getUpdateRowStr(row [][]byte) string { - var eventStr string + var rowStr strings.Builder for _, col := range row { - eventStr += getUpdateColumnStr(col) + rowStr.WriteString(getUpdateColumnStr(col)) } - return eventStr + return rowStr.String() } func getUpdateColumnStr(column []byte) string { @@ -97,12 +98,12 @@ func getUpdateColumnStr(column []byte) string { } func getInsertOrDeleteRowStr(row [][]byte) string { - var eventStr string + var rowStr strings.Builder for _, col := range row { - eventStr += getInsertOrDeleteColumnStr(col) + rowStr.WriteString(getInsertOrDeleteColumnStr(col)) } - return eventStr + return rowStr.String() } func getInsertOrDeleteColumnStr(column []byte) string { diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index 0c46d5108..7c4fa887b 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -65,7 +65,7 @@ func (s *testPrintSuite) TestPrintRow(c *check.C) { col3Str := getUpdateColumnStr(cols[2]) c.Assert(col3Str, check.Equals, "c(varchar): test => abc\n") - insertEvent := &pb.Event { + insertEvent := &pb.Event{ Tp: pb.EventType_Insert, Row: [][]byte{cols[0], cols[1]}, } @@ -74,22 +74,21 @@ func (s *testPrintSuite) TestPrintRow(c *check.C) { rowStr := getInsertOrDeleteRowStr(insertEvent.Row) c.Assert(rowStr, check.Equals, "a(int): 1 \nb(varchar): test \n") - deleteEvent := &pb.Event { + deleteEvent := &pb.Event{ Tp: pb.EventType_Delete, - Row: [][]byte{cols[0], cols[1]}, + Row: [][]byte{cols[0], cols[1]}, } eventStr = getEventDataStr(deleteEvent) c.Assert(eventStr, check.Equals, "a(int): 1 \nb(varchar): test \n") rowStr = getInsertOrDeleteRowStr(deleteEvent.Row) c.Assert(rowStr, check.Equals, "a(int): 1 \nb(varchar): test \n") - updateEvent := &pb.Event { + updateEvent := &pb.Event{ Tp: pb.EventType_Update, - Row: [][]byte{cols[2]}, + Row: [][]byte{cols[2]}, } eventStr = getEventDataStr(updateEvent) c.Assert(eventStr, check.Equals, "c(varchar): test => abc\n") rowStr = getUpdateRowStr(updateEvent.Row) c.Assert(rowStr, check.Equals, "c(varchar): test => abc\n") } - diff --git a/reparo/syncer/translate_test.go b/reparo/syncer/translate_test.go index e520f313e..dda01204f 100644 --- a/reparo/syncer/translate_test.go +++ b/reparo/syncer/translate_test.go @@ -18,26 +18,26 @@ var _ = check.Suite(&testTranslateSuite{}) func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { tests := map[*pb.Binlog]*loader.Txn{ - &pb.Binlog{ + { Tp: pb.BinlogType_DDL, DdlQuery: []byte("use db1; create table table1(id int)"), - }: &loader.Txn{ + }: { DDL: &loader.DDL{ SQL: "use db1; create table table1(id int)", }, }, - &pb.Binlog{ + { Tp: pb.BinlogType_DML, DmlData: &pb.DMLData{ Events: generateDMLEvents(), }, - }: &loader.Txn{ + }: { DMLs: []*loader.DML{ { Database: "test", Table: "t1", Tp: loader.InsertDMLType, - Values: map[string]interface{}{ + Values: map[string]interface{}{ "a": int64(1), "b": "test", }, @@ -45,7 +45,7 @@ func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { Database: "test", Table: "t1", Tp: loader.DeleteDMLType, - Values: map[string]interface{}{ + Values: map[string]interface{}{ "a": int64(1), "b": "test", }, @@ -53,10 +53,10 @@ func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { Database: "test", Table: "t1", Tp: loader.UpdateDMLType, - Values: map[string]interface{}{ + Values: map[string]interface{}{ "c": "abc", }, - OldValues: map[string]interface{}{ + OldValues: map[string]interface{}{ "c": "test", }, }, @@ -76,7 +76,7 @@ func (s *testTranslateSuite) TestGenColsAndArgs(c *check.C) { cols, args, err := genColsAndArgs(generateColumns()) c.Assert(err, check.IsNil) c.Assert(cols, check.DeepEquals, []string{"a", "b", "c"}) - c.Assert(args, check.DeepEquals, []interface {}{int64(1), "test", "test"}) + c.Assert(args, check.DeepEquals, []interface{}{int64(1), "test", "test"}) } // generateDMLEvents generates three DML Events for test. @@ -85,22 +85,22 @@ func generateDMLEvents() []pb.Event { table := "t1" cols := generateColumns() - return []pb.Event { + return []pb.Event{ { - Tp: pb.EventType_Insert, + Tp: pb.EventType_Insert, SchemaName: &schema, - TableName: &table, - Row: [][]byte{cols[0], cols[1]}, - },{ - Tp: pb.EventType_Delete, + TableName: &table, + Row: [][]byte{cols[0], cols[1]}, + }, { + Tp: pb.EventType_Delete, SchemaName: &schema, - TableName: &table, - Row: [][]byte{cols[0], cols[1]}, - },{ - Tp: pb.EventType_Update, + TableName: &table, + Row: [][]byte{cols[0], cols[1]}, + }, { + Tp: pb.EventType_Update, SchemaName: &schema, - TableName: &table, - Row: [][]byte{cols[2]}, + TableName: &table, + Row: [][]byte{cols[2]}, }, } } @@ -108,8 +108,8 @@ func generateDMLEvents() []pb.Event { // generateColumns generates three columns for test, the last one used for update. func generateColumns() [][]byte { allColBytes := make([][]byte, 0, 3) - - cols := []*pb.Column { + + cols := []*pb.Column{ { Name: "a", Tp: []byte{mysql.TypeInt24}, @@ -121,10 +121,10 @@ func generateColumns() [][]byte { MysqlType: "varchar", Value: encodeBytesValue([]byte("test")), }, { - Name: "c", - Tp: []byte{mysql.TypeVarchar}, - MysqlType: "varchar", - Value: encodeBytesValue([]byte("test")), + Name: "c", + Tp: []byte{mysql.TypeVarchar}, + MysqlType: "varchar", + Value: encodeBytesValue([]byte("test")), ChangedValue: encodeBytesValue([]byte("abc")), }, } @@ -155,4 +155,4 @@ func encodeBytesValue(value []byte) []byte { b = append(b, 1) b = codec.EncodeBytes(b, value) return b -} \ No newline at end of file +} From f185d461026ad10f3323ebc2807dfc521b7b40a8 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 22 Apr 2019 13:08:39 +0800 Subject: [PATCH 16/31] add test --- pkg/util/util.go | 2 +- reparo/syncer/mysql_test.go | 21 ++++++++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/pkg/util/util.go b/pkg/util/util.go index 8b739de15..b1cc532f8 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -121,7 +121,7 @@ func RetryOnError(retryCount int, sleepTime time.Duration, errStr string, fn fun break } - log.Errorf("%s: %v", errStr, err) + log.Errorf("%s: %v", errStr, errors.ErrorStack(err)) time.Sleep(sleepTime) } diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index 97903ed4a..22b053523 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -3,6 +3,7 @@ package syncer import ( sqlmock "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/check" + pb "github.com/pingcap/tidb-binlog/proto/binlog" ) type testMysqlSuite struct{} @@ -10,12 +11,30 @@ type testMysqlSuite struct{} var _ = check.Suite(&testMysqlSuite{}) func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { - db, _, err := sqlmock.New() + db, mock, err := sqlmock.New() c.Assert(err, check.IsNil) syncer, err := newMysqlSyncerFromSQLDB(db) c.Assert(err, check.IsNil) + ddlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DDL, + DdlQuery: []byte("create database test;"), + } + dmlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DML, + } + + mock.ExpectBegin() + mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectCommit() + + err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) {}) + c.Assert(err, check.IsNil) + + err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) {}) + c.Assert(err, check.IsNil) + err = syncer.Close() c.Assert(err, check.IsNil) } From 7b1af3f02e3e6270b3c6203717b7733be13b9741 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 22 Apr 2019 13:09:34 +0800 Subject: [PATCH 17/31] revert --- pkg/util/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/util.go b/pkg/util/util.go index b1cc532f8..8b739de15 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -121,7 +121,7 @@ func RetryOnError(retryCount int, sleepTime time.Duration, errStr string, fn fun break } - log.Errorf("%s: %v", errStr, errors.ErrorStack(err)) + log.Errorf("%s: %v", errStr, err) time.Sleep(sleepTime) } From 4cc5317fc2e0700d72058794a0e33495bcf8eba1 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 24 Apr 2019 12:12:51 +0800 Subject: [PATCH 18/31] add test for callback --- reparo/syncer/mysql_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index 22b053523..99d8cb502 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -1,6 +1,8 @@ package syncer import ( + "time" + sqlmock "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/check" pb "github.com/pingcap/tidb-binlog/proto/binlog" @@ -27,10 +29,16 @@ func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { mock.ExpectBegin() mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(0, 0)) - mock.ExpectCommit() + mock.ExpectCommit() - err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) {}) + binlogs := make([]*pb.Binlog, 0, 1) + err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) { + c.Log(binlog) + binlogs = append(binlogs, binlog) + }) c.Assert(err, check.IsNil) + time.Sleep(100 * time.Millisecond) + c.Assert(binlogs, check.HasLen, 1) err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) {}) c.Assert(err, check.IsNil) From f2d8613df039ffa0c9deacc6e978c44b727179e4 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 24 Apr 2019 15:45:57 +0800 Subject: [PATCH 19/31] address comment --- reparo/syncer/print_test.go | 2 +- reparo/syncer/syncer_test.go | 6 +++--- reparo/syncer/translate_test.go | 12 ++++++------ 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index 7c4fa887b..61d55619c 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -54,7 +54,7 @@ func (s *testPrintSuite) TestPrintDDL(c *check.C) { } func (s *testPrintSuite) TestPrintRow(c *check.C) { - cols := generateColumns() + cols := generateColumns(c) col1Str := getInsertOrDeleteColumnStr(cols[0]) c.Assert(col1Str, check.Equals, "a(int): 1 \n") diff --git a/reparo/syncer/syncer_test.go b/reparo/syncer/syncer_test.go index 839ef2afc..b70f91a33 100644 --- a/reparo/syncer/syncer_test.go +++ b/reparo/syncer/syncer_test.go @@ -14,15 +14,15 @@ func (s *testSyncerSuite) TestNewSyncer(c *check.C) { syncer, err := New("print", cfg) c.Assert(err, check.IsNil) _, ok := syncer.(*printSyncer) - c.Assert(ok, check.Equals, true) + c.Assert(ok, check.IsTrue) syncer, err = New("memory", cfg) c.Assert(err, check.IsNil) _, ok = syncer.(*MemSyncer) - c.Assert(ok, check.Equals, true) + c.Assert(ok, check.IsTrue) syncer, err = New("print", cfg) c.Assert(err, check.IsNil) _, ok = syncer.(*MemSyncer) - c.Assert(ok, check.Equals, false) + c.Assert(ok, check.IsFalse) } diff --git a/reparo/syncer/translate_test.go b/reparo/syncer/translate_test.go index dda01204f..e05eca092 100644 --- a/reparo/syncer/translate_test.go +++ b/reparo/syncer/translate_test.go @@ -29,7 +29,7 @@ func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { { Tp: pb.BinlogType_DML, DmlData: &pb.DMLData{ - Events: generateDMLEvents(), + Events: generateDMLEvents(c), }, }: { DMLs: []*loader.DML{ @@ -73,17 +73,17 @@ func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { } func (s *testTranslateSuite) TestGenColsAndArgs(c *check.C) { - cols, args, err := genColsAndArgs(generateColumns()) + cols, args, err := genColsAndArgs(generateColumns(c)) c.Assert(err, check.IsNil) c.Assert(cols, check.DeepEquals, []string{"a", "b", "c"}) c.Assert(args, check.DeepEquals, []interface{}{int64(1), "test", "test"}) } // generateDMLEvents generates three DML Events for test. -func generateDMLEvents() []pb.Event { +func generateDMLEvents(c *check.C) []pb.Event { schema := "test" table := "t1" - cols := generateColumns() + cols := generateColumns(c) return []pb.Event{ { @@ -106,7 +106,7 @@ func generateDMLEvents() []pb.Event { } // generateColumns generates three columns for test, the last one used for update. -func generateColumns() [][]byte { +func generateColumns(c *check.C) [][]byte { allColBytes := make([][]byte, 0, 3) cols := []*pb.Column{ @@ -132,7 +132,7 @@ func generateColumns() [][]byte { for _, col := range cols { colBytes, err := col.Marshal() if err != nil { - panic(err) + c.Fatal(err) } allColBytes = append(allColBytes, colBytes) From b7071ed20943d5b790969686ee56ca3e6599918a Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 24 Apr 2019 16:41:46 +0800 Subject: [PATCH 20/31] revert change in print, and capture stdout for test --- go.mod | 1 + reparo/syncer/print.go | 120 ++++++++++++++++-------------------- reparo/syncer/print_test.go | 67 ++++++++++++-------- 3 files changed, 95 insertions(+), 93 deletions(-) diff --git a/go.mod b/go.mod index 38b101af6..d020257f2 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/gorilla/websocket v1.4.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20180820150422-93bf4626fba7 // indirect + github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d github.com/kr/pretty v0.1.0 // indirect github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808 // indirect github.com/ngaut/log v0.0.0-20160810023011-cec23d3e10b0 diff --git a/reparo/syncer/print.go b/reparo/syncer/print.go index 82c01bf8d..49999b8cb 100644 --- a/reparo/syncer/print.go +++ b/reparo/syncer/print.go @@ -2,7 +2,6 @@ package syncer import ( "fmt" - "strings" "github.com/ngaut/log" "github.com/pingcap/errors" @@ -19,23 +18,20 @@ func newPrintSyncer() (*printSyncer, error) { } func (p *printSyncer) Sync(pbBinlog *pb.Binlog, cb func(binlog *pb.Binlog)) error { - var info strings.Builder switch pbBinlog.Tp { case pb.BinlogType_DDL: - info.WriteString(getDDLStr(pbBinlog)) + printDDL(pbBinlog) + cb(pbBinlog) case pb.BinlogType_DML: for _, event := range pbBinlog.GetDmlData().GetEvents() { - info.WriteString(getEventHeaderStr(&event)) - info.WriteString(getEventDataStr(&event)) + printEvent(&event) } + cb(pbBinlog) default: return errors.Errorf("unknown type: %v", pbBinlog.Tp) } - fmt.Print(info.String()) - cb(pbBinlog) - return nil } @@ -43,83 +39,73 @@ func (p *printSyncer) Close() error { return nil } -func getEventDataStr(event *pb.Event) string { +func printEvent(event *pb.Event) { + printHeader(event) + switch event.GetTp() { case pb.EventType_Insert: - return getInsertOrDeleteRowStr(event.Row) + printInsertOrDeleteEvent(event.Row) case pb.EventType_Update: - return getUpdateRowStr(event.Row) + printUpdateEvent(event.Row) case pb.EventType_Delete: - return getInsertOrDeleteRowStr(event.Row) + printInsertOrDeleteEvent(event.Row) } - - return "" } -func getDDLStr(binlog *pb.Binlog) string { - return fmt.Sprintf("DDL query: %s\n", binlog.DdlQuery) +func printHeader(event *pb.Event) { + printEventHeader(event) } -func getEventHeaderStr(event *pb.Event) string { - return fmt.Sprintf("schema: %s; table: %s; type: %s\n", event.GetSchemaName(), event.GetTableName(), event.GetTp()) +func printDDL(binlog *pb.Binlog) { + fmt.Printf("DDL query: %s\n", binlog.DdlQuery) } -func getUpdateRowStr(row [][]byte) string { - var rowStr strings.Builder - for _, col := range row { - rowStr.WriteString(getUpdateColumnStr(col)) - } - - return rowStr.String() +func printEventHeader(event *pb.Event) { + fmt.Printf("schema: %s; table: %s; type: %s\n", event.GetSchemaName(), event.GetTableName(), event.GetTp()) } -func getUpdateColumnStr(column []byte) string { - col := &pb.Column{} - err := col.Unmarshal(column) - if err != nil { - log.Errorf("unmarshal error %v", err) - return "" - } - - _, val, err := codec.DecodeOne(col.Value) - if err != nil { - log.Errorf("decode row error %v", err) - return "" - } +func printUpdateEvent(row [][]byte) { + for _, c := range row { + col := &pb.Column{} + err := col.Unmarshal(c) + if err != nil { + log.Errorf("unmarshal error %v", err) + return + } - _, changedVal, err := codec.DecodeOne(col.ChangedValue) - if err != nil { - log.Errorf("decode row error %v", err) - return "" - } + _, val, err := codec.DecodeOne(col.Value) + if err != nil { + log.Errorf("decode row error %v", err) + return + } - tp := col.Tp[0] - return fmt.Sprintf("%s(%s): %s => %s\n", col.Name, col.MysqlType, formatValueToString(val, tp), formatValueToString(changedVal, tp)) -} + _, changedVal, err := codec.DecodeOne(col.ChangedValue) + if err != nil { + log.Errorf("decode row error %v", err) + return + } -func getInsertOrDeleteRowStr(row [][]byte) string { - var rowStr strings.Builder - for _, col := range row { - rowStr.WriteString(getInsertOrDeleteColumnStr(col)) + tp := col.Tp[0] + fmt.Printf("%s(%s): %s => %s\n", col.Name, col.MysqlType, formatValueToString(val, tp), formatValueToString(changedVal, tp)) } - - return rowStr.String() } -func getInsertOrDeleteColumnStr(column []byte) string { - col := &pb.Column{} - err := col.Unmarshal(column) - if err != nil { - log.Errorf("unmarshal error %v", err) - return "" - } +func printInsertOrDeleteEvent(row [][]byte) { + for _, c := range row { + col := &pb.Column{} + err := col.Unmarshal(c) + if err != nil { + log.Errorf("unmarshal error %v", err) + return + } - _, val, err := codec.DecodeOne(col.Value) - if err != nil { - log.Errorf("decode row error %v", err) - return "" - } + _, val, err := codec.DecodeOne(col.Value) + if err != nil { + log.Errorf("decode row error %v", err) + return + } - tp := col.Tp[0] - return fmt.Sprintf("%s(%s): %s \n", col.Name, col.MysqlType, formatValueToString(val, tp)) -} + tp := col.Tp[0] + fmt.Printf("%s(%s): %s\n", col.Name, col.MysqlType, formatValueToString(val, tp)) + } +} \ No newline at end of file diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index 61d55619c..a49c9540c 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -1,8 +1,11 @@ package syncer import ( + "strings" + "github.com/pingcap/check" pb "github.com/pingcap/tidb-binlog/proto/binlog" + "github.com/kami-zh/go-capturer" ) type testPrintSuite struct{} @@ -40,8 +43,12 @@ func (s *testPrintSuite) TestPrintEventHeader(c *check.C) { TableName: &table, } - eventHeaderStr := getEventHeaderStr(event) - c.Assert(eventHeaderStr, check.Equals, "schema: test; table: t1; type: Insert\n") + out := capturer.CaptureStdout(func() { + printEventHeader(event) + }) + lines := strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 1) + c.Assert(lines[0], check.Matches, ".*schema: test; table: t1; type: Insert.*") } func (s *testPrintSuite) TestPrintDDL(c *check.C) { @@ -49,46 +56,54 @@ func (s *testPrintSuite) TestPrintDDL(c *check.C) { Tp: pb.BinlogType_DDL, DdlQuery: []byte("create database test;"), } - ddlStr := getDDLStr(ddlBinlog) - c.Assert(ddlStr, check.Equals, "DDL query: create database test;\n") + + out := capturer.CaptureStdout(func() { + printDDL(ddlBinlog) + }) + lines := strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 1) + c.Assert(lines[0], check.Matches, ".*DDL query: create database test;.*") } func (s *testPrintSuite) TestPrintRow(c *check.C) { cols := generateColumns(c) - col1Str := getInsertOrDeleteColumnStr(cols[0]) - c.Assert(col1Str, check.Equals, "a(int): 1 \n") - - col2Str := getInsertOrDeleteColumnStr(cols[1]) - c.Assert(col2Str, check.Equals, "b(varchar): test \n") - - col3Str := getUpdateColumnStr(cols[2]) - c.Assert(col3Str, check.Equals, "c(varchar): test => abc\n") - insertEvent := &pb.Event{ Tp: pb.EventType_Insert, Row: [][]byte{cols[0], cols[1]}, } - eventStr := getEventDataStr(insertEvent) - c.Assert(eventStr, check.Equals, "a(int): 1 \nb(varchar): test \n") - rowStr := getInsertOrDeleteRowStr(insertEvent.Row) - c.Assert(rowStr, check.Equals, "a(int): 1 \nb(varchar): test \n") + + out := capturer.CaptureStdout(func() { + printEvent(insertEvent) + }) + lines := strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 3) + c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Insert") + c.Assert(lines[1], check.Equals, "a(int): 1") + c.Assert(lines[2], check.Equals, "b(varchar): test") deleteEvent := &pb.Event{ Tp: pb.EventType_Delete, Row: [][]byte{cols[0], cols[1]}, } - eventStr = getEventDataStr(deleteEvent) - c.Assert(eventStr, check.Equals, "a(int): 1 \nb(varchar): test \n") - rowStr = getInsertOrDeleteRowStr(deleteEvent.Row) - c.Assert(rowStr, check.Equals, "a(int): 1 \nb(varchar): test \n") + out = capturer.CaptureStdout(func() { + printEvent(deleteEvent) + }) + lines = strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 3) + c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Delete") + c.Assert(lines[1], check.Equals, "a(int): 1") + c.Assert(lines[2], check.Equals, "b(varchar): test") updateEvent := &pb.Event{ Tp: pb.EventType_Update, Row: [][]byte{cols[2]}, } - eventStr = getEventDataStr(updateEvent) - c.Assert(eventStr, check.Equals, "c(varchar): test => abc\n") - rowStr = getUpdateRowStr(updateEvent.Row) - c.Assert(rowStr, check.Equals, "c(varchar): test => abc\n") -} + out = capturer.CaptureStdout(func() { + printEvent(updateEvent) + }) + lines = strings.Split(strings.TrimSpace(out), "\n") + c.Assert(lines, check.HasLen, 2) + c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Update") + c.Assert(lines[1], check.Equals, "c(varchar): test => abc") +} \ No newline at end of file From 6655dc3b26a4306357597d5354495bdcf32ebc09 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 24 Apr 2019 16:43:55 +0800 Subject: [PATCH 21/31] format code --- reparo/syncer/print.go | 2 +- reparo/syncer/print_test.go | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/reparo/syncer/print.go b/reparo/syncer/print.go index 49999b8cb..0c562ac68 100644 --- a/reparo/syncer/print.go +++ b/reparo/syncer/print.go @@ -108,4 +108,4 @@ func printInsertOrDeleteEvent(row [][]byte) { tp := col.Tp[0] fmt.Printf("%s(%s): %s\n", col.Name, col.MysqlType, formatValueToString(val, tp)) } -} \ No newline at end of file +} diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index a49c9540c..a07b4fa1c 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -5,7 +5,6 @@ import ( "github.com/pingcap/check" pb "github.com/pingcap/tidb-binlog/proto/binlog" - "github.com/kami-zh/go-capturer" ) type testPrintSuite struct{} @@ -48,7 +47,7 @@ func (s *testPrintSuite) TestPrintEventHeader(c *check.C) { }) lines := strings.Split(strings.TrimSpace(out), "\n") c.Assert(lines, check.HasLen, 1) - c.Assert(lines[0], check.Matches, ".*schema: test; table: t1; type: Insert.*") + c.Assert(lines[0], check.Matches, ".*schema: test; table: t1; type: Insert.*") } func (s *testPrintSuite) TestPrintDDL(c *check.C) { @@ -106,4 +105,4 @@ func (s *testPrintSuite) TestPrintRow(c *check.C) { c.Assert(lines, check.HasLen, 2) c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Update") c.Assert(lines[1], check.Equals, "c(varchar): test => abc") -} \ No newline at end of file +} From 28eda63801b2225569fe3ca1ae5a311b95039fb5 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 24 Apr 2019 16:50:55 +0800 Subject: [PATCH 22/31] fix test --- reparo/syncer/print_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index a07b4fa1c..2fa58ee6e 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -3,6 +3,7 @@ package syncer import ( "strings" + capturer "github.com/kami-zh/go-capturer" "github.com/pingcap/check" pb "github.com/pingcap/tidb-binlog/proto/binlog" ) From dbc2642a1dd5b38e89165db9b249914afb9db719 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 25 Apr 2019 10:57:36 +0800 Subject: [PATCH 23/31] add test --- reparo/syncer/memory_test.go | 19 +++++++++++++++++-- reparo/syncer/mysql_test.go | 20 +++++++++++++++++--- reparo/syncer/print_test.go | 18 ++++++++++++++++-- 3 files changed, 50 insertions(+), 7 deletions(-) diff --git a/reparo/syncer/memory_test.go b/reparo/syncer/memory_test.go index 8a719c0c4..a176de421 100644 --- a/reparo/syncer/memory_test.go +++ b/reparo/syncer/memory_test.go @@ -1,6 +1,8 @@ package syncer import ( + "time" + "github.com/pingcap/check" pb "github.com/pingcap/tidb-binlog/proto/binlog" ) @@ -19,14 +21,27 @@ func (s *testMemorySuite) TestMemorySyncer(c *check.C) { } dmlBinlog := &pb.Binlog{ Tp: pb.BinlogType_DML, + DmlData: &pb.DMLData{ + Events: generateDMLEvents(c)[0:1], + }, } - err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) {}) + binlogs := make([]*pb.Binlog, 0, 1) + err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) { + c.Log(binlog) + binlogs = append(binlogs, binlog) + }) c.Assert(err, check.IsNil) - err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) {}) + err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) { + c.Log(binlog) + binlogs = append(binlogs, binlog) + }) c.Assert(err, check.IsNil) + time.Sleep(100 * time.Millisecond) + c.Assert(binlogs, check.HasLen, 2) + err = syncer.Close() c.Assert(err, check.IsNil) diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index 99d8cb502..0e4f60d07 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -25,24 +25,38 @@ func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { } dmlBinlog := &pb.Binlog{ Tp: pb.BinlogType_DML, + DmlData: &pb.DMLData{ + Events: generateDMLEvents(c)[0:1], + }, } mock.ExpectBegin() mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectCommit() + mock.ExpectQuery("SELECT column_name, extra FROM information_schema.columns").WithArgs("test", "t1").WillReturnRows(sqlmock.NewRows([]string{"column_name", "extra"}).AddRow("a", "").AddRow("b", "")) + mock.ExpectQuery("SELECT non_unique, index_name, seq_in_index, column_name FROM information_schema.statistics").WithArgs("test", "t1").WillReturnRows(sqlmock.NewRows([]string{"non_unique", "index_name", "seq_in_index", "column_name"})) + + mock.ExpectBegin() + mock.ExpectExec("INSERT INTO").WithArgs(1, "test").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + binlogs := make([]*pb.Binlog, 0, 1) err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) { c.Log(binlog) binlogs = append(binlogs, binlog) }) c.Assert(err, check.IsNil) - time.Sleep(100 * time.Millisecond) - c.Assert(binlogs, check.HasLen, 1) - err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) {}) + err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) { + c.Log(binlog) + binlogs = append(binlogs, binlog) + }) c.Assert(err, check.IsNil) + time.Sleep(100 * time.Millisecond) + c.Assert(binlogs, check.HasLen, 2) + err = syncer.Close() c.Assert(err, check.IsNil) } diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index 2fa58ee6e..9aa41d266 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -2,6 +2,7 @@ package syncer import ( "strings" + "time" capturer "github.com/kami-zh/go-capturer" "github.com/pingcap/check" @@ -22,14 +23,27 @@ func (s *testPrintSuite) TestPrintSyncer(c *check.C) { } dmlBinlog := &pb.Binlog{ Tp: pb.BinlogType_DML, + DmlData: &pb.DMLData{ + Events: generateDMLEvents(c)[0:1], + }, } - err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) {}) + binlogs := make([]*pb.Binlog, 0, 1) + err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) { + c.Log(binlog) + binlogs = append(binlogs, binlog) + }) c.Assert(err, check.IsNil) - err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) {}) + err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) { + c.Log(binlog) + binlogs = append(binlogs, binlog) + }) c.Assert(err, check.IsNil) + time.Sleep(100 * time.Millisecond) + c.Assert(binlogs, check.HasLen, 2) + err = syncer.Close() c.Assert(err, check.IsNil) } From d54b885fd1bd7dea8e94c7dced25b237efc45ae9 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 25 Apr 2019 11:35:37 +0800 Subject: [PATCH 24/31] minor update --- reparo/syncer/memory_test.go | 2 +- reparo/syncer/mysql_test.go | 2 +- reparo/syncer/print_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/reparo/syncer/memory_test.go b/reparo/syncer/memory_test.go index a176de421..a3ac319f5 100644 --- a/reparo/syncer/memory_test.go +++ b/reparo/syncer/memory_test.go @@ -26,7 +26,7 @@ func (s *testMemorySuite) TestMemorySyncer(c *check.C) { }, } - binlogs := make([]*pb.Binlog, 0, 1) + binlogs := make([]*pb.Binlog, 0, 2) err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) { c.Log(binlog) binlogs = append(binlogs, binlog) diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index 0e4f60d07..18448f62b 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -41,7 +41,7 @@ func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { mock.ExpectExec("INSERT INTO").WithArgs(1, "test").WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() - binlogs := make([]*pb.Binlog, 0, 1) + binlogs := make([]*pb.Binlog, 0, 2) err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) { c.Log(binlog) binlogs = append(binlogs, binlog) diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index 9aa41d266..25da59b19 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -28,7 +28,7 @@ func (s *testPrintSuite) TestPrintSyncer(c *check.C) { }, } - binlogs := make([]*pb.Binlog, 0, 1) + binlogs := make([]*pb.Binlog, 0, 2) err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) { c.Log(binlog) binlogs = append(binlogs, binlog) From 8b3bfd249c5e7970df8a4742fb6e427462e9ed58 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Sun, 28 Apr 2019 15:15:39 +0800 Subject: [PATCH 25/31] fix check --- go.sum | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go.sum b/go.sum index 0c8e0b65e..1eedbc28c 100644 --- a/go.sum +++ b/go.sum @@ -90,6 +90,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d h1:cVtBfNW5XTHiKQe7jDaDBSh/EVM4XLPutLAGboIXuM0= +github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d/go.mod h1:P2viExyCEfeWGU259JnaQ34Inuec4R38JCyBx2edgD0= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= From dd02aed12105902026cb1cd9d2ed3ac44121ed87 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Sun, 28 Apr 2019 19:42:39 +0800 Subject: [PATCH 26/31] format code --- reparo/syncer/mysql_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index 18448f62b..1ce80294a 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -35,7 +35,11 @@ func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { mock.ExpectCommit() mock.ExpectQuery("SELECT column_name, extra FROM information_schema.columns").WithArgs("test", "t1").WillReturnRows(sqlmock.NewRows([]string{"column_name", "extra"}).AddRow("a", "").AddRow("b", "")) - mock.ExpectQuery("SELECT non_unique, index_name, seq_in_index, column_name FROM information_schema.statistics").WithArgs("test", "t1").WillReturnRows(sqlmock.NewRows([]string{"non_unique", "index_name", "seq_in_index", "column_name"})) + + rows := sqlmock.NewRows([]string{"non_unique", "index_name", "seq_in_index", "column_name"}) + mock.ExpectQuery("SELECT non_unique, index_name, seq_in_index, column_name FROM information_schema.statistics"). + WithArgs("test", "t1"). + WillReturnRows(rows) mock.ExpectBegin() mock.ExpectExec("INSERT INTO").WithArgs(1, "test").WillReturnResult(sqlmock.NewResult(0, 1)) From 0bd0c15875ab0d3e3fae24e0603aec0b8f1bcafb Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 29 Apr 2019 11:17:30 +0800 Subject: [PATCH 27/31] address comemnt --- reparo/syncer/memory_test.go | 2 +- reparo/syncer/mysql.go | 8 ++++++-- reparo/syncer/mysql_test.go | 10 +++++++++- reparo/syncer/print_test.go | 2 +- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/reparo/syncer/memory_test.go b/reparo/syncer/memory_test.go index a3ac319f5..0e2973b09 100644 --- a/reparo/syncer/memory_test.go +++ b/reparo/syncer/memory_test.go @@ -22,7 +22,7 @@ func (s *testMemorySuite) TestMemorySyncer(c *check.C) { dmlBinlog := &pb.Binlog{ Tp: pb.BinlogType_DML, DmlData: &pb.DMLData{ - Events: generateDMLEvents(c)[0:1], + Events: generateDMLEvents(c), }, } diff --git a/reparo/syncer/mysql.go b/reparo/syncer/mysql.go index 7d50b56cc..46b5a6d67 100644 --- a/reparo/syncer/mysql.go +++ b/reparo/syncer/mysql.go @@ -42,7 +42,11 @@ type mysqlSyncer struct { loaderErr error } -var _ Syncer = &mysqlSyncer{} +var ( + _ Syncer = &mysqlSyncer{} + defaultWorkerCount = 16 + defaultBatchSize = 20 +) func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) { db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port) @@ -54,7 +58,7 @@ func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) { } func newMysqlSyncerFromSQLDB(db *sql.DB) (*mysqlSyncer, error) { - loader, err := loader.NewLoader(db, loader.WorkerCount(16), loader.BatchSize(20)) + loader, err := loader.NewLoader(db, loader.WorkerCount(defaultWorkerCount), loader.BatchSize(defaultBatchSize)) if err != nil { return nil, errors.Annotate(err, "new loader failed") } diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index 1ce80294a..378c45932 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -13,6 +13,12 @@ type testMysqlSuite struct{} var _ = check.Suite(&testMysqlSuite{}) func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { + originWorkerCount := defaultWorkerCount + defaultWorkerCount = 1 + defer func() { + defaultWorkerCount = originWorkerCount + }() + db, mock, err := sqlmock.New() c.Assert(err, check.IsNil) @@ -26,7 +32,7 @@ func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { dmlBinlog := &pb.Binlog{ Tp: pb.BinlogType_DML, DmlData: &pb.DMLData{ - Events: generateDMLEvents(c)[0:1], + Events: generateDMLEvents(c), }, } @@ -43,6 +49,8 @@ func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { mock.ExpectBegin() mock.ExpectExec("INSERT INTO").WithArgs(1, "test").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("DELETE FROM").WithArgs(1, "test").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("UPDATE").WithArgs("abc").WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() binlogs := make([]*pb.Binlog, 0, 2) diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index 25da59b19..aac2268c1 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -24,7 +24,7 @@ func (s *testPrintSuite) TestPrintSyncer(c *check.C) { dmlBinlog := &pb.Binlog{ Tp: pb.BinlogType_DML, DmlData: &pb.DMLData{ - Events: generateDMLEvents(c)[0:1], + Events: generateDMLEvents(c), }, } From c274caaffcc26927d9153e1c412dee71290eb1a0 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 29 Apr 2019 11:39:22 +0800 Subject: [PATCH 28/31] address comment --- reparo/syncer/syncer_test.go | 39 ++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/reparo/syncer/syncer_test.go b/reparo/syncer/syncer_test.go index b70f91a33..f6610b79f 100644 --- a/reparo/syncer/syncer_test.go +++ b/reparo/syncer/syncer_test.go @@ -1,6 +1,8 @@ package syncer import ( + "reflect" + "github.com/pingcap/check" ) @@ -11,18 +13,29 @@ var _ = check.Suite(&testSyncerSuite{}) func (s *testSyncerSuite) TestNewSyncer(c *check.C) { cfg := new(DBConfig) - syncer, err := New("print", cfg) - c.Assert(err, check.IsNil) - _, ok := syncer.(*printSyncer) - c.Assert(ok, check.IsTrue) - - syncer, err = New("memory", cfg) - c.Assert(err, check.IsNil) - _, ok = syncer.(*MemSyncer) - c.Assert(ok, check.IsTrue) + testCases := []struct { + typeStr string + tp reflect.Type + checker check.Checker + }{ + { + "print", + reflect.TypeOf(new(printSyncer)), + check.Equals, + }, { + "memory", + reflect.TypeOf(new(MemSyncer)), + check.Equals, + }, { + "print", + reflect.TypeOf(new(MemSyncer)), + check.Not(check.Equals), + }, + } - syncer, err = New("print", cfg) - c.Assert(err, check.IsNil) - _, ok = syncer.(*MemSyncer) - c.Assert(ok, check.IsFalse) + for _, testCase := range testCases { + syncer, err := New(testCase.typeStr, cfg) + c.Assert(err, check.IsNil) + c.Assert(reflect.TypeOf(syncer), testCase.checker, testCase.tp) + } } From 984c7c0b732e5a2deb404dccbec1c9bc15d25d07 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 29 Apr 2019 11:42:00 +0800 Subject: [PATCH 29/31] format code --- reparo/syncer/mysql.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/reparo/syncer/mysql.go b/reparo/syncer/mysql.go index 46b5a6d67..ad87fb131 100644 --- a/reparo/syncer/mysql.go +++ b/reparo/syncer/mysql.go @@ -43,9 +43,9 @@ type mysqlSyncer struct { } var ( - _ Syncer = &mysqlSyncer{} - defaultWorkerCount = 16 - defaultBatchSize = 20 + _ Syncer = &mysqlSyncer{} + defaultWorkerCount = 16 + defaultBatchSize = 20 ) func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) { From 201a7cbed48ecaf55fca6c7f7a139f54cb191f8f Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 29 Apr 2019 11:56:03 +0800 Subject: [PATCH 30/31] add syncTest function --- reparo/syncer/memory_test.go | 34 +++------------------------------- reparo/syncer/mysql_test.go | 34 +++++++++++++++++++--------------- reparo/syncer/print_test.go | 28 +--------------------------- 3 files changed, 23 insertions(+), 73 deletions(-) diff --git a/reparo/syncer/memory_test.go b/reparo/syncer/memory_test.go index 0e2973b09..08cb687ba 100644 --- a/reparo/syncer/memory_test.go +++ b/reparo/syncer/memory_test.go @@ -1,10 +1,7 @@ package syncer import ( - "time" - "github.com/pingcap/check" - pb "github.com/pingcap/tidb-binlog/proto/binlog" ) type testMemorySuite struct{} @@ -15,36 +12,11 @@ func (s *testMemorySuite) TestMemorySyncer(c *check.C) { syncer, err := newMemSyncer() c.Assert(err, check.IsNil) - ddlBinlog := &pb.Binlog{ - Tp: pb.BinlogType_DDL, - DdlQuery: []byte("create database test;"), - } - dmlBinlog := &pb.Binlog{ - Tp: pb.BinlogType_DML, - DmlData: &pb.DMLData{ - Events: generateDMLEvents(c), - }, - } - - binlogs := make([]*pb.Binlog, 0, 2) - err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) { - c.Log(binlog) - binlogs = append(binlogs, binlog) - }) - c.Assert(err, check.IsNil) - - err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) { - c.Log(binlog) - binlogs = append(binlogs, binlog) - }) - c.Assert(err, check.IsNil) + syncTest(c, Syncer(syncer)) - time.Sleep(100 * time.Millisecond) - c.Assert(binlogs, check.HasLen, 2) + binlog := syncer.GetBinlogs() + c.Assert(binlog, check.HasLen, 2) err = syncer.Close() c.Assert(err, check.IsNil) - - binlog := syncer.GetBinlogs() - c.Assert(binlog, check.DeepEquals, []*pb.Binlog{ddlBinlog, dmlBinlog}) } diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index 378c45932..c149b7c4b 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -25,17 +25,6 @@ func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { syncer, err := newMysqlSyncerFromSQLDB(db) c.Assert(err, check.IsNil) - ddlBinlog := &pb.Binlog{ - Tp: pb.BinlogType_DDL, - DdlQuery: []byte("create database test;"), - } - dmlBinlog := &pb.Binlog{ - Tp: pb.BinlogType_DML, - DmlData: &pb.DMLData{ - Events: generateDMLEvents(c), - }, - } - mock.ExpectBegin() mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectCommit() @@ -53,8 +42,26 @@ func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { mock.ExpectExec("UPDATE").WithArgs("abc").WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() + syncTest(c, Syncer(syncer)) + + err = syncer.Close() + c.Assert(err, check.IsNil) +} + +func syncTest(c *check.C, syncer Syncer) { + ddlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DDL, + DdlQuery: []byte("create database test;"), + } + dmlBinlog := &pb.Binlog{ + Tp: pb.BinlogType_DML, + DmlData: &pb.DMLData{ + Events: generateDMLEvents(c), + }, + } + binlogs := make([]*pb.Binlog, 0, 2) - err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) { + err := syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) { c.Log(binlog) binlogs = append(binlogs, binlog) }) @@ -68,7 +75,4 @@ func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) { time.Sleep(100 * time.Millisecond) c.Assert(binlogs, check.HasLen, 2) - - err = syncer.Close() - c.Assert(err, check.IsNil) } diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index aac2268c1..7795e6a2e 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -2,7 +2,6 @@ package syncer import ( "strings" - "time" capturer "github.com/kami-zh/go-capturer" "github.com/pingcap/check" @@ -17,32 +16,7 @@ func (s *testPrintSuite) TestPrintSyncer(c *check.C) { syncer, err := newPrintSyncer() c.Assert(err, check.IsNil) - ddlBinlog := &pb.Binlog{ - Tp: pb.BinlogType_DDL, - DdlQuery: []byte("create database test;"), - } - dmlBinlog := &pb.Binlog{ - Tp: pb.BinlogType_DML, - DmlData: &pb.DMLData{ - Events: generateDMLEvents(c), - }, - } - - binlogs := make([]*pb.Binlog, 0, 2) - err = syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) { - c.Log(binlog) - binlogs = append(binlogs, binlog) - }) - c.Assert(err, check.IsNil) - - err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) { - c.Log(binlog) - binlogs = append(binlogs, binlog) - }) - c.Assert(err, check.IsNil) - - time.Sleep(100 * time.Millisecond) - c.Assert(binlogs, check.HasLen, 2) + syncTest(c, Syncer(syncer)) err = syncer.Close() c.Assert(err, check.IsNil) From 65bf64a4c3dfedad030f64530fbde01c106ce976 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 29 Apr 2019 12:52:41 +0800 Subject: [PATCH 31/31] handle out put of printsyncer for test --- reparo/syncer/print_test.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index 7795e6a2e..9485f0f03 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -16,7 +16,20 @@ func (s *testPrintSuite) TestPrintSyncer(c *check.C) { syncer, err := newPrintSyncer() c.Assert(err, check.IsNil) - syncTest(c, Syncer(syncer)) + out := capturer.CaptureStdout(func() { + syncTest(c, Syncer(syncer)) + }) + + c.Assert(out, check.Equals, + "DDL query: create database test;\n"+ + "schema: test; table: t1; type: Insert\n"+ + "a(int): 1\n"+ + "b(varchar): test\n"+ + "schema: test; table: t1; type: Delete\n"+ + "a(int): 1\n"+ + "b(varchar): test\n"+ + "schema: test; table: t1; type: Update\n"+ + "c(varchar): test => abc\n") err = syncer.Close() c.Assert(err, check.IsNil)