Skip to content

Commit

Permalink
mysql (ticdc): only set write source when downstream is supported (#9175
Browse files Browse the repository at this point in the history
) (#9236)

close #9180
  • Loading branch information
ti-chi-bot authored Aug 8, 2023
1 parent dcf789a commit 647516c
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 5 deletions.
11 changes: 8 additions & 3 deletions cdc/sinkv2/eventsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func NewMySQLBackends(
return nil, err
}

cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, db)
if err != nil {
return nil, err
}

db.SetMaxIdleConns(cfg.WorkerCount)
db.SetMaxOpenConns(cfg.WorkerCount)

Expand Down Expand Up @@ -712,7 +717,7 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
}
}

// we set write source for each txn,
// we try to set write source for each txn,
// so we can use it to trace the data source
if err = s.setWriteSource(pctx, tx); err != nil {
err := logDMLTxnErr(
Expand Down Expand Up @@ -805,8 +810,8 @@ func (s *mysqlBackend) setDMLMaxRetry(maxRetry uint64) {

// setWriteSource sets write source for the transaction.
func (s *mysqlBackend) setWriteSource(ctx context.Context, txn *sql.Tx) error {
// we only set write source when donwstream is TiDB
if !s.cfg.IsTiDB {
// we only set write source when donwstream is TiDB and write source is existed.
if !s.cfg.IsWriteSourceExisted {
return nil
}
// downstream is TiDB, set system variables.
Expand Down
4 changes: 4 additions & 0 deletions cdc/sinkv2/eventsink/txn/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func newTestMockDB(t *testing.T) (db *sql.DB, mock sqlmock.Sqlmock) {
Number: 1305,
Message: "FUNCTION test.tidb_version does not exist",
})
mock.ExpectQuery("select tidb_version()").WillReturnError(&dmysql.MySQLError{
Number: 1305,
Message: "FUNCTION test.tidb_version does not exist",
})
require.Nil(t, err)
return
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/applier/redo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,10 @@ func getMockDB(t *testing.T) *sql.DB {
Number: 1305,
Message: "FUNCTION test.tidb_version does not exist",
})

mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{
Number: 1305,
Message: "FUNCTION test.tidb_version does not exist",
})
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("create table checkpoint(id int)").WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down
6 changes: 5 additions & 1 deletion pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ type Config struct {
ForceReplicate bool
EnableOldValue bool

IsTiDB bool // IsTiDB is true if the downstream is TiDB
IsTiDB bool // IsTiDB is true if the downstream is TiDB
// IsBDRModeSupported is true if the downstream is TiDB and write source is existed.
// write source exists when the downstream is TiDB and version is greater than or equal to v6.5.0.
IsWriteSourceExisted bool

SourceID uint64
BatchDMLEnable bool
MultiStmtEnable bool
Expand Down

0 comments on commit 647516c

Please sign in to comment.