diff --git a/cmd/reparo/reparo.toml b/cmd/reparo/reparo.toml index 76cbbe84b..bbb87cf7e 100644 --- a/cmd/reparo/reparo.toml +++ b/cmd/reparo/reparo.toml @@ -19,6 +19,14 @@ log-level = "info" # for print, it just prints decoded value. dest-type = "mysql" +# number of binlog events in a transaction batch +txn-batch = 20 + +# work count to execute binlogs +# if the latency between reparo and downstream(mysql or tidb) are too high, you might want to increase this +# to get higher throughput by higher concurrent write to the downstream +worker-count = 16 + ##replicate-do-db priority over replicate-do-table if have same db name ##and we support regular expression , start with '~' declare use regular expression. # diff --git a/reparo/config.go b/reparo/config.go index c87914fac..fc73c6a20 100644 --- a/reparo/config.go +++ b/reparo/config.go @@ -30,6 +30,8 @@ type Config struct { StopDatetime string `toml:"stop-datetime" json:"stop-datetime"` StartTSO int64 `toml:"start-tso" json:"start-tso"` StopTSO int64 `toml:"stop-tso" json:"stop-tso"` + TxnBatch int `toml:"txn-batch" json:"txn-batch"` + WorkerCount int `toml:"worker-count" json:"worker-count"` DestType string `toml:"dest-type" json:"dest-type"` DestDB *syncer.DBConfig `toml:"dest-db" json:"dest-db"` @@ -64,6 +66,8 @@ func NewConfig() *Config { fs.StringVar(&c.StopDatetime, "stop-datetime", "", "recovery end in stop-datetime, empty string means never end.") fs.Int64Var(&c.StartTSO, "start-tso", 0, "similar to start-datetime but in pd-server tso format") fs.Int64Var(&c.StopTSO, "stop-tso", 0, "similar to stop-datetime, but in pd-server tso format") + fs.IntVar(&c.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch") + fs.IntVar(&c.WorkerCount, "c", 16, "parallel worker count") fs.StringVar(&c.LogFile, "log-file", "", "log file path") fs.StringVar(&c.LogRotate, "log-rotate", "", "log file rotate type, hour/day") fs.StringVar(&c.DestType, "dest-type", "print", "dest type, values can be [print,mysql]") diff --git a/reparo/reparo.go b/reparo/reparo.go index 02372ea0d..396170a0d 100644 --- a/reparo/reparo.go +++ b/reparo/reparo.go @@ -23,7 +23,7 @@ type Reparo struct { func New(cfg *Config) (*Reparo, error) { log.Infof("cfg %+v", cfg) - syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.SafeMode) + syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.WorkerCount, cfg.TxnBatch, cfg.SafeMode) if err != nil { return nil, errors.Trace(err) } diff --git a/reparo/syncer/mysql.go b/reparo/syncer/mysql.go index b4fda8767..c60003c28 100644 --- a/reparo/syncer/mysql.go +++ b/reparo/syncer/mysql.go @@ -30,25 +30,23 @@ type mysqlSyncer struct { } var ( - _ Syncer = &mysqlSyncer{} - defaultWorkerCount = 16 - defaultBatchSize = 20 + _ Syncer = &mysqlSyncer{} ) // should be only used for unit test to create mock db var createDB = loader.CreateDB -func newMysqlSyncer(cfg *DBConfig, safemode bool) (*mysqlSyncer, error) { +func newMysqlSyncer(cfg *DBConfig, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) { db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port) if err != nil { return nil, errors.Trace(err) } - return newMysqlSyncerFromSQLDB(db, safemode) + return newMysqlSyncerFromSQLDB(db, worker, batchSize, safemode) } -func newMysqlSyncerFromSQLDB(db *sql.DB, safemode bool) (*mysqlSyncer, error) { - loader, err := loader.NewLoader(db, loader.WorkerCount(defaultWorkerCount), loader.BatchSize(defaultBatchSize)) +func newMysqlSyncerFromSQLDB(db *sql.DB, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) { + loader, err := loader.NewLoader(db, loader.WorkerCount(worker), loader.BatchSize(batchSize)) if err != nil { return nil, errors.Annotate(err, "new loader failed") } diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index c9379b3e1..cf4634156 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -22,11 +22,6 @@ func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) { var ( mock sqlmock.Sqlmock ) - originWorkerCount := defaultWorkerCount - defaultWorkerCount = 1 - defer func() { - defaultWorkerCount = originWorkerCount - }() oldCreateDB := createDB createDB = func(string, string, string, int) (db *sql.DB, err error) { @@ -37,7 +32,7 @@ func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) { createDB = oldCreateDB }() - syncer, err := newMysqlSyncer(&DBConfig{}, safemode) + syncer, err := newMysqlSyncer(&DBConfig{}, 1, 20, safemode) c.Assert(err, check.IsNil) mock.ExpectBegin() diff --git a/reparo/syncer/syncer.go b/reparo/syncer/syncer.go index 3a2872695..a963ab337 100644 --- a/reparo/syncer/syncer.go +++ b/reparo/syncer/syncer.go @@ -16,10 +16,10 @@ type Syncer interface { } // New creates a new executor based on the name. -func New(name string, cfg *DBConfig, safemode bool) (Syncer, error) { +func New(name string, cfg *DBConfig, worker int, batchSize int, safemode bool) (Syncer, error) { switch name { case "mysql": - return newMysqlSyncer(cfg, safemode) + return newMysqlSyncer(cfg, worker, batchSize, safemode) case "print": return newPrintSyncer() case "memory": diff --git a/reparo/syncer/syncer_test.go b/reparo/syncer/syncer_test.go index 7b2a4ce5a..4ebd8c831 100644 --- a/reparo/syncer/syncer_test.go +++ b/reparo/syncer/syncer_test.go @@ -34,7 +34,7 @@ func (s *testSyncerSuite) TestNewSyncer(c *check.C) { } for _, testCase := range testCases { - syncer, err := New(testCase.typeStr, cfg, false) + syncer, err := New(testCase.typeStr, cfg, 16, 20, false) c.Assert(err, check.IsNil) c.Assert(reflect.TypeOf(syncer), testCase.checker, testCase.tp) }