From 1874f90837bb68dd3cc77ea3fffa2d5d2c0b008e Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Sun, 5 Jan 2020 20:13:09 +0800 Subject: [PATCH 1/5] Add unique key info in slave proto (#858) (#862) --- drainer/translator/kafka.go | 36 +++++++++++ drainer/translator/kafka_test.go | 106 +++++++++++++++++++++++++++++++ go.mod | 6 +- go.sum | 22 ++----- 4 files changed, 150 insertions(+), 20 deletions(-) diff --git a/drainer/translator/kafka.go b/drainer/translator/kafka.go index b14e429ff..3127b43bb 100644 --- a/drainer/translator/kafka.go +++ b/drainer/translator/kafka.go @@ -100,6 +100,42 @@ func genTable(schema string, tableInfo *model.TableInfo) (table *obinlog.Table) } table.ColumnInfo = columnInfos + // If PKIsHandle, tableInfo.Indices *will not* contains the primary key + // so we add it here. + // If !PKIsHandle tableInfo.Indices *will* contains the primary key + if tableInfo.PKIsHandle { + pkName := tableInfo.GetPkName() + key := &obinlog.Key{ + Name: proto.String("PRIMARY"), + ColumnNames: []string{pkName.O}, + } + table.UniqueKeys = append(table.UniqueKeys, key) + } + + for _, index := range tableInfo.Indices { + if !index.Unique && !index.Primary { + continue + } + + // just a protective check + if tableInfo.PKIsHandle && index.Name.O == "PRIMARY" { + log.Warn("PKIsHandle and also contains PRIMARY index TableInfo.Indices") + continue + } + + key := new(obinlog.Key) + table.UniqueKeys = append(table.UniqueKeys, key) + + key.Name = proto.String(index.Name.O) + + names := make([]string, len(index.Columns)) + for i, col := range index.Columns { + names[i] = col.Name.O + } + + key.ColumnNames = names + } + return } diff --git a/drainer/translator/kafka_test.go b/drainer/translator/kafka_test.go index 7c635c244..d7663e2bc 100644 --- a/drainer/translator/kafka_test.go +++ b/drainer/translator/kafka_test.go @@ -18,6 +18,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/pingcap/check" + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog" "github.com/pingcap/tidb/types" ) @@ -117,3 +119,107 @@ func checkColumn(c *check.C, info *obinlog.ColumnInfo, col *obinlog.Column, datu c.Assert(colV, check.Equals, datumV) } + +func (t *testKafkaSuite) TestGenTable(c *check.C) { + schema := "test" + table := "test" + + // a table test.test(c1, c2, c3) with: + // primary key: (c1) + // unique key: (c2, c3) + // non-unique key: (c3) + info := &model.TableInfo{ + Name: model.NewCIStr(table), + Columns: []*model.ColumnInfo{ + { + Name: model.NewCIStr("c1"), + FieldType: types.FieldType{ + Flag: mysql.PriKeyFlag, + Tp: mysql.TypeLong, + }, + }, + { + Name: model.NewCIStr("c2"), + FieldType: types.FieldType{ + Tp: mysql.TypeLong, + }, + }, + { + Name: model.NewCIStr("c3"), + FieldType: types.FieldType{ + Tp: mysql.TypeLong, + }, + }, + }, + Indices: []*model.IndexInfo{ + { + Name: model.NewCIStr("PRIMARY"), + Primary: true, + Unique: true, + Columns: []*model.IndexColumn{ + { + Offset: 0, + Name: model.NewCIStr("c1"), + }, + }, + }, + { + Name: model.NewCIStr("idx1"), + Unique: true, + Columns: []*model.IndexColumn{ + { + Offset: 1, + Name: model.NewCIStr("c2"), + }, + { + Offset: 2, + Name: model.NewCIStr("c3"), + }, + }, + }, + { + Name: model.NewCIStr("idx2"), + Unique: false, + Columns: []*model.IndexColumn{ + { + Offset: 1, + Name: model.NewCIStr("c3"), + }, + }, + }, + }, + } + + expectTable := &obinlog.Table{ + SchemaName: proto.String(schema), + TableName: proto.String(table), + ColumnInfo: []*obinlog.ColumnInfo{ + { + Name: "c1", + IsPrimaryKey: true, + MysqlType: "int", + }, + { + Name: "c2", + MysqlType: "int", + }, + { + Name: "c3", + MysqlType: "int", + }, + }, + UniqueKeys: []*obinlog.Key{ + { + Name: proto.String("PRIMARY"), + ColumnNames: []string{"c1"}, + }, + { + Name: proto.String("idx1"), + ColumnNames: []string{"c2", "c3"}, + }, + }, + } + + getTable := genTable(schema, info) + c.Assert(expectTable, check.DeepEquals, getTable) +} diff --git a/go.mod b/go.mod index 4eff1ca4b..039ae72a3 100644 --- a/go.mod +++ b/go.mod @@ -19,17 +19,17 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0 github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd - github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623 + github.com/pingcap/parser v0.0.0-20200103153514-95649ba8a872 github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 github.com/pingcap/tidb v1.1.0-beta.0.20191120070053-5a7ecfeb94fd - github.com/pingcap/tidb-tools v3.0.6-0.20191125061035-b087739b71f1+incompatible + github.com/pingcap/tidb-tools v3.1.0-beta.0.20191227034743-57985f125c52+incompatible github.com/pingcap/tipb v0.0.0-20191120045257-1b9900292ab6 github.com/prometheus/client_golang v1.0.0 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/samuel/go-zookeeper v0.0.0-20170815201139-e6b59f6144be github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 - github.com/siddontang/go-mysql v0.0.0-20190618002340-dbe0224ac097 // indirect + github.com/sirupsen/logrus v1.4.1 // indirect github.com/soheilhy/cmux v0.1.4 github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect diff --git a/go.sum b/go.sum index 14efb01d8..41ffc5bee 100644 --- a/go.sum +++ b/go.sum @@ -208,20 +208,17 @@ github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0 h1:CHOC95Ct4abJ9Ed github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= -github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= -github.com/pingcap/parser v0.0.0-20191118062434-7c5018645942 h1:JAPbnAxPryeAO50UO89/9MDYJK38Ts7mykTDqgUS14k= github.com/pingcap/parser v0.0.0-20191118062434-7c5018645942/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= -github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623 h1:/BJjVyJlNKWMMrgPsbzk5Y9VPJWwHKYttj3oWxnFQ9U= -github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= +github.com/pingcap/parser v0.0.0-20200103153514-95649ba8a872 h1:PCY0XeNh/+6bGEHI6yWqwRXVDxvhpw2T6eE/umI5iXw= +github.com/pingcap/parser v0.0.0-20200103153514-95649ba8a872/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd/go.mod h1:I7TEby5BHTYIxgHszfsOJSBsk8b2Qt8QrSIgdv5n5QQ= github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 h1:GIEq+wZfrl2bcJxpuSrEH4H7/nlf5YdmpS+dU9lNIt8= github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0/go.mod h1:G/6rJpnYwM0LKMec2rI82/5Kg6GaZMvlfB+e6/tvYmI= github.com/pingcap/tidb v1.1.0-beta.0.20191120070053-5a7ecfeb94fd h1:A7ZhUliNg9NA2+S9pMu7vC1R2ZmRZkpZocX50Mk/YjQ= github.com/pingcap/tidb v1.1.0-beta.0.20191120070053-5a7ecfeb94fd/go.mod h1:+SjmTI6LG3FvK4lIVDsOSyW2lWA4qtCSLKgwnN6FKxI= github.com/pingcap/tidb-tools v3.0.6-0.20191119150227-ff0a3c6e5763+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tidb-tools v3.0.6-0.20191125061035-b087739b71f1+incompatible h1:nndliEgwxjmupxW5ZOXHe4b7xQPXCaCm61Bi0dlQPpM= -github.com/pingcap/tidb-tools v3.0.6-0.20191125061035-b087739b71f1+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +github.com/pingcap/tidb-tools v3.1.0-beta.0.20191227034743-57985f125c52+incompatible h1:MfCru+JGzAww5YbKCCeobjCaCj5/jT01RTxiMEMqILw= +github.com/pingcap/tidb-tools v3.1.0-beta.0.20191227034743-57985f125c52+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20191120045257-1b9900292ab6 h1:HPgqtaqIFIZXTvQNiZoJ9Y79AXz3pmDpYFL28KraTKE= github.com/pingcap/tipb v0.0.0-20191120045257-1b9900292ab6/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -254,20 +251,13 @@ github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7/go.mod h1:qq github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/samuel/go-zookeeper v0.0.0-20170815201139-e6b59f6144be h1:9prAjluC9cN0nht+csYXr3isly4OvAEfch9+VVFnn4U= github.com/samuel/go-zookeeper v0.0.0-20170815201139-e6b59f6144be/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= -github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= -github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v2.18.10+incompatible h1:cy84jW6EVRPa5g9HAHrlbxMSIjBhDSX0OFYyMYminYs= github.com/shirou/gopsutil v2.18.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= -github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/vfsgen v0.0.0-20181020040650-a97a25d856ca/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM= github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= -github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q= -github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= -github.com/siddontang/go-mysql v0.0.0-20190618002340-dbe0224ac097 h1:7krIUFu42TgBwT1wLQwpgR0Jcd/AkjgCGKSjhVWedaI= -github.com/siddontang/go-mysql v0.0.0-20190618002340-dbe0224ac097/go.mod h1:Bl4lryU44qtIXEXNbP0k0pD646Nkw/qHn21wfZVGJx4= github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= @@ -375,15 +365,13 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190909082730-f460065e899a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c h1:S/FtSvpNLtFBgjTqcKsRpsa6aVsI6iztaz1bQd9BJwE= golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= From c97e501d5054ed63c325c02b581a7c1a661cbd42 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Tue, 7 Jan 2020 21:09:07 +0800 Subject: [PATCH 2/5] Add encrypt password config support (#857) (#868) * Add encrypt password support also add a command to binlogctl, usage example: ./binlogctl -cmd encrypt -text aaa --- binlogctl/config.go | 7 +- binlogctl/encrypt.go | 18 ++++ cmd/binlogctl/main.go | 6 ++ cmd/drainer/drainer.toml | 4 + drainer/config.go | 20 ++++- drainer/config_test.go | 30 +++++++ drainer/sync/util.go | 22 +++-- pkg/encrypt/encrypt.go | 159 ++++++++++++++++++++++++++++++++++++ pkg/encrypt/encrypt_test.go | 92 +++++++++++++++++++++ 9 files changed, 347 insertions(+), 11 deletions(-) create mode 100644 binlogctl/encrypt.go create mode 100644 pkg/encrypt/encrypt.go create mode 100644 pkg/encrypt/encrypt_test.go diff --git a/binlogctl/config.go b/binlogctl/config.go index 6fae7a53e..413a0a4be 100644 --- a/binlogctl/config.go +++ b/binlogctl/config.go @@ -58,6 +58,9 @@ const ( // OfflineDrainer is comamnd used for offlien drainer. OfflineDrainer = "offline-drainer" + + // Encrypt is command used for encrypt password. + Encrypt = "encrypt" ) // Config holds the configuration of drainer @@ -74,6 +77,7 @@ type Config struct { SSLKey string `toml:"ssl-key" json:"ssl-key"` State string `toml:"state" json:"state"` ShowOfflineNodes bool `toml:"state" json:"show-offline-nodes"` + Text string `toml:"text" json:"text"` tls *tls.Config printVersion bool } @@ -83,7 +87,7 @@ func NewConfig() *Config { cfg := &Config{} cfg.FlagSet = flag.NewFlagSet("binlogctl", flag.ContinueOnError) - cfg.FlagSet.StringVar(&cfg.Command, "cmd", "pumps", "operator: \"generate_meta\", \"pumps\", \"drainers\", \"update-pump\", \"update-drainer\", \"pause-pump\", \"pause-drainer\", \"offline-pump\", \"offline-drainer\"") + cfg.FlagSet.StringVar(&cfg.Command, "cmd", "pumps", "operator: \"generate_meta\", \"pumps\", \"drainers\", \"update-pump\", \"update-drainer\", \"pause-pump\", \"pause-drainer\", \"offline-pump\", \"offline-drainer\", \"encrypt\"") cfg.FlagSet.StringVar(&cfg.NodeID, "node-id", "", "id of node, use to update some node with operation update-pump, update-drainer, pause-pump, pause-drainer, offline-pump and offline-drainer") cfg.FlagSet.StringVar(&cfg.DataDir, "data-dir", defaultDataDir, "meta directory path") cfg.FlagSet.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of PD endpoints") @@ -93,6 +97,7 @@ func NewConfig() *Config { cfg.FlagSet.StringVar(&cfg.TimeZone, "time-zone", "", "set time zone if you want save time info in savepoint file, for example `Asia/Shanghai` for CST time, `Local` for local time") cfg.FlagSet.StringVar(&cfg.State, "state", "", "set node's state, can set to online, pausing, paused, closing or offline.") cfg.FlagSet.BoolVar(&cfg.ShowOfflineNodes, "show-offline-nodes", false, "include offline nodes when querying pumps/drainers") + cfg.FlagSet.StringVar(&cfg.Text, "text", "", "text to be encrypt when using encrypt command") cfg.FlagSet.BoolVar(&cfg.printVersion, "V", false, "prints version and exit") return cfg diff --git a/binlogctl/encrypt.go b/binlogctl/encrypt.go new file mode 100644 index 000000000..5dc41a730 --- /dev/null +++ b/binlogctl/encrypt.go @@ -0,0 +1,18 @@ +package binlogctl + +import ( + "github.com/pingcap/log" + "github.com/pingcap/tidb-binlog/pkg/encrypt" + "go.uber.org/zap" +) + +// EncryptHandler log the encrypted text if success or return error. +func EncryptHandler(text string) error { + enc, err := encrypt.Encrypt(text) + if err != nil { + return err + } + + log.Info("encrypt text", zap.String("encrypted", string(enc))) + return nil +} diff --git a/cmd/binlogctl/main.go b/cmd/binlogctl/main.go index eb9e104f4..7f56473fd 100644 --- a/cmd/binlogctl/main.go +++ b/cmd/binlogctl/main.go @@ -60,6 +60,12 @@ func main() { err = ctl.ApplyAction(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, close) case ctl.OfflineDrainer: err = ctl.ApplyAction(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, close) + case ctl.Encrypt: + if len(cfg.Text) == 0 { + err = errors.New("need to specify the text to be encrypt") + } else { + err = ctl.EncryptHandler(cfg.Text) + } default: err = errors.NotSupportedf("cmd %s", cfg.Command) } diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index fef6c17e3..6dceb50ed 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -81,6 +81,8 @@ ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql" host = "127.0.0.1" user = "root" password = "" +# if encrypted_password is not empty, password will be ignored. +encrypted_password = "" port = 3306 [syncer.to.checkpoint] @@ -93,6 +95,8 @@ port = 3306 # schema = "tidb_binlog" # host = "127.0.0.1" # user = "root" +# if encrypted_password is not empty, password will be ignored. +# encrypted_password = "" # password = "" # port = 3306 diff --git a/drainer/config.go b/drainer/config.go index a4c8d3132..7955d3316 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -31,6 +31,7 @@ import ( "go.uber.org/zap" dsync "github.com/pingcap/tidb-binlog/drainer/sync" + "github.com/pingcap/tidb-binlog/pkg/encrypt" "github.com/pingcap/tidb-binlog/pkg/filter" "github.com/pingcap/tidb-binlog/pkg/flags" "github.com/pingcap/tidb-binlog/pkg/security" @@ -374,11 +375,28 @@ func (cfg *Config) adjustConfig() error { } cfg.SyncerCfg.To.User = user } - if len(cfg.SyncerCfg.To.Password) == 0 { + + if len(cfg.SyncerCfg.To.EncryptedPassword) > 0 { + decrypt, err := encrypt.Decrypt(cfg.SyncerCfg.To.EncryptedPassword) + if err != nil { + return errors.Annotate(err, "failed to decrypt password in `to.encrypted_password`") + } + + cfg.SyncerCfg.To.Password = decrypt + } else if len(cfg.SyncerCfg.To.Password) == 0 { cfg.SyncerCfg.To.Password = os.Getenv("MYSQL_PSWD") } } + if len(cfg.SyncerCfg.To.Checkpoint.EncryptedPassword) > 0 { + decrypt, err := encrypt.Decrypt(cfg.SyncerCfg.To.EncryptedPassword) + if err != nil { + return errors.Annotate(err, "failed to decrypt password in `checkpoint.encrypted_password`") + } + + cfg.SyncerCfg.To.Checkpoint.Password = decrypt + } + cfg.SyncerCfg.adjustWorkCount() cfg.SyncerCfg.adjustDoDBAndTable() diff --git a/drainer/config_test.go b/drainer/config_test.go index 7e087e178..588108ec2 100644 --- a/drainer/config_test.go +++ b/drainer/config_test.go @@ -21,9 +21,11 @@ import ( "time" "github.com/BurntSushi/toml" + "github.com/pingcap/check" . "github.com/pingcap/check" "github.com/pingcap/parser/mysql" dsync "github.com/pingcap/tidb-binlog/drainer/sync" + "github.com/pingcap/tidb-binlog/pkg/encrypt" "github.com/pingcap/tidb-binlog/pkg/filter" "github.com/pingcap/tidb-binlog/pkg/util" pkgzk "github.com/pingcap/tidb-binlog/pkg/zk" @@ -151,6 +153,7 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) { c.Assert(cfg.ListenAddr, Equals, "http://"+util.DefaultListenAddr(8249)) c.Assert(cfg.AdvertiseAddr, Equals, cfg.ListenAddr) + // test EncryptedPassword cfg = NewConfig() cfg.ListenAddr = "0.0.0.0:8257" cfg.AdvertiseAddr = "192.168.15.12:8257" @@ -158,6 +161,33 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) { c.Assert(err, IsNil) c.Assert(cfg.ListenAddr, Equals, "http://0.0.0.0:8257") c.Assert(cfg.AdvertiseAddr, Equals, "http://192.168.15.12:8257") + + cfg = NewConfig() + encrypted, err := encrypt.Encrypt("origin") + c.Assert(err, IsNil) + + cfg.SyncerCfg.To = &dsync.DBConfig{ + EncryptedPassword: string(encrypted), + Checkpoint: dsync.CheckpointConfig{ + EncryptedPassword: string(encrypted), + }, + } + err = cfg.adjustConfig() + c.Assert(err, IsNil) + c.Assert(cfg.SyncerCfg.To.Password, check.Equals, "origin") + c.Assert(cfg.SyncerCfg.To.Checkpoint.Password, check.Equals, "origin") + + // test false positive + cfg.SyncerCfg.To = &dsync.DBConfig{ + EncryptedPassword: "what ever" + string(encrypted), + Checkpoint: dsync.CheckpointConfig{ + EncryptedPassword: "what ever" + string(encrypted), + }, + } + + c.Logf("to.password: %v", cfg.SyncerCfg.To.Password) + err = cfg.adjustConfig() + c.Assert(err, NotNil) } func (t *testDrainerSuite) TestConfigParsingFileWithInvalidOptions(c *C) { diff --git a/drainer/sync/util.go b/drainer/sync/util.go index cfa922e19..f58e0d048 100644 --- a/drainer/sync/util.go +++ b/drainer/sync/util.go @@ -20,14 +20,16 @@ import ( // DBConfig is the DB configuration. type DBConfig struct { - Host string `toml:"host" json:"host"` - User string `toml:"user" json:"user"` - Password string `toml:"password" json:"password"` - Port int `toml:"port" json:"port"` - Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"` - BinlogFileDir string `toml:"dir" json:"dir"` - TimeLimit string `toml:"time-limit" json:"time-limit"` - SizeLimit string `toml:"size-limit" json:"size-limit"` + Host string `toml:"host" json:"host"` + User string `toml:"user" json:"user"` + Password string `toml:"password" json:"password"` + // if EncryptedPassword is not empty, Password will be ignore. + EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"` + Port int `toml:"port" json:"port"` + Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"` + BinlogFileDir string `toml:"dir" json:"dir"` + TimeLimit string `toml:"time-limit" json:"time-limit"` + SizeLimit string `toml:"size-limit" json:"size-limit"` ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"` KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"` @@ -45,7 +47,9 @@ type CheckpointConfig struct { Host string `toml:"host" json:"host"` User string `toml:"user" json:"user"` Password string `toml:"password" json:"password"` - Port int `toml:"port" json:"port"` + // if EncryptedPassword is not empty, Password will be ignore. + EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"` + Port int `toml:"port" json:"port"` } type baseError struct { diff --git a/pkg/encrypt/encrypt.go b/pkg/encrypt/encrypt.go new file mode 100644 index 000000000..690a07942 --- /dev/null +++ b/pkg/encrypt/encrypt.go @@ -0,0 +1,159 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package encrypt + +import ( + "bytes" + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "encoding/base64" + "encoding/hex" + "os" + "sync" + + "github.com/pingcap/errors" + "github.com/pingcap/log" +) + +var ( + defaultSecretKey, _ = hex.DecodeString("a529b7665997f043a30ac8fadcb51d6aa032c226ab5b7750530b12b8c1a16a48") + secretKey []byte + ivSep = []byte("@") // ciphertext format: iv + ivSep + encrypted-plaintext +) + +var initSecretKeyOnce sync.Once +var initSecretKeyErr error + +func initSecretKey() error { + hexKey := os.Getenv("BINLOG_SECRET_KEY") + if len(hexKey) == 0 { + log.Warn("use the default secret key to encrypt") + secretKey = defaultSecretKey + return nil + } + + key, err := hex.DecodeString(hexKey) + if err != nil { + return errors.Trace(err) + } + + return SetSecretKey(key) +} + +// SetSecretKey sets the secret key which used to encrypt +func SetSecretKey(key []byte) error { + switch len(key) { + case 16, 24, 32: + break + default: + return errors.Errorf("secretKey not valid: %v", key) + } + secretKey = key + return nil +} + +// Encrypt tries to encrypt plaintext to base64 encoded ciphertext +func Encrypt(plaintext string) (string, error) { + ciphertext, err := encrypt([]byte(plaintext)) + if err != nil { + return "", err + } + + return base64.StdEncoding.EncodeToString(ciphertext), nil +} + +// Decrypt tries to decrypt base64 encoded ciphertext to plaintext +func Decrypt(ciphertextB64 string) (string, error) { + ciphertext, err := base64.StdEncoding.DecodeString(ciphertextB64) + if err != nil { + return "", errors.Annotatef(err, "base 64 failed to decode: %s", ciphertext) + } + + plaintext, err := decrypt(ciphertext) + if err != nil { + return "", errors.Trace(err) + } + return string(plaintext), nil +} + +// encrypt encrypts plaintext to ciphertext +func encrypt(plaintext []byte) ([]byte, error) { + initSecretKeyOnce.Do(func() { + initSecretKeyErr = initSecretKey() + }) + if initSecretKeyErr != nil { + return nil, initSecretKeyErr + } + + block, err := aes.NewCipher(secretKey) + if err != nil { + return nil, errors.Trace(err) + } + + iv, err := genIV(block.BlockSize()) + if err != nil { + return nil, err + } + + ciphertext := make([]byte, 0, len(iv)+len(ivSep)+len(plaintext)) + ciphertext = append(ciphertext, iv...) + ciphertext = append(ciphertext, ivSep...) + ciphertext = append(ciphertext, plaintext...) // will be overwrite by XORKeyStream + + stream := cipher.NewCFBEncrypter(block, iv) + stream.XORKeyStream(ciphertext[len(iv)+len(ivSep):], plaintext) + + return ciphertext, nil +} + +// decrypt decrypts ciphertext to plaintext +func decrypt(ciphertext []byte) ([]byte, error) { + initSecretKeyOnce.Do(func() { + initSecretKeyErr = initSecretKey() + }) + if initSecretKeyErr != nil { + return nil, initSecretKeyErr + } + + block, err := aes.NewCipher(secretKey) + if err != nil { + return nil, err + } + + if len(ciphertext) < block.BlockSize()+len(ivSep) { + // return nil, terror.ErrCiphertextLenNotValid.Generate(block.BlockSize()+len(ivSep), len(ciphertext)) + return nil, errors.Errorf("ciphertext not valid") + } + + if !bytes.Equal(ciphertext[block.BlockSize():block.BlockSize()+len(ivSep)], ivSep) { + // return nil, terror.ErrCiphertextContextNotValid.Generate() + return nil, errors.Errorf("ciphertext not valid") + } + + iv := ciphertext[:block.BlockSize()] + ciphertext = ciphertext[block.BlockSize()+len(ivSep):] + plaintext := make([]byte, len(ciphertext)) + + stream := cipher.NewCFBDecrypter(block, iv) + stream.XORKeyStream(plaintext, ciphertext) + + return plaintext, nil +} + +func genIV(n int) ([]byte, error) { + b := make([]byte, n) + _, err := rand.Read(b) + return b, errors.Trace(err) +} diff --git a/pkg/encrypt/encrypt_test.go b/pkg/encrypt/encrypt_test.go new file mode 100644 index 000000000..9889a5acf --- /dev/null +++ b/pkg/encrypt/encrypt_test.go @@ -0,0 +1,92 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package encrypt + +import ( + "crypto/aes" + "crypto/rand" + "testing" + + . "github.com/pingcap/check" +) + +var _ = Suite(&testEncryptSuite{}) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testEncryptSuite struct { +} + +func (t *testEncryptSuite) TestSetSecretKey(c *C) { + // 16 bit + b16 := make([]byte, 16) + _, err := rand.Read(b16) + c.Assert(err, IsNil) + + err = SetSecretKey(b16) + c.Assert(err, IsNil) + + // 20 bit + b20 := make([]byte, 20) + _, err = rand.Read(b20) + c.Assert(err, IsNil) + + err = SetSecretKey(b20) + c.Assert(err, NotNil) +} + +func removeChar(input []byte, c byte) []byte { + i := 0 + for _, x := range input { + if x != c { + input[i] = x + i++ + } + } + return input[:i] +} + +func (t *testEncryptSuite) TestEncrypt(c *C) { + plaintext := []byte("a plain text") + + // encrypt + ciphertext, err := encrypt(plaintext) + c.Assert(err, IsNil) + + // decrypt + plaintext2, err := decrypt(ciphertext) + c.Assert(err, IsNil) + c.Assert(plaintext2, DeepEquals, plaintext) + + // invalid length + _, err = decrypt(ciphertext[:len(ciphertext)-len(plaintext)-1]) + c.Assert(err, NotNil) + + // invalid content + _, err = decrypt(removeChar(ciphertext, ivSep[0])) + c.Assert(err, NotNil) + + // a special case, we construct a ciphertext that can be decrypted but the + // plaintext is not what we want. This is because currently encrypt mechanism + // doesn't keep enough information to decide whether the new ciphertext is valid + block, err := aes.NewCipher(secretKey) + c.Assert(err, IsNil) + blockSize := block.BlockSize() + c.Assert(len(ciphertext), Greater, blockSize+2) + plaintext3, err := decrypt(append(ciphertext[1:blockSize+1], append([]byte{ivSep[0]}, ciphertext[blockSize+2:]...)...)) + c.Assert(err, IsNil) + c.Assert(plaintext3, Not(DeepEquals), plaintext) +} From e9cce0dcdba88fb05100e0c04fa860cc5091ba03 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 9 Jan 2020 14:39:44 +0800 Subject: [PATCH 3/5] metrics: add grafana and alertmanager scripts (#872) --- metrics/alertmanager/binlog.rules.yml | 84 ++ metrics/grafana/binlog.json | 2002 +++++++++++++++++++++++++ 2 files changed, 2086 insertions(+) create mode 100644 metrics/alertmanager/binlog.rules.yml create mode 100644 metrics/grafana/binlog.json diff --git a/metrics/alertmanager/binlog.rules.yml b/metrics/alertmanager/binlog.rules.yml new file mode 100644 index 000000000..552c06724 --- /dev/null +++ b/metrics/alertmanager/binlog.rules.yml @@ -0,0 +1,84 @@ +groups: +- name: alert.rules + rules: + - alert: binlog_pump_storage_error_count + expr: changes(binlog_pump_storage_error_count[1m]) > 0 + labels: + env: ENV_LABELS_ENV + level: emergency + expr: changes(binlog_pump_storage_error_count[1m]) > 0 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog pump storage write some binlogs failed + + - alert: binlog_drainer_checkpoint_high_delay + expr: (time() - binlog_drainer_checkpoint_tso / 1000) > 3600 + for: 1m + labels: + env: ENV_LABELS_ENV + level: critical + expr: (time() - binlog_drainer_checkpoint_tso / 1000) > 3600 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog drainer checkpoint delay more than 1 hour + + - alert: binlog_pump_write_binlog_rpc_duration_seconds_bucket + expr: histogram_quantile(0.9, rate(binlog_pump_rpc_duration_seconds_bucket{method="WriteBinlog"}[5m])) > 1 + for: 1m + labels: + env: ENV_LABELS_ENV + level: warning + expr: histogram_quantile(0.9, rate(binlog_pump_rpc_duration_seconds_bucket{method="WriteBinlog"}[5m])) + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog pump write binlog RPC latency is too high + + - alert: binlog_pump_storage_write_binlog_duration_time_bucket + expr: histogram_quantile(0.9, rate(binlog_pump_storage_write_binlog_duration_time_bucket{type="batch"}[5m])) > 1 + for: 1m + labels: + env: ENV_LABELS_ENV + level: warning + expr: histogram_quantile(0.9, rate(binlog_pump_storage_write_binlog_duration_time_bucket{type="batch"}[5m])) + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog pump write binlog to disk is too slow + + - alert: binlog_pump_storage_available_size_less_than_20G + expr: binlog_pump_storage_storage_size_bytes{type="available"} < 20 * 1024 * 1024 * 1024 + for: 10s + labels: + env: ENV_LABELS_ENV + level: warning + expr: binlog_pump_storage_storage_size_bytes{type="available"} < 20 * 1024 * 1024 * 1024 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog pump storage available size less than 20G + + - alert: binlog_drainer_execute_duration_time_more_than_10s + expr: histogram_quantile(0.9, rate(binlog_drainer_execute_duration_time_bucket[1m])) > 10 + for: 1m + labels: + env: ENV_LABELS_ENV + level: warning + expr: histogram_quantile(0.9, rate(binlog_drainer_txn_duration_time_bucket[1m])) > 10 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog binlog drainer execute_duration_time_more_than_10s + + - alert: binlog_drainer_checkpoint_tso_no_change_for_1m + expr: changes(binlog_drainer_checkpoint_tso[1m]) < 1 + labels: + env: ENV_LABELS_ENV + level: warning + expr: changes(binlog_drainer_checkpoint_tso[1m]) < 1 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog drainer checkpoint tso no change for 1m diff --git a/metrics/grafana/binlog.json b/metrics/grafana/binlog.json new file mode 100644 index 000000000..6f7f7da0d --- /dev/null +++ b/metrics/grafana/binlog.json @@ -0,0 +1,2002 @@ +{ + "__inputs": [ + { + "name": "DS_TEST-CLUSTER", + "label": "test-cluster", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "6.1.6" + }, + { + "type": "panel", + "id": "graph", + "name": "Graph", + "version": "" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "1.0.0" + }, + { + "type": "panel", + "id": "singlestat", + "name": "Singlestat", + "version": "" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "${DS_TEST-CLUSTER}", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 0, + "id": null, + "iteration": 1564734492545, + "links": [], + "panels": [ + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 73, + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 1 + }, + "hideTimeOverride": false, + "id": 68, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideZero": false, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_pump_storage_storage_size_bytes", + "format": "time_series", + "hide": false, + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Storage Size", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "decbytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 1 + }, + "id": 63, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": false, + "hideZero": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_pump_storage_gc_ts", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : gc_tso", + "refId": "A" + }, + { + "expr": "binlog_pump_storage_max_commit_ts", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : max_commit_tso", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Metadata", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "dateTimeAsIso", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 7, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "rate(binlog_pump_rpc_duration_seconds_count{method=\"WriteBinlog\"}[1m])", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} :: {{label}}", + "metric": "binlog_cistern_rpc_duration_seconds_bucket", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Write Binlog QPS by Instance", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "ops", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 8 + }, + "id": 3, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.99, rate(binlog_pump_rpc_duration_seconds_bucket{method=\"WriteBinlog\"}[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{method}}:99", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, rate(binlog_pump_rpc_duration_seconds_bucket{method=\"WriteBinlog\"}[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{method}} : 95", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Write Binlog Latency", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 15 + }, + "id": 44, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.99, rate(binlog_pump_storage_write_binlog_size_bucket[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{type}} : 99", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.95, rate(binlog_pump_storage_write_binlog_size_bucket[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{type}} : 95", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Storage Write Binlog Size", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "Bps", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 15 + }, + "id": 66, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.99, rate(binlog_pump_storage_write_binlog_duration_time_bucket[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{type}}:99", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, rate(binlog_pump_storage_write_binlog_duration_time_bucket[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{type}}:95", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Storage Write Binlog Latency", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 22 + }, + "id": 48, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "rate(binlog_pump_storage_error_count[1m])", + "format": "time_series", + "instant": false, + "intervalFactor": 2, + "legendFormat": "{{instance}}:{{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Pump Storage Error By Type", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 22 + }, + "id": 67, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_pump_storage_query_tikv_count", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Query Tikv", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + } + ], + "repeat": null, + "title": "pump", + "type": "row" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 1 + }, + "id": 74, + "panels": [ + { + "cacheTimeout": null, + "colorBackground": false, + "colorValue": false, + "colors": [ + "#299c46", + "rgba(237, 129, 40, 0.89)", + "#d44a3a" + ], + "datasource": "${DS_TEST-CLUSTER}", + "format": "dateTimeAsIso", + "gauge": { + "maxValue": null, + "minValue": 0, + "show": false, + "thresholdLabels": false, + "thresholdMarkers": true + }, + "gridPos": { + "h": 7, + "w": 8, + "x": 0, + "y": 23 + }, + "hideTimeOverride": false, + "id": 70, + "interval": null, + "links": [], + "mappingType": 1, + "mappingTypes": [ + { + "name": "value to text", + "value": 1 + }, + { + "name": "range to text", + "value": 2 + } + ], + "maxDataPoints": 100, + "nullPointMode": "connected", + "nullText": null, + "postfix": "", + "postfixFontSize": "50%", + "prefix": "", + "prefixFontSize": "50%", + "rangeMaps": [ + { + "from": "null", + "text": "N/A", + "to": "null" + } + ], + "repeat": null, + "sparkline": { + "fillColor": "rgba(31, 118, 189, 0.18)", + "full": false, + "lineColor": "rgb(31, 120, 193)", + "show": false + }, + "tableColumn": "__name__", + "targets": [ + { + "expr": "binlog_drainer_checkpoint_tso{instance = \"$drainer_instance\"}", + "format": "time_series", + "instant": true, + "intervalFactor": 2, + "legendFormat": "checkpoint tso", + "refId": "A" + } + ], + "thresholds": "", + "timeFrom": null, + "timeShift": null, + "title": "Checkpoint TSO", + "transparent": false, + "type": "singlestat", + "valueFontSize": "80%", + "valueMaps": [ + { + "op": "=", + "text": "N/A", + "value": "null" + } + ], + "valueName": "current" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 16, + "x": 8, + "y": 23 + }, + "id": 69, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": false, + "hideZero": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_drainer_pump_position{instance = \"$drainer_instance\"}", + "format": "time_series", + "hide": false, + "instant": false, + "intervalFactor": 2, + "legendFormat": "{{nodeID}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Pump Handle TSO", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": false, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "dateTimeAsIso", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 30 + }, + "id": 62, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(binlog_drainer_read_binlog_size_count{instance = \"$drainer_instance\"}[1m])) by (nodeID)", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{nodeID}}", + "metric": "binlog_drainer_event", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Pull Binlog QPS by Pump NodeID", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "none", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 30 + }, + "id": 53, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.95, rate(binlog_drainer_binlog_reach_duration_time_bucket{instance = \"$drainer_instance\"}[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{nodeID}}", + "metric": "binlog_drainer_event", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "95% Binlog Reach Duration By Pump", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 37 + }, + "id": 58, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_drainer_error_count{instance = \"$drainer_instance\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{type}}", + "metric": "binlog_drainer_position", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Error By Type", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "none", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 37 + }, + "id": 6, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "irate(binlog_drainer_event{instance = \"$drainer_instance\"}[1m])", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{type}}", + "metric": "binlog_drainer_event", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Drainer Event", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "ops", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 44 + }, + "id": 15, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.99, rate(binlog_drainer_execute_duration_time_bucket{instance = \"$drainer_instance\"}[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{job}}", + "metric": "binlog_drainer_txn_duration_time_bucket", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "99% Execute Time", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 44 + }, + "id": 71, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.99, rate(binlog_drainer_query_duration_time_bucket{instance = \"$drainer_instance\"}[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{type}}", + "metric": "binlog_drainer_txn_duration_time_bucket", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "99% sql query Time", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 51 + }, + "id": 55, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.95, rate(binlog_drainer_read_binlog_size_bucket{instance = \"$drainer_instance\"}[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "pump: {{nodeID}}", + "metric": "binlog_drainer_event", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "95% Binlog Size", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "Bps", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 51 + }, + "id": 52, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_drainer_ddl_jobs_total{instance = \"$drainer_instance\"}", + "format": "time_series", + "instant": false, + "intervalFactor": 2, + "legendFormat": "ddl job count", + "metric": "binlog_drainer_position", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "DDL Job Count", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "none", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 24, + "x": 0, + "y": 58 + }, + "id": 72, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_drainer_queue_size{instance = \"$drainer_instance\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{name}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "queue size", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + } + ], + "repeat": null, + "title": "drainer", + "type": "row" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 2 + }, + "id": 75, + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 59 + }, + "id": 9, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "go_goroutines{job=~\"binlog|pump|drainer\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}}", + "metric": "go_goroutines", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Goroutine", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 59 + }, + "id": 39, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "go_memstats_heap_inuse_bytes{job=~\"binlog|pump|drainer\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}}", + "metric": "go_goroutines", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Memory", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "bits", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + } + ], + "repeat": null, + "title": "node", + "type": "row" + } + ], + "refresh": "10s", + "schemaVersion": 18, + "style": "dark", + "tags": [], + "templating": { + "list": [ + { + "allValue": null, + "current": {}, + "datasource": "${DS_TEST-CLUSTER}", + "definition": "", + "hide": 0, + "includeAll": false, + "label": null, + "multi": false, + "name": "drainer_instance", + "options": [], + "query": "label_values(binlog_drainer_ddl_jobs_total, instance)", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + } + ] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": { + "refresh_intervals": [ + "5s", + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h", + "2h", + "1d" + ], + "time_options": [ + "5m", + "15m", + "1h", + "6h", + "12h", + "24h", + "2d", + "7d", + "30d" + ] + }, + "timezone": "browser", + "title": "Test-Cluster-Binlog", + "uid": "RDdDTFvZz", + "version": 2 +} \ No newline at end of file From a50a36fb7f78979b6909200f770bc054dce2b955 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 15 Jan 2020 13:56:07 +0800 Subject: [PATCH 4/5] binlog loop back sync (#884) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add sync mode config (#867) allow when the column number of downstream table mismatch with current schema. For the case bidirectional replication, we will execute the DDL at one side, for add or drop column the column number will mismatch. cluster A <-> cluster B drop column of table t at cluster A some DML of table t at cluster B will miss the column dropped compared to cluster A * binlog loop back sync (#879) In the AA dual activity scenario, there is data write (DML) on both sides. The data of tidb cluster on both sides needs to be synchronized with each other,but avoid loopback synchronization,and ddl is only writed one side. add three variables in drainer.toml as identification to confirm need sync ddl or not and need set sync mark identification or not and sync identification id to Avoid loopback synchronization add configuration item loopback-control (true/false) set mark table identification or not and filter txn by mark table ddl-sync (true/false) sync ddl to downstream DB or not channel-id (integer) sync identification id,avoid loopback synchronization Co-authored-by: Nihao123451 <37206498+Nihao123451@users.noreply.github.com> Co-authored-by: freemindLi <59459626+freemindLi@users.noreply.github.com> Co-authored-by: Nihao123451 <37206498+Nihao123451@users.noreply.github.com> --- cmd/drainer/drainer.toml | 17 ++++ drainer/config.go | 6 ++ drainer/loopbacksync/loopbacksync.go | 42 +++++++++ drainer/loopbacksync/loopbacksync_test.go | 27 ++++++ drainer/sync/mysql.go | 52 ++++++++++- drainer/sync/mysql_test.go | 26 ++++++ drainer/sync/syncer_test.go | 2 +- drainer/sync/util.go | 1 + drainer/syncer.go | 58 +++++++++++- drainer/syncer_test.go | 32 +++++++ pkg/loader/executor.go | 36 +++++++- pkg/loader/load.go | 107 ++++++++++++++++++---- pkg/loader/load_test.go | 33 ++++++- pkg/loader/model.go | 7 ++ pkg/loader/model_test.go | 34 ++++++- 15 files changed, 451 insertions(+), 29 deletions(-) create mode 100644 drainer/loopbacksync/loopbacksync.go create mode 100644 drainer/loopbacksync/loopbacksync_test.go diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index 6dceb50ed..a1fa9772b 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -38,6 +38,19 @@ compressor = "" # number of binlog events in a transaction batch txn-batch = 20 +# sync ddl to downstream db or not +sync-ddl = true + +# This variable works in dual-a. if it is false, the upstream data will all be synchronized to the downstream, except for the filtered table. +# If it is true, the channel value is set at the same time, and the upstream starts with the mark table ID updated, and the channel ID is the same as its channel ID. +# this part of data will not be synchronized to the downstream. Therefore, in dual-a scenario,both sides Channel id also needs to be set to the same value +loopback-control = false + +# When loopback control is turned on, the channel ID will work. +# In the dual-a scenario, the channel ID synchronized from the downstream to the upstream and the channel ID synchronized from +# the upstream to the downstream need to be set to the same value to avoid loopback synchronization +channel-id = 1 + # work count to execute binlogs # if the latency between drainer and downstream(mysql or tidb) are too high, you might want to increase this # to get higher throughput by higher concurrent write to the downstream @@ -84,6 +97,10 @@ password = "" # if encrypted_password is not empty, password will be ignored. encrypted_password = "" port = 3306 +# 1: SyncFullColumn, 2: SyncPartialColumn +# when setting SyncPartialColumn drainer will allow the downstream schema +# having more or less column numbers and relax sql mode by removing STRICT_TRANS_TABLES. +# sync-mode = 1 [syncer.to.checkpoint] # only support mysql or tidb now, you can uncomment this to control where the checkpoint is saved. diff --git a/drainer/config.go b/drainer/config.go index 7955d3316..b791b9d16 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -66,6 +66,9 @@ type SyncerConfig struct { IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"` TxnBatch int `toml:"txn-batch" json:"txn-batch"` + LoopbackControl bool `toml:"loopback-control" json:"loopback-control"` + SyncDDL bool `toml:"sync-ddl" json:"sync-ddl"` + ChannelID int64 `toml:"channel-id" json:"channel-id"` WorkerCount int `toml:"worker-count" json:"worker-count"` To *dsync.DBConfig `toml:"to" json:"to"` DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"` @@ -127,6 +130,9 @@ func NewConfig() *Config { fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", -1, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint, will get a latest timestamp from pd if setting to be -1") fs.StringVar(&cfg.Compressor, "compressor", "", "use the specified compressor to compress payload between pump and drainer, only 'gzip' is supported now (default \"\", ie. compression disabled.)") fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch") + fs.BoolVar(&cfg.SyncerCfg.LoopbackControl, "loopback-control", false, "set mark or not ") + fs.BoolVar(&cfg.SyncerCfg.SyncDDL, "sync-ddl", true, "sync ddl or not") + fs.Int64Var(&cfg.SyncerCfg.ChannelID, "channel-id", 0, "sync channel id ") fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas") fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count") fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml") diff --git a/drainer/loopbacksync/loopbacksync.go b/drainer/loopbacksync/loopbacksync.go new file mode 100644 index 000000000..9960bb180 --- /dev/null +++ b/drainer/loopbacksync/loopbacksync.go @@ -0,0 +1,42 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package loopbacksync + +const ( + //MarkTableName mark table name + MarkTableName = "retl._drainer_repl_mark" + //ChannelID channel id + ChannelID = "channel_id" + //Val val + Val = "val" + //ChannelInfo channel info + ChannelInfo = "channel_info" +) + +//LoopBackSync loopback sync info +type LoopBackSync struct { + ChannelID int64 + LoopbackControl bool + SyncDDL bool +} + +//NewLoopBackSyncInfo return LoopBackSyncInfo objec +func NewLoopBackSyncInfo(ChannelID int64, LoopbackControl, SyncDDL bool) *LoopBackSync { + l := &LoopBackSync{ + ChannelID: ChannelID, + LoopbackControl: LoopbackControl, + SyncDDL: SyncDDL, + } + return l +} diff --git a/drainer/loopbacksync/loopbacksync_test.go b/drainer/loopbacksync/loopbacksync_test.go new file mode 100644 index 000000000..7306da0f3 --- /dev/null +++ b/drainer/loopbacksync/loopbacksync_test.go @@ -0,0 +1,27 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package loopbacksync + +import "testing" + +//TestNewLoopBackSyncInfo test loopBackSyncInfo alloc +func TestNewLoopBackSyncInfo(t *testing.T) { + var ChannelID int64 = 1 + var LoopbackControl = true + var SyncDDL = false + l := NewLoopBackSyncInfo(ChannelID, LoopbackControl, SyncDDL) + if l == nil { + t.Error("alloc loopBackSyncInfo objec failed ") + } +} diff --git a/drainer/sync/mysql.go b/drainer/sync/mysql.go index 10343e3ed..12151fb02 100644 --- a/drainer/sync/mysql.go +++ b/drainer/sync/mysql.go @@ -15,8 +15,11 @@ package sync import ( "database/sql" + "strings" "sync" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb-binlog/drainer/translator" @@ -38,14 +41,14 @@ type MysqlSyncer struct { var createDB = loader.CreateDBWithSQLMode // NewMysqlSyncer returns a instance of MysqlSyncer -func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string) (*MysqlSyncer, error) { +func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string, info *loopbacksync.LoopBackSync) (*MysqlSyncer, error) { db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, sqlMode) if err != nil { return nil, errors.Trace(err) } var opts []loader.Option - opts = append(opts, loader.WorkerCount(worker), loader.BatchSize(batchSize), loader.SaveAppliedTS(destDBType == "tidb")) + opts = append(opts, loader.WorkerCount(worker), loader.BatchSize(batchSize), loader.SaveAppliedTS(destDBType == "tidb"), loader.SetloopBackSyncInfo(info)) if queryHistogramVec != nil { opts = append(opts, loader.Metrics(&loader.MetricsGroup{ QueryHistogramVec: queryHistogramVec, @@ -53,6 +56,27 @@ func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, w })) } + if cfg.SyncMode != 0 { + mode := loader.SyncMode(cfg.SyncMode) + opts = append(opts, loader.SyncModeOption(mode)) + + if mode == loader.SyncPartialColumn { + var oldMode, newMode string + oldMode, newMode, err = relaxSQLMode(db) + if err != nil { + return nil, errors.Trace(err) + } + + if newMode != oldMode { + db.Close() + db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, &newMode) + if err != nil { + return nil, errors.Trace(err) + } + } + } + } + loader, err := loader.NewLoader(db, opts...) if err != nil { return nil, errors.Trace(err) @@ -69,6 +93,29 @@ func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, w return s, nil } +// set newMode as the oldMode query from db by removing "STRICT_TRANS_TABLES". +func relaxSQLMode(db *sql.DB) (oldMode string, newMode string, err error) { + row := db.QueryRow("SELECT @@SESSION.sql_mode;") + err = row.Scan(&oldMode) + if err != nil { + return "", "", errors.Trace(err) + } + + toRemove := "STRICT_TRANS_TABLES" + newMode = oldMode + + if !strings.Contains(oldMode, toRemove) { + return + } + + // concatenated by "," like: mode1,mode2 + newMode = strings.Replace(newMode, toRemove+",", "", -1) + newMode = strings.Replace(newMode, ","+toRemove, "", -1) + newMode = strings.Replace(newMode, toRemove, "", -1) + + return +} + // SetSafeMode make the MysqlSyncer to use safe mode or not func (m *MysqlSyncer) SetSafeMode(mode bool) { m.loader.SetSafeMode(mode) @@ -80,7 +127,6 @@ func (m *MysqlSyncer) Sync(item *Item) error { if err != nil { return errors.Trace(err) } - txn.Metadata = item select { diff --git a/drainer/sync/mysql_test.go b/drainer/sync/mysql_test.go index 5865b6253..c7b2dc252 100644 --- a/drainer/sync/mysql_test.go +++ b/drainer/sync/mysql_test.go @@ -87,3 +87,29 @@ func (s *mysqlSuite) TestMySQLSyncerAvoidBlock(c *check.C) { c.Fatal("mysql syncer hasn't synced item in 1s after some error occurs in loader") } } + +func (s *mysqlSuite) TestRelaxSQLMode(c *check.C) { + tests := []struct { + oldMode string + newMode string + }{ + {"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE", "ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE"}, + {"ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE,STRICT_TRANS_TABLES", "ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE"}, + {"STRICT_TRANS_TABLES", ""}, + {"ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE", "ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE"}, + } + + for _, test := range tests { + db, dbMock, err := sqlmock.New() + c.Assert(err, check.IsNil) + + rows := sqlmock.NewRows([]string{"@@SESSION.sql_mode"}). + AddRow(test.oldMode) + dbMock.ExpectQuery("SELECT @@SESSION.sql_mode;").WillReturnRows(rows) + + getOld, getNew, err := relaxSQLMode(db) + c.Assert(err, check.IsNil) + c.Assert(getOld, check.Equals, test.oldMode) + c.Assert(getNew, check.Equals, test.newMode) + } +} diff --git a/drainer/sync/syncer_test.go b/drainer/sync/syncer_test.go index a153eb132..f86805953 100644 --- a/drainer/sync/syncer_test.go +++ b/drainer/sync/syncer_test.go @@ -69,7 +69,7 @@ func (s *syncerSuite) SetUpTest(c *check.C) { createDB = oldCreateDB }() - mysql, err := NewMysqlSyncer(cfg, infoGetter, 1, 1, nil, nil, "mysql") + mysql, err := NewMysqlSyncer(cfg, infoGetter, 1, 1, nil, nil, "mysql", nil) c.Assert(err, check.IsNil) s.syncers = append(s.syncers, mysql) diff --git a/drainer/sync/util.go b/drainer/sync/util.go index f58e0d048..f4a6e2ac2 100644 --- a/drainer/sync/util.go +++ b/drainer/sync/util.go @@ -25,6 +25,7 @@ type DBConfig struct { Password string `toml:"password" json:"password"` // if EncryptedPassword is not empty, Password will be ignore. EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"` + SyncMode int `toml:"sync-mode" json:"sync-mode"` Port int `toml:"port" json:"port"` Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"` BinlogFileDir string `toml:"dir" json:"dir"` diff --git a/drainer/syncer.go b/drainer/syncer.go index 7bc10b7c5..ab6cb1568 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -14,10 +14,14 @@ package drainer import ( + "reflect" "strings" "sync/atomic" "time" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/tidb-binlog/pkg/loader" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/parser/model" @@ -46,6 +50,8 @@ type Syncer struct { filter *filter.Filter + loopbackSync *loopbacksync.LoopBackSync + // last time we successfully sync binlog item to downstream lastSyncTime time.Time @@ -70,6 +76,7 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) ( ignoreDBs = strings.Split(cfg.IgnoreSchemas, ",") } syncer.filter = filter.NewFilter(ignoreDBs, cfg.IgnoreTables, cfg.DoDBs, cfg.DoTables) + syncer.loopbackSync = loopbacksync.NewLoopBackSyncInfo(cfg.ChannelID, cfg.LoopbackControl, cfg.SyncDDL) var err error // create schema @@ -78,7 +85,7 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) ( return nil, errors.Trace(err) } - syncer.dsyncer, err = createDSyncer(cfg, syncer.schema) + syncer.dsyncer, err = createDSyncer(cfg, syncer.schema, syncer.loopbackSync) if err != nil { return nil, errors.Trace(err) } @@ -86,7 +93,7 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) ( return syncer, nil } -func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err error) { +func createDSyncer(cfg *SyncerConfig, schema *Schema, info *loopbacksync.LoopBackSync) (dsyncer dsync.Syncer, err error) { switch cfg.DestDBType { case "kafka": dsyncer, err = dsync.NewKafka(cfg.To, schema) @@ -104,7 +111,7 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err return nil, errors.Annotate(err, "fail to create flash dsyncer") } case "mysql", "tidb": - dsyncer, err = dsync.NewMysqlSyncer(cfg.To, schema, cfg.WorkerCount, cfg.TxnBatch, queryHistogramVec, cfg.StrSQLMode, cfg.DestDBType) + dsyncer, err = dsync.NewMysqlSyncer(cfg.To, schema, cfg.WorkerCount, cfg.TxnBatch, queryHistogramVec, cfg.StrSQLMode, cfg.DestDBType, info) if err != nil { return nil, errors.Annotate(err, "fail to create mysql dsyncer") } @@ -348,6 +355,15 @@ ForLoop: err = errors.Annotate(err, "handlePreviousDDLJobIfNeed failed") break ForLoop } + var isFilterTransaction = false + var err1 error + if s.loopbackSync != nil && s.loopbackSync.LoopbackControl { + isFilterTransaction, err1 = loopBackStatus(binlog, preWrite, s.schema, s.loopbackSync) + if err1 != nil { + err = errors.Annotate(err1, "analyze transaction failed") + break ForLoop + } + } var ignore bool ignore, err = filterTable(preWrite, s.filter, s.schema) @@ -356,7 +372,7 @@ ForLoop: break ForLoop } - if !ignore { + if !ignore && !isFilterTransaction { s.addDMLEventMetrics(preWrite.GetMutations()) beginTime := time.Now() lastAddComitTS = binlog.GetCommitTs() @@ -374,6 +390,11 @@ ForLoop: // DDL (with version 10, commit ts 100) -> DDL (with version 9, commit ts 101) would never happen s.schema.addJob(b.job) + if !s.cfg.SyncDDL { + log.Info("Syncer skips DDL", zap.String("sql", b.job.Query), zap.Int64("ts", b.GetCommitTs()), zap.Bool("SyncDDL", s.cfg.SyncDDL)) + continue + } + log.Debug("get DDL", zap.Int64("SchemaVersion", b.job.BinlogInfo.SchemaVersion)) lastDDLSchemaVersion = b.job.BinlogInfo.SchemaVersion @@ -436,6 +457,35 @@ ForLoop: return cerr } +func findLoopBackMark(dmls []*loader.DML, info *loopbacksync.LoopBackSync) (bool, error) { + for _, dml := range dmls { + tableName := dml.Database + "." + dml.Table + if strings.EqualFold(tableName, loopbacksync.MarkTableName) { + channelID, ok := dml.Values[loopbacksync.ChannelID] + if ok { + channelIDInt64, ok := channelID.(int64) + if !ok { + return false, errors.Errorf("wrong type of channelID: %s", reflect.TypeOf(channelID)) + } + if channelIDInt64 == info.ChannelID { + return true, nil + } + } + } + } + return false, nil +} + +func loopBackStatus(binlog *pb.Binlog, prewriteValue *pb.PrewriteValue, infoGetter translator.TableInfoGetter, info *loopbacksync.LoopBackSync) (bool, error) { + var tableName string + var schemaName string + txn, err := translator.TiBinlogToTxn(infoGetter, schemaName, tableName, binlog, prewriteValue) + if err != nil { + return false, errors.Trace(err) + } + return findLoopBackMark(txn.DMLs, info) +} + // filterTable may drop some table mutation in `PrewriteValue` // Return true if all table mutations are dropped. func filterTable(pv *pb.PrewriteValue, filter *filter.Filter, schema *Schema) (ignore bool, err error) { diff --git a/drainer/syncer_test.go b/drainer/syncer_test.go index 6902d0b3d..7dc71ac1b 100644 --- a/drainer/syncer_test.go +++ b/drainer/syncer_test.go @@ -16,6 +16,9 @@ package drainer import ( "time" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/tidb-binlog/pkg/loader" + "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/parser/model" @@ -58,9 +61,38 @@ func (s *syncerSuite) TestFilterTable(c *check.C) { c.Assert(len(pv.Mutations), check.Equals, 1) } +func (s *syncerSuite) TestFilterMarkDatas(c *check.C) { + var dmls []*loader.DML + dml := loader.DML{ + Database: "retl", + Table: "_drainer_repl_mark", + Tp: 1, + Values: make(map[string]interface{}), + } + dml.Values["channel_id"] = int64(100) + dmls = append(dmls, &dml) + dml1 := loader.DML{ + Database: "retl", + Table: "retl_mark9", + Tp: 1, + Values: make(map[string]interface{}), + } + dml1.Values["status"] = 100 + dmls = append(dmls, &dml1) + loopBackSyncInfo := loopbacksync.LoopBackSync{ + ChannelID: 100, + SyncDDL: true, + LoopbackControl: false, + } + status, err := findLoopBackMark(dmls, &loopBackSyncInfo) + c.Assert(status, check.IsTrue) + c.Assert(err, check.IsNil) +} + func (s *syncerSuite) TestNewSyncer(c *check.C) { cfg := &SyncerConfig{ DestDBType: "_intercept", + SyncDDL: true, } cpFile := c.MkDir() + "/checkpoint" diff --git a/pkg/loader/executor.go b/pkg/loader/executor.go index 4e72a5194..ab2d18d6c 100644 --- a/pkg/loader/executor.go +++ b/pkg/loader/executor.go @@ -20,6 +20,8 @@ import ( "strings" "time" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb-binlog/pkg/util" @@ -33,6 +35,7 @@ var defaultBatchSize = 128 type executor struct { db *gosql.DB batchSize int + info *loopbacksync.LoopBackSync queryHistogramVec *prometheus.HistogramVec } @@ -50,6 +53,10 @@ func (e *executor) withBatchSize(batchSize int) *executor { return e } +func (e *executor) setSyncInfo(info *loopbacksync.LoopBackSync) { + e.info = info +} + func (e *executor) withQueryHistogramVec(queryHistogramVec *prometheus.HistogramVec) *executor { e.queryHistogramVec = queryHistogramVec return e @@ -100,6 +107,22 @@ func (tx *tx) commit() error { return errors.Trace(err) } +func (e *executor) updateMark(channel string, tx *tx) error { + if e.info == nil { + return nil + } + status := 1 + columns := fmt.Sprintf("(%s,%s,%s) VALUES(?,?,?)", loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + var args []interface{} + sql := fmt.Sprintf("INSERT INTO %s%s on duplicate key update %s=%s+1;", loopbacksync.MarkTableName, columns, loopbacksync.Val, loopbacksync.Val) + args = append(args, e.info.ChannelID, status, channel) + _, err := tx.autoRollbackExec(sql, args...) + if err != nil { + return errors.Trace(err) + } + return nil +} + // return a wrap of sql.Tx func (e *executor) begin() (*tx, error) { sqlTx, err := e.db.Begin() @@ -107,10 +130,19 @@ func (e *executor) begin() (*tx, error) { return nil, errors.Trace(err) } - return &tx{ + var tx = &tx{ Tx: sqlTx, queryHistogramVec: e.queryHistogramVec, - }, nil + } + + if e.info != nil && e.info.LoopbackControl { + err1 := e.updateMark("", tx) + if err1 != nil { + return nil, errors.Trace(err1) + } + } + + return tx, nil } func (e *executor) bulkDelete(deletes []*DML) error { diff --git a/pkg/loader/load.go b/pkg/loader/load.go index 402fe5766..aaa497941 100644 --- a/pkg/loader/load.go +++ b/pkg/loader/load.go @@ -17,10 +17,13 @@ import ( "context" gosql "database/sql" "fmt" + "strings" "sync" "sync/atomic" "time" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb-binlog/pkg/util" @@ -69,6 +72,9 @@ type loaderImpl struct { batchSize int workerCount int + syncMode SyncMode + + loopBackSyncInfo *loopbacksync.LoopBackSync input chan *Txn successTxn chan *Txn @@ -99,23 +105,43 @@ type MetricsGroup struct { QueryHistogramVec *prometheus.HistogramVec } +// SyncMode represents the sync mode of DML. +type SyncMode int + +// SyncMode values. +const ( + SyncFullColumn SyncMode = 1 + iota + SyncPartialColumn +) + type options struct { - workerCount int - batchSize int - metrics *MetricsGroup - saveAppliedTS bool + workerCount int + batchSize int + loopBackSyncInfo *loopbacksync.LoopBackSync + metrics *MetricsGroup + saveAppliedTS bool + syncMode SyncMode } var defaultLoaderOptions = options{ - workerCount: 16, - batchSize: 20, - metrics: nil, - saveAppliedTS: false, + workerCount: 16, + batchSize: 20, + loopBackSyncInfo: nil, + metrics: nil, + saveAppliedTS: false, + syncMode: SyncFullColumn, } // A Option sets options such batch size, worker count etc. type Option func(*options) +// SyncModeOption set sync mode of loader. +func SyncModeOption(n SyncMode) Option { + return func(o *options) { + o.syncMode = n + } +} + // WorkerCount set worker count of loader func WorkerCount(n int) Option { return func(o *options) { @@ -130,6 +156,13 @@ func BatchSize(n int) Option { } } +//SetloopBackSyncInfo set loop back sync info of loader +func SetloopBackSyncInfo(loopBackSyncInfo *loopbacksync.LoopBackSync) Option { + return func(o *options) { + o.loopBackSyncInfo = loopBackSyncInfo + } +} + // SaveAppliedTS set downstream type, values can be tidb or mysql func SaveAppliedTS(save bool) Option { return func(o *options) { @@ -155,14 +188,15 @@ func NewLoader(db *gosql.DB, opt ...Option) (Loader, error) { ctx, cancel := context.WithCancel(context.Background()) s := &loaderImpl{ - db: db, - workerCount: opts.workerCount, - batchSize: opts.batchSize, - metrics: opts.metrics, - input: make(chan *Txn), - successTxn: make(chan *Txn), - merge: true, - saveAppliedTS: opts.saveAppliedTS, + db: db, + workerCount: opts.workerCount, + batchSize: opts.batchSize, + metrics: opts.metrics, + loopBackSyncInfo: opts.loopBackSyncInfo, + input: make(chan *Txn), + successTxn: make(chan *Txn), + merge: true, + saveAppliedTS: opts.saveAppliedTS, ctx: ctx, cancel: cancel, @@ -306,7 +340,6 @@ func isCreateDatabaseDDL(sql string) bool { func (s *loaderImpl) execDDL(ddl *DDL) error { log.Debug("exec ddl", zap.Reflect("ddl", ddl)) - err := util.RetryContext(s.ctx, maxDDLRetryCount, execDDLRetryWait, 1, func(context.Context) error { tx, err := s.db.Begin() if err != nil { @@ -392,6 +425,20 @@ func (s *loaderImpl) singleExec(executor *executor, dmls []*DML) error { return errors.Trace(err) } +func removeOrphanCols(info *tableInfo, dml *DML) { + mp := make(map[string]struct{}, len(info.columns)) + for _, name := range info.columns { + mp[name] = struct{}{} + } + + for name := range dml.Values { + if _, ok := mp[name]; !ok { + delete(dml.Values, name) + delete(dml.OldValues, name) + } + } +} + func (s *loaderImpl) execDMLs(dmls []*DML) error { if len(dmls) == 0 { return nil @@ -402,6 +449,9 @@ func (s *loaderImpl) execDMLs(dmls []*DML) error { return errors.Trace(err) } filterGeneratedCols(dml) + if s.syncMode == SyncPartialColumn { + removeOrphanCols(dml.info, dml) + } } batchTables, singleDMLs := s.groupDMLs(dmls) @@ -428,8 +478,30 @@ func (s *loaderImpl) execDMLs(dmls []*DML) error { return errors.Trace(err) } +func (s *loaderImpl) createMarkTable() error { + markTableDataBase := loopbacksync.MarkTableName[:strings.Index(loopbacksync.MarkTableName, ".")] + createDatabaseSQL := fmt.Sprintf("create database IF NOT EXISTS %s;", markTableDataBase) + createDatabase := DDL{SQL: createDatabaseSQL} + if err1 := s.execDDL(&createDatabase); err1 != nil { + log.Error("exec failed", zap.String("sql", createDatabase.SQL), zap.Error(err1)) + return errors.Trace(err1) + } + sql := createMarkTableDDL() + createMarkTableInfo := DDL{Database: markTableDataBase, Table: loopbacksync.MarkTableName, SQL: sql} + if err := s.execDDL(&createMarkTableInfo); err != nil { + log.Error("exec failed", zap.String("sql", createMarkTableInfo.SQL), zap.Error(err)) + return errors.Trace(err) + } + return nil +} + // Run will quit when meet any error, or all the txn are drained func (s *loaderImpl) Run() error { + if s.loopBackSyncInfo != nil && s.loopBackSyncInfo.LoopbackControl { + if err := s.createMarkTable(); err != nil { + return errors.Trace(err) + } + } txnManager := newTxnManager(1024, s.input) defer func() { log.Info("Run()... in Loader quit") @@ -538,6 +610,7 @@ func filterGeneratedCols(dml *DML) { func (s *loaderImpl) getExecutor() *executor { e := newExecutor(s.db).withBatchSize(s.batchSize) + e.setSyncInfo(s.loopBackSyncInfo) if s.metrics != nil && s.metrics.QueryHistogramVec != nil { e = e.withQueryHistogramVec(s.metrics.QueryHistogramVec) } diff --git a/pkg/loader/load_test.go b/pkg/loader/load_test.go index d6655cacf..0d9e7a468 100644 --- a/pkg/loader/load_test.go +++ b/pkg/loader/load_test.go @@ -21,7 +21,7 @@ import ( sqlmock "github.com/DATA-DOG/go-sqlmock" "github.com/go-sql-driver/mysql" - check "github.com/pingcap/check" + "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -37,6 +37,37 @@ func (cs *LoadSuite) SetUpTest(c *check.C) { func (cs *LoadSuite) TearDownTest(c *check.C) { } +func (cs *LoadSuite) TestRemoveOrphanCols(c *check.C) { + dml := &DML{ + Values: map[string]interface{}{ + "exist1": 11, + "exist2": 22, + "orhpan1": 11, + "orhpan2": 22, + }, + OldValues: map[string]interface{}{ + "exist1": 1, + "exist2": 2, + "orhpan1": 1, + "orhpan2": 2, + }, + } + + info := &tableInfo{ + columns: []string{"exist1", "exist2"}, + } + + removeOrphanCols(info, dml) + c.Assert(dml.Values, check.DeepEquals, map[string]interface{}{ + "exist1": 11, + "exist2": 22, + }) + c.Assert(dml.OldValues, check.DeepEquals, map[string]interface{}{ + "exist1": 1, + "exist2": 2, + }) +} + func (cs *LoadSuite) TestOptions(c *check.C) { var o options WorkerCount(42)(&o) diff --git a/pkg/loader/model.go b/pkg/loader/model.go index 604d42761..07e9859b7 100644 --- a/pkg/loader/model.go +++ b/pkg/loader/model.go @@ -18,6 +18,8 @@ import ( "strconv" "strings" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/log" "go.uber.org/zap" ) @@ -184,6 +186,11 @@ func (dml *DML) updateSQL() (sql string, args []interface{}) { return } +func createMarkTableDDL() string { + sql := fmt.Sprintf("CREATE TABLE If Not Exists %s ( %s bigint primary key, %s bigint DEFAULT 0, %s varchar(64));", loopbacksync.MarkTableName, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + return sql +} + func (dml *DML) buildWhere(builder *strings.Builder) (args []interface{}) { wnames, wargs := dml.whereSlice() for i := 0; i < len(wnames); i++ { diff --git a/pkg/loader/model_test.go b/pkg/loader/model_test.go index 57e0222f4..71146a052 100644 --- a/pkg/loader/model_test.go +++ b/pkg/loader/model_test.go @@ -14,9 +14,14 @@ package loader import ( + "fmt" + "regexp" "strings" - check "github.com/pingcap/check" + "github.com/DATA-DOG/go-sqlmock" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + + "github.com/pingcap/check" ) type dmlSuite struct { @@ -228,3 +233,30 @@ func (s *SQLSuite) TestUpdateSQL(c *check.C) { c.Assert(args[0], check.Equals, "pc") c.Assert(args[1], check.Equals, "pingcap") } + +func (s *SQLSuite) TestUpdateMarkSQL(c *check.C) { + db, mock, err := sqlmock.New() + c.Assert(err, check.IsNil) + defer db.Close() + columns := fmt.Sprintf("(%s,%s,%s) VALUES(?,?,?)", loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + sql := fmt.Sprintf("INSERT INTO %s%s on duplicate key update %s=%s+1;", loopbacksync.MarkTableName, columns, loopbacksync.Val, loopbacksync.Val) + mock.ExpectBegin() + mock.ExpectExec(regexp.QuoteMeta(sql)). + WithArgs(100, 1, "").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + e := newExecutor(db) + tx, err := e.begin() + c.Assert(err, check.IsNil) + info := &loopbacksync.LoopBackSync{ChannelID: 100, LoopbackControl: true, SyncDDL: true} + e.info = info + err1 := e.updateMark("", tx) + c.Assert(err1, check.IsNil) + err2 := tx.commit() + c.Assert(err2, check.IsNil) + c.Assert(mock.ExpectationsWereMet(), check.IsNil) +} +func (s *SQLSuite) TestCreateMarkTable(c *check.C) { + sql := createMarkTableDDL() + sql1 := fmt.Sprintf("CREATE TABLE If Not Exists %s ( %s bigint primary key, %s bigint DEFAULT 0, %s varchar(64));", loopbacksync.MarkTableName, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + c.Assert(sql, check.Equals, sql1) +} From 9ed320f3734ff251337f6f84ea7e1ed5e509debb Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Fri, 7 Feb 2020 14:36:33 +0800 Subject: [PATCH 5/5] Fix current DML&DDL corner case for bidirectional replication (#890) --- drainer/sync/mysql.go | 2 +- drainer/sync/syncer.go | 4 +++ drainer/syncer.go | 46 ++++++++++++++++----------- drainer/translator/mysql.go | 9 +++--- drainer/translator/mysql_test.go | 11 ++++--- pkg/loader/executor.go | 54 +++++++++++++++++++++++++++++++- pkg/loader/executor_test.go | 29 +++++++++++++---- pkg/loader/load.go | 18 +++++++++++ pkg/loader/load_test.go | 3 +- pkg/loader/model.go | 24 +++++++++++--- pkg/loader/model_test.go | 18 ++++++----- reparo/syncer/mysql_test.go | 10 +++--- reparo/syncer/print_test.go | 4 +++ reparo/syncer/translate_test.go | 30 +++++++++++------- 14 files changed, 199 insertions(+), 63 deletions(-) diff --git a/drainer/sync/mysql.go b/drainer/sync/mysql.go index 12151fb02..140937560 100644 --- a/drainer/sync/mysql.go +++ b/drainer/sync/mysql.go @@ -123,7 +123,7 @@ func (m *MysqlSyncer) SetSafeMode(mode bool) { // Sync implements Syncer interface func (m *MysqlSyncer) Sync(item *Item) error { - txn, err := translator.TiBinlogToTxn(m.tableInfoGetter, item.Schema, item.Table, item.Binlog, item.PrewriteValue) + txn, err := translator.TiBinlogToTxn(m.tableInfoGetter, item.Schema, item.Table, item.Binlog, item.PrewriteValue, item.ShouldSkip) if err != nil { return errors.Trace(err) } diff --git a/drainer/sync/syncer.go b/drainer/sync/syncer.go index 1dad58bda..6b391ce9f 100644 --- a/drainer/sync/syncer.go +++ b/drainer/sync/syncer.go @@ -27,6 +27,10 @@ type Item struct { // the applied TS executed in downstream, only for tidb AppliedTS int64 + // should skip to replicate this item at downstream + // currently only used for signal the syncer to learn that the downstream schema is changed + // when we don't replicate DDL. + ShouldSkip bool } // Syncer sync binlog item to downstream diff --git a/drainer/syncer.go b/drainer/syncer.go index ab6cb1568..ad8aa7505 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -390,11 +390,6 @@ ForLoop: // DDL (with version 10, commit ts 100) -> DDL (with version 9, commit ts 101) would never happen s.schema.addJob(b.job) - if !s.cfg.SyncDDL { - log.Info("Syncer skips DDL", zap.String("sql", b.job.Query), zap.Int64("ts", b.GetCommitTs()), zap.Bool("SyncDDL", s.cfg.SyncDDL)) - continue - } - log.Debug("get DDL", zap.Int64("SchemaVersion", b.job.BinlogInfo.SchemaVersion)) lastDDLSchemaVersion = b.job.BinlogInfo.SchemaVersion @@ -418,23 +413,38 @@ ForLoop: } if s.filter.SkipSchemaAndTable(schema, table) { - log.Info("skip ddl", zap.String("schema", schema), zap.String("table", table), + log.Info("skip ddl by filter", zap.String("schema", schema), zap.String("table", table), zap.String("sql", sql), zap.Int64("commit ts", commitTS)) - } else if sql != "" { - s.addDDLCount() - beginTime := time.Now() - lastAddComitTS = binlog.GetCommitTs() + continue + } - log.Info("add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed", - zap.String("sql", sql), zap.Int64("commit ts", binlog.CommitTs)) + shouldSkip := false - err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table}) - if err != nil { - err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs) - break ForLoop + if !s.cfg.SyncDDL { + log.Info("skip ddl by SyncDDL setting to false", zap.String("schema", schema), zap.String("table", table), + zap.String("sql", sql), zap.Int64("commit ts", commitTS)) + // A empty sql force it to evict the downstream table info. + if s.cfg.DestDBType == "tidb" || s.cfg.DestDBType == "mysql" { + shouldSkip = true + } else { + continue } - executeHistogram.Observe(time.Since(beginTime).Seconds()) } + + // Add ddl item to downstream. + s.addDDLCount() + beginTime := time.Now() + lastAddComitTS = binlog.GetCommitTs() + + log.Info("add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed", + zap.String("sql", sql), zap.Int64("commit ts", binlog.CommitTs)) + + err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table, ShouldSkip: shouldSkip}) + if err != nil { + err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs) + break ForLoop + } + executeHistogram.Observe(time.Since(beginTime).Seconds()) } } @@ -479,7 +489,7 @@ func findLoopBackMark(dmls []*loader.DML, info *loopbacksync.LoopBackSync) (bool func loopBackStatus(binlog *pb.Binlog, prewriteValue *pb.PrewriteValue, infoGetter translator.TableInfoGetter, info *loopbacksync.LoopBackSync) (bool, error) { var tableName string var schemaName string - txn, err := translator.TiBinlogToTxn(infoGetter, schemaName, tableName, binlog, prewriteValue) + txn, err := translator.TiBinlogToTxn(infoGetter, schemaName, tableName, binlog, prewriteValue, false) if err != nil { return false, errors.Trace(err) } diff --git a/drainer/translator/mysql.go b/drainer/translator/mysql.go index 18bcef025..894cc6bf5 100644 --- a/drainer/translator/mysql.go +++ b/drainer/translator/mysql.go @@ -102,14 +102,15 @@ func genMysqlDelete(schema string, table *model.TableInfo, row []byte) (names [] } // TiBinlogToTxn translate the format to loader.Txn -func TiBinlogToTxn(infoGetter TableInfoGetter, schema string, table string, tiBinlog *tipb.Binlog, pv *tipb.PrewriteValue) (txn *loader.Txn, err error) { +func TiBinlogToTxn(infoGetter TableInfoGetter, schema string, table string, tiBinlog *tipb.Binlog, pv *tipb.PrewriteValue, shouldSkip bool) (txn *loader.Txn, err error) { txn = new(loader.Txn) if tiBinlog.DdlJobId > 0 { txn.DDL = &loader.DDL{ - Database: schema, - Table: table, - SQL: string(tiBinlog.GetDdlQuery()), + Database: schema, + Table: table, + SQL: string(tiBinlog.GetDdlQuery()), + ShouldSkip: shouldSkip, } } else { for _, mut := range pv.GetMutations() { diff --git a/drainer/translator/mysql_test.go b/drainer/translator/mysql_test.go index 3a5e330b8..83e564286 100644 --- a/drainer/translator/mysql_test.go +++ b/drainer/translator/mysql_test.go @@ -37,20 +37,21 @@ func (t *testMysqlSuite) TestGenColumnList(c *check.C) { func (t *testMysqlSuite) TestDDL(c *check.C) { t.SetDDL() - txn, err := TiBinlogToTxn(t, t.Schema, t.Table, t.TiBinlog, nil) + txn, err := TiBinlogToTxn(t, t.Schema, t.Table, t.TiBinlog, nil, true) c.Assert(err, check.IsNil) c.Assert(txn, check.DeepEquals, &loader.Txn{ DDL: &loader.DDL{ - Database: t.Schema, - Table: t.Table, - SQL: string(t.TiBinlog.GetDdlQuery()), + Database: t.Schema, + Table: t.Table, + SQL: string(t.TiBinlog.GetDdlQuery()), + ShouldSkip: true, }, }) } func (t *testMysqlSuite) testDML(c *check.C, tp loader.DMLType) { - txn, err := TiBinlogToTxn(t, t.Schema, t.Table, t.TiBinlog, t.PV) + txn, err := TiBinlogToTxn(t, t.Schema, t.Table, t.TiBinlog, t.PV, false) c.Assert(err, check.IsNil) c.Assert(txn.DMLs, check.HasLen, 1) diff --git a/pkg/loader/executor.go b/pkg/loader/executor.go index ab2d18d6c..4058bfece 100644 --- a/pkg/loader/executor.go +++ b/pkg/loader/executor.go @@ -21,9 +21,11 @@ import ( "time" "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/errors" "github.com/pingcap/log" + pkgsql "github.com/pingcap/tidb-binlog/pkg/sql" "github.com/pingcap/tidb-binlog/pkg/util" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -37,6 +39,7 @@ type executor struct { batchSize int info *loopbacksync.LoopBackSync queryHistogramVec *prometheus.HistogramVec + refreshTableInfo func(schema string, table string) (info *tableInfo, err error) } func newExecutor(db *gosql.DB) *executor { @@ -48,6 +51,11 @@ func newExecutor(db *gosql.DB) *executor { return exe } +func (e *executor) withRefreshTableInfo(fn func(schema string, table string) (info *tableInfo, err error)) *executor { + e.refreshTableInfo = fn + return e +} + func (e *executor) withBatchSize(batchSize int) *executor { e.batchSize = batchSize return e @@ -269,10 +277,54 @@ func (e *executor) splitExecDML(ctx context.Context, dmls []*DML, exec func(dmls return errors.Trace(errg.Wait()) } +func tryRefreshTableErr(err error) bool { + errCode, ok := pkgsql.GetSQLErrCode(err) + if !ok { + return false + } + + switch errCode { + case infoschema.ErrColumnNotExists.Code(): + return true + } + + return false +} + func (e *executor) singleExecRetry(ctx context.Context, allDMLs []*DML, safeMode bool, retryNum int, backoff time.Duration) error { for _, dmls := range splitDMLs(allDMLs, e.batchSize) { err := util.RetryContext(ctx, retryNum, backoff, 1, func(context.Context) error { - return e.singleExec(dmls, safeMode) + execErr := e.singleExec(dmls, safeMode) + if execErr == nil { + return nil + } + + if tryRefreshTableErr(execErr) && e.refreshTableInfo != nil { + log.Info("try refresh table info") + name2info := make(map[string]*tableInfo) + for _, dml := range dmls { + name := dml.TableName() + info, ok := name2info[name] + if !ok { + var err error + info, err = e.refreshTableInfo(dml.Database, dml.Table) + if err != nil { + log.Error("fail to refresh table info", zap.Error(err)) + continue + } + + name2info[name] = info + } + + if len(dml.info.columns) != len(info.columns) { + log.Info("columns change", zap.Strings("old", dml.info.columns), + zap.Strings("new", info.columns)) + removeOrphanCols(info, dml) + } + dml.info = info + } + } + return execErr }) if err != nil { return errors.Trace(err) diff --git a/pkg/loader/executor_test.go b/pkg/loader/executor_test.go index e04625874..4358da48b 100644 --- a/pkg/loader/executor_test.go +++ b/pkg/loader/executor_test.go @@ -21,6 +21,8 @@ import ( "sync/atomic" sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/go-sql-driver/mysql" + "github.com/pingcap/check" . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/prometheus/client_golang/prometheus" @@ -75,6 +77,21 @@ func (s *executorSuite) TestSplitExecDML(c *C) { c.Assert(counter, Equals, int32(3)) } +func (s *executorSuite) TestTryRefreshTableErr(c *C) { + tests := []struct { + err error + res bool + }{ + {&mysql.MySQLError{Number: 1054} /*Unknown column*/, true}, + {errors.New("what ever"), false}, + } + + for _, test := range tests { + get := tryRefreshTableErr(test.err) + c.Assert(get, check.Equals, test.res) + } +} + type singleExecSuite struct { db *sql.DB dbMock sqlmock.Sqlmock @@ -114,12 +131,12 @@ func (s *singleExecSuite) TestInsert(c *C) { columns: []string{"name", "age"}, }, } - insertSQL := "INSERT INTO `unicorn`.`users`(`name`,`age`) VALUES(?,?)" - replaceSQL := "REPLACE INTO `unicorn`.`users`(`name`,`age`) VALUES(?,?)" + insertSQL := "INSERT INTO `unicorn`.`users`(`age`,`name`) VALUES(?,?)" + replaceSQL := "REPLACE INTO `unicorn`.`users`(`age`,`name`) VALUES(?,?)" s.dbMock.ExpectBegin() s.dbMock.ExpectExec(regexp.QuoteMeta(insertSQL)). - WithArgs("tester", 2019).WillReturnResult(sqlmock.NewResult(1, 1)) + WithArgs(2019, "tester").WillReturnResult(sqlmock.NewResult(1, 1)) s.dbMock.ExpectCommit() e := newExecutor(s.db) @@ -131,7 +148,7 @@ func (s *singleExecSuite) TestInsert(c *C) { s.dbMock.ExpectBegin() s.dbMock.ExpectExec(regexp.QuoteMeta(replaceSQL)). - WithArgs("tester", 2019).WillReturnResult(sqlmock.NewResult(1, 1)) + WithArgs(2019, "tester").WillReturnResult(sqlmock.NewResult(1, 1)) s.dbMock.ExpectCommit() e = newExecutor(s.db) @@ -180,7 +197,7 @@ func (s *singleExecSuite) TestSafeUpdate(c *C) { s.dbMock.ExpectExec(delSQL). WithArgs("tester").WillReturnResult(sqlmock.NewResult(1, 1)) s.dbMock.ExpectExec(replaceSQL). - WithArgs("tester", 2019).WillReturnError(errors.New("replace")) + WithArgs(2019, "tester").WillReturnError(errors.New("replace")) e = newExecutor(s.db) err = e.singleExec([]*DML{&dml}, true) @@ -193,7 +210,7 @@ func (s *singleExecSuite) TestSafeUpdate(c *C) { s.dbMock.ExpectExec(delSQL). WithArgs("tester").WillReturnResult(sqlmock.NewResult(1, 1)) s.dbMock.ExpectExec(replaceSQL). - WithArgs("tester", 2019).WillReturnResult(sqlmock.NewResult(1, 1)) + WithArgs(2019, "tester").WillReturnResult(sqlmock.NewResult(1, 1)) s.dbMock.ExpectCommit() e = newExecutor(s.db) diff --git a/pkg/loader/load.go b/pkg/loader/load.go index aaa497941..a8493b2e9 100644 --- a/pkg/loader/load.go +++ b/pkg/loader/load.go @@ -187,11 +187,13 @@ func NewLoader(db *gosql.DB, opt ...Option) (Loader, error) { ctx, cancel := context.WithCancel(context.Background()) + // TODO just save opts in loaderImpl instead of copy every field. s := &loaderImpl{ db: db, workerCount: opts.workerCount, batchSize: opts.batchSize, metrics: opts.metrics, + syncMode: opts.syncMode, loopBackSyncInfo: opts.loopBackSyncInfo, input: make(chan *Txn), successTxn: make(chan *Txn), @@ -296,6 +298,10 @@ func (s *loaderImpl) refreshTableInfo(schema string, table string) (info *tableI return } +func (s *loaderImpl) evitTableInfo(schema string, table string) { + s.tableInfos.Delete(quoteSchema(schema, table)) +} + func (s *loaderImpl) getTableInfo(schema string, table string) (info *tableInfo, err error) { v, ok := s.tableInfos.Load(quoteSchema(schema, table)) if ok { @@ -340,6 +346,10 @@ func isCreateDatabaseDDL(sql string) bool { func (s *loaderImpl) execDDL(ddl *DDL) error { log.Debug("exec ddl", zap.Reflect("ddl", ddl)) + if ddl.ShouldSkip { + return nil + } + err := util.RetryContext(s.ctx, maxDDLRetryCount, execDDLRetryWait, 1, func(context.Context) error { tx, err := s.db.Begin() if err != nil { @@ -610,6 +620,9 @@ func filterGeneratedCols(dml *DML) { func (s *loaderImpl) getExecutor() *executor { e := newExecutor(s.db).withBatchSize(s.batchSize) + if s.syncMode == SyncPartialColumn { + e = e.withRefreshTableInfo(s.refreshTableInfo) + } e.setSyncInfo(s.loopBackSyncInfo) if s.metrics != nil && s.metrics.QueryHistogramVec != nil { e = e.withQueryHistogramVec(s.metrics.QueryHistogramVec) @@ -625,6 +638,11 @@ func newBatchManager(s *loaderImpl) *batchManager { fExecDDL: s.execDDL, fDDLSuccessCallback: func(txn *Txn) { s.markSuccess(txn) + if txn.DDL.ShouldSkip { + s.evitTableInfo(txn.DDL.Database, txn.DDL.Table) + return + } + if needRefreshTableInfo(txn.DDL.SQL) { if _, err := s.refreshTableInfo(txn.DDL.Database, txn.DDL.Table); err != nil { log.Error("refresh table info failed", zap.String("database", txn.DDL.Database), zap.String("table", txn.DDL.Table), zap.Error(err)) diff --git a/pkg/loader/load_test.go b/pkg/loader/load_test.go index 0d9e7a468..20fc47343 100644 --- a/pkg/loader/load_test.go +++ b/pkg/loader/load_test.go @@ -151,8 +151,9 @@ func (cs *LoadSuite) TestNewClose(c *check.C) { db, _, err := sqlmock.New() c.Assert(err, check.IsNil) - loader, err := NewLoader(db) + loader, err := NewLoader(db, SyncModeOption(SyncPartialColumn)) c.Assert(err, check.IsNil) + c.Assert(loader.(*loaderImpl).syncMode, check.Equals, SyncPartialColumn) loader.Close() } diff --git a/pkg/loader/model.go b/pkg/loader/model.go index 07e9859b7..1db4adfa9 100644 --- a/pkg/loader/model.go +++ b/pkg/loader/model.go @@ -15,6 +15,7 @@ package loader import ( "fmt" + "sort" "strconv" "strings" @@ -53,6 +54,9 @@ type DDL struct { Database string Table string SQL string + // should skip to execute this DDL at downstream and just refresh the downstream table info. + // one case for this usage is for bidirectional replication and only execute DDL at one side. + ShouldSkip bool } // Txn holds transaction info, an DDL or DML sequences @@ -237,7 +241,8 @@ func (dml *DML) whereSlice() (colNames []string, args []interface{}) { } // Fallback to use all columns - return dml.info.columns, dml.whereValues(dml.info.columns) + names := dml.columnNames() + return names, dml.whereValues(names) } func (dml *DML) deleteSQL() (sql string, args []interface{}) { @@ -251,10 +256,21 @@ func (dml *DML) deleteSQL() (sql string, args []interface{}) { return } +func (dml *DML) columnNames() []string { + names := make([]string, 0, len(dml.Values)) + + for name := range dml.Values { + names = append(names, name) + } + + sort.Strings(names) + return names +} + func (dml *DML) replaceSQL() (sql string, args []interface{}) { - info := dml.info - sql = fmt.Sprintf("REPLACE INTO %s(%s) VALUES(%s)", dml.TableName(), buildColumnList(info.columns), holderString(len(info.columns))) - for _, name := range info.columns { + names := dml.columnNames() + sql = fmt.Sprintf("REPLACE INTO %s(%s) VALUES(%s)", dml.TableName(), buildColumnList(names), holderString(len(names))) + for _, name := range names { v := dml.Values[name] args = append(args, v) } diff --git a/pkg/loader/model_test.go b/pkg/loader/model_test.go index 71146a052..8a83e5007 100644 --- a/pkg/loader/model_test.go +++ b/pkg/loader/model_test.go @@ -62,6 +62,7 @@ func (d *dmlSuite) testWhere(c *check.C, tp DMLType) { if tp == UpdateDMLType { dml.OldValues = values + dml.Values = values } else { dml.Values = values } @@ -79,12 +80,13 @@ func (d *dmlSuite) testWhere(c *check.C, tp DMLType) { dml = getDML(false, tp) if tp == UpdateDMLType { dml.OldValues = values + dml.Values = values } else { dml.Values = values } names, args = dml.whereSlice() - c.Assert(names, check.DeepEquals, []string{"id", "a1"}) + c.Assert(names, check.DeepEquals, []string{"a1", "id"}) c.Assert(args, check.DeepEquals, []interface{}{1, 1}) builder.Reset() @@ -184,10 +186,10 @@ func (s *SQLSuite) TestInsertSQL(c *check.C) { }, } sql, args := dml.sql() - c.Assert(sql, check.Equals, "INSERT INTO `test`.`hello`(`name`,`age`) VALUES(?,?)") + c.Assert(sql, check.Equals, "INSERT INTO `test`.`hello`(`age`,`name`) VALUES(?,?)") c.Assert(args, check.HasLen, 2) - c.Assert(args[0], check.Equals, "pc") - c.Assert(args[1], check.Equals, 42) + c.Assert(args[0], check.Equals, 42) + c.Assert(args[1], check.Equals, "pc") } func (s *SQLSuite) TestDeleteSQL(c *check.C) { @@ -197,6 +199,7 @@ func (s *SQLSuite) TestDeleteSQL(c *check.C) { Table: "hello", Values: map[string]interface{}{ "name": "pc", + "age": 10, }, info: &tableInfo{ columns: []string{"name", "age"}, @@ -205,9 +208,10 @@ func (s *SQLSuite) TestDeleteSQL(c *check.C) { sql, args := dml.sql() c.Assert( sql, check.Equals, - "DELETE FROM `test`.`hello` WHERE `name` = ? AND `age` IS NULL LIMIT 1") - c.Assert(args, check.HasLen, 1) - c.Assert(args[0], check.Equals, "pc") + "DELETE FROM `test`.`hello` WHERE `age` = ? AND `name` = ? LIMIT 1") + c.Assert(args, check.HasLen, 2) + c.Assert(args[0], check.Equals, 10) + c.Assert(args[1], check.Equals, "pc") } func (s *SQLSuite) TestUpdateSQL(c *check.C) { diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index 7d9ffa892..963bc8c41 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -51,17 +51,17 @@ func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) { if safemode { insertPattern = "REPLACE INTO" } - mock.ExpectExec(insertPattern).WithArgs(1, "test", nil).WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectExec("DELETE FROM").WithArgs(1, "test").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(insertPattern).WithArgs(1, "test", "test").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("DELETE FROM").WithArgs(1, "test", "test").WillReturnResult(sqlmock.NewResult(0, 1)) if safemode { mock.ExpectExec("DELETE FROM").WithArgs().WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectExec(insertPattern).WithArgs(nil, nil, "abc").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(insertPattern).WithArgs(1, "test", "abc").WillReturnResult(sqlmock.NewResult(0, 1)) } else { - mock.ExpectExec("UPDATE").WithArgs("abc", "test").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("UPDATE").WithArgs(1, "test", "abc", 1, "test", "test").WillReturnResult(sqlmock.NewResult(0, 1)) } mock.ExpectCommit() - syncTest(c, Syncer(syncer)) + syncTest(c, syncer) err = syncer.Close() c.Assert(err, check.IsNil) diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index 9485f0f03..197081d27 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -25,10 +25,14 @@ func (s *testPrintSuite) TestPrintSyncer(c *check.C) { "schema: test; table: t1; type: Insert\n"+ "a(int): 1\n"+ "b(varchar): test\n"+ + "c(varchar): test\n"+ "schema: test; table: t1; type: Delete\n"+ "a(int): 1\n"+ "b(varchar): test\n"+ + "c(varchar): test\n"+ "schema: test; table: t1; type: Update\n"+ + "a(int): 1 => 1\n"+ + "b(varchar): test => test\n"+ "c(varchar): test => abc\n") err = syncer.Close() diff --git a/reparo/syncer/translate_test.go b/reparo/syncer/translate_test.go index 6c2aecca8..e3ffb2eb2 100644 --- a/reparo/syncer/translate_test.go +++ b/reparo/syncer/translate_test.go @@ -134,6 +134,7 @@ func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { Values: map[string]interface{}{ "a": int64(1), "b": "test", + "c": "test", }, }, { Database: "test", @@ -142,15 +143,20 @@ func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { Values: map[string]interface{}{ "a": int64(1), "b": "test", + "c": "test", }, }, { Database: "test", Table: "t1", Tp: loader.UpdateDMLType, Values: map[string]interface{}{ + "a": int64(1), + "b": "test", "c": "abc", }, OldValues: map[string]interface{}{ + "a": int64(1), + "b": "test", "c": "test", }, }, @@ -199,17 +205,17 @@ func generateDMLEvents(c *check.C) []pb.Event { Tp: pb.EventType_Insert, SchemaName: &schema, TableName: &table, - Row: [][]byte{cols[0], cols[1]}, + Row: [][]byte{cols[0], cols[1], cols[2]}, }, { Tp: pb.EventType_Delete, SchemaName: &schema, TableName: &table, - Row: [][]byte{cols[0], cols[1]}, + Row: [][]byte{cols[0], cols[1], cols[2]}, }, { Tp: pb.EventType_Update, SchemaName: &schema, TableName: &table, - Row: [][]byte{cols[2]}, + Row: [][]byte{cols[0], cols[1], cols[2]}, }, } } @@ -220,15 +226,17 @@ func generateColumns(c *check.C) [][]byte { cols := []*pb.Column{ { - Name: "a", - Tp: []byte{mysql.TypeInt24}, - MysqlType: "int", - Value: encodeIntValue(1), + Name: "a", + Tp: []byte{mysql.TypeInt24}, + MysqlType: "int", + Value: encodeIntValue(1), + ChangedValue: encodeIntValue(1), }, { - Name: "b", - Tp: []byte{mysql.TypeVarchar}, - MysqlType: "varchar", - Value: encodeBytesValue([]byte("test")), + Name: "b", + Tp: []byte{mysql.TypeVarchar}, + MysqlType: "varchar", + Value: encodeBytesValue([]byte("test")), + ChangedValue: encodeBytesValue([]byte("test")), }, { Name: "c", Tp: []byte{mysql.TypeVarchar},