diff --git a/binlogctl/config.go b/binlogctl/config.go index 6fae7a53e..413a0a4be 100644 --- a/binlogctl/config.go +++ b/binlogctl/config.go @@ -58,6 +58,9 @@ const ( // OfflineDrainer is comamnd used for offlien drainer. OfflineDrainer = "offline-drainer" + + // Encrypt is command used for encrypt password. + Encrypt = "encrypt" ) // Config holds the configuration of drainer @@ -74,6 +77,7 @@ type Config struct { SSLKey string `toml:"ssl-key" json:"ssl-key"` State string `toml:"state" json:"state"` ShowOfflineNodes bool `toml:"state" json:"show-offline-nodes"` + Text string `toml:"text" json:"text"` tls *tls.Config printVersion bool } @@ -83,7 +87,7 @@ func NewConfig() *Config { cfg := &Config{} cfg.FlagSet = flag.NewFlagSet("binlogctl", flag.ContinueOnError) - cfg.FlagSet.StringVar(&cfg.Command, "cmd", "pumps", "operator: \"generate_meta\", \"pumps\", \"drainers\", \"update-pump\", \"update-drainer\", \"pause-pump\", \"pause-drainer\", \"offline-pump\", \"offline-drainer\"") + cfg.FlagSet.StringVar(&cfg.Command, "cmd", "pumps", "operator: \"generate_meta\", \"pumps\", \"drainers\", \"update-pump\", \"update-drainer\", \"pause-pump\", \"pause-drainer\", \"offline-pump\", \"offline-drainer\", \"encrypt\"") cfg.FlagSet.StringVar(&cfg.NodeID, "node-id", "", "id of node, use to update some node with operation update-pump, update-drainer, pause-pump, pause-drainer, offline-pump and offline-drainer") cfg.FlagSet.StringVar(&cfg.DataDir, "data-dir", defaultDataDir, "meta directory path") cfg.FlagSet.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of PD endpoints") @@ -93,6 +97,7 @@ func NewConfig() *Config { cfg.FlagSet.StringVar(&cfg.TimeZone, "time-zone", "", "set time zone if you want save time info in savepoint file, for example `Asia/Shanghai` for CST time, `Local` for local time") cfg.FlagSet.StringVar(&cfg.State, "state", "", "set node's state, can set to online, pausing, paused, closing or offline.") cfg.FlagSet.BoolVar(&cfg.ShowOfflineNodes, "show-offline-nodes", false, "include offline nodes when querying pumps/drainers") + cfg.FlagSet.StringVar(&cfg.Text, "text", "", "text to be encrypt when using encrypt command") cfg.FlagSet.BoolVar(&cfg.printVersion, "V", false, "prints version and exit") return cfg diff --git a/binlogctl/encrypt.go b/binlogctl/encrypt.go new file mode 100644 index 000000000..5dc41a730 --- /dev/null +++ b/binlogctl/encrypt.go @@ -0,0 +1,18 @@ +package binlogctl + +import ( + "github.com/pingcap/log" + "github.com/pingcap/tidb-binlog/pkg/encrypt" + "go.uber.org/zap" +) + +// EncryptHandler log the encrypted text if success or return error. +func EncryptHandler(text string) error { + enc, err := encrypt.Encrypt(text) + if err != nil { + return err + } + + log.Info("encrypt text", zap.String("encrypted", string(enc))) + return nil +} diff --git a/cmd/binlogctl/main.go b/cmd/binlogctl/main.go index eb9e104f4..7f56473fd 100644 --- a/cmd/binlogctl/main.go +++ b/cmd/binlogctl/main.go @@ -60,6 +60,12 @@ func main() { err = ctl.ApplyAction(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, close) case ctl.OfflineDrainer: err = ctl.ApplyAction(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, close) + case ctl.Encrypt: + if len(cfg.Text) == 0 { + err = errors.New("need to specify the text to be encrypt") + } else { + err = ctl.EncryptHandler(cfg.Text) + } default: err = errors.NotSupportedf("cmd %s", cfg.Command) } diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index fef6c17e3..a1fa9772b 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -38,6 +38,19 @@ compressor = "" # number of binlog events in a transaction batch txn-batch = 20 +# sync ddl to downstream db or not +sync-ddl = true + +# This variable works in dual-a. if it is false, the upstream data will all be synchronized to the downstream, except for the filtered table. +# If it is true, the channel value is set at the same time, and the upstream starts with the mark table ID updated, and the channel ID is the same as its channel ID. +# this part of data will not be synchronized to the downstream. Therefore, in dual-a scenario,both sides Channel id also needs to be set to the same value +loopback-control = false + +# When loopback control is turned on, the channel ID will work. +# In the dual-a scenario, the channel ID synchronized from the downstream to the upstream and the channel ID synchronized from +# the upstream to the downstream need to be set to the same value to avoid loopback synchronization +channel-id = 1 + # work count to execute binlogs # if the latency between drainer and downstream(mysql or tidb) are too high, you might want to increase this # to get higher throughput by higher concurrent write to the downstream @@ -81,7 +94,13 @@ ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql" host = "127.0.0.1" user = "root" password = "" +# if encrypted_password is not empty, password will be ignored. +encrypted_password = "" port = 3306 +# 1: SyncFullColumn, 2: SyncPartialColumn +# when setting SyncPartialColumn drainer will allow the downstream schema +# having more or less column numbers and relax sql mode by removing STRICT_TRANS_TABLES. +# sync-mode = 1 [syncer.to.checkpoint] # only support mysql or tidb now, you can uncomment this to control where the checkpoint is saved. @@ -93,6 +112,8 @@ port = 3306 # schema = "tidb_binlog" # host = "127.0.0.1" # user = "root" +# if encrypted_password is not empty, password will be ignored. +# encrypted_password = "" # password = "" # port = 3306 diff --git a/drainer/config.go b/drainer/config.go index a4c8d3132..b791b9d16 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -31,6 +31,7 @@ import ( "go.uber.org/zap" dsync "github.com/pingcap/tidb-binlog/drainer/sync" + "github.com/pingcap/tidb-binlog/pkg/encrypt" "github.com/pingcap/tidb-binlog/pkg/filter" "github.com/pingcap/tidb-binlog/pkg/flags" "github.com/pingcap/tidb-binlog/pkg/security" @@ -65,6 +66,9 @@ type SyncerConfig struct { IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"` TxnBatch int `toml:"txn-batch" json:"txn-batch"` + LoopbackControl bool `toml:"loopback-control" json:"loopback-control"` + SyncDDL bool `toml:"sync-ddl" json:"sync-ddl"` + ChannelID int64 `toml:"channel-id" json:"channel-id"` WorkerCount int `toml:"worker-count" json:"worker-count"` To *dsync.DBConfig `toml:"to" json:"to"` DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"` @@ -126,6 +130,9 @@ func NewConfig() *Config { fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", -1, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint, will get a latest timestamp from pd if setting to be -1") fs.StringVar(&cfg.Compressor, "compressor", "", "use the specified compressor to compress payload between pump and drainer, only 'gzip' is supported now (default \"\", ie. compression disabled.)") fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch") + fs.BoolVar(&cfg.SyncerCfg.LoopbackControl, "loopback-control", false, "set mark or not ") + fs.BoolVar(&cfg.SyncerCfg.SyncDDL, "sync-ddl", true, "sync ddl or not") + fs.Int64Var(&cfg.SyncerCfg.ChannelID, "channel-id", 0, "sync channel id ") fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas") fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count") fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml") @@ -374,11 +381,28 @@ func (cfg *Config) adjustConfig() error { } cfg.SyncerCfg.To.User = user } - if len(cfg.SyncerCfg.To.Password) == 0 { + + if len(cfg.SyncerCfg.To.EncryptedPassword) > 0 { + decrypt, err := encrypt.Decrypt(cfg.SyncerCfg.To.EncryptedPassword) + if err != nil { + return errors.Annotate(err, "failed to decrypt password in `to.encrypted_password`") + } + + cfg.SyncerCfg.To.Password = decrypt + } else if len(cfg.SyncerCfg.To.Password) == 0 { cfg.SyncerCfg.To.Password = os.Getenv("MYSQL_PSWD") } } + if len(cfg.SyncerCfg.To.Checkpoint.EncryptedPassword) > 0 { + decrypt, err := encrypt.Decrypt(cfg.SyncerCfg.To.EncryptedPassword) + if err != nil { + return errors.Annotate(err, "failed to decrypt password in `checkpoint.encrypted_password`") + } + + cfg.SyncerCfg.To.Checkpoint.Password = decrypt + } + cfg.SyncerCfg.adjustWorkCount() cfg.SyncerCfg.adjustDoDBAndTable() diff --git a/drainer/config_test.go b/drainer/config_test.go index 7e087e178..588108ec2 100644 --- a/drainer/config_test.go +++ b/drainer/config_test.go @@ -21,9 +21,11 @@ import ( "time" "github.com/BurntSushi/toml" + "github.com/pingcap/check" . "github.com/pingcap/check" "github.com/pingcap/parser/mysql" dsync "github.com/pingcap/tidb-binlog/drainer/sync" + "github.com/pingcap/tidb-binlog/pkg/encrypt" "github.com/pingcap/tidb-binlog/pkg/filter" "github.com/pingcap/tidb-binlog/pkg/util" pkgzk "github.com/pingcap/tidb-binlog/pkg/zk" @@ -151,6 +153,7 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) { c.Assert(cfg.ListenAddr, Equals, "http://"+util.DefaultListenAddr(8249)) c.Assert(cfg.AdvertiseAddr, Equals, cfg.ListenAddr) + // test EncryptedPassword cfg = NewConfig() cfg.ListenAddr = "0.0.0.0:8257" cfg.AdvertiseAddr = "192.168.15.12:8257" @@ -158,6 +161,33 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) { c.Assert(err, IsNil) c.Assert(cfg.ListenAddr, Equals, "http://0.0.0.0:8257") c.Assert(cfg.AdvertiseAddr, Equals, "http://192.168.15.12:8257") + + cfg = NewConfig() + encrypted, err := encrypt.Encrypt("origin") + c.Assert(err, IsNil) + + cfg.SyncerCfg.To = &dsync.DBConfig{ + EncryptedPassword: string(encrypted), + Checkpoint: dsync.CheckpointConfig{ + EncryptedPassword: string(encrypted), + }, + } + err = cfg.adjustConfig() + c.Assert(err, IsNil) + c.Assert(cfg.SyncerCfg.To.Password, check.Equals, "origin") + c.Assert(cfg.SyncerCfg.To.Checkpoint.Password, check.Equals, "origin") + + // test false positive + cfg.SyncerCfg.To = &dsync.DBConfig{ + EncryptedPassword: "what ever" + string(encrypted), + Checkpoint: dsync.CheckpointConfig{ + EncryptedPassword: "what ever" + string(encrypted), + }, + } + + c.Logf("to.password: %v", cfg.SyncerCfg.To.Password) + err = cfg.adjustConfig() + c.Assert(err, NotNil) } func (t *testDrainerSuite) TestConfigParsingFileWithInvalidOptions(c *C) { diff --git a/drainer/loopbacksync/loopbacksync.go b/drainer/loopbacksync/loopbacksync.go new file mode 100644 index 000000000..9960bb180 --- /dev/null +++ b/drainer/loopbacksync/loopbacksync.go @@ -0,0 +1,42 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package loopbacksync + +const ( + //MarkTableName mark table name + MarkTableName = "retl._drainer_repl_mark" + //ChannelID channel id + ChannelID = "channel_id" + //Val val + Val = "val" + //ChannelInfo channel info + ChannelInfo = "channel_info" +) + +//LoopBackSync loopback sync info +type LoopBackSync struct { + ChannelID int64 + LoopbackControl bool + SyncDDL bool +} + +//NewLoopBackSyncInfo return LoopBackSyncInfo objec +func NewLoopBackSyncInfo(ChannelID int64, LoopbackControl, SyncDDL bool) *LoopBackSync { + l := &LoopBackSync{ + ChannelID: ChannelID, + LoopbackControl: LoopbackControl, + SyncDDL: SyncDDL, + } + return l +} diff --git a/drainer/loopbacksync/loopbacksync_test.go b/drainer/loopbacksync/loopbacksync_test.go new file mode 100644 index 000000000..7306da0f3 --- /dev/null +++ b/drainer/loopbacksync/loopbacksync_test.go @@ -0,0 +1,27 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package loopbacksync + +import "testing" + +//TestNewLoopBackSyncInfo test loopBackSyncInfo alloc +func TestNewLoopBackSyncInfo(t *testing.T) { + var ChannelID int64 = 1 + var LoopbackControl = true + var SyncDDL = false + l := NewLoopBackSyncInfo(ChannelID, LoopbackControl, SyncDDL) + if l == nil { + t.Error("alloc loopBackSyncInfo objec failed ") + } +} diff --git a/drainer/sync/mysql.go b/drainer/sync/mysql.go index 10343e3ed..140937560 100644 --- a/drainer/sync/mysql.go +++ b/drainer/sync/mysql.go @@ -15,8 +15,11 @@ package sync import ( "database/sql" + "strings" "sync" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb-binlog/drainer/translator" @@ -38,14 +41,14 @@ type MysqlSyncer struct { var createDB = loader.CreateDBWithSQLMode // NewMysqlSyncer returns a instance of MysqlSyncer -func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string) (*MysqlSyncer, error) { +func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string, info *loopbacksync.LoopBackSync) (*MysqlSyncer, error) { db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, sqlMode) if err != nil { return nil, errors.Trace(err) } var opts []loader.Option - opts = append(opts, loader.WorkerCount(worker), loader.BatchSize(batchSize), loader.SaveAppliedTS(destDBType == "tidb")) + opts = append(opts, loader.WorkerCount(worker), loader.BatchSize(batchSize), loader.SaveAppliedTS(destDBType == "tidb"), loader.SetloopBackSyncInfo(info)) if queryHistogramVec != nil { opts = append(opts, loader.Metrics(&loader.MetricsGroup{ QueryHistogramVec: queryHistogramVec, @@ -53,6 +56,27 @@ func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, w })) } + if cfg.SyncMode != 0 { + mode := loader.SyncMode(cfg.SyncMode) + opts = append(opts, loader.SyncModeOption(mode)) + + if mode == loader.SyncPartialColumn { + var oldMode, newMode string + oldMode, newMode, err = relaxSQLMode(db) + if err != nil { + return nil, errors.Trace(err) + } + + if newMode != oldMode { + db.Close() + db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, &newMode) + if err != nil { + return nil, errors.Trace(err) + } + } + } + } + loader, err := loader.NewLoader(db, opts...) if err != nil { return nil, errors.Trace(err) @@ -69,6 +93,29 @@ func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, w return s, nil } +// set newMode as the oldMode query from db by removing "STRICT_TRANS_TABLES". +func relaxSQLMode(db *sql.DB) (oldMode string, newMode string, err error) { + row := db.QueryRow("SELECT @@SESSION.sql_mode;") + err = row.Scan(&oldMode) + if err != nil { + return "", "", errors.Trace(err) + } + + toRemove := "STRICT_TRANS_TABLES" + newMode = oldMode + + if !strings.Contains(oldMode, toRemove) { + return + } + + // concatenated by "," like: mode1,mode2 + newMode = strings.Replace(newMode, toRemove+",", "", -1) + newMode = strings.Replace(newMode, ","+toRemove, "", -1) + newMode = strings.Replace(newMode, toRemove, "", -1) + + return +} + // SetSafeMode make the MysqlSyncer to use safe mode or not func (m *MysqlSyncer) SetSafeMode(mode bool) { m.loader.SetSafeMode(mode) @@ -76,11 +123,10 @@ func (m *MysqlSyncer) SetSafeMode(mode bool) { // Sync implements Syncer interface func (m *MysqlSyncer) Sync(item *Item) error { - txn, err := translator.TiBinlogToTxn(m.tableInfoGetter, item.Schema, item.Table, item.Binlog, item.PrewriteValue) + txn, err := translator.TiBinlogToTxn(m.tableInfoGetter, item.Schema, item.Table, item.Binlog, item.PrewriteValue, item.ShouldSkip) if err != nil { return errors.Trace(err) } - txn.Metadata = item select { diff --git a/drainer/sync/mysql_test.go b/drainer/sync/mysql_test.go index 5865b6253..c7b2dc252 100644 --- a/drainer/sync/mysql_test.go +++ b/drainer/sync/mysql_test.go @@ -87,3 +87,29 @@ func (s *mysqlSuite) TestMySQLSyncerAvoidBlock(c *check.C) { c.Fatal("mysql syncer hasn't synced item in 1s after some error occurs in loader") } } + +func (s *mysqlSuite) TestRelaxSQLMode(c *check.C) { + tests := []struct { + oldMode string + newMode string + }{ + {"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE", "ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE"}, + {"ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE,STRICT_TRANS_TABLES", "ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE"}, + {"STRICT_TRANS_TABLES", ""}, + {"ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE", "ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE"}, + } + + for _, test := range tests { + db, dbMock, err := sqlmock.New() + c.Assert(err, check.IsNil) + + rows := sqlmock.NewRows([]string{"@@SESSION.sql_mode"}). + AddRow(test.oldMode) + dbMock.ExpectQuery("SELECT @@SESSION.sql_mode;").WillReturnRows(rows) + + getOld, getNew, err := relaxSQLMode(db) + c.Assert(err, check.IsNil) + c.Assert(getOld, check.Equals, test.oldMode) + c.Assert(getNew, check.Equals, test.newMode) + } +} diff --git a/drainer/sync/syncer.go b/drainer/sync/syncer.go index 1dad58bda..6b391ce9f 100644 --- a/drainer/sync/syncer.go +++ b/drainer/sync/syncer.go @@ -27,6 +27,10 @@ type Item struct { // the applied TS executed in downstream, only for tidb AppliedTS int64 + // should skip to replicate this item at downstream + // currently only used for signal the syncer to learn that the downstream schema is changed + // when we don't replicate DDL. + ShouldSkip bool } // Syncer sync binlog item to downstream diff --git a/drainer/sync/syncer_test.go b/drainer/sync/syncer_test.go index a153eb132..f86805953 100644 --- a/drainer/sync/syncer_test.go +++ b/drainer/sync/syncer_test.go @@ -69,7 +69,7 @@ func (s *syncerSuite) SetUpTest(c *check.C) { createDB = oldCreateDB }() - mysql, err := NewMysqlSyncer(cfg, infoGetter, 1, 1, nil, nil, "mysql") + mysql, err := NewMysqlSyncer(cfg, infoGetter, 1, 1, nil, nil, "mysql", nil) c.Assert(err, check.IsNil) s.syncers = append(s.syncers, mysql) diff --git a/drainer/sync/util.go b/drainer/sync/util.go index cfa922e19..f4a6e2ac2 100644 --- a/drainer/sync/util.go +++ b/drainer/sync/util.go @@ -20,14 +20,17 @@ import ( // DBConfig is the DB configuration. type DBConfig struct { - Host string `toml:"host" json:"host"` - User string `toml:"user" json:"user"` - Password string `toml:"password" json:"password"` - Port int `toml:"port" json:"port"` - Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"` - BinlogFileDir string `toml:"dir" json:"dir"` - TimeLimit string `toml:"time-limit" json:"time-limit"` - SizeLimit string `toml:"size-limit" json:"size-limit"` + Host string `toml:"host" json:"host"` + User string `toml:"user" json:"user"` + Password string `toml:"password" json:"password"` + // if EncryptedPassword is not empty, Password will be ignore. + EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"` + SyncMode int `toml:"sync-mode" json:"sync-mode"` + Port int `toml:"port" json:"port"` + Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"` + BinlogFileDir string `toml:"dir" json:"dir"` + TimeLimit string `toml:"time-limit" json:"time-limit"` + SizeLimit string `toml:"size-limit" json:"size-limit"` ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"` KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"` @@ -45,7 +48,9 @@ type CheckpointConfig struct { Host string `toml:"host" json:"host"` User string `toml:"user" json:"user"` Password string `toml:"password" json:"password"` - Port int `toml:"port" json:"port"` + // if EncryptedPassword is not empty, Password will be ignore. + EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"` + Port int `toml:"port" json:"port"` } type baseError struct { diff --git a/drainer/syncer.go b/drainer/syncer.go index 7bc10b7c5..ad8aa7505 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -14,10 +14,14 @@ package drainer import ( + "reflect" "strings" "sync/atomic" "time" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/tidb-binlog/pkg/loader" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/parser/model" @@ -46,6 +50,8 @@ type Syncer struct { filter *filter.Filter + loopbackSync *loopbacksync.LoopBackSync + // last time we successfully sync binlog item to downstream lastSyncTime time.Time @@ -70,6 +76,7 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) ( ignoreDBs = strings.Split(cfg.IgnoreSchemas, ",") } syncer.filter = filter.NewFilter(ignoreDBs, cfg.IgnoreTables, cfg.DoDBs, cfg.DoTables) + syncer.loopbackSync = loopbacksync.NewLoopBackSyncInfo(cfg.ChannelID, cfg.LoopbackControl, cfg.SyncDDL) var err error // create schema @@ -78,7 +85,7 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) ( return nil, errors.Trace(err) } - syncer.dsyncer, err = createDSyncer(cfg, syncer.schema) + syncer.dsyncer, err = createDSyncer(cfg, syncer.schema, syncer.loopbackSync) if err != nil { return nil, errors.Trace(err) } @@ -86,7 +93,7 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) ( return syncer, nil } -func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err error) { +func createDSyncer(cfg *SyncerConfig, schema *Schema, info *loopbacksync.LoopBackSync) (dsyncer dsync.Syncer, err error) { switch cfg.DestDBType { case "kafka": dsyncer, err = dsync.NewKafka(cfg.To, schema) @@ -104,7 +111,7 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err return nil, errors.Annotate(err, "fail to create flash dsyncer") } case "mysql", "tidb": - dsyncer, err = dsync.NewMysqlSyncer(cfg.To, schema, cfg.WorkerCount, cfg.TxnBatch, queryHistogramVec, cfg.StrSQLMode, cfg.DestDBType) + dsyncer, err = dsync.NewMysqlSyncer(cfg.To, schema, cfg.WorkerCount, cfg.TxnBatch, queryHistogramVec, cfg.StrSQLMode, cfg.DestDBType, info) if err != nil { return nil, errors.Annotate(err, "fail to create mysql dsyncer") } @@ -348,6 +355,15 @@ ForLoop: err = errors.Annotate(err, "handlePreviousDDLJobIfNeed failed") break ForLoop } + var isFilterTransaction = false + var err1 error + if s.loopbackSync != nil && s.loopbackSync.LoopbackControl { + isFilterTransaction, err1 = loopBackStatus(binlog, preWrite, s.schema, s.loopbackSync) + if err1 != nil { + err = errors.Annotate(err1, "analyze transaction failed") + break ForLoop + } + } var ignore bool ignore, err = filterTable(preWrite, s.filter, s.schema) @@ -356,7 +372,7 @@ ForLoop: break ForLoop } - if !ignore { + if !ignore && !isFilterTransaction { s.addDMLEventMetrics(preWrite.GetMutations()) beginTime := time.Now() lastAddComitTS = binlog.GetCommitTs() @@ -397,23 +413,38 @@ ForLoop: } if s.filter.SkipSchemaAndTable(schema, table) { - log.Info("skip ddl", zap.String("schema", schema), zap.String("table", table), + log.Info("skip ddl by filter", zap.String("schema", schema), zap.String("table", table), zap.String("sql", sql), zap.Int64("commit ts", commitTS)) - } else if sql != "" { - s.addDDLCount() - beginTime := time.Now() - lastAddComitTS = binlog.GetCommitTs() + continue + } - log.Info("add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed", - zap.String("sql", sql), zap.Int64("commit ts", binlog.CommitTs)) + shouldSkip := false - err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table}) - if err != nil { - err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs) - break ForLoop + if !s.cfg.SyncDDL { + log.Info("skip ddl by SyncDDL setting to false", zap.String("schema", schema), zap.String("table", table), + zap.String("sql", sql), zap.Int64("commit ts", commitTS)) + // A empty sql force it to evict the downstream table info. + if s.cfg.DestDBType == "tidb" || s.cfg.DestDBType == "mysql" { + shouldSkip = true + } else { + continue } - executeHistogram.Observe(time.Since(beginTime).Seconds()) } + + // Add ddl item to downstream. + s.addDDLCount() + beginTime := time.Now() + lastAddComitTS = binlog.GetCommitTs() + + log.Info("add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed", + zap.String("sql", sql), zap.Int64("commit ts", binlog.CommitTs)) + + err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table, ShouldSkip: shouldSkip}) + if err != nil { + err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs) + break ForLoop + } + executeHistogram.Observe(time.Since(beginTime).Seconds()) } } @@ -436,6 +467,35 @@ ForLoop: return cerr } +func findLoopBackMark(dmls []*loader.DML, info *loopbacksync.LoopBackSync) (bool, error) { + for _, dml := range dmls { + tableName := dml.Database + "." + dml.Table + if strings.EqualFold(tableName, loopbacksync.MarkTableName) { + channelID, ok := dml.Values[loopbacksync.ChannelID] + if ok { + channelIDInt64, ok := channelID.(int64) + if !ok { + return false, errors.Errorf("wrong type of channelID: %s", reflect.TypeOf(channelID)) + } + if channelIDInt64 == info.ChannelID { + return true, nil + } + } + } + } + return false, nil +} + +func loopBackStatus(binlog *pb.Binlog, prewriteValue *pb.PrewriteValue, infoGetter translator.TableInfoGetter, info *loopbacksync.LoopBackSync) (bool, error) { + var tableName string + var schemaName string + txn, err := translator.TiBinlogToTxn(infoGetter, schemaName, tableName, binlog, prewriteValue, false) + if err != nil { + return false, errors.Trace(err) + } + return findLoopBackMark(txn.DMLs, info) +} + // filterTable may drop some table mutation in `PrewriteValue` // Return true if all table mutations are dropped. func filterTable(pv *pb.PrewriteValue, filter *filter.Filter, schema *Schema) (ignore bool, err error) { diff --git a/drainer/syncer_test.go b/drainer/syncer_test.go index 6902d0b3d..7dc71ac1b 100644 --- a/drainer/syncer_test.go +++ b/drainer/syncer_test.go @@ -16,6 +16,9 @@ package drainer import ( "time" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/tidb-binlog/pkg/loader" + "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/parser/model" @@ -58,9 +61,38 @@ func (s *syncerSuite) TestFilterTable(c *check.C) { c.Assert(len(pv.Mutations), check.Equals, 1) } +func (s *syncerSuite) TestFilterMarkDatas(c *check.C) { + var dmls []*loader.DML + dml := loader.DML{ + Database: "retl", + Table: "_drainer_repl_mark", + Tp: 1, + Values: make(map[string]interface{}), + } + dml.Values["channel_id"] = int64(100) + dmls = append(dmls, &dml) + dml1 := loader.DML{ + Database: "retl", + Table: "retl_mark9", + Tp: 1, + Values: make(map[string]interface{}), + } + dml1.Values["status"] = 100 + dmls = append(dmls, &dml1) + loopBackSyncInfo := loopbacksync.LoopBackSync{ + ChannelID: 100, + SyncDDL: true, + LoopbackControl: false, + } + status, err := findLoopBackMark(dmls, &loopBackSyncInfo) + c.Assert(status, check.IsTrue) + c.Assert(err, check.IsNil) +} + func (s *syncerSuite) TestNewSyncer(c *check.C) { cfg := &SyncerConfig{ DestDBType: "_intercept", + SyncDDL: true, } cpFile := c.MkDir() + "/checkpoint" diff --git a/drainer/translator/kafka.go b/drainer/translator/kafka.go index b14e429ff..3127b43bb 100644 --- a/drainer/translator/kafka.go +++ b/drainer/translator/kafka.go @@ -100,6 +100,42 @@ func genTable(schema string, tableInfo *model.TableInfo) (table *obinlog.Table) } table.ColumnInfo = columnInfos + // If PKIsHandle, tableInfo.Indices *will not* contains the primary key + // so we add it here. + // If !PKIsHandle tableInfo.Indices *will* contains the primary key + if tableInfo.PKIsHandle { + pkName := tableInfo.GetPkName() + key := &obinlog.Key{ + Name: proto.String("PRIMARY"), + ColumnNames: []string{pkName.O}, + } + table.UniqueKeys = append(table.UniqueKeys, key) + } + + for _, index := range tableInfo.Indices { + if !index.Unique && !index.Primary { + continue + } + + // just a protective check + if tableInfo.PKIsHandle && index.Name.O == "PRIMARY" { + log.Warn("PKIsHandle and also contains PRIMARY index TableInfo.Indices") + continue + } + + key := new(obinlog.Key) + table.UniqueKeys = append(table.UniqueKeys, key) + + key.Name = proto.String(index.Name.O) + + names := make([]string, len(index.Columns)) + for i, col := range index.Columns { + names[i] = col.Name.O + } + + key.ColumnNames = names + } + return } diff --git a/drainer/translator/kafka_test.go b/drainer/translator/kafka_test.go index 7c635c244..d7663e2bc 100644 --- a/drainer/translator/kafka_test.go +++ b/drainer/translator/kafka_test.go @@ -18,6 +18,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/pingcap/check" + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog" "github.com/pingcap/tidb/types" ) @@ -117,3 +119,107 @@ func checkColumn(c *check.C, info *obinlog.ColumnInfo, col *obinlog.Column, datu c.Assert(colV, check.Equals, datumV) } + +func (t *testKafkaSuite) TestGenTable(c *check.C) { + schema := "test" + table := "test" + + // a table test.test(c1, c2, c3) with: + // primary key: (c1) + // unique key: (c2, c3) + // non-unique key: (c3) + info := &model.TableInfo{ + Name: model.NewCIStr(table), + Columns: []*model.ColumnInfo{ + { + Name: model.NewCIStr("c1"), + FieldType: types.FieldType{ + Flag: mysql.PriKeyFlag, + Tp: mysql.TypeLong, + }, + }, + { + Name: model.NewCIStr("c2"), + FieldType: types.FieldType{ + Tp: mysql.TypeLong, + }, + }, + { + Name: model.NewCIStr("c3"), + FieldType: types.FieldType{ + Tp: mysql.TypeLong, + }, + }, + }, + Indices: []*model.IndexInfo{ + { + Name: model.NewCIStr("PRIMARY"), + Primary: true, + Unique: true, + Columns: []*model.IndexColumn{ + { + Offset: 0, + Name: model.NewCIStr("c1"), + }, + }, + }, + { + Name: model.NewCIStr("idx1"), + Unique: true, + Columns: []*model.IndexColumn{ + { + Offset: 1, + Name: model.NewCIStr("c2"), + }, + { + Offset: 2, + Name: model.NewCIStr("c3"), + }, + }, + }, + { + Name: model.NewCIStr("idx2"), + Unique: false, + Columns: []*model.IndexColumn{ + { + Offset: 1, + Name: model.NewCIStr("c3"), + }, + }, + }, + }, + } + + expectTable := &obinlog.Table{ + SchemaName: proto.String(schema), + TableName: proto.String(table), + ColumnInfo: []*obinlog.ColumnInfo{ + { + Name: "c1", + IsPrimaryKey: true, + MysqlType: "int", + }, + { + Name: "c2", + MysqlType: "int", + }, + { + Name: "c3", + MysqlType: "int", + }, + }, + UniqueKeys: []*obinlog.Key{ + { + Name: proto.String("PRIMARY"), + ColumnNames: []string{"c1"}, + }, + { + Name: proto.String("idx1"), + ColumnNames: []string{"c2", "c3"}, + }, + }, + } + + getTable := genTable(schema, info) + c.Assert(expectTable, check.DeepEquals, getTable) +} diff --git a/drainer/translator/mysql.go b/drainer/translator/mysql.go index 18bcef025..894cc6bf5 100644 --- a/drainer/translator/mysql.go +++ b/drainer/translator/mysql.go @@ -102,14 +102,15 @@ func genMysqlDelete(schema string, table *model.TableInfo, row []byte) (names [] } // TiBinlogToTxn translate the format to loader.Txn -func TiBinlogToTxn(infoGetter TableInfoGetter, schema string, table string, tiBinlog *tipb.Binlog, pv *tipb.PrewriteValue) (txn *loader.Txn, err error) { +func TiBinlogToTxn(infoGetter TableInfoGetter, schema string, table string, tiBinlog *tipb.Binlog, pv *tipb.PrewriteValue, shouldSkip bool) (txn *loader.Txn, err error) { txn = new(loader.Txn) if tiBinlog.DdlJobId > 0 { txn.DDL = &loader.DDL{ - Database: schema, - Table: table, - SQL: string(tiBinlog.GetDdlQuery()), + Database: schema, + Table: table, + SQL: string(tiBinlog.GetDdlQuery()), + ShouldSkip: shouldSkip, } } else { for _, mut := range pv.GetMutations() { diff --git a/drainer/translator/mysql_test.go b/drainer/translator/mysql_test.go index 3a5e330b8..83e564286 100644 --- a/drainer/translator/mysql_test.go +++ b/drainer/translator/mysql_test.go @@ -37,20 +37,21 @@ func (t *testMysqlSuite) TestGenColumnList(c *check.C) { func (t *testMysqlSuite) TestDDL(c *check.C) { t.SetDDL() - txn, err := TiBinlogToTxn(t, t.Schema, t.Table, t.TiBinlog, nil) + txn, err := TiBinlogToTxn(t, t.Schema, t.Table, t.TiBinlog, nil, true) c.Assert(err, check.IsNil) c.Assert(txn, check.DeepEquals, &loader.Txn{ DDL: &loader.DDL{ - Database: t.Schema, - Table: t.Table, - SQL: string(t.TiBinlog.GetDdlQuery()), + Database: t.Schema, + Table: t.Table, + SQL: string(t.TiBinlog.GetDdlQuery()), + ShouldSkip: true, }, }) } func (t *testMysqlSuite) testDML(c *check.C, tp loader.DMLType) { - txn, err := TiBinlogToTxn(t, t.Schema, t.Table, t.TiBinlog, t.PV) + txn, err := TiBinlogToTxn(t, t.Schema, t.Table, t.TiBinlog, t.PV, false) c.Assert(err, check.IsNil) c.Assert(txn.DMLs, check.HasLen, 1) diff --git a/go.mod b/go.mod index 4eff1ca4b..039ae72a3 100644 --- a/go.mod +++ b/go.mod @@ -19,17 +19,17 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0 github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd - github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623 + github.com/pingcap/parser v0.0.0-20200103153514-95649ba8a872 github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 github.com/pingcap/tidb v1.1.0-beta.0.20191120070053-5a7ecfeb94fd - github.com/pingcap/tidb-tools v3.0.6-0.20191125061035-b087739b71f1+incompatible + github.com/pingcap/tidb-tools v3.1.0-beta.0.20191227034743-57985f125c52+incompatible github.com/pingcap/tipb v0.0.0-20191120045257-1b9900292ab6 github.com/prometheus/client_golang v1.0.0 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/samuel/go-zookeeper v0.0.0-20170815201139-e6b59f6144be github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 - github.com/siddontang/go-mysql v0.0.0-20190618002340-dbe0224ac097 // indirect + github.com/sirupsen/logrus v1.4.1 // indirect github.com/soheilhy/cmux v0.1.4 github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect diff --git a/go.sum b/go.sum index 14efb01d8..41ffc5bee 100644 --- a/go.sum +++ b/go.sum @@ -208,20 +208,17 @@ github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0 h1:CHOC95Ct4abJ9Ed github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= -github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= -github.com/pingcap/parser v0.0.0-20191118062434-7c5018645942 h1:JAPbnAxPryeAO50UO89/9MDYJK38Ts7mykTDqgUS14k= github.com/pingcap/parser v0.0.0-20191118062434-7c5018645942/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= -github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623 h1:/BJjVyJlNKWMMrgPsbzk5Y9VPJWwHKYttj3oWxnFQ9U= -github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= +github.com/pingcap/parser v0.0.0-20200103153514-95649ba8a872 h1:PCY0XeNh/+6bGEHI6yWqwRXVDxvhpw2T6eE/umI5iXw= +github.com/pingcap/parser v0.0.0-20200103153514-95649ba8a872/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd/go.mod h1:I7TEby5BHTYIxgHszfsOJSBsk8b2Qt8QrSIgdv5n5QQ= github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 h1:GIEq+wZfrl2bcJxpuSrEH4H7/nlf5YdmpS+dU9lNIt8= github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0/go.mod h1:G/6rJpnYwM0LKMec2rI82/5Kg6GaZMvlfB+e6/tvYmI= github.com/pingcap/tidb v1.1.0-beta.0.20191120070053-5a7ecfeb94fd h1:A7ZhUliNg9NA2+S9pMu7vC1R2ZmRZkpZocX50Mk/YjQ= github.com/pingcap/tidb v1.1.0-beta.0.20191120070053-5a7ecfeb94fd/go.mod h1:+SjmTI6LG3FvK4lIVDsOSyW2lWA4qtCSLKgwnN6FKxI= github.com/pingcap/tidb-tools v3.0.6-0.20191119150227-ff0a3c6e5763+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tidb-tools v3.0.6-0.20191125061035-b087739b71f1+incompatible h1:nndliEgwxjmupxW5ZOXHe4b7xQPXCaCm61Bi0dlQPpM= -github.com/pingcap/tidb-tools v3.0.6-0.20191125061035-b087739b71f1+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +github.com/pingcap/tidb-tools v3.1.0-beta.0.20191227034743-57985f125c52+incompatible h1:MfCru+JGzAww5YbKCCeobjCaCj5/jT01RTxiMEMqILw= +github.com/pingcap/tidb-tools v3.1.0-beta.0.20191227034743-57985f125c52+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20191120045257-1b9900292ab6 h1:HPgqtaqIFIZXTvQNiZoJ9Y79AXz3pmDpYFL28KraTKE= github.com/pingcap/tipb v0.0.0-20191120045257-1b9900292ab6/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -254,20 +251,13 @@ github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7/go.mod h1:qq github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/samuel/go-zookeeper v0.0.0-20170815201139-e6b59f6144be h1:9prAjluC9cN0nht+csYXr3isly4OvAEfch9+VVFnn4U= github.com/samuel/go-zookeeper v0.0.0-20170815201139-e6b59f6144be/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= -github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= -github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v2.18.10+incompatible h1:cy84jW6EVRPa5g9HAHrlbxMSIjBhDSX0OFYyMYminYs= github.com/shirou/gopsutil v2.18.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= -github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/vfsgen v0.0.0-20181020040650-a97a25d856ca/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM= github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= -github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q= -github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= -github.com/siddontang/go-mysql v0.0.0-20190618002340-dbe0224ac097 h1:7krIUFu42TgBwT1wLQwpgR0Jcd/AkjgCGKSjhVWedaI= -github.com/siddontang/go-mysql v0.0.0-20190618002340-dbe0224ac097/go.mod h1:Bl4lryU44qtIXEXNbP0k0pD646Nkw/qHn21wfZVGJx4= github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= @@ -375,15 +365,13 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190909082730-f460065e899a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c h1:S/FtSvpNLtFBgjTqcKsRpsa6aVsI6iztaz1bQd9BJwE= golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= diff --git a/metrics/alertmanager/binlog.rules.yml b/metrics/alertmanager/binlog.rules.yml new file mode 100644 index 000000000..552c06724 --- /dev/null +++ b/metrics/alertmanager/binlog.rules.yml @@ -0,0 +1,84 @@ +groups: +- name: alert.rules + rules: + - alert: binlog_pump_storage_error_count + expr: changes(binlog_pump_storage_error_count[1m]) > 0 + labels: + env: ENV_LABELS_ENV + level: emergency + expr: changes(binlog_pump_storage_error_count[1m]) > 0 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog pump storage write some binlogs failed + + - alert: binlog_drainer_checkpoint_high_delay + expr: (time() - binlog_drainer_checkpoint_tso / 1000) > 3600 + for: 1m + labels: + env: ENV_LABELS_ENV + level: critical + expr: (time() - binlog_drainer_checkpoint_tso / 1000) > 3600 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog drainer checkpoint delay more than 1 hour + + - alert: binlog_pump_write_binlog_rpc_duration_seconds_bucket + expr: histogram_quantile(0.9, rate(binlog_pump_rpc_duration_seconds_bucket{method="WriteBinlog"}[5m])) > 1 + for: 1m + labels: + env: ENV_LABELS_ENV + level: warning + expr: histogram_quantile(0.9, rate(binlog_pump_rpc_duration_seconds_bucket{method="WriteBinlog"}[5m])) + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog pump write binlog RPC latency is too high + + - alert: binlog_pump_storage_write_binlog_duration_time_bucket + expr: histogram_quantile(0.9, rate(binlog_pump_storage_write_binlog_duration_time_bucket{type="batch"}[5m])) > 1 + for: 1m + labels: + env: ENV_LABELS_ENV + level: warning + expr: histogram_quantile(0.9, rate(binlog_pump_storage_write_binlog_duration_time_bucket{type="batch"}[5m])) + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog pump write binlog to disk is too slow + + - alert: binlog_pump_storage_available_size_less_than_20G + expr: binlog_pump_storage_storage_size_bytes{type="available"} < 20 * 1024 * 1024 * 1024 + for: 10s + labels: + env: ENV_LABELS_ENV + level: warning + expr: binlog_pump_storage_storage_size_bytes{type="available"} < 20 * 1024 * 1024 * 1024 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog pump storage available size less than 20G + + - alert: binlog_drainer_execute_duration_time_more_than_10s + expr: histogram_quantile(0.9, rate(binlog_drainer_execute_duration_time_bucket[1m])) > 10 + for: 1m + labels: + env: ENV_LABELS_ENV + level: warning + expr: histogram_quantile(0.9, rate(binlog_drainer_txn_duration_time_bucket[1m])) > 10 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog binlog drainer execute_duration_time_more_than_10s + + - alert: binlog_drainer_checkpoint_tso_no_change_for_1m + expr: changes(binlog_drainer_checkpoint_tso[1m]) < 1 + labels: + env: ENV_LABELS_ENV + level: warning + expr: changes(binlog_drainer_checkpoint_tso[1m]) < 1 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: binlog drainer checkpoint tso no change for 1m diff --git a/metrics/grafana/binlog.json b/metrics/grafana/binlog.json new file mode 100644 index 000000000..6f7f7da0d --- /dev/null +++ b/metrics/grafana/binlog.json @@ -0,0 +1,2002 @@ +{ + "__inputs": [ + { + "name": "DS_TEST-CLUSTER", + "label": "test-cluster", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "6.1.6" + }, + { + "type": "panel", + "id": "graph", + "name": "Graph", + "version": "" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "1.0.0" + }, + { + "type": "panel", + "id": "singlestat", + "name": "Singlestat", + "version": "" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "${DS_TEST-CLUSTER}", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 0, + "id": null, + "iteration": 1564734492545, + "links": [], + "panels": [ + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 73, + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 1 + }, + "hideTimeOverride": false, + "id": 68, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideZero": false, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_pump_storage_storage_size_bytes", + "format": "time_series", + "hide": false, + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Storage Size", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "decbytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 1 + }, + "id": 63, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": false, + "hideZero": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_pump_storage_gc_ts", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : gc_tso", + "refId": "A" + }, + { + "expr": "binlog_pump_storage_max_commit_ts", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : max_commit_tso", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Metadata", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "dateTimeAsIso", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 7, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "rate(binlog_pump_rpc_duration_seconds_count{method=\"WriteBinlog\"}[1m])", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} :: {{label}}", + "metric": "binlog_cistern_rpc_duration_seconds_bucket", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Write Binlog QPS by Instance", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "ops", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 8 + }, + "id": 3, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.99, rate(binlog_pump_rpc_duration_seconds_bucket{method=\"WriteBinlog\"}[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{method}}:99", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, rate(binlog_pump_rpc_duration_seconds_bucket{method=\"WriteBinlog\"}[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{method}} : 95", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Write Binlog Latency", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 15 + }, + "id": 44, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.99, rate(binlog_pump_storage_write_binlog_size_bucket[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{type}} : 99", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.95, rate(binlog_pump_storage_write_binlog_size_bucket[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{type}} : 95", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Storage Write Binlog Size", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "Bps", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 15 + }, + "id": 66, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.99, rate(binlog_pump_storage_write_binlog_duration_time_bucket[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{type}}:99", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, rate(binlog_pump_storage_write_binlog_duration_time_bucket[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} : {{type}}:95", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Storage Write Binlog Latency", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 22 + }, + "id": 48, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "rate(binlog_pump_storage_error_count[1m])", + "format": "time_series", + "instant": false, + "intervalFactor": 2, + "legendFormat": "{{instance}}:{{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Pump Storage Error By Type", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 22 + }, + "id": 67, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_pump_storage_query_tikv_count", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Query Tikv", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + } + ], + "repeat": null, + "title": "pump", + "type": "row" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 1 + }, + "id": 74, + "panels": [ + { + "cacheTimeout": null, + "colorBackground": false, + "colorValue": false, + "colors": [ + "#299c46", + "rgba(237, 129, 40, 0.89)", + "#d44a3a" + ], + "datasource": "${DS_TEST-CLUSTER}", + "format": "dateTimeAsIso", + "gauge": { + "maxValue": null, + "minValue": 0, + "show": false, + "thresholdLabels": false, + "thresholdMarkers": true + }, + "gridPos": { + "h": 7, + "w": 8, + "x": 0, + "y": 23 + }, + "hideTimeOverride": false, + "id": 70, + "interval": null, + "links": [], + "mappingType": 1, + "mappingTypes": [ + { + "name": "value to text", + "value": 1 + }, + { + "name": "range to text", + "value": 2 + } + ], + "maxDataPoints": 100, + "nullPointMode": "connected", + "nullText": null, + "postfix": "", + "postfixFontSize": "50%", + "prefix": "", + "prefixFontSize": "50%", + "rangeMaps": [ + { + "from": "null", + "text": "N/A", + "to": "null" + } + ], + "repeat": null, + "sparkline": { + "fillColor": "rgba(31, 118, 189, 0.18)", + "full": false, + "lineColor": "rgb(31, 120, 193)", + "show": false + }, + "tableColumn": "__name__", + "targets": [ + { + "expr": "binlog_drainer_checkpoint_tso{instance = \"$drainer_instance\"}", + "format": "time_series", + "instant": true, + "intervalFactor": 2, + "legendFormat": "checkpoint tso", + "refId": "A" + } + ], + "thresholds": "", + "timeFrom": null, + "timeShift": null, + "title": "Checkpoint TSO", + "transparent": false, + "type": "singlestat", + "valueFontSize": "80%", + "valueMaps": [ + { + "op": "=", + "text": "N/A", + "value": "null" + } + ], + "valueName": "current" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 16, + "x": 8, + "y": 23 + }, + "id": 69, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": false, + "hideZero": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_drainer_pump_position{instance = \"$drainer_instance\"}", + "format": "time_series", + "hide": false, + "instant": false, + "intervalFactor": 2, + "legendFormat": "{{nodeID}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Pump Handle TSO", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": false, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "dateTimeAsIso", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 30 + }, + "id": 62, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(binlog_drainer_read_binlog_size_count{instance = \"$drainer_instance\"}[1m])) by (nodeID)", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{nodeID}}", + "metric": "binlog_drainer_event", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Pull Binlog QPS by Pump NodeID", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "none", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 30 + }, + "id": 53, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.95, rate(binlog_drainer_binlog_reach_duration_time_bucket{instance = \"$drainer_instance\"}[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{nodeID}}", + "metric": "binlog_drainer_event", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "95% Binlog Reach Duration By Pump", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 37 + }, + "id": 58, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_drainer_error_count{instance = \"$drainer_instance\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{type}}", + "metric": "binlog_drainer_position", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Error By Type", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "none", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 37 + }, + "id": 6, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "irate(binlog_drainer_event{instance = \"$drainer_instance\"}[1m])", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{type}}", + "metric": "binlog_drainer_event", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Drainer Event", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "ops", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 44 + }, + "id": 15, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.99, rate(binlog_drainer_execute_duration_time_bucket{instance = \"$drainer_instance\"}[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{job}}", + "metric": "binlog_drainer_txn_duration_time_bucket", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "99% Execute Time", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 44 + }, + "id": 71, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.99, rate(binlog_drainer_query_duration_time_bucket{instance = \"$drainer_instance\"}[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{type}}", + "metric": "binlog_drainer_txn_duration_time_bucket", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "99% sql query Time", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 51 + }, + "id": 55, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.95, rate(binlog_drainer_read_binlog_size_bucket{instance = \"$drainer_instance\"}[1m]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "pump: {{nodeID}}", + "metric": "binlog_drainer_event", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "95% Binlog Size", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "Bps", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 51 + }, + "id": 52, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_drainer_ddl_jobs_total{instance = \"$drainer_instance\"}", + "format": "time_series", + "instant": false, + "intervalFactor": 2, + "legendFormat": "ddl job count", + "metric": "binlog_drainer_position", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "DDL Job Count", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "none", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 24, + "x": 0, + "y": 58 + }, + "id": 72, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "binlog_drainer_queue_size{instance = \"$drainer_instance\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{name}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "queue size", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + } + ], + "repeat": null, + "title": "drainer", + "type": "row" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 2 + }, + "id": 75, + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 59 + }, + "id": 9, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "go_goroutines{job=~\"binlog|pump|drainer\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}}", + "metric": "go_goroutines", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Goroutine", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 59 + }, + "id": 39, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "connected", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "go_memstats_heap_inuse_bytes{job=~\"binlog|pump|drainer\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}}", + "metric": "go_goroutines", + "refId": "A", + "step": 2 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Memory", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "bits", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + } + ], + "repeat": null, + "title": "node", + "type": "row" + } + ], + "refresh": "10s", + "schemaVersion": 18, + "style": "dark", + "tags": [], + "templating": { + "list": [ + { + "allValue": null, + "current": {}, + "datasource": "${DS_TEST-CLUSTER}", + "definition": "", + "hide": 0, + "includeAll": false, + "label": null, + "multi": false, + "name": "drainer_instance", + "options": [], + "query": "label_values(binlog_drainer_ddl_jobs_total, instance)", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + } + ] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": { + "refresh_intervals": [ + "5s", + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h", + "2h", + "1d" + ], + "time_options": [ + "5m", + "15m", + "1h", + "6h", + "12h", + "24h", + "2d", + "7d", + "30d" + ] + }, + "timezone": "browser", + "title": "Test-Cluster-Binlog", + "uid": "RDdDTFvZz", + "version": 2 +} \ No newline at end of file diff --git a/pkg/encrypt/encrypt.go b/pkg/encrypt/encrypt.go new file mode 100644 index 000000000..690a07942 --- /dev/null +++ b/pkg/encrypt/encrypt.go @@ -0,0 +1,159 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package encrypt + +import ( + "bytes" + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "encoding/base64" + "encoding/hex" + "os" + "sync" + + "github.com/pingcap/errors" + "github.com/pingcap/log" +) + +var ( + defaultSecretKey, _ = hex.DecodeString("a529b7665997f043a30ac8fadcb51d6aa032c226ab5b7750530b12b8c1a16a48") + secretKey []byte + ivSep = []byte("@") // ciphertext format: iv + ivSep + encrypted-plaintext +) + +var initSecretKeyOnce sync.Once +var initSecretKeyErr error + +func initSecretKey() error { + hexKey := os.Getenv("BINLOG_SECRET_KEY") + if len(hexKey) == 0 { + log.Warn("use the default secret key to encrypt") + secretKey = defaultSecretKey + return nil + } + + key, err := hex.DecodeString(hexKey) + if err != nil { + return errors.Trace(err) + } + + return SetSecretKey(key) +} + +// SetSecretKey sets the secret key which used to encrypt +func SetSecretKey(key []byte) error { + switch len(key) { + case 16, 24, 32: + break + default: + return errors.Errorf("secretKey not valid: %v", key) + } + secretKey = key + return nil +} + +// Encrypt tries to encrypt plaintext to base64 encoded ciphertext +func Encrypt(plaintext string) (string, error) { + ciphertext, err := encrypt([]byte(plaintext)) + if err != nil { + return "", err + } + + return base64.StdEncoding.EncodeToString(ciphertext), nil +} + +// Decrypt tries to decrypt base64 encoded ciphertext to plaintext +func Decrypt(ciphertextB64 string) (string, error) { + ciphertext, err := base64.StdEncoding.DecodeString(ciphertextB64) + if err != nil { + return "", errors.Annotatef(err, "base 64 failed to decode: %s", ciphertext) + } + + plaintext, err := decrypt(ciphertext) + if err != nil { + return "", errors.Trace(err) + } + return string(plaintext), nil +} + +// encrypt encrypts plaintext to ciphertext +func encrypt(plaintext []byte) ([]byte, error) { + initSecretKeyOnce.Do(func() { + initSecretKeyErr = initSecretKey() + }) + if initSecretKeyErr != nil { + return nil, initSecretKeyErr + } + + block, err := aes.NewCipher(secretKey) + if err != nil { + return nil, errors.Trace(err) + } + + iv, err := genIV(block.BlockSize()) + if err != nil { + return nil, err + } + + ciphertext := make([]byte, 0, len(iv)+len(ivSep)+len(plaintext)) + ciphertext = append(ciphertext, iv...) + ciphertext = append(ciphertext, ivSep...) + ciphertext = append(ciphertext, plaintext...) // will be overwrite by XORKeyStream + + stream := cipher.NewCFBEncrypter(block, iv) + stream.XORKeyStream(ciphertext[len(iv)+len(ivSep):], plaintext) + + return ciphertext, nil +} + +// decrypt decrypts ciphertext to plaintext +func decrypt(ciphertext []byte) ([]byte, error) { + initSecretKeyOnce.Do(func() { + initSecretKeyErr = initSecretKey() + }) + if initSecretKeyErr != nil { + return nil, initSecretKeyErr + } + + block, err := aes.NewCipher(secretKey) + if err != nil { + return nil, err + } + + if len(ciphertext) < block.BlockSize()+len(ivSep) { + // return nil, terror.ErrCiphertextLenNotValid.Generate(block.BlockSize()+len(ivSep), len(ciphertext)) + return nil, errors.Errorf("ciphertext not valid") + } + + if !bytes.Equal(ciphertext[block.BlockSize():block.BlockSize()+len(ivSep)], ivSep) { + // return nil, terror.ErrCiphertextContextNotValid.Generate() + return nil, errors.Errorf("ciphertext not valid") + } + + iv := ciphertext[:block.BlockSize()] + ciphertext = ciphertext[block.BlockSize()+len(ivSep):] + plaintext := make([]byte, len(ciphertext)) + + stream := cipher.NewCFBDecrypter(block, iv) + stream.XORKeyStream(plaintext, ciphertext) + + return plaintext, nil +} + +func genIV(n int) ([]byte, error) { + b := make([]byte, n) + _, err := rand.Read(b) + return b, errors.Trace(err) +} diff --git a/pkg/encrypt/encrypt_test.go b/pkg/encrypt/encrypt_test.go new file mode 100644 index 000000000..9889a5acf --- /dev/null +++ b/pkg/encrypt/encrypt_test.go @@ -0,0 +1,92 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package encrypt + +import ( + "crypto/aes" + "crypto/rand" + "testing" + + . "github.com/pingcap/check" +) + +var _ = Suite(&testEncryptSuite{}) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testEncryptSuite struct { +} + +func (t *testEncryptSuite) TestSetSecretKey(c *C) { + // 16 bit + b16 := make([]byte, 16) + _, err := rand.Read(b16) + c.Assert(err, IsNil) + + err = SetSecretKey(b16) + c.Assert(err, IsNil) + + // 20 bit + b20 := make([]byte, 20) + _, err = rand.Read(b20) + c.Assert(err, IsNil) + + err = SetSecretKey(b20) + c.Assert(err, NotNil) +} + +func removeChar(input []byte, c byte) []byte { + i := 0 + for _, x := range input { + if x != c { + input[i] = x + i++ + } + } + return input[:i] +} + +func (t *testEncryptSuite) TestEncrypt(c *C) { + plaintext := []byte("a plain text") + + // encrypt + ciphertext, err := encrypt(plaintext) + c.Assert(err, IsNil) + + // decrypt + plaintext2, err := decrypt(ciphertext) + c.Assert(err, IsNil) + c.Assert(plaintext2, DeepEquals, plaintext) + + // invalid length + _, err = decrypt(ciphertext[:len(ciphertext)-len(plaintext)-1]) + c.Assert(err, NotNil) + + // invalid content + _, err = decrypt(removeChar(ciphertext, ivSep[0])) + c.Assert(err, NotNil) + + // a special case, we construct a ciphertext that can be decrypted but the + // plaintext is not what we want. This is because currently encrypt mechanism + // doesn't keep enough information to decide whether the new ciphertext is valid + block, err := aes.NewCipher(secretKey) + c.Assert(err, IsNil) + blockSize := block.BlockSize() + c.Assert(len(ciphertext), Greater, blockSize+2) + plaintext3, err := decrypt(append(ciphertext[1:blockSize+1], append([]byte{ivSep[0]}, ciphertext[blockSize+2:]...)...)) + c.Assert(err, IsNil) + c.Assert(plaintext3, Not(DeepEquals), plaintext) +} diff --git a/pkg/loader/executor.go b/pkg/loader/executor.go index 4e72a5194..4058bfece 100644 --- a/pkg/loader/executor.go +++ b/pkg/loader/executor.go @@ -20,8 +20,12 @@ import ( "strings" "time" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/errors" "github.com/pingcap/log" + pkgsql "github.com/pingcap/tidb-binlog/pkg/sql" "github.com/pingcap/tidb-binlog/pkg/util" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -33,7 +37,9 @@ var defaultBatchSize = 128 type executor struct { db *gosql.DB batchSize int + info *loopbacksync.LoopBackSync queryHistogramVec *prometheus.HistogramVec + refreshTableInfo func(schema string, table string) (info *tableInfo, err error) } func newExecutor(db *gosql.DB) *executor { @@ -45,11 +51,20 @@ func newExecutor(db *gosql.DB) *executor { return exe } +func (e *executor) withRefreshTableInfo(fn func(schema string, table string) (info *tableInfo, err error)) *executor { + e.refreshTableInfo = fn + return e +} + func (e *executor) withBatchSize(batchSize int) *executor { e.batchSize = batchSize return e } +func (e *executor) setSyncInfo(info *loopbacksync.LoopBackSync) { + e.info = info +} + func (e *executor) withQueryHistogramVec(queryHistogramVec *prometheus.HistogramVec) *executor { e.queryHistogramVec = queryHistogramVec return e @@ -100,6 +115,22 @@ func (tx *tx) commit() error { return errors.Trace(err) } +func (e *executor) updateMark(channel string, tx *tx) error { + if e.info == nil { + return nil + } + status := 1 + columns := fmt.Sprintf("(%s,%s,%s) VALUES(?,?,?)", loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + var args []interface{} + sql := fmt.Sprintf("INSERT INTO %s%s on duplicate key update %s=%s+1;", loopbacksync.MarkTableName, columns, loopbacksync.Val, loopbacksync.Val) + args = append(args, e.info.ChannelID, status, channel) + _, err := tx.autoRollbackExec(sql, args...) + if err != nil { + return errors.Trace(err) + } + return nil +} + // return a wrap of sql.Tx func (e *executor) begin() (*tx, error) { sqlTx, err := e.db.Begin() @@ -107,10 +138,19 @@ func (e *executor) begin() (*tx, error) { return nil, errors.Trace(err) } - return &tx{ + var tx = &tx{ Tx: sqlTx, queryHistogramVec: e.queryHistogramVec, - }, nil + } + + if e.info != nil && e.info.LoopbackControl { + err1 := e.updateMark("", tx) + if err1 != nil { + return nil, errors.Trace(err1) + } + } + + return tx, nil } func (e *executor) bulkDelete(deletes []*DML) error { @@ -237,10 +277,54 @@ func (e *executor) splitExecDML(ctx context.Context, dmls []*DML, exec func(dmls return errors.Trace(errg.Wait()) } +func tryRefreshTableErr(err error) bool { + errCode, ok := pkgsql.GetSQLErrCode(err) + if !ok { + return false + } + + switch errCode { + case infoschema.ErrColumnNotExists.Code(): + return true + } + + return false +} + func (e *executor) singleExecRetry(ctx context.Context, allDMLs []*DML, safeMode bool, retryNum int, backoff time.Duration) error { for _, dmls := range splitDMLs(allDMLs, e.batchSize) { err := util.RetryContext(ctx, retryNum, backoff, 1, func(context.Context) error { - return e.singleExec(dmls, safeMode) + execErr := e.singleExec(dmls, safeMode) + if execErr == nil { + return nil + } + + if tryRefreshTableErr(execErr) && e.refreshTableInfo != nil { + log.Info("try refresh table info") + name2info := make(map[string]*tableInfo) + for _, dml := range dmls { + name := dml.TableName() + info, ok := name2info[name] + if !ok { + var err error + info, err = e.refreshTableInfo(dml.Database, dml.Table) + if err != nil { + log.Error("fail to refresh table info", zap.Error(err)) + continue + } + + name2info[name] = info + } + + if len(dml.info.columns) != len(info.columns) { + log.Info("columns change", zap.Strings("old", dml.info.columns), + zap.Strings("new", info.columns)) + removeOrphanCols(info, dml) + } + dml.info = info + } + } + return execErr }) if err != nil { return errors.Trace(err) diff --git a/pkg/loader/executor_test.go b/pkg/loader/executor_test.go index e04625874..4358da48b 100644 --- a/pkg/loader/executor_test.go +++ b/pkg/loader/executor_test.go @@ -21,6 +21,8 @@ import ( "sync/atomic" sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/go-sql-driver/mysql" + "github.com/pingcap/check" . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/prometheus/client_golang/prometheus" @@ -75,6 +77,21 @@ func (s *executorSuite) TestSplitExecDML(c *C) { c.Assert(counter, Equals, int32(3)) } +func (s *executorSuite) TestTryRefreshTableErr(c *C) { + tests := []struct { + err error + res bool + }{ + {&mysql.MySQLError{Number: 1054} /*Unknown column*/, true}, + {errors.New("what ever"), false}, + } + + for _, test := range tests { + get := tryRefreshTableErr(test.err) + c.Assert(get, check.Equals, test.res) + } +} + type singleExecSuite struct { db *sql.DB dbMock sqlmock.Sqlmock @@ -114,12 +131,12 @@ func (s *singleExecSuite) TestInsert(c *C) { columns: []string{"name", "age"}, }, } - insertSQL := "INSERT INTO `unicorn`.`users`(`name`,`age`) VALUES(?,?)" - replaceSQL := "REPLACE INTO `unicorn`.`users`(`name`,`age`) VALUES(?,?)" + insertSQL := "INSERT INTO `unicorn`.`users`(`age`,`name`) VALUES(?,?)" + replaceSQL := "REPLACE INTO `unicorn`.`users`(`age`,`name`) VALUES(?,?)" s.dbMock.ExpectBegin() s.dbMock.ExpectExec(regexp.QuoteMeta(insertSQL)). - WithArgs("tester", 2019).WillReturnResult(sqlmock.NewResult(1, 1)) + WithArgs(2019, "tester").WillReturnResult(sqlmock.NewResult(1, 1)) s.dbMock.ExpectCommit() e := newExecutor(s.db) @@ -131,7 +148,7 @@ func (s *singleExecSuite) TestInsert(c *C) { s.dbMock.ExpectBegin() s.dbMock.ExpectExec(regexp.QuoteMeta(replaceSQL)). - WithArgs("tester", 2019).WillReturnResult(sqlmock.NewResult(1, 1)) + WithArgs(2019, "tester").WillReturnResult(sqlmock.NewResult(1, 1)) s.dbMock.ExpectCommit() e = newExecutor(s.db) @@ -180,7 +197,7 @@ func (s *singleExecSuite) TestSafeUpdate(c *C) { s.dbMock.ExpectExec(delSQL). WithArgs("tester").WillReturnResult(sqlmock.NewResult(1, 1)) s.dbMock.ExpectExec(replaceSQL). - WithArgs("tester", 2019).WillReturnError(errors.New("replace")) + WithArgs(2019, "tester").WillReturnError(errors.New("replace")) e = newExecutor(s.db) err = e.singleExec([]*DML{&dml}, true) @@ -193,7 +210,7 @@ func (s *singleExecSuite) TestSafeUpdate(c *C) { s.dbMock.ExpectExec(delSQL). WithArgs("tester").WillReturnResult(sqlmock.NewResult(1, 1)) s.dbMock.ExpectExec(replaceSQL). - WithArgs("tester", 2019).WillReturnResult(sqlmock.NewResult(1, 1)) + WithArgs(2019, "tester").WillReturnResult(sqlmock.NewResult(1, 1)) s.dbMock.ExpectCommit() e = newExecutor(s.db) diff --git a/pkg/loader/load.go b/pkg/loader/load.go index 402fe5766..a8493b2e9 100644 --- a/pkg/loader/load.go +++ b/pkg/loader/load.go @@ -17,10 +17,13 @@ import ( "context" gosql "database/sql" "fmt" + "strings" "sync" "sync/atomic" "time" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb-binlog/pkg/util" @@ -69,6 +72,9 @@ type loaderImpl struct { batchSize int workerCount int + syncMode SyncMode + + loopBackSyncInfo *loopbacksync.LoopBackSync input chan *Txn successTxn chan *Txn @@ -99,23 +105,43 @@ type MetricsGroup struct { QueryHistogramVec *prometheus.HistogramVec } +// SyncMode represents the sync mode of DML. +type SyncMode int + +// SyncMode values. +const ( + SyncFullColumn SyncMode = 1 + iota + SyncPartialColumn +) + type options struct { - workerCount int - batchSize int - metrics *MetricsGroup - saveAppliedTS bool + workerCount int + batchSize int + loopBackSyncInfo *loopbacksync.LoopBackSync + metrics *MetricsGroup + saveAppliedTS bool + syncMode SyncMode } var defaultLoaderOptions = options{ - workerCount: 16, - batchSize: 20, - metrics: nil, - saveAppliedTS: false, + workerCount: 16, + batchSize: 20, + loopBackSyncInfo: nil, + metrics: nil, + saveAppliedTS: false, + syncMode: SyncFullColumn, } // A Option sets options such batch size, worker count etc. type Option func(*options) +// SyncModeOption set sync mode of loader. +func SyncModeOption(n SyncMode) Option { + return func(o *options) { + o.syncMode = n + } +} + // WorkerCount set worker count of loader func WorkerCount(n int) Option { return func(o *options) { @@ -130,6 +156,13 @@ func BatchSize(n int) Option { } } +//SetloopBackSyncInfo set loop back sync info of loader +func SetloopBackSyncInfo(loopBackSyncInfo *loopbacksync.LoopBackSync) Option { + return func(o *options) { + o.loopBackSyncInfo = loopBackSyncInfo + } +} + // SaveAppliedTS set downstream type, values can be tidb or mysql func SaveAppliedTS(save bool) Option { return func(o *options) { @@ -154,15 +187,18 @@ func NewLoader(db *gosql.DB, opt ...Option) (Loader, error) { ctx, cancel := context.WithCancel(context.Background()) + // TODO just save opts in loaderImpl instead of copy every field. s := &loaderImpl{ - db: db, - workerCount: opts.workerCount, - batchSize: opts.batchSize, - metrics: opts.metrics, - input: make(chan *Txn), - successTxn: make(chan *Txn), - merge: true, - saveAppliedTS: opts.saveAppliedTS, + db: db, + workerCount: opts.workerCount, + batchSize: opts.batchSize, + metrics: opts.metrics, + syncMode: opts.syncMode, + loopBackSyncInfo: opts.loopBackSyncInfo, + input: make(chan *Txn), + successTxn: make(chan *Txn), + merge: true, + saveAppliedTS: opts.saveAppliedTS, ctx: ctx, cancel: cancel, @@ -262,6 +298,10 @@ func (s *loaderImpl) refreshTableInfo(schema string, table string) (info *tableI return } +func (s *loaderImpl) evitTableInfo(schema string, table string) { + s.tableInfos.Delete(quoteSchema(schema, table)) +} + func (s *loaderImpl) getTableInfo(schema string, table string) (info *tableInfo, err error) { v, ok := s.tableInfos.Load(quoteSchema(schema, table)) if ok { @@ -306,6 +346,9 @@ func isCreateDatabaseDDL(sql string) bool { func (s *loaderImpl) execDDL(ddl *DDL) error { log.Debug("exec ddl", zap.Reflect("ddl", ddl)) + if ddl.ShouldSkip { + return nil + } err := util.RetryContext(s.ctx, maxDDLRetryCount, execDDLRetryWait, 1, func(context.Context) error { tx, err := s.db.Begin() @@ -392,6 +435,20 @@ func (s *loaderImpl) singleExec(executor *executor, dmls []*DML) error { return errors.Trace(err) } +func removeOrphanCols(info *tableInfo, dml *DML) { + mp := make(map[string]struct{}, len(info.columns)) + for _, name := range info.columns { + mp[name] = struct{}{} + } + + for name := range dml.Values { + if _, ok := mp[name]; !ok { + delete(dml.Values, name) + delete(dml.OldValues, name) + } + } +} + func (s *loaderImpl) execDMLs(dmls []*DML) error { if len(dmls) == 0 { return nil @@ -402,6 +459,9 @@ func (s *loaderImpl) execDMLs(dmls []*DML) error { return errors.Trace(err) } filterGeneratedCols(dml) + if s.syncMode == SyncPartialColumn { + removeOrphanCols(dml.info, dml) + } } batchTables, singleDMLs := s.groupDMLs(dmls) @@ -428,8 +488,30 @@ func (s *loaderImpl) execDMLs(dmls []*DML) error { return errors.Trace(err) } +func (s *loaderImpl) createMarkTable() error { + markTableDataBase := loopbacksync.MarkTableName[:strings.Index(loopbacksync.MarkTableName, ".")] + createDatabaseSQL := fmt.Sprintf("create database IF NOT EXISTS %s;", markTableDataBase) + createDatabase := DDL{SQL: createDatabaseSQL} + if err1 := s.execDDL(&createDatabase); err1 != nil { + log.Error("exec failed", zap.String("sql", createDatabase.SQL), zap.Error(err1)) + return errors.Trace(err1) + } + sql := createMarkTableDDL() + createMarkTableInfo := DDL{Database: markTableDataBase, Table: loopbacksync.MarkTableName, SQL: sql} + if err := s.execDDL(&createMarkTableInfo); err != nil { + log.Error("exec failed", zap.String("sql", createMarkTableInfo.SQL), zap.Error(err)) + return errors.Trace(err) + } + return nil +} + // Run will quit when meet any error, or all the txn are drained func (s *loaderImpl) Run() error { + if s.loopBackSyncInfo != nil && s.loopBackSyncInfo.LoopbackControl { + if err := s.createMarkTable(); err != nil { + return errors.Trace(err) + } + } txnManager := newTxnManager(1024, s.input) defer func() { log.Info("Run()... in Loader quit") @@ -538,6 +620,10 @@ func filterGeneratedCols(dml *DML) { func (s *loaderImpl) getExecutor() *executor { e := newExecutor(s.db).withBatchSize(s.batchSize) + if s.syncMode == SyncPartialColumn { + e = e.withRefreshTableInfo(s.refreshTableInfo) + } + e.setSyncInfo(s.loopBackSyncInfo) if s.metrics != nil && s.metrics.QueryHistogramVec != nil { e = e.withQueryHistogramVec(s.metrics.QueryHistogramVec) } @@ -552,6 +638,11 @@ func newBatchManager(s *loaderImpl) *batchManager { fExecDDL: s.execDDL, fDDLSuccessCallback: func(txn *Txn) { s.markSuccess(txn) + if txn.DDL.ShouldSkip { + s.evitTableInfo(txn.DDL.Database, txn.DDL.Table) + return + } + if needRefreshTableInfo(txn.DDL.SQL) { if _, err := s.refreshTableInfo(txn.DDL.Database, txn.DDL.Table); err != nil { log.Error("refresh table info failed", zap.String("database", txn.DDL.Database), zap.String("table", txn.DDL.Table), zap.Error(err)) diff --git a/pkg/loader/load_test.go b/pkg/loader/load_test.go index d6655cacf..20fc47343 100644 --- a/pkg/loader/load_test.go +++ b/pkg/loader/load_test.go @@ -21,7 +21,7 @@ import ( sqlmock "github.com/DATA-DOG/go-sqlmock" "github.com/go-sql-driver/mysql" - check "github.com/pingcap/check" + "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -37,6 +37,37 @@ func (cs *LoadSuite) SetUpTest(c *check.C) { func (cs *LoadSuite) TearDownTest(c *check.C) { } +func (cs *LoadSuite) TestRemoveOrphanCols(c *check.C) { + dml := &DML{ + Values: map[string]interface{}{ + "exist1": 11, + "exist2": 22, + "orhpan1": 11, + "orhpan2": 22, + }, + OldValues: map[string]interface{}{ + "exist1": 1, + "exist2": 2, + "orhpan1": 1, + "orhpan2": 2, + }, + } + + info := &tableInfo{ + columns: []string{"exist1", "exist2"}, + } + + removeOrphanCols(info, dml) + c.Assert(dml.Values, check.DeepEquals, map[string]interface{}{ + "exist1": 11, + "exist2": 22, + }) + c.Assert(dml.OldValues, check.DeepEquals, map[string]interface{}{ + "exist1": 1, + "exist2": 2, + }) +} + func (cs *LoadSuite) TestOptions(c *check.C) { var o options WorkerCount(42)(&o) @@ -120,8 +151,9 @@ func (cs *LoadSuite) TestNewClose(c *check.C) { db, _, err := sqlmock.New() c.Assert(err, check.IsNil) - loader, err := NewLoader(db) + loader, err := NewLoader(db, SyncModeOption(SyncPartialColumn)) c.Assert(err, check.IsNil) + c.Assert(loader.(*loaderImpl).syncMode, check.Equals, SyncPartialColumn) loader.Close() } diff --git a/pkg/loader/model.go b/pkg/loader/model.go index 604d42761..1db4adfa9 100644 --- a/pkg/loader/model.go +++ b/pkg/loader/model.go @@ -15,9 +15,12 @@ package loader import ( "fmt" + "sort" "strconv" "strings" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/log" "go.uber.org/zap" ) @@ -51,6 +54,9 @@ type DDL struct { Database string Table string SQL string + // should skip to execute this DDL at downstream and just refresh the downstream table info. + // one case for this usage is for bidirectional replication and only execute DDL at one side. + ShouldSkip bool } // Txn holds transaction info, an DDL or DML sequences @@ -184,6 +190,11 @@ func (dml *DML) updateSQL() (sql string, args []interface{}) { return } +func createMarkTableDDL() string { + sql := fmt.Sprintf("CREATE TABLE If Not Exists %s ( %s bigint primary key, %s bigint DEFAULT 0, %s varchar(64));", loopbacksync.MarkTableName, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + return sql +} + func (dml *DML) buildWhere(builder *strings.Builder) (args []interface{}) { wnames, wargs := dml.whereSlice() for i := 0; i < len(wnames); i++ { @@ -230,7 +241,8 @@ func (dml *DML) whereSlice() (colNames []string, args []interface{}) { } // Fallback to use all columns - return dml.info.columns, dml.whereValues(dml.info.columns) + names := dml.columnNames() + return names, dml.whereValues(names) } func (dml *DML) deleteSQL() (sql string, args []interface{}) { @@ -244,10 +256,21 @@ func (dml *DML) deleteSQL() (sql string, args []interface{}) { return } +func (dml *DML) columnNames() []string { + names := make([]string, 0, len(dml.Values)) + + for name := range dml.Values { + names = append(names, name) + } + + sort.Strings(names) + return names +} + func (dml *DML) replaceSQL() (sql string, args []interface{}) { - info := dml.info - sql = fmt.Sprintf("REPLACE INTO %s(%s) VALUES(%s)", dml.TableName(), buildColumnList(info.columns), holderString(len(info.columns))) - for _, name := range info.columns { + names := dml.columnNames() + sql = fmt.Sprintf("REPLACE INTO %s(%s) VALUES(%s)", dml.TableName(), buildColumnList(names), holderString(len(names))) + for _, name := range names { v := dml.Values[name] args = append(args, v) } diff --git a/pkg/loader/model_test.go b/pkg/loader/model_test.go index 57e0222f4..8a83e5007 100644 --- a/pkg/loader/model_test.go +++ b/pkg/loader/model_test.go @@ -14,9 +14,14 @@ package loader import ( + "fmt" + "regexp" "strings" - check "github.com/pingcap/check" + "github.com/DATA-DOG/go-sqlmock" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + + "github.com/pingcap/check" ) type dmlSuite struct { @@ -57,6 +62,7 @@ func (d *dmlSuite) testWhere(c *check.C, tp DMLType) { if tp == UpdateDMLType { dml.OldValues = values + dml.Values = values } else { dml.Values = values } @@ -74,12 +80,13 @@ func (d *dmlSuite) testWhere(c *check.C, tp DMLType) { dml = getDML(false, tp) if tp == UpdateDMLType { dml.OldValues = values + dml.Values = values } else { dml.Values = values } names, args = dml.whereSlice() - c.Assert(names, check.DeepEquals, []string{"id", "a1"}) + c.Assert(names, check.DeepEquals, []string{"a1", "id"}) c.Assert(args, check.DeepEquals, []interface{}{1, 1}) builder.Reset() @@ -179,10 +186,10 @@ func (s *SQLSuite) TestInsertSQL(c *check.C) { }, } sql, args := dml.sql() - c.Assert(sql, check.Equals, "INSERT INTO `test`.`hello`(`name`,`age`) VALUES(?,?)") + c.Assert(sql, check.Equals, "INSERT INTO `test`.`hello`(`age`,`name`) VALUES(?,?)") c.Assert(args, check.HasLen, 2) - c.Assert(args[0], check.Equals, "pc") - c.Assert(args[1], check.Equals, 42) + c.Assert(args[0], check.Equals, 42) + c.Assert(args[1], check.Equals, "pc") } func (s *SQLSuite) TestDeleteSQL(c *check.C) { @@ -192,6 +199,7 @@ func (s *SQLSuite) TestDeleteSQL(c *check.C) { Table: "hello", Values: map[string]interface{}{ "name": "pc", + "age": 10, }, info: &tableInfo{ columns: []string{"name", "age"}, @@ -200,9 +208,10 @@ func (s *SQLSuite) TestDeleteSQL(c *check.C) { sql, args := dml.sql() c.Assert( sql, check.Equals, - "DELETE FROM `test`.`hello` WHERE `name` = ? AND `age` IS NULL LIMIT 1") - c.Assert(args, check.HasLen, 1) - c.Assert(args[0], check.Equals, "pc") + "DELETE FROM `test`.`hello` WHERE `age` = ? AND `name` = ? LIMIT 1") + c.Assert(args, check.HasLen, 2) + c.Assert(args[0], check.Equals, 10) + c.Assert(args[1], check.Equals, "pc") } func (s *SQLSuite) TestUpdateSQL(c *check.C) { @@ -228,3 +237,30 @@ func (s *SQLSuite) TestUpdateSQL(c *check.C) { c.Assert(args[0], check.Equals, "pc") c.Assert(args[1], check.Equals, "pingcap") } + +func (s *SQLSuite) TestUpdateMarkSQL(c *check.C) { + db, mock, err := sqlmock.New() + c.Assert(err, check.IsNil) + defer db.Close() + columns := fmt.Sprintf("(%s,%s,%s) VALUES(?,?,?)", loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + sql := fmt.Sprintf("INSERT INTO %s%s on duplicate key update %s=%s+1;", loopbacksync.MarkTableName, columns, loopbacksync.Val, loopbacksync.Val) + mock.ExpectBegin() + mock.ExpectExec(regexp.QuoteMeta(sql)). + WithArgs(100, 1, "").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + e := newExecutor(db) + tx, err := e.begin() + c.Assert(err, check.IsNil) + info := &loopbacksync.LoopBackSync{ChannelID: 100, LoopbackControl: true, SyncDDL: true} + e.info = info + err1 := e.updateMark("", tx) + c.Assert(err1, check.IsNil) + err2 := tx.commit() + c.Assert(err2, check.IsNil) + c.Assert(mock.ExpectationsWereMet(), check.IsNil) +} +func (s *SQLSuite) TestCreateMarkTable(c *check.C) { + sql := createMarkTableDDL() + sql1 := fmt.Sprintf("CREATE TABLE If Not Exists %s ( %s bigint primary key, %s bigint DEFAULT 0, %s varchar(64));", loopbacksync.MarkTableName, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + c.Assert(sql, check.Equals, sql1) +} diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index 7d9ffa892..963bc8c41 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -51,17 +51,17 @@ func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) { if safemode { insertPattern = "REPLACE INTO" } - mock.ExpectExec(insertPattern).WithArgs(1, "test", nil).WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectExec("DELETE FROM").WithArgs(1, "test").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(insertPattern).WithArgs(1, "test", "test").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("DELETE FROM").WithArgs(1, "test", "test").WillReturnResult(sqlmock.NewResult(0, 1)) if safemode { mock.ExpectExec("DELETE FROM").WithArgs().WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectExec(insertPattern).WithArgs(nil, nil, "abc").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(insertPattern).WithArgs(1, "test", "abc").WillReturnResult(sqlmock.NewResult(0, 1)) } else { - mock.ExpectExec("UPDATE").WithArgs("abc", "test").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("UPDATE").WithArgs(1, "test", "abc", 1, "test", "test").WillReturnResult(sqlmock.NewResult(0, 1)) } mock.ExpectCommit() - syncTest(c, Syncer(syncer)) + syncTest(c, syncer) err = syncer.Close() c.Assert(err, check.IsNil) diff --git a/reparo/syncer/print_test.go b/reparo/syncer/print_test.go index 9485f0f03..197081d27 100644 --- a/reparo/syncer/print_test.go +++ b/reparo/syncer/print_test.go @@ -25,10 +25,14 @@ func (s *testPrintSuite) TestPrintSyncer(c *check.C) { "schema: test; table: t1; type: Insert\n"+ "a(int): 1\n"+ "b(varchar): test\n"+ + "c(varchar): test\n"+ "schema: test; table: t1; type: Delete\n"+ "a(int): 1\n"+ "b(varchar): test\n"+ + "c(varchar): test\n"+ "schema: test; table: t1; type: Update\n"+ + "a(int): 1 => 1\n"+ + "b(varchar): test => test\n"+ "c(varchar): test => abc\n") err = syncer.Close() diff --git a/reparo/syncer/translate_test.go b/reparo/syncer/translate_test.go index 6c2aecca8..e3ffb2eb2 100644 --- a/reparo/syncer/translate_test.go +++ b/reparo/syncer/translate_test.go @@ -134,6 +134,7 @@ func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { Values: map[string]interface{}{ "a": int64(1), "b": "test", + "c": "test", }, }, { Database: "test", @@ -142,15 +143,20 @@ func (s *testTranslateSuite) TestPBBinlogToTxn(c *check.C) { Values: map[string]interface{}{ "a": int64(1), "b": "test", + "c": "test", }, }, { Database: "test", Table: "t1", Tp: loader.UpdateDMLType, Values: map[string]interface{}{ + "a": int64(1), + "b": "test", "c": "abc", }, OldValues: map[string]interface{}{ + "a": int64(1), + "b": "test", "c": "test", }, }, @@ -199,17 +205,17 @@ func generateDMLEvents(c *check.C) []pb.Event { Tp: pb.EventType_Insert, SchemaName: &schema, TableName: &table, - Row: [][]byte{cols[0], cols[1]}, + Row: [][]byte{cols[0], cols[1], cols[2]}, }, { Tp: pb.EventType_Delete, SchemaName: &schema, TableName: &table, - Row: [][]byte{cols[0], cols[1]}, + Row: [][]byte{cols[0], cols[1], cols[2]}, }, { Tp: pb.EventType_Update, SchemaName: &schema, TableName: &table, - Row: [][]byte{cols[2]}, + Row: [][]byte{cols[0], cols[1], cols[2]}, }, } } @@ -220,15 +226,17 @@ func generateColumns(c *check.C) [][]byte { cols := []*pb.Column{ { - Name: "a", - Tp: []byte{mysql.TypeInt24}, - MysqlType: "int", - Value: encodeIntValue(1), + Name: "a", + Tp: []byte{mysql.TypeInt24}, + MysqlType: "int", + Value: encodeIntValue(1), + ChangedValue: encodeIntValue(1), }, { - Name: "b", - Tp: []byte{mysql.TypeVarchar}, - MysqlType: "varchar", - Value: encodeBytesValue([]byte("test")), + Name: "b", + Tp: []byte{mysql.TypeVarchar}, + MysqlType: "varchar", + Value: encodeBytesValue([]byte("test")), + ChangedValue: encodeBytesValue([]byte("test")), }, { Name: "c", Tp: []byte{mysql.TypeVarchar},