From 5fd3d17f9ecbfc709ddd214208c6707dd197966e Mon Sep 17 00:00:00 2001 From: Lonng Date: Mon, 1 Apr 2019 18:18:12 +0800 Subject: [PATCH] *: Add sql-mode config for drainer (#511) If this is setted , will use the same sql-mode to parse DDL statment, and set the same sql-mode at downstream when db-type is mysql. If this is not setted, it will not set any sql-mode. (cherry picked from commit 06e378fb258891dc377f74dc94e508eea74b0c28) --- cmd/drainer/drainer.toml | 5 +++++ drainer/config.go | 13 ++++++++++++- drainer/config_test.go | 25 +++++++++++++++++++++++++ drainer/executor/executor.go | 4 ++-- drainer/executor/mysql.go | 4 ++-- drainer/syncer.go | 13 +++++++------ drainer/translator/flash.go | 12 +++++++++--- drainer/translator/kafka.go | 3 ++- drainer/translator/mysql.go | 9 +++++++-- drainer/translator/pb.go | 10 +++++++--- drainer/translator/translator.go | 3 ++- drainer/translator/translator_test.go | 3 ++- drainer/util.go | 4 ++-- pkg/sql/sql.go | 16 +++++++++++++--- 14 files changed, 97 insertions(+), 27 deletions(-) diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index 294ebb92d..c51f4edb7 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -24,6 +24,11 @@ pd-urls = "http://127.0.0.1:2379" # syncer Configuration. [syncer] +# Assume the upstream sql-mode. +# If this is setted , will use the same sql-mode to parse DDL statment, and set the same sql-mode at downstream when db-type is mysql. +# If this is not setted, it will not set any sql-mode. +# sql-mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION" + # disable sync these schema ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql" diff --git a/drainer/config.go b/drainer/config.go index 7ef1c7130..3389669f5 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -15,6 +15,8 @@ import ( "github.com/BurntSushi/toml" "github.com/ngaut/log" "github.com/pingcap/errors" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb-binlog/drainer/executor" "github.com/pingcap/tidb-binlog/pkg/filter" "github.com/pingcap/tidb-binlog/pkg/flags" @@ -42,6 +44,8 @@ var ( // SyncerConfig is the Syncer's configuration. type SyncerConfig struct { + StrSQLMode *string `toml:"sql-mode" json:"sql-mode"` + SQLMode mysql.SQLMode `toml:"-" json:"-"` IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` TxnBatch int `toml:"txn-batch" json:"txn-batch"` WorkerCount int `toml:"worker-count" json:"worker-count"` @@ -81,7 +85,7 @@ func NewConfig() *Config { cfg := &Config{ EtcdTimeout: defaultEtcdTimeout, - SyncerCfg: new(SyncerConfig), + SyncerCfg: &SyncerConfig{}, } cfg.FlagSet = flag.NewFlagSet("drainer", flag.ContinueOnError) fs := cfg.FlagSet @@ -156,6 +160,13 @@ func (cfg *Config) Parse(args []string) error { return errors.Trace(err) } + if cfg.SyncerCfg.StrSQLMode != nil { + cfg.SyncerCfg.SQLMode, err = mysql.GetSQLMode(*cfg.SyncerCfg.StrSQLMode) + if err != nil { + return errors.Annotate(err, "invalid config: `sql-mode` must be a valid SQL_MODE") + } + } + cfg.tls, err = cfg.Security.ToTLSConfig() if err != nil { return errors.Errorf("tls config %+v error %v", cfg.Security, err) diff --git a/drainer/config_test.go b/drainer/config_test.go index d4fc1d318..ea7834ec1 100644 --- a/drainer/config_test.go +++ b/drainer/config_test.go @@ -4,6 +4,7 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/pingcap/parser/mysql" ) // Hook up gocheck into the "go test" runner. @@ -30,4 +31,28 @@ func (t *testDrainerSuite) TestConfig(c *C) { c.Assert(cfg.SyncerCfg.TxnBatch, Equals, 1) c.Assert(cfg.SyncerCfg.DestDBType, Equals, "mysql") c.Assert(cfg.SyncerCfg.To.Host, Equals, "127.0.0.1") + var strSQLMode *string + c.Assert(cfg.SyncerCfg.StrSQLMode, Equals, strSQLMode) + c.Assert(cfg.SyncerCfg.SQLMode, Equals, mysql.SQLMode(0)) +} + +func (t *testDrainerSuite) TestValidate(c *C) { + cfg := NewConfig() + + cfg.ListenAddr = "http://123:9091" + err := cfg.validate() + c.Assert(err, ErrorMatches, ".*ListenAddr.*") + cfg.ListenAddr = "http://192.168.10.12:9091" + + cfg.EtcdURLs = "127.0.0.1:2379,127.0.0.1:2380" + err = cfg.validate() + c.Assert(err, ErrorMatches, ".*EtcdURLs.*") + + cfg.EtcdURLs = "http://127.0.0.1,http://192.168.12.12" + err = cfg.validate() + c.Assert(err, ErrorMatches, ".*EtcdURLs.*") + + cfg.EtcdURLs = "http://127.0.0.1:2379,http://192.168.12.12:2379" + err = cfg.validate() + c.Assert(err, IsNil) } diff --git a/drainer/executor/executor.go b/drainer/executor/executor.go index 986636909..7f5162402 100644 --- a/drainer/executor/executor.go +++ b/drainer/executor/executor.go @@ -15,10 +15,10 @@ type Executor interface { } // New returns the an Executor instance by given name -func New(name string, cfg *DBConfig) (Executor, error) { +func New(name string, cfg *DBConfig, sqlMode *string) (Executor, error) { switch name { case "mysql", "tidb": - return newMysql(cfg) + return newMysql(cfg, sqlMode) case "pb": return newPB(cfg) case "flash": diff --git a/drainer/executor/mysql.go b/drainer/executor/mysql.go index e8316b5d0..31e269984 100644 --- a/drainer/executor/mysql.go +++ b/drainer/executor/mysql.go @@ -17,8 +17,8 @@ type mysqlExecutor struct { *baseError } -func newMysql(cfg *DBConfig) (Executor, error) { - db, err := pkgsql.OpenDB("mysql", cfg.Host, cfg.Port, cfg.User, cfg.Password) +func newMysql(cfg *DBConfig, sqlMode *string) (Executor, error) { + db, err := pkgsql.OpenDBWithSQLMode("mysql", cfg.Host, cfg.Port, cfg.User, cfg.Password, sqlMode) if err != nil { return nil, errors.Trace(err) } diff --git a/drainer/syncer.go b/drainer/syncer.go index cb86336eb..bce2dcbc9 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -12,14 +12,15 @@ import ( "github.com/ngaut/log" "github.com/pingcap/errors" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/store/tikv/oracle" + pb "github.com/pingcap/tipb/go-binlog" + "github.com/pingcap/tidb-binlog/drainer/checkpoint" "github.com/pingcap/tidb-binlog/drainer/executor" "github.com/pingcap/tidb-binlog/drainer/translator" "github.com/pingcap/tidb-binlog/pkg/filter" "github.com/pingcap/tidb-binlog/pkg/loader" pkgsql "github.com/pingcap/tidb-binlog/pkg/sql" - "github.com/pingcap/tidb/store/tikv/oracle" - pb "github.com/pingcap/tipb/go-binlog" ) var ( @@ -130,13 +131,13 @@ func (s *Syncer) checkWait(job *job) bool { func (s *Syncer) enableSafeModeInitializationPhase() { // set safeMode to true and useInsert to flase at the first, and will use the config after 5 minutes. - s.translator.SetConfig(true) + s.translator.SetConfig(true, s.cfg.SQLMode) go func() { ctx, cancel := context.WithCancel(s.ctx) defer func() { cancel() - s.translator.SetConfig(s.cfg.SafeMode) + s.translator.SetConfig(s.cfg.SafeMode, s.cfg.SQLMode) }() select { @@ -418,7 +419,7 @@ func (s *Syncer) run(jobs []*model.Job) error { return errors.Trace(err) } - s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount) + s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount, s.cfg.StrSQLMode) if err != nil { return errors.Trace(err) } @@ -428,7 +429,7 @@ func (s *Syncer) run(jobs []*model.Job) error { return errors.Trace(err) } - s.translator.SetConfig(s.cfg.SafeMode) + s.translator.SetConfig(s.cfg.SafeMode, s.cfg.SQLMode) go s.enableSafeModeInitializationPhase() for i := 0; i < s.cfg.WorkerCount; i++ { diff --git a/drainer/translator/flash.go b/drainer/translator/flash.go index 74aa54a8a..59c6073a7 100644 --- a/drainer/translator/flash.go +++ b/drainer/translator/flash.go @@ -11,6 +11,7 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" + parsermysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-binlog/pkg/dml" "github.com/pingcap/tidb-binlog/pkg/util" "github.com/pingcap/tidb/mysql" @@ -19,14 +20,17 @@ import ( ) // flashTranslator translates TiDB binlog to flash sqls -type flashTranslator struct{} +type flashTranslator struct { + sqlMode parsermysql.SQLMode +} func init() { Register("flash", &flashTranslator{}) } // Config set the configuration -func (f *flashTranslator) SetConfig(bool) { +func (f *flashTranslator) SetConfig(_ bool, sqlMode parsermysql.SQLMode) { + f.sqlMode = sqlMode } func (f *flashTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows [][]byte, commitTS int64) ([]string, [][]string, [][]interface{}, error) { @@ -191,7 +195,9 @@ func (f *flashTranslator) GenDeleteSQLs(schema string, table *model.TableInfo, r func (f *flashTranslator) GenDDLSQL(sql string, schema string, commitTS int64) (string, error) { schema = strings.ToLower(schema) - stmt, err := parser.New().ParseOneStmt(sql, "", "") + ddlParser := parser.New() + ddlParser.SetSQLMode(f.sqlMode) + stmt, err := ddlParser.ParseOneStmt(sql, "", "") if err != nil { return "", errors.Trace(err) } diff --git a/drainer/translator/kafka.go b/drainer/translator/kafka.go index cd8d6da2a..c8ad879dd 100644 --- a/drainer/translator/kafka.go +++ b/drainer/translator/kafka.go @@ -9,6 +9,7 @@ import ( "github.com/ngaut/log" "github.com/pingcap/errors" "github.com/pingcap/parser/model" + parsermysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-binlog/pkg/util" obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog" "github.com/pingcap/tidb/mysql" @@ -24,7 +25,7 @@ func init() { Register("kafka", &kafkaTranslator{}) } -func (p *kafkaTranslator) SetConfig(bool) { +func (p *kafkaTranslator) SetConfig(bool, parsermysql.SQLMode) { // do nothing } diff --git a/drainer/translator/mysql.go b/drainer/translator/mysql.go index 854ec0128..948eb9dd7 100644 --- a/drainer/translator/mysql.go +++ b/drainer/translator/mysql.go @@ -13,6 +13,7 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" + parsermysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-binlog/pkg/dml" "github.com/pingcap/tidb-binlog/pkg/util" "github.com/pingcap/tidb/mysql" @@ -27,6 +28,7 @@ const implicitColID = -1 type mysqlTranslator struct { // safeMode is a mode for translate sql, will translate update to delete and replace, and translate insert to replace. safeMode int32 + sqlMode parsermysql.SQLMode } func init() { @@ -34,12 +36,13 @@ func init() { Register("tidb", &mysqlTranslator{}) } -func (m *mysqlTranslator) SetConfig(safeMode bool) { +func (m *mysqlTranslator) SetConfig(safeMode bool, sqlMode parsermysql.SQLMode) { if safeMode { atomic.StoreInt32(&m.safeMode, 1) } else { atomic.StoreInt32(&m.safeMode, 0) } + m.sqlMode = sqlMode } func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows [][]byte, commitTS int64) ([]string, [][]string, [][]interface{}, error) { @@ -272,7 +275,9 @@ func (m *mysqlTranslator) genDeleteSQL(schema string, table *model.TableInfo, co } func (m *mysqlTranslator) GenDDLSQL(sql string, schema string, commitTS int64) (string, error) { - stmt, err := parser.New().ParseOneStmt(sql, "", "") + ddlParser := parser.New() + ddlParser.SetSQLMode(m.sqlMode) + stmt, err := ddlParser.ParseOneStmt(sql, "", "") if err != nil { return "", errors.Trace(err) } diff --git a/drainer/translator/pb.go b/drainer/translator/pb.go index 528149306..3f7932dde 100644 --- a/drainer/translator/pb.go +++ b/drainer/translator/pb.go @@ -9,6 +9,7 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-binlog/pkg/util" pb "github.com/pingcap/tidb-binlog/proto/binlog" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -19,14 +20,15 @@ import ( // pbTranslator translates TiDB binlog to self-description protobuf type pbTranslator struct { + sqlMode mysql.SQLMode } func init() { Register("pb", &pbTranslator{}) } -func (p *pbTranslator) SetConfig(bool) { - // do nothing +func (p *pbTranslator) SetConfig(_ bool, sqlMode mysql.SQLMode) { + p.sqlMode = sqlMode } func (p *pbTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows [][]byte, commitTS int64) ([]string, [][]string, [][]interface{}, error) { @@ -182,7 +184,9 @@ func (p *pbTranslator) GenDeleteSQLs(schema string, table *model.TableInfo, rows } func (p *pbTranslator) GenDDLSQL(sql string, schema string, commitTS int64) (string, error) { - stmt, err := parser.New().ParseOneStmt(sql, "", "") + ddlParser := parser.New() + ddlParser.SetSQLMode(p.sqlMode) + stmt, err := ddlParser.ParseOneStmt(sql, "", "") if err != nil { return "", errors.Trace(err) } diff --git a/drainer/translator/translator.go b/drainer/translator/translator.go index f4fc95825..efa07c5a3 100644 --- a/drainer/translator/translator.go +++ b/drainer/translator/translator.go @@ -6,6 +6,7 @@ import ( "github.com/ngaut/log" "github.com/pingcap/errors" "github.com/pingcap/parser/model" + parsermysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-binlog/pkg/util" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/table" @@ -35,7 +36,7 @@ var providers = make(map[string]SQLTranslator) // SQLTranslator is the interface for translating TiDB binlog to target sqls type SQLTranslator interface { // Config set the configuration - SetConfig(safeMode bool) + SetConfig(safeMode bool, sqlMode parsermysql.SQLMode) // GenInsertSQLs generates the insert sqls GenInsertSQLs(schema string, table *model.TableInfo, rows [][]byte, commitTS int64) ([]string, [][]string, [][]interface{}, error) diff --git a/drainer/translator/translator_test.go b/drainer/translator/translator_test.go index ab32d0521..ec9645f4c 100644 --- a/drainer/translator/translator_test.go +++ b/drainer/translator/translator_test.go @@ -9,6 +9,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/parser/model" + parsermysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/tablecodec" @@ -57,7 +58,7 @@ func (t *testTranslatorSuite) TestTranslater(c *C) { } func testGenInsertSQLs(c *C, s SQLTranslator, safeMode bool) { - s.SetConfig(safeMode) + s.SetConfig(safeMode, parsermysql.ModeStrictTransTables|parsermysql.ModeNoEngineSubstitution) schema := "t" tables := []*model.TableInfo{testGenTable("normal"), testGenTable("hasPK"), testGenTable("hasID")} exceptedKeys := []int{3, 2, 1} diff --git a/drainer/util.go b/drainer/util.go index cf4b9308d..b657e85ae 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -133,10 +133,10 @@ func closeExecutors(executors ...executor.Executor) { } } -func createExecutors(destDBType string, cfg *executor.DBConfig, count int) ([]executor.Executor, error) { +func createExecutors(destDBType string, cfg *executor.DBConfig, count int, sqlMODE *string) ([]executor.Executor, error) { executors := make([]executor.Executor, 0, count) for i := 0; i < count; i++ { - executor, err := executor.New(destDBType, cfg) + executor, err := executor.New(destDBType, cfg, sqlMODE) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/sql/sql.go b/pkg/sql/sql.go index a7c2dd96c..68674f8f2 100644 --- a/pkg/sql/sql.go +++ b/pkg/sql/sql.go @@ -4,6 +4,7 @@ import ( "database/sql" "fmt" "net" + "net/url" "strconv" "strings" "time" @@ -116,17 +117,26 @@ func ExecuteTxnWithHistogram(db *sql.DB, sqls []string, args [][]interface{}, hi return nil } -// OpenDB creates an instance of sql.DB. -func OpenDB(proto string, host string, port int, username string, password string) (*sql.DB, error) { +// OpenDBWithSQLMode creates an instance of sql.DB. +func OpenDBWithSQLMode(proto string, host string, port int, username string, password string, sqlMode *string) (*sql.DB, error) { dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4,utf8&multiStatements=true", username, password, host, port) + if sqlMode != nil { + // same as "set sql_mode = ''" + dbDSN += "&sql_mode='" + url.QueryEscape(*sqlMode) + "'" + } db, err := sql.Open(proto, dbDSN) if err != nil { - return nil, errors.Trace(err) + return nil, errors.Annotatef(err, "dsn: %s", dbDSN) } return db, nil } +// OpenDB creates an instance of sql.DB. +func OpenDB(proto string, host string, port int, username string, password string) (*sql.DB, error) { + return OpenDBWithSQLMode(proto, host, port, username, password, nil) +} + // IgnoreDDLError checks the error can be ignored or not. func IgnoreDDLError(err error) bool { errCode, ok := GetSQLErrCode(err)