From 10ec2c2ee4e1b301b1467346ec7ec71db11db802 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Sun, 13 Aug 2023 23:01:58 +0800 Subject: [PATCH] dm: fix validator deadlock and enhance retry (#9522) (#9544) close pingcap/tiflow#9257 --- dm/pkg/retry/errors.go | 2 +- dm/pkg/retry/strategy.go | 4 +- dm/syncer/data_validator.go | 57 ++++++++++++++++---------- dm/syncer/data_validator_test.go | 47 +++++++++++++++++++-- dm/syncer/validate_worker.go | 13 +----- dm/syncer/validate_worker_test.go | 1 + dm/syncer/validator_checkpoint.go | 3 +- dm/syncer/validator_checkpoint_test.go | 6 +-- dm/unit/unit.go | 31 ++++++++++++++ dm/unit/unit_test.go | 31 ++++++++++++++ 10 files changed, 153 insertions(+), 42 deletions(-) diff --git a/dm/pkg/retry/errors.go b/dm/pkg/retry/errors.go index 0cea4bb7619..47d5b0c28c4 100644 --- a/dm/pkg/retry/errors.go +++ b/dm/pkg/retry/errors.go @@ -46,7 +46,7 @@ var ( UnsupportedDMLMsgs = []string{ "Error 1062: Duplicate", "Error 1406: Data too long for column", - "Error 1366", + "Error 1366", // Incorrect %s value: '%s' for column '%s' at row %d "Error 8025: entry too large", } diff --git a/dm/pkg/retry/strategy.go b/dm/pkg/retry/strategy.go index 5b8d923f9e4..26701e72994 100644 --- a/dm/pkg/retry/strategy.go +++ b/dm/pkg/retry/strategy.go @@ -77,7 +77,7 @@ type Strategy interface { // FiniteRetryStrategy will retry `RetryCount` times when failed to operate DB. type FiniteRetryStrategy struct{} -// Apply for FiniteRetryStrategy, it wait `FirstRetryDuration` before it starts first retry, and then rest of retries wait time depends on BackoffStrategy. +// Apply for FiniteRetryStrategy, it waits `FirstRetryDuration` before it starts first retry, and then rest of retries wait time depends on BackoffStrategy. func (*FiniteRetryStrategy) Apply(ctx *tcontext.Context, params Params, operateFn OperateFunc, ) (ret interface{}, i int, err error) { for ; i < params.RetryCount; i++ { @@ -91,7 +91,7 @@ func (*FiniteRetryStrategy) Apply(ctx *tcontext.Context, params Params, operateF duration = time.Duration(i+1) * params.FirstRetryDuration default: } - log.L().Warn("retry stratey takes effect", zap.Error(err), zap.Int("retry_times", i), zap.Int("retry_count", params.RetryCount)) + log.L().Warn("retry strategy takes effect", zap.Error(err), zap.Int("retry_times", i), zap.Int("retry_count", params.RetryCount)) select { case <-ctx.Context().Done(): diff --git a/dm/syncer/data_validator.go b/dm/syncer/data_validator.go index b6208bf4f29..69730b2c99a 100644 --- a/dm/syncer/data_validator.go +++ b/dm/syncer/data_validator.go @@ -155,7 +155,7 @@ func (vs *tableValidateStatus) stopped(msg string) { vs.message = msg } -// DataValidator +// DataValidator is used to continuously validate incremental data migrated to downstream by dm. // validator can be start when there's syncer unit in the subtask and validation mode is not none, // it's terminated when the subtask is terminated. // stage of validator is independent of subtask, pause/resume subtask doesn't affect the stage of validator. @@ -163,8 +163,13 @@ func (vs *tableValidateStatus) stopped(msg string) { // validator can be in running or stopped stage // - in running when it's started with subtask or started later on the fly. // - in stopped when validation stop is executed. +// +// for each subtask, before it's closed/killed, only one DataValidator object is created, +// on "dmctl validation stop/start", will call Stop and Start on the same object. type DataValidator struct { + // used to sync Stop and Start operations. sync.RWMutex + cfg *config.SubTaskConfig syncer *Syncer // whether validator starts together with subtask @@ -198,7 +203,7 @@ type DataValidator struct { // fields in this field block are guarded by stateMutex stateMutex sync.RWMutex - stage pb.Stage + stage pb.Stage // only Running or Stopped is allowed for validator flushedLoc *binlog.Location result pb.ProcessResult tableStatus map[string]*tableValidateStatus @@ -351,7 +356,7 @@ func (v *DataValidator) Start(expect pb.Stage) { } if err := v.initialize(); err != nil { - v.fillResult(err, false) + v.fillResult(err) return } @@ -425,16 +430,11 @@ func (v *DataValidator) printStatusRoutine() { } } -func (v *DataValidator) fillResult(err error, needLock bool) { - if needLock { - v.Lock() - defer v.Unlock() - } - +func (v *DataValidator) fillResult(err error) { // when met a non-retryable error, we'll call stopInner, then v.ctx is cancelled, // don't set IsCanceled in this case isCanceled := false - if len(v.result.Errors) == 0 { + if v.getResultErrCnt() == 0 { select { case <-v.ctx.Done(): isCanceled = true @@ -454,13 +454,25 @@ func (v *DataValidator) fillResult(err error, needLock bool) { func (v *DataValidator) errorProcessRoutine() { defer v.errProcessWg.Done() - for err := range v.errChan { - v.fillResult(err, true) - if errors.Cause(err) != context.Canceled { - v.stopInner() + var ( + stopped bool + wg sync.WaitGroup + ) + + for err := range v.errChan { + v.fillResult(err) + + if errors.Cause(err) != context.Canceled && !stopped { + stopped = true + wg.Add(1) + go func() { + defer wg.Done() + v.stopInner() + }() } } + wg.Wait() } func (v *DataValidator) waitSyncerSynced(currLoc binlog.Location) error { @@ -724,10 +736,10 @@ func (v *DataValidator) Stop() { func (v *DataValidator) stopInner() { v.Lock() + defer v.Unlock() v.L.Info("stopping") if v.Stage() != pb.Stage_Running { v.L.Warn("not started") - v.Unlock() return } @@ -736,13 +748,10 @@ func (v *DataValidator) stopInner() { v.fromDB.Close() v.toDB.Close() - // release the lock so that the error routine can process errors - // wait until all errors are recorded - v.Unlock() v.wg.Wait() - close(v.errChan) // close error chan after all possible sender goroutines stopped - v.Lock() // lock and modify the stage - defer v.Unlock() + // we want to record all errors, so we need to wait all error sender goroutines to stop + // before closing this error chan. + close(v.errChan) v.setStage(pb.Stage_Stopped) v.L.Info("stopped") @@ -1105,6 +1114,12 @@ func (v *DataValidator) addResultError(err *pb.ProcessError, cancelled bool) { v.result.IsCanceled = cancelled } +func (v *DataValidator) getResultErrCnt() int { + v.stateMutex.Lock() + defer v.stateMutex.Unlock() + return len(v.result.Errors) +} + func (v *DataValidator) resetResult() { v.stateMutex.Lock() defer v.stateMutex.Unlock() diff --git a/dm/syncer/data_validator_test.go b/dm/syncer/data_validator_test.go index 64a8dc312ea..43d49fc7479 100644 --- a/dm/syncer/data_validator_test.go +++ b/dm/syncer/data_validator_test.go @@ -163,12 +163,12 @@ func TestValidatorFillResult(t *testing.T) { validator.persistHelper.schemaInitialized.Store(true) validator.Start(pb.Stage_Running) defer validator.Stop() // in case assert failed before Stop - validator.fillResult(errors.New("test error"), false) + validator.fillResult(errors.New("test error")) require.Len(t, validator.result.Errors, 1) - validator.fillResult(errors.New("test error"), true) + validator.fillResult(errors.New("test error")) require.Len(t, validator.result.Errors, 2) validator.Stop() - validator.fillResult(validator.ctx.Err(), true) + validator.fillResult(validator.ctx.Err()) require.Len(t, validator.result.Errors, 2) } @@ -798,3 +798,44 @@ func TestValidatorMarkReachedSyncerRoutine(t *testing.T) { validator.wg.Wait() require.True(t, validator.markErrorStarted.Load()) } + +func TestValidatorErrorProcessRoutineDeadlock(t *testing.T) { + cfg := genSubtaskConfig(t) + syncerObj := NewSyncer(cfg, nil, nil) + validator := NewContinuousDataValidator(cfg, syncerObj, false) + validator.ctx, validator.cancel = context.WithCancel(context.Background()) + validator.errChan = make(chan error) + validator.setStage(pb.Stage_Running) + validator.streamerController = binlogstream.NewStreamerController4Test(nil, nil) + validator.fromDB = &conn.BaseDB{} + validator.toDB = &conn.BaseDB{} + require.Equal(t, pb.Stage_Running, validator.Stage()) + + finishedCh := make(chan struct{}) + // simulate a worker + validator.wg.Add(1) + go func() { + defer validator.wg.Done() + validator.sendError(errors.New("test error 1")) + validator.sendError(errors.New("test error 2")) + validator.sendError(errors.New("test error 3")) + }() + + validator.errProcessWg.Add(1) + go func() { + defer func() { + finishedCh <- struct{}{} + }() + validator.errorProcessRoutine() + }() + + select { + case <-finishedCh: + validator.errProcessWg.Wait() + require.Equal(t, pb.Stage_Stopped, validator.Stage()) + // all error gathered + require.Len(t, validator.getResult().Errors, 3) + case <-time.After(time.Second * 5): + t.Fatal("deadlock") + } +} diff --git a/dm/syncer/validate_worker.go b/dm/syncer/validate_worker.go index 7c79d291ed1..099ae04d164 100644 --- a/dm/syncer/validate_worker.go +++ b/dm/syncer/validate_worker.go @@ -16,7 +16,6 @@ package syncer import ( "context" "database/sql" - "database/sql/driver" "fmt" "math" "strconv" @@ -25,7 +24,6 @@ import ( "time" "github.com/docker/go-units" - "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/parser/model" @@ -39,6 +37,7 @@ import ( "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/terror" "github.com/pingcap/tiflow/dm/pkg/utils" + "github.com/pingcap/tiflow/dm/unit" "github.com/pingcap/tiflow/pkg/sqlmodel" "go.uber.org/atomic" "go.uber.org/zap" @@ -519,15 +518,7 @@ func isRetryableValidateError(err error) bool { } func isRetryableDBError(err error) bool { - err = errors.Cause(err) - if dbutil.IsRetryableError(err) { - return true - } - switch err { - case driver.ErrBadConn, mysql.ErrInvalidConn: - return true - } - return false + return unit.IsResumableDBError(err) } func scanRow(rows *sql.Rows) ([]*sql.NullString, error) { diff --git a/dm/syncer/validate_worker_test.go b/dm/syncer/validate_worker_test.go index 9421afd27c4..745ad16cad8 100644 --- a/dm/syncer/validate_worker_test.go +++ b/dm/syncer/validate_worker_test.go @@ -530,6 +530,7 @@ func TestValidatorIsRetryableDBError(t *testing.T) { require.True(t, isRetryableDBError(gmysql.ErrInvalidConn)) require.True(t, isRetryableDBError(driver.ErrBadConn)) require.True(t, isRetryableDBError(errors.Annotate(driver.ErrBadConn, "test"))) + require.True(t, isRetryableDBError(errors.New("Error 9005: Region is unavailable"))) } func TestValidatorRowCountAndSize(t *testing.T) { diff --git a/dm/syncer/validator_checkpoint.go b/dm/syncer/validator_checkpoint.go index cc1dbe05093..d05731fac17 100644 --- a/dm/syncer/validator_checkpoint.go +++ b/dm/syncer/validator_checkpoint.go @@ -226,7 +226,8 @@ func (c *validatorPersistHelper) execQueriesWithRetry(tctx *tcontext.Context, qu if (c.cfg.SourceID == "mysql-replica-01" && i == 3) || (c.cfg.SourceID == "mysql-replica-02" && i == 4) { triggeredFailOnPersistForIntegrationTest = true - failpoint.Return(nil, errors.New("ValidatorFailOnPersist")) + // "Error 1406" is non-resumable error, so we can't retry it + failpoint.Return(nil, errors.New("ValidatorFailOnPersist Error 1366")) } } }) diff --git a/dm/syncer/validator_checkpoint_test.go b/dm/syncer/validator_checkpoint_test.go index 4bc76ef6d8d..4e76a0e9bc2 100644 --- a/dm/syncer/validator_checkpoint_test.go +++ b/dm/syncer/validator_checkpoint_test.go @@ -138,10 +138,10 @@ func TestValidatorCheckpointPersist(t *testing.T) { dbMock.ExpectExec("INSERT INTO .*_validator_pending_change.*VALUES \\(\\?, \\?, \\?, \\?, \\?, \\?\\)"). WillReturnResult(driver.ResultNoRows) dbMock.ExpectExec("INSERT INTO .*_validator_checkpoint.*ON DUPLICATE.*"). - WillReturnError(errors.New("failed on persist checkpoint")) + WillReturnError(errors.New("Error 1366 failed on persist checkpoint")) require.Nil(t, validator.flushedLoc) err2 := validator.persistCheckpointAndData(*validator.location) - require.EqualError(t, err2, "failed on persist checkpoint") + require.EqualError(t, err2, "Error 1366 failed on persist checkpoint") require.Equal(t, int64(100), validator.persistHelper.revision) require.Len(t, validator.workers[0].errorRows, 1) require.Nil(t, validator.flushedLoc) @@ -157,7 +157,7 @@ func TestValidatorCheckpointPersist(t *testing.T) { dbMock.ExpectExec("INSERT INTO .*_validator_checkpoint.*ON DUPLICATE.*"). WillReturnResult(driver.ResultNoRows) dbMock.ExpectExec("DELETE FROM .*_validator_pending_change.*WHERE source = \\? and revision != \\?"). - WillReturnError(errors.New("failed on delete pending change")) + WillReturnError(errors.New("Error 1366 failed on delete pending change")) err2 = validator.persistCheckpointAndData(*validator.location) require.NoError(t, err2) require.Equal(t, int64(101), validator.persistHelper.revision) diff --git a/dm/unit/unit.go b/dm/unit/unit.go index 85c03f5b248..8ea27b5f14f 100644 --- a/dm/unit/unit.go +++ b/dm/unit/unit.go @@ -18,9 +18,11 @@ import ( "strings" "time" + "github.com/pingcap/errors" "github.com/pingcap/tiflow/dm/config" "github.com/pingcap/tiflow/dm/pb" "github.com/pingcap/tiflow/dm/pkg/binlog" + "github.com/pingcap/tiflow/dm/pkg/retry" "github.com/pingcap/tiflow/dm/pkg/terror" ) @@ -107,3 +109,32 @@ func JoinProcessErrors(errors []*pb.ProcessError) string { } return strings.Join(serrs, ", ") } + +// IsResumableDBError checks whether the error is resumable DB error. +// this is a simplified version of IsResumableError. +// we use a blacklist to filter out some errors which can not be resumed, +// all other errors is resumable. +func IsResumableDBError(err error) bool { + if err == nil { + return true + } + + err = errors.Cause(err) + if err == context.Canceled { + return false + } + + // not elegant code, because TiDB doesn't expose some error + errStr := strings.ToLower(err.Error()) + for _, msg := range retry.UnsupportedDDLMsgs { + if strings.Contains(errStr, strings.ToLower(msg)) { + return false + } + } + for _, msg := range retry.UnsupportedDMLMsgs { + if strings.Contains(errStr, strings.ToLower(msg)) { + return false + } + } + return true +} diff --git a/dm/unit/unit_test.go b/dm/unit/unit_test.go index 607211c7fbe..f7239840b23 100644 --- a/dm/unit/unit_test.go +++ b/dm/unit/unit_test.go @@ -15,12 +15,16 @@ package unit import ( "context" + "database/sql/driver" "testing" + "github.com/go-sql-driver/mysql" "github.com/pingcap/check" "github.com/pingcap/errors" + tmysql "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/dm/pb" "github.com/pingcap/tiflow/dm/pkg/terror" + "github.com/stretchr/testify/require" ) func TestSuite(t *testing.T) { @@ -56,3 +60,30 @@ func (t *testUnitSuite) TestJoinProcessErrors(c *check.C) { c.Assert(JoinProcessErrors(errs), check.Equals, `ErrCode:10001 ErrClass:"database" ErrScope:"not-set" ErrLevel:"high" Message:"database driver error" Workaround:"Please check the database connection and the database config in configuration file." , ErrCode:36014 ErrClass:"sync-unit" ErrScope:"internal" ErrLevel:"high" Message:"only support ROW format binlog, unexpected DML statement found in query event" `) } + +func TestIsResumableDBError(t *testing.T) { + testCases := []struct { + err error + resumable bool + }{ + // only DM new error is checked + {&tmysql.SQLError{Code: 1105, Message: "unsupported modify column length 20 is less than origin 40", State: tmysql.DefaultMySQLState}, false}, + {&tmysql.SQLError{Code: 1105, Message: "unsupported drop integer primary key", State: tmysql.DefaultMySQLState}, false}, + {terror.ErrDBExecuteFailed.Generate("file test.t3.sql: execute statement failed: USE `test_abc`;: context canceled"), true}, + {terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{Code: 1105, Message: "unsupported modify column length 20 is less than origin 40", State: tmysql.DefaultMySQLState}, "alter table t modify col varchar(20)"), false}, + {terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{Code: 1105, Message: "unsupported drop integer primary key", State: tmysql.DefaultMySQLState}, "alter table t drop column id"), false}, + {terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{Code: 1067, Message: "Invalid default value for 'ct'", State: tmysql.DefaultMySQLState}, "CREATE TABLE `tbl` (`c1` int(11) NOT NULL,`ct` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',PRIMARY KEY (`c1`)) ENGINE=InnoDB DEFAULT CHARSET=latin1"), false}, + {terror.ErrDBExecuteFailed.Delegate(errors.New("Error 1062: Duplicate entry '5' for key 'PRIMARY'")), false}, + {terror.ErrDBExecuteFailed.Delegate(errors.New("INSERT INTO `db`.`tbl` (`c1`,`c2`) VALUES (?,?);: Error 1406: Data too long for column 'c2' at row 1")), false}, + // others + {nil, true}, + {errors.New("unknown error"), true}, + {driver.ErrBadConn, true}, + {mysql.ErrInvalidConn, true}, + {context.Canceled, false}, + } + + for i, tc := range testCases { + require.Equal(t, tc.resumable, IsResumableDBError(tc.err), "case %d", i) + } +}