From a8a59e61aa8cf446d38597d3d36ed4a8225ce9a2 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Mon, 2 Nov 2020 21:50:09 +0800 Subject: [PATCH] shardddl: add infoOpLock for pessmist shardddl (#1257) (#1263) Signed-off-by: ti-srebot Co-authored-by: GMHDBJD <35025882+GMHDBJD@users.noreply.github.com> --- dm/master/shardddl/pessimist.go | 47 ++++++++++++++++++++++++++++++--- tests/_utils/test_prepare | 2 +- tests/shardddl3/run.sh | 13 +++++++-- 3 files changed, 55 insertions(+), 7 deletions(-) diff --git a/dm/master/shardddl/pessimist.go b/dm/master/shardddl/pessimist.go index 306ad0e470..d47eb69e44 100644 --- a/dm/master/shardddl/pessimist.go +++ b/dm/master/shardddl/pessimist.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/shardddl/pessimism" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/failpoint" ) var ( @@ -52,6 +53,8 @@ type Pessimist struct { // taskSources used to get all sources relative to the given task. taskSources func(task string) []string + + infoOpMu sync.Mutex } // NewPessimist creates a new Pessimist instance. @@ -464,17 +467,18 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I return } p.logger.Info("receive a shard DDL info", zap.Stringer("info", info)) + + p.infoOpMu.Lock() lockID, synced, remain, err := p.lk.TrySync(info, p.taskSources(info.Task)) if err != nil { - // if the lock become synced, and `done` for `exec`/`skip` operation received, - // but the `done` operations have not been deleted, - // then the DM-worker should not put any new DDL info until the old operation has been deleted. p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err)) // currently, only DDL mismatch will cause error metrics.ReportDDLError(info.Task, metrics.InfoErrSyncLock) + p.infoOpMu.Unlock() continue } else if !synced { p.logger.Info("the shard DDL lock has not synced", zap.String("lock", lockID), zap.Int("remain", remain)) + p.infoOpMu.Unlock() continue } p.logger.Info("the shard DDL lock has synced", zap.String("lock", lockID)) @@ -483,8 +487,8 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I if err != nil { p.logger.Error("fail to handle the shard DDL lock", zap.String("lock", lockID), log.ShortError(err)) metrics.ReportDDLError(info.Task, metrics.InfoErrHandleLock) - continue } + p.infoOpMu.Unlock() } } } @@ -505,14 +509,17 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis continue } + p.infoOpMu.Lock() lock := p.lk.FindLock(op.ID) if lock == nil { p.logger.Warn("no lock for the shard DDL lock operation exist", zap.Stringer("operation", op)) + p.infoOpMu.Unlock() continue } else if synced, _ := lock.IsSynced(); !synced { // this should not happen in normal case. p.logger.Warn("the lock for the shard DDL lock operation has not synced", zap.Stringer("operation", op)) metrics.ReportDDLError(op.Task, metrics.OpErrLockUnSynced) + p.infoOpMu.Unlock() continue } @@ -527,6 +534,7 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis metrics.ReportDDLError(op.Task, metrics.OpErrRemoveLock) } p.logger.Info("the lock info for the shard DDL lock operation has been cleared", zap.Stringer("operation", op)) + p.infoOpMu.Unlock() continue } @@ -534,6 +542,7 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis // still need to wait for more `done` from other non-owner dm-worker instances. if op.Source != lock.Owner { p.logger.Info("the shard DDL lock operation of a non-owner has done", zap.Stringer("operation", op), zap.String("owner", lock.Owner)) + p.infoOpMu.Unlock() continue } @@ -544,6 +553,7 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis p.logger.Error("fail to put skip shard DDL lock operations for non-owner", zap.String("lock", lock.ID), log.ShortError(err)) metrics.ReportDDLError(op.Task, metrics.OpErrPutNonOwnerOp) } + p.infoOpMu.Unlock() } } } @@ -631,6 +641,35 @@ func (p *Pessimist) removeLock(lock *pessimism.Lock) error { if err != nil { return err } + + failpoint.Inject("SleepWhenRemoveLock", func(val failpoint.Value) { + t := val.(int) + log.L().Info("wait new ddl info putted into etcd", + zap.String("failpoint", "SleepWhenRemoveLock"), + zap.Int("max wait second", t)) + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + timer := time.NewTimer(time.Duration(t) * time.Second) + defer timer.Stop() + OUTER: + for { + select { + case <-timer.C: + log.L().Info("failed to wait new DDL info", zap.Int("wait second", t)) + break OUTER + case <-ticker.C: + // manually check etcd + infos, _, err := pessimism.GetAllInfo(p.cli) + if err == nil { + if _, ok := infos[lock.Task]; ok { + log.L().Info("found new DDL info") + break OUTER + } + } + } + } + }) p.lk.RemoveLock(lock.ID) metrics.ReportDDLPending(lock.Task, metrics.DDLPendingSynced, metrics.DDLPendingNone) return nil diff --git a/tests/_utils/test_prepare b/tests/_utils/test_prepare index 6f2ddb7981..579a04d9ec 100644 --- a/tests/_utils/test_prepare +++ b/tests/_utils/test_prepare @@ -197,7 +197,7 @@ function check_log_contain_with_retry() { log2=$3 fi rc=0 - for ((k=1;k<11;k++)); do + for ((k=1;k<31;k++)); do if [[ ! -f $log1 ]]; then sleep 2 echo "check log contain failed $k-th time (file not exist), retry later" diff --git a/tests/shardddl3/run.sh b/tests/shardddl3/run.sh index 5821dc99de..5bc8c7efd9 100644 --- a/tests/shardddl3/run.sh +++ b/tests/shardddl3/run.sh @@ -686,7 +686,11 @@ function DM_RemoveLock_CASE() { check_log_contain_with_retry "wait new ddl info putted into etcd" $WORK_DIR/master/log/dm-master.log run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;" - check_log_contain_with_retry "fail to delete shard DDL infos and lock operations" $WORK_DIR/master/log/dm-master.log + if [[ "$1" = "pessimistic" ]]; then + check_log_contain_with_retry "found new DDL info" $WORK_DIR/master/log/dm-master.log + else + check_log_contain_with_retry "fail to delete shard DDL infos and lock operations" $WORK_DIR/master/log/dm-master.log + fi run_sql_source1 "alter table ${shardddl1}.${tb1} change a a bigint default 10;" run_sql_source2 "alter table ${shardddl1}.${tb1} drop column b;" @@ -700,13 +704,18 @@ function DM_RemoveLock_CASE() { function DM_RemoveLock() { ps aux | grep dm-master |awk '{print $2}'|xargs kill || true check_port_offline $MASTER_PORT1 20 - export GO_FAILPOINTS="github.com/pingcap/dm/dm/master/shardddl/SleepWhenRemoveLock=return(10)" + export GO_FAILPOINTS="github.com/pingcap/dm/dm/master/shardddl/SleepWhenRemoveLock=return(30)" run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "list-member -w" \ "bound" 2 + run_case RemoveLock "double-source-pessimistic" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \ + run_sql_source2 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \ + run_sql_source2 \"create table ${shardddl1}.${tb2} (a int, b varchar(10));\"" \ + "clean_table" "pessimistic" run_case RemoveLock "double-source-optimistic" \ "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \ run_sql_source2 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \