Skip to content

Commit

Permalink
dm: fix validator deadlock and enhance retry (#9522) (#9544)
Browse files Browse the repository at this point in the history
close #9257
  • Loading branch information
ti-chi-bot authored Aug 13, 2023
1 parent 0a53977 commit 10ec2c2
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 42 deletions.
2 changes: 1 addition & 1 deletion dm/pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (
UnsupportedDMLMsgs = []string{
"Error 1062: Duplicate",
"Error 1406: Data too long for column",
"Error 1366",
"Error 1366", // Incorrect %s value: '%s' for column '%s' at row %d
"Error 8025: entry too large",
}

Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/retry/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Strategy interface {
// FiniteRetryStrategy will retry `RetryCount` times when failed to operate DB.
type FiniteRetryStrategy struct{}

// Apply for FiniteRetryStrategy, it wait `FirstRetryDuration` before it starts first retry, and then rest of retries wait time depends on BackoffStrategy.
// Apply for FiniteRetryStrategy, it waits `FirstRetryDuration` before it starts first retry, and then rest of retries wait time depends on BackoffStrategy.
func (*FiniteRetryStrategy) Apply(ctx *tcontext.Context, params Params, operateFn OperateFunc,
) (ret interface{}, i int, err error) {
for ; i < params.RetryCount; i++ {
Expand All @@ -91,7 +91,7 @@ func (*FiniteRetryStrategy) Apply(ctx *tcontext.Context, params Params, operateF
duration = time.Duration(i+1) * params.FirstRetryDuration
default:
}
log.L().Warn("retry stratey takes effect", zap.Error(err), zap.Int("retry_times", i), zap.Int("retry_count", params.RetryCount))
log.L().Warn("retry strategy takes effect", zap.Error(err), zap.Int("retry_times", i), zap.Int("retry_count", params.RetryCount))

select {
case <-ctx.Context().Done():
Expand Down
57 changes: 36 additions & 21 deletions dm/syncer/data_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,21 @@ func (vs *tableValidateStatus) stopped(msg string) {
vs.message = msg
}

// DataValidator
// DataValidator is used to continuously validate incremental data migrated to downstream by dm.
// validator can be start when there's syncer unit in the subtask and validation mode is not none,
// it's terminated when the subtask is terminated.
// stage of validator is independent of subtask, pause/resume subtask doesn't affect the stage of validator.
//
// validator can be in running or stopped stage
// - in running when it's started with subtask or started later on the fly.
// - in stopped when validation stop is executed.
//
// for each subtask, before it's closed/killed, only one DataValidator object is created,
// on "dmctl validation stop/start", will call Stop and Start on the same object.
type DataValidator struct {
// used to sync Stop and Start operations.
sync.RWMutex

cfg *config.SubTaskConfig
syncer *Syncer
// whether validator starts together with subtask
Expand Down Expand Up @@ -198,7 +203,7 @@ type DataValidator struct {

// fields in this field block are guarded by stateMutex
stateMutex sync.RWMutex
stage pb.Stage
stage pb.Stage // only Running or Stopped is allowed for validator
flushedLoc *binlog.Location
result pb.ProcessResult
tableStatus map[string]*tableValidateStatus
Expand Down Expand Up @@ -351,7 +356,7 @@ func (v *DataValidator) Start(expect pb.Stage) {
}

if err := v.initialize(); err != nil {
v.fillResult(err, false)
v.fillResult(err)
return
}

Expand Down Expand Up @@ -425,16 +430,11 @@ func (v *DataValidator) printStatusRoutine() {
}
}

func (v *DataValidator) fillResult(err error, needLock bool) {
if needLock {
v.Lock()
defer v.Unlock()
}

func (v *DataValidator) fillResult(err error) {
// when met a non-retryable error, we'll call stopInner, then v.ctx is cancelled,
// don't set IsCanceled in this case
isCanceled := false
if len(v.result.Errors) == 0 {
if v.getResultErrCnt() == 0 {
select {
case <-v.ctx.Done():
isCanceled = true
Expand All @@ -454,13 +454,25 @@ func (v *DataValidator) fillResult(err error, needLock bool) {

func (v *DataValidator) errorProcessRoutine() {
defer v.errProcessWg.Done()
for err := range v.errChan {
v.fillResult(err, true)

if errors.Cause(err) != context.Canceled {
v.stopInner()
var (
stopped bool
wg sync.WaitGroup
)

for err := range v.errChan {
v.fillResult(err)

if errors.Cause(err) != context.Canceled && !stopped {
stopped = true
wg.Add(1)
go func() {
defer wg.Done()
v.stopInner()
}()
}
}
wg.Wait()
}

func (v *DataValidator) waitSyncerSynced(currLoc binlog.Location) error {
Expand Down Expand Up @@ -724,10 +736,10 @@ func (v *DataValidator) Stop() {

func (v *DataValidator) stopInner() {
v.Lock()
defer v.Unlock()
v.L.Info("stopping")
if v.Stage() != pb.Stage_Running {
v.L.Warn("not started")
v.Unlock()
return
}

Expand All @@ -736,13 +748,10 @@ func (v *DataValidator) stopInner() {
v.fromDB.Close()
v.toDB.Close()

// release the lock so that the error routine can process errors
// wait until all errors are recorded
v.Unlock()
v.wg.Wait()
close(v.errChan) // close error chan after all possible sender goroutines stopped
v.Lock() // lock and modify the stage
defer v.Unlock()
// we want to record all errors, so we need to wait all error sender goroutines to stop
// before closing this error chan.
close(v.errChan)

v.setStage(pb.Stage_Stopped)
v.L.Info("stopped")
Expand Down Expand Up @@ -1105,6 +1114,12 @@ func (v *DataValidator) addResultError(err *pb.ProcessError, cancelled bool) {
v.result.IsCanceled = cancelled
}

func (v *DataValidator) getResultErrCnt() int {
v.stateMutex.Lock()
defer v.stateMutex.Unlock()
return len(v.result.Errors)
}

func (v *DataValidator) resetResult() {
v.stateMutex.Lock()
defer v.stateMutex.Unlock()
Expand Down
47 changes: 44 additions & 3 deletions dm/syncer/data_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ func TestValidatorFillResult(t *testing.T) {
validator.persistHelper.schemaInitialized.Store(true)
validator.Start(pb.Stage_Running)
defer validator.Stop() // in case assert failed before Stop
validator.fillResult(errors.New("test error"), false)
validator.fillResult(errors.New("test error"))
require.Len(t, validator.result.Errors, 1)
validator.fillResult(errors.New("test error"), true)
validator.fillResult(errors.New("test error"))
require.Len(t, validator.result.Errors, 2)
validator.Stop()
validator.fillResult(validator.ctx.Err(), true)
validator.fillResult(validator.ctx.Err())
require.Len(t, validator.result.Errors, 2)
}

Expand Down Expand Up @@ -798,3 +798,44 @@ func TestValidatorMarkReachedSyncerRoutine(t *testing.T) {
validator.wg.Wait()
require.True(t, validator.markErrorStarted.Load())
}

func TestValidatorErrorProcessRoutineDeadlock(t *testing.T) {
cfg := genSubtaskConfig(t)
syncerObj := NewSyncer(cfg, nil, nil)
validator := NewContinuousDataValidator(cfg, syncerObj, false)
validator.ctx, validator.cancel = context.WithCancel(context.Background())
validator.errChan = make(chan error)
validator.setStage(pb.Stage_Running)
validator.streamerController = binlogstream.NewStreamerController4Test(nil, nil)
validator.fromDB = &conn.BaseDB{}
validator.toDB = &conn.BaseDB{}
require.Equal(t, pb.Stage_Running, validator.Stage())

finishedCh := make(chan struct{})
// simulate a worker
validator.wg.Add(1)
go func() {
defer validator.wg.Done()
validator.sendError(errors.New("test error 1"))
validator.sendError(errors.New("test error 2"))
validator.sendError(errors.New("test error 3"))
}()

validator.errProcessWg.Add(1)
go func() {
defer func() {
finishedCh <- struct{}{}
}()
validator.errorProcessRoutine()
}()

select {
case <-finishedCh:
validator.errProcessWg.Wait()
require.Equal(t, pb.Stage_Stopped, validator.Stage())
// all error gathered
require.Len(t, validator.getResult().Errors, 3)
case <-time.After(time.Second * 5):
t.Fatal("deadlock")
}
}
13 changes: 2 additions & 11 deletions dm/syncer/validate_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package syncer
import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"math"
"strconv"
Expand All @@ -25,7 +24,6 @@ import (
"time"

"github.com/docker/go-units"
"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/parser/model"
Expand All @@ -39,6 +37,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/dm/unit"
"github.com/pingcap/tiflow/pkg/sqlmodel"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -519,15 +518,7 @@ func isRetryableValidateError(err error) bool {
}

func isRetryableDBError(err error) bool {
err = errors.Cause(err)
if dbutil.IsRetryableError(err) {
return true
}
switch err {
case driver.ErrBadConn, mysql.ErrInvalidConn:
return true
}
return false
return unit.IsResumableDBError(err)
}

func scanRow(rows *sql.Rows) ([]*sql.NullString, error) {
Expand Down
1 change: 1 addition & 0 deletions dm/syncer/validate_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ func TestValidatorIsRetryableDBError(t *testing.T) {
require.True(t, isRetryableDBError(gmysql.ErrInvalidConn))
require.True(t, isRetryableDBError(driver.ErrBadConn))
require.True(t, isRetryableDBError(errors.Annotate(driver.ErrBadConn, "test")))
require.True(t, isRetryableDBError(errors.New("Error 9005: Region is unavailable")))
}

func TestValidatorRowCountAndSize(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion dm/syncer/validator_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ func (c *validatorPersistHelper) execQueriesWithRetry(tctx *tcontext.Context, qu
if (c.cfg.SourceID == "mysql-replica-01" && i == 3) ||
(c.cfg.SourceID == "mysql-replica-02" && i == 4) {
triggeredFailOnPersistForIntegrationTest = true
failpoint.Return(nil, errors.New("ValidatorFailOnPersist"))
// "Error 1406" is non-resumable error, so we can't retry it
failpoint.Return(nil, errors.New("ValidatorFailOnPersist Error 1366"))
}
}
})
Expand Down
6 changes: 3 additions & 3 deletions dm/syncer/validator_checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ func TestValidatorCheckpointPersist(t *testing.T) {
dbMock.ExpectExec("INSERT INTO .*_validator_pending_change.*VALUES \\(\\?, \\?, \\?, \\?, \\?, \\?\\)").
WillReturnResult(driver.ResultNoRows)
dbMock.ExpectExec("INSERT INTO .*_validator_checkpoint.*ON DUPLICATE.*").
WillReturnError(errors.New("failed on persist checkpoint"))
WillReturnError(errors.New("Error 1366 failed on persist checkpoint"))
require.Nil(t, validator.flushedLoc)
err2 := validator.persistCheckpointAndData(*validator.location)
require.EqualError(t, err2, "failed on persist checkpoint")
require.EqualError(t, err2, "Error 1366 failed on persist checkpoint")
require.Equal(t, int64(100), validator.persistHelper.revision)
require.Len(t, validator.workers[0].errorRows, 1)
require.Nil(t, validator.flushedLoc)
Expand All @@ -157,7 +157,7 @@ func TestValidatorCheckpointPersist(t *testing.T) {
dbMock.ExpectExec("INSERT INTO .*_validator_checkpoint.*ON DUPLICATE.*").
WillReturnResult(driver.ResultNoRows)
dbMock.ExpectExec("DELETE FROM .*_validator_pending_change.*WHERE source = \\? and revision != \\?").
WillReturnError(errors.New("failed on delete pending change"))
WillReturnError(errors.New("Error 1366 failed on delete pending change"))
err2 = validator.persistCheckpointAndData(*validator.location)
require.NoError(t, err2)
require.Equal(t, int64(101), validator.persistHelper.revision)
Expand Down
31 changes: 31 additions & 0 deletions dm/unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/binlog"
"github.com/pingcap/tiflow/dm/pkg/retry"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

Expand Down Expand Up @@ -107,3 +109,32 @@ func JoinProcessErrors(errors []*pb.ProcessError) string {
}
return strings.Join(serrs, ", ")
}

// IsResumableDBError checks whether the error is resumable DB error.
// this is a simplified version of IsResumableError.
// we use a blacklist to filter out some errors which can not be resumed,
// all other errors is resumable.
func IsResumableDBError(err error) bool {
if err == nil {
return true
}

err = errors.Cause(err)
if err == context.Canceled {
return false
}

// not elegant code, because TiDB doesn't expose some error
errStr := strings.ToLower(err.Error())
for _, msg := range retry.UnsupportedDDLMsgs {
if strings.Contains(errStr, strings.ToLower(msg)) {
return false
}
}
for _, msg := range retry.UnsupportedDMLMsgs {
if strings.Contains(errStr, strings.ToLower(msg)) {
return false
}
}
return true
}
31 changes: 31 additions & 0 deletions dm/unit/unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ package unit

import (
"context"
"database/sql/driver"
"testing"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/check"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/stretchr/testify/require"
)

func TestSuite(t *testing.T) {
Expand Down Expand Up @@ -56,3 +60,30 @@ func (t *testUnitSuite) TestJoinProcessErrors(c *check.C) {
c.Assert(JoinProcessErrors(errs), check.Equals,
`ErrCode:10001 ErrClass:"database" ErrScope:"not-set" ErrLevel:"high" Message:"database driver error" Workaround:"Please check the database connection and the database config in configuration file." , ErrCode:36014 ErrClass:"sync-unit" ErrScope:"internal" ErrLevel:"high" Message:"only support ROW format binlog, unexpected DML statement found in query event" `)
}

func TestIsResumableDBError(t *testing.T) {
testCases := []struct {
err error
resumable bool
}{
// only DM new error is checked
{&tmysql.SQLError{Code: 1105, Message: "unsupported modify column length 20 is less than origin 40", State: tmysql.DefaultMySQLState}, false},
{&tmysql.SQLError{Code: 1105, Message: "unsupported drop integer primary key", State: tmysql.DefaultMySQLState}, false},
{terror.ErrDBExecuteFailed.Generate("file test.t3.sql: execute statement failed: USE `test_abc`;: context canceled"), true},
{terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{Code: 1105, Message: "unsupported modify column length 20 is less than origin 40", State: tmysql.DefaultMySQLState}, "alter table t modify col varchar(20)"), false},
{terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{Code: 1105, Message: "unsupported drop integer primary key", State: tmysql.DefaultMySQLState}, "alter table t drop column id"), false},
{terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{Code: 1067, Message: "Invalid default value for 'ct'", State: tmysql.DefaultMySQLState}, "CREATE TABLE `tbl` (`c1` int(11) NOT NULL,`ct` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',PRIMARY KEY (`c1`)) ENGINE=InnoDB DEFAULT CHARSET=latin1"), false},
{terror.ErrDBExecuteFailed.Delegate(errors.New("Error 1062: Duplicate entry '5' for key 'PRIMARY'")), false},
{terror.ErrDBExecuteFailed.Delegate(errors.New("INSERT INTO `db`.`tbl` (`c1`,`c2`) VALUES (?,?);: Error 1406: Data too long for column 'c2' at row 1")), false},
// others
{nil, true},
{errors.New("unknown error"), true},
{driver.ErrBadConn, true},
{mysql.ErrInvalidConn, true},
{context.Canceled, false},
}

for i, tc := range testCases {
require.Equal(t, tc.resumable, IsResumableDBError(tc.err), "case %d", i)
}
}

0 comments on commit 10ec2c2

Please sign in to comment.