From 647516cb95ca325755a242e9723930a2044cf278 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 8 Aug 2023 18:20:02 +0800 Subject: [PATCH] mysql (ticdc): only set write source when downstream is supported (#9175) (#9236) close pingcap/tiflow#9180 --- cdc/sinkv2/eventsink/txn/mysql/mysql.go | 11 ++++++++--- cdc/sinkv2/eventsink/txn/mysql/mysql_test.go | 4 ++++ pkg/applier/redo_test.go | 5 ++++- pkg/sink/mysql/config.go | 6 +++++- 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql.go b/cdc/sinkv2/eventsink/txn/mysql/mysql.go index ff16c529100..c2a99d5efb9 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql.go @@ -103,6 +103,11 @@ func NewMySQLBackends( return nil, err } + cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, db) + if err != nil { + return nil, err + } + db.SetMaxIdleConns(cfg.WorkerCount) db.SetMaxOpenConns(cfg.WorkerCount) @@ -712,7 +717,7 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare } } - // we set write source for each txn, + // we try to set write source for each txn, // so we can use it to trace the data source if err = s.setWriteSource(pctx, tx); err != nil { err := logDMLTxnErr( @@ -805,8 +810,8 @@ func (s *mysqlBackend) setDMLMaxRetry(maxRetry uint64) { // setWriteSource sets write source for the transaction. func (s *mysqlBackend) setWriteSource(ctx context.Context, txn *sql.Tx) error { - // we only set write source when donwstream is TiDB - if !s.cfg.IsTiDB { + // we only set write source when donwstream is TiDB and write source is existed. + if !s.cfg.IsWriteSourceExisted { return nil } // downstream is TiDB, set system variables. diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go index 0f8670528ee..ac4cac5a10e 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go @@ -83,6 +83,10 @@ func newTestMockDB(t *testing.T) (db *sql.DB, mock sqlmock.Sqlmock) { Number: 1305, Message: "FUNCTION test.tidb_version does not exist", }) + mock.ExpectQuery("select tidb_version()").WillReturnError(&dmysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) require.Nil(t, err) return } diff --git a/pkg/applier/redo_test.go b/pkg/applier/redo_test.go index 94e997c19e3..0c3959fe741 100644 --- a/pkg/applier/redo_test.go +++ b/pkg/applier/redo_test.go @@ -236,7 +236,10 @@ func getMockDB(t *testing.T) *sql.DB { Number: 1305, Message: "FUNCTION test.tidb_version does not exist", }) - + mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) mock.ExpectBegin() mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec("create table checkpoint(id int)").WillReturnResult(sqlmock.NewResult(1, 1)) diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index e7fd7493cc2..c3d0e03dbef 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -96,7 +96,11 @@ type Config struct { ForceReplicate bool EnableOldValue bool - IsTiDB bool // IsTiDB is true if the downstream is TiDB + IsTiDB bool // IsTiDB is true if the downstream is TiDB + // IsBDRModeSupported is true if the downstream is TiDB and write source is existed. + // write source exists when the downstream is TiDB and version is greater than or equal to v6.5.0. + IsWriteSourceExisted bool + SourceID uint64 BatchDMLEnable bool MultiStmtEnable bool