diff --git a/dm/dm/master/scheduler/scheduler.go b/dm/dm/master/scheduler/scheduler.go index 7b7dcb2557a..519fabcb804 100644 --- a/dm/dm/master/scheduler/scheduler.go +++ b/dm/dm/master/scheduler/scheduler.go @@ -2269,16 +2269,56 @@ func (s *Scheduler) getNextLoadTaskTransfer(worker, source string) (string, stri return "", "" } -// hasLoadTaskByWorkerAndSource check whether there is a load subtask for the worker and source. +// hasLoadTaskByWorkerAndSource check whether there is an existing load subtask for the worker and source. func (s *Scheduler) hasLoadTaskByWorkerAndSource(worker, source string) bool { - for _, sourceWorkerMap := range s.loadTasks { - if workerName, ok := sourceWorkerMap[source]; ok && workerName == worker { + for taskName, sourceWorkerMap := range s.loadTasks { + // don't consider removed subtask + subtasksV, ok := s.subTaskCfgs.Load(taskName) + if !ok { + continue + } + subtasks := subtasksV.(map[string]config.SubTaskConfig) + if _, ok2 := subtasks[source]; !ok2 { + continue + } + + if workerName, ok2 := sourceWorkerMap[source]; ok2 && workerName == worker { return true } } return false } +// TryResolveLoadTask checks if there are sources whose load task has local files and not bound to the worker which is +// accessible to the local files. If so, trigger a transfer source. +func (s *Scheduler) TryResolveLoadTask(sources []string) { + for _, source := range sources { + s.mu.Lock() + worker, ok := s.bounds[source] + if !ok { + s.mu.Unlock() + continue + } + if err := s.tryResolveLoadTask(worker.baseInfo.Name, source); err != nil { + s.logger.Error("tryResolveLoadTask failed", zap.Error(err)) + } + s.mu.Unlock() + } +} + +func (s *Scheduler) tryResolveLoadTask(originWorker, originSource string) error { + if s.hasLoadTaskByWorkerAndSource(originWorker, originSource) { + return nil + } + + worker, source := s.getNextLoadTaskTransfer(originWorker, originSource) + if worker == "" && source == "" { + return nil + } + + return s.transferWorkerAndSource(originWorker, originSource, worker, source) +} + func (s *Scheduler) handleLoadTaskDel(loadTask ha.LoadTask) error { s.mu.Lock() defer s.mu.Unlock() @@ -2296,16 +2336,7 @@ func (s *Scheduler) handleLoadTaskDel(loadTask ha.LoadTask) error { delete(s.loadTasks, loadTask.Task) } - if s.hasLoadTaskByWorkerAndSource(originWorker, loadTask.Source) { - return nil - } - - worker, source := s.getNextLoadTaskTransfer(originWorker, loadTask.Source) - if worker == "" && source == "" { - return nil - } - - return s.transferWorkerAndSource(originWorker, loadTask.Source, worker, source) + return s.tryResolveLoadTask(originWorker, loadTask.Source) } func (s *Scheduler) handleLoadTaskPut(loadTask ha.LoadTask) { diff --git a/dm/dm/master/scheduler/scheduler_test.go b/dm/dm/master/scheduler/scheduler_test.go index 5335a9daf47..11f4a3f2ec2 100644 --- a/dm/dm/master/scheduler/scheduler_test.go +++ b/dm/dm/master/scheduler/scheduler_test.go @@ -1578,6 +1578,13 @@ func (t *testScheduler) TestWatchLoadTask(c *C) { s.workers[workerName4] = worker4 s.sourceCfgs[sourceID1] = &config.SourceConfig{} s.sourceCfgs[sourceID2] = &config.SourceConfig{} + s.subTaskCfgs.Store(task1, map[string]config.SubTaskConfig{ + sourceID1: {}, + }) + s.subTaskCfgs.Store(task2, map[string]config.SubTaskConfig{ + sourceID1: {}, + sourceID2: {}, + }) worker1.ToFree() c.Assert(s.boundSourceToWorker(sourceID1, worker1), IsNil) @@ -1651,6 +1658,11 @@ func (t *testScheduler) TestWatchLoadTask(c *C) { c.Assert(s.bounds[sourceID2], DeepEquals, worker4) c.Assert(worker2.stage, Equals, WorkerFree) + // after stop-task, hasLoadTaskByWorkerAndSource is no longer valid + c.Assert(s.hasLoadTaskByWorkerAndSource(workerName4, sourceID2), IsTrue) + s.subTaskCfgs.Delete(task2) + c.Assert(s.hasLoadTaskByWorkerAndSource(workerName4, sourceID2), IsFalse) + cancel1() wg.Wait() } diff --git a/dm/dm/master/server.go b/dm/dm/master/server.go index 1f50386f448..47d7508fa9a 100644 --- a/dm/dm/master/server.go +++ b/dm/dm/master/server.go @@ -500,6 +500,8 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S release() } + go s.scheduler.TryResolveLoadTask(sources) + resp.Result = true if cfg.RemoveMeta { resp.Msg = "`remove-meta` in task config is deprecated, please use `start-task ... --remove-meta` instead" diff --git a/dm/tests/_utils/test_prepare b/dm/tests/_utils/test_prepare index 04e3f298fcd..3e581f76d01 100644 --- a/dm/tests/_utils/test_prepare +++ b/dm/tests/_utils/test_prepare @@ -59,13 +59,18 @@ function join_string() { # shortcut for start task on one DM-worker function dmctl_start_task_standalone() { + if [ $# -ge 2 ]; then + remove_meta=$2 + else + remove_meta="" + fi if [ $# -ge 1 ]; then task_conf=$1 else task_conf="$cur/conf/dm-task.yaml" fi run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-task $task_conf" \ + "start-task $task_conf $remove_meta" \ "\"result\": true" 2 \ "\"source\": \"$SOURCE_ID1\"" 1 } diff --git a/dm/tests/load_task/conf/dm-task-standalone.yaml b/dm/tests/load_task/conf/dm-task-standalone.yaml new file mode 100644 index 00000000000..0d293423e43 --- /dev/null +++ b/dm/tests/load_task/conf/dm-task-standalone.yaml @@ -0,0 +1,41 @@ +--- +name: load_task1 +task-mode: all +is-sharding: false +meta-schema: "dm_meta" +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 + +target-database: + host: "127.0.0.1" + port: 4000 + user: "test" + password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +block-allow-list: + instance: + do-dbs: ["load_task1"] + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/dm/tests/load_task/conf/dm-task2-standalone.yaml b/dm/tests/load_task/conf/dm-task2-standalone.yaml new file mode 100644 index 00000000000..bc98e4efac3 --- /dev/null +++ b/dm/tests/load_task/conf/dm-task2-standalone.yaml @@ -0,0 +1,41 @@ +--- +name: load_task2 +task-mode: all +is-sharding: false +meta-schema: "dm_meta" +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 + +target-database: + host: "127.0.0.1" + port: 4000 + user: "test" + password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +block-allow-list: + instance: + do-dbs: ["load_task2"] + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/dm/tests/load_task/run.sh b/dm/tests/load_task/run.sh index 9bd2c791920..24251f6ada6 100755 --- a/dm/tests/load_task/run.sh +++ b/dm/tests/load_task/run.sh @@ -170,6 +170,85 @@ function test_transfer_two_sources() { "\"taskStatus\": \"Running\"" 4 } +function stop_task_left_load() { + echo "start DM master, workers and sources" + run_dm_master $WORK_DIR/master $MASTER_PORT1 $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT1 + + export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/loader/LoadDataSlowDownByTask=return(\"load_task1\")" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + dmctl_start_task_standalone "$cur/conf/dm-task-standalone.yaml" "--remove-meta" + + export GO_FAILPOINTS="" + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # kill worker1, load_task1 will be transferred to worker2, but lack local files + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task1" \ + "different worker in load stage, previous worker: worker1, current worker: worker2" 1 + + # now stop this task without clean meta (left a load_task KV in etcd) + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task load_task1" \ + "\"result\": true" 2 + + dmctl_start_task_standalone "$cur/conf/dm-task2-standalone.yaml" "--remove-meta" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task2" \ + "\"unit\": \"Sync\"" 1 + + # after worker1 goes online, although it has unfinished load_task1, but load_task1 is stopped so should not rebound + export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/loader/LoadDataSlowDownByTask=return(\"load_task1\")" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member --name worker1" \ + "\"source\": \"\"" 1 + + # start-task again, expect the source is auto transferred back + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $cur/conf/dm-task-standalone.yaml" + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member --name worker1" \ + "\"source\": \"mysql-replica-01\"" 1 + + # repeat again and check start-task --remove-meta will not cause transfer + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task load_task1" \ + "\"result\": true" 2 + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task2" \ + "\"unit\": \"Sync\"" 1 + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + dmctl_start_task_standalone "$cur/conf/dm-task-standalone.yaml" "--remove-meta" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task1" \ + "\"unit\": \"Sync\"" 1 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member --name worker1" \ + "\"source\": \"\"" 1 + + cleanup_process $* + cleanup_data load_task1 + cleanup_data load_task2 +} + function run() { echo "import prepare data" run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 @@ -177,6 +256,8 @@ function run() { run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 check_contains 'Query OK, 3 rows affected' + stop_task_left_load + echo "start DM master, workers and sources" run_dm_master $WORK_DIR/master $MASTER_PORT1 $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT1