From 2fcf5dc470df3657315a65244ef85a76107c8567 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 22 Jul 2019 18:32:05 +0800 Subject: [PATCH] drainer/: Add ignore-txn-commit-ts to skip some txn (#685) --- cmd/drainer/drainer.toml | 3 +++ drainer/config.go | 27 ++++++++++++++------------- drainer/syncer.go | 16 +++++++++++++++- drainer/syncer_test.go | 29 +++++++++++++++++++++++++++++ 4 files changed, 61 insertions(+), 14 deletions(-) create mode 100644 drainer/syncer_test.go diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index 5cde179b1..4948ffd45 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -52,6 +52,9 @@ safe-mode = false # valid values are "mysql", "file", "tidb", "flash", "kafka" db-type = "mysql" +# ignore syncing the txn with specified commit ts to downstream +ignore-txn-commit-ts = [] + # disable sync these schema ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql" diff --git a/drainer/config.go b/drainer/config.go index 8ddf13839..fbe1a0467 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -45,19 +45,20 @@ var ( // SyncerConfig is the Syncer's configuration. type SyncerConfig struct { - StrSQLMode *string `toml:"sql-mode" json:"sql-mode"` - SQLMode mysql.SQLMode `toml:"-" json:"-"` - IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` - IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"` - TxnBatch int `toml:"txn-batch" json:"txn-batch"` - WorkerCount int `toml:"worker-count" json:"worker-count"` - To *executor.DBConfig `toml:"to" json:"to"` - DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"` - DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"` - DestDBType string `toml:"db-type" json:"db-type"` - DisableDispatch bool `toml:"disable-dispatch" json:"disable-dispatch"` - SafeMode bool `toml:"safe-mode" json:"safe-mode"` - DisableCausality bool `toml:"disable-detect" json:"disable-detect"` + StrSQLMode *string `toml:"sql-mode" json:"sql-mode"` + SQLMode mysql.SQLMode `toml:"-" json:"-"` + IgnoreTxnCommitTS []int64 `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"` + IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` + IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"` + TxnBatch int `toml:"txn-batch" json:"txn-batch"` + WorkerCount int `toml:"worker-count" json:"worker-count"` + To *executor.DBConfig `toml:"to" json:"to"` + DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"` + DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"` + DestDBType string `toml:"db-type" json:"db-type"` + DisableDispatch bool `toml:"disable-dispatch" json:"disable-dispatch"` + SafeMode bool `toml:"safe-mode" json:"safe-mode"` + DisableCausality bool `toml:"disable-detect" json:"disable-detect"` } // Config holds the configuration of drainer diff --git a/drainer/syncer.go b/drainer/syncer.go index 31a20ebd7..130b7484d 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -452,6 +452,11 @@ func (s *Syncer) run(jobs []*model.Job) error { commitTS := binlog.GetCommitTs() jobID := binlog.GetDdlJobId() + if isIgnoreTxnCommitTS(s.cfg.IgnoreTxnCommitTS, commitTS) { + log.Warnf("skip txn, binlog: %s", b.binlog.String()) + continue + } + if startTS == commitTS { // generate fake binlog job s.addJob(newFakeJob(commitTS, b.nodeID)) @@ -513,7 +518,7 @@ func (s *Syncer) run(jobs []*model.Job) error { return errors.Trace(err) } - log.Infof("[ddl][start]%s[commit ts]%v", sql, commitTS) + log.Infof("[ddl][start]%s[commit ts]%v, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed", sql, commitTS) var args []interface{} // for kafka, mysql and tidb, we want to know the relate schema and table, get it while args now // in executor @@ -630,6 +635,15 @@ func (s *Syncer) translateSqls(mutations []pb.TableMutation, commitTS int64, nod return nil } +func isIgnoreTxnCommitTS(ignoreTxnCommitTS []int64, ts int64) bool { + for _, ignoreTS := range ignoreTxnCommitTS { + if ignoreTS == ts { + return true + } + } + return false +} + // Add adds binlogItem to the syncer's input channel func (s *Syncer) Add(b *binlogItem) { select { diff --git a/drainer/syncer_test.go b/drainer/syncer_test.go new file mode 100644 index 000000000..05d09e47a --- /dev/null +++ b/drainer/syncer_test.go @@ -0,0 +1,29 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package drainer + +import ( + "github.com/pingcap/check" +) + +type syncerSuite struct{} + +var _ = check.Suite(&syncerSuite{}) + +func (s *syncerSuite) TestIsIgnoreTxnCommitTS(c *check.C) { + c.Assert(isIgnoreTxnCommitTS(nil, 1), check.IsFalse) + c.Assert(isIgnoreTxnCommitTS([]int64{1, 3}, 1), check.IsTrue) + c.Assert(isIgnoreTxnCommitTS([]int64{1, 3}, 2), check.IsFalse) + c.Assert(isIgnoreTxnCommitTS([]int64{1, 3}, 3), check.IsTrue) +}