Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm(engine): declarative command should consider time #7640

Merged
merged 10 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions engine/executor/dm/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,15 @@ func (w *dmWorker) tryUpdateStatus(ctx context.Context) error {
// workerStatus gets worker status.
func (w *dmWorker) workerStatus(ctx context.Context) frameModel.WorkerStatus {
var (
stage = w.getStage()
code frameModel.WorkerState
taskStatus = &runtime.TaskStatus{Unit: w.workerType, Task: w.taskID, Stage: stage, CfgModRevision: w.cfgModRevision}
stage = w.getStage()
code frameModel.WorkerState
taskStatus = &runtime.TaskStatus{
Unit: w.workerType,
Task: w.taskID,
Stage: stage,
StageUpdatedTime: time.Now(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this assign tasks no effect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to left this one, remove the write to StageUpdatedTime at jobmaster side

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only update task status when task stage changed, so the StageUpdatedTime may not change for a long time, does this meet user expectations

CfgModRevision: w.cfgModRevision,
}
finalStatus any
)
if stage == metadata.StageFinished {
Expand Down
40 changes: 24 additions & 16 deletions engine/jobmaster/dm/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,20 +131,22 @@ func TestQueryStatusAPI(t *testing.T) {
"task2": {
&metadata.FinishedTaskStatus{
TaskStatus: metadata.TaskStatus{
Unit: frameModel.WorkerDMDump,
Task: "task2",
Stage: metadata.StageFinished,
CfgModRevision: 3,
Unit: frameModel.WorkerDMDump,
Task: "task2",
Stage: metadata.StageFinished,
CfgModRevision: 3,
StageUpdatedTime: loadTime,
},
Status: dumpStatusBytes,
Duration: dumpDuration,
},
&metadata.FinishedTaskStatus{
TaskStatus: metadata.TaskStatus{
Unit: frameModel.WorkerDMLoad,
Task: "task2",
Stage: metadata.StageFinished,
CfgModRevision: 3,
Unit: frameModel.WorkerDMLoad,
Task: "task2",
Stage: metadata.StageFinished,
CfgModRevision: 3,
StageUpdatedTime: syncTime,
},
Status: loadStatusBytes,
Duration: loadDuration,
Expand All @@ -153,20 +155,22 @@ func TestQueryStatusAPI(t *testing.T) {
"task7": {
&metadata.FinishedTaskStatus{
TaskStatus: metadata.TaskStatus{
Unit: frameModel.WorkerDMDump,
Task: "task7",
Stage: metadata.StageFinished,
CfgModRevision: 4,
Unit: frameModel.WorkerDMDump,
Task: "task7",
Stage: metadata.StageFinished,
CfgModRevision: 4,
StageUpdatedTime: loadTime,
},
Status: dumpStatusBytes,
Duration: dumpDuration,
},
&metadata.FinishedTaskStatus{
TaskStatus: metadata.TaskStatus{
Unit: frameModel.WorkerDMLoad,
Task: "task7",
Stage: metadata.StageFinished,
CfgModRevision: 4,
Unit: frameModel.WorkerDMLoad,
Task: "task7",
Stage: metadata.StageFinished,
CfgModRevision: 4,
StageUpdatedTime: syncTime,
},
Status: loadStatusBytes,
Duration: loadDuration,
Expand Down Expand Up @@ -369,6 +373,7 @@ func TestQueryStatusAPI(t *testing.T) {
"Task": "task2",
"Stage": "Finished",
"CfgModRevision": 3,
"StageUpdatedTime": "2022-11-04T19:47:57.43382274+08:00",
"Result": null,
"Status": {
"totalTables": 10,
Expand All @@ -386,6 +391,7 @@ func TestQueryStatusAPI(t *testing.T) {
"Task": "task2",
"Stage": "Finished",
"CfgModRevision": 3,
"StageUpdatedTime": "2022-11-04T20:47:57.43382274+08:00",
"Result": null,
"Status": {
"finishedBytes": 4,
Expand All @@ -404,6 +410,7 @@ func TestQueryStatusAPI(t *testing.T) {
"Task": "task7",
"Stage": "Finished",
"CfgModRevision": 4,
"StageUpdatedTime": "2022-11-04T19:47:57.43382274+08:00",
"Result": null,
"Status": {
"totalTables": 10,
Expand All @@ -421,6 +428,7 @@ func TestQueryStatusAPI(t *testing.T) {
"Task": "task7",
"Stage": "Finished",
"CfgModRevision": 4,
"StageUpdatedTime": "2022-11-04T20:47:57.43382274+08:00",
"Result": null,
"Status": {
"finishedBytes": 4,
Expand Down
11 changes: 9 additions & 2 deletions engine/jobmaster/dm/dm_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ func (j dmJobMasterFactory) DeserializeConfig(configBytes []byte) (registry.Work
}

// NewWorkerImpl implements WorkerFactory.NewWorkerImpl
func (j dmJobMasterFactory) NewWorkerImpl(dCtx *dcontext.Context, workerID frameModel.WorkerID, masterID frameModel.MasterID, conf framework.WorkerConfig) (framework.WorkerImpl, error) {
func (j dmJobMasterFactory) NewWorkerImpl(
dCtx *dcontext.Context,
workerID frameModel.WorkerID,
masterID frameModel.MasterID,
conf framework.WorkerConfig,
) (framework.WorkerImpl, error) {
log.L().Info("new dm jobmaster", zap.String(logutil.ConstFieldJobKey, workerID))
jm := &JobMaster{
initJobCfg: conf.(*config.JobCfg),
Expand Down Expand Up @@ -183,6 +188,7 @@ func (jm *JobMaster) OnWorkerOnline(worker framework.WorkerHandle) error {
return jm.handleOnlineStatus(worker)
}

// handleOnlineStatus is used by OnWorkerOnline and OnWorkerStatusUpdated.
func (jm *JobMaster) handleOnlineStatus(worker framework.WorkerHandle) error {
var taskStatus runtime.TaskStatus
if err := json.Unmarshal(worker.Status().ExtBytes, &taskStatus); err != nil {
Expand Down Expand Up @@ -225,7 +231,8 @@ func (jm *JobMaster) onWorkerFinished(finishedTaskStatus runtime.FinishedTaskSta

unitStateStore := jm.metadata.UnitStateStore()
err := unitStateStore.ReadModifyWrite(context.TODO(), func(state *metadata.UnitState) error {
finishedTaskStatus.Duration = time.Since(state.CurrentUnitStatus[taskStatus.Task].CreatedTime)
finishedTaskStatus.StageUpdatedTime = time.Now()
finishedTaskStatus.Duration = finishedTaskStatus.StageUpdatedTime.Sub(state.CurrentUnitStatus[taskStatus.Task].CreatedTime)
for i, status := range state.FinishedUnitStatus[taskStatus.Task] {
// when the unit is restarted by update-cfg or something, overwrite the old status and truncate
if status.Unit == taskStatus.Unit {
Expand Down
16 changes: 10 additions & 6 deletions engine/jobmaster/dm/metadata/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"sync"
"time"

"github.com/pingcap/tiflow/engine/jobmaster/dm/bootstrap"
"github.com/pingcap/tiflow/engine/jobmaster/dm/config"
Expand Down Expand Up @@ -118,15 +119,17 @@ func NewJob(jobCfg *config.JobCfg) *Job {
// Task is the minimum working unit of a job.
// A job may contain multiple upstream and it will be converted into multiple tasks.
type Task struct {
Cfg *config.TaskCfg
Stage TaskStage
Cfg *config.TaskCfg
Stage TaskStage
StageUpdatedTime time.Time
}

// NewTask creates a new Task instance
func NewTask(taskCfg *config.TaskCfg) *Task {
return &Task{
Cfg: taskCfg,
Stage: StageRunning, // TODO: support set stage when create task.
Cfg: taskCfg,
Stage: StageRunning, // TODO: support set stage when create task.
StageUpdatedTime: time.Now(),
}
}

Expand Down Expand Up @@ -181,11 +184,12 @@ func (jobStore *JobStore) UpdateStages(ctx context.Context, taskIDs []string, st
}
}
for _, taskID := range taskIDs {
if _, ok := job.Tasks[taskID]; !ok {
t, ok := job.Tasks[taskID]
if !ok {
return errors.Errorf("task %s not found", taskID)
}
t := job.Tasks[taskID]
t.Stage = stage
t.StageUpdatedTime = time.Now()
}

return jobStore.Put(ctx, job)
Expand Down
9 changes: 5 additions & 4 deletions engine/jobmaster/dm/metadata/unit_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func NewUnitStateStore(kvClient metaModel.KVClient) *UnitStateStore {

// TaskStatus defines the running task status.
type TaskStatus struct {
Unit frameModel.WorkerType
Task string
Stage TaskStage
CfgModRevision uint64
Unit frameModel.WorkerType
Task string
Stage TaskStage
CfgModRevision uint64
StageUpdatedTime time.Time
}

// FinishedTaskStatus wraps the TaskStatus with FinishedStatus.
Expand Down
16 changes: 12 additions & 4 deletions engine/jobmaster/dm/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (tm *TaskManager) checkAndOperateTasks(ctx context.Context, job *metadata.J
continue
}

op := genOp(runningTask.Stage, persistentTask.Stage)
op := genOp(runningTask.Stage, runningTask.StageUpdatedTime, persistentTask.Stage, persistentTask.StageUpdatedTime)
if op == dmpkg.None {
tm.logger.Debug(
"task status will not be changed",
Expand Down Expand Up @@ -224,11 +224,19 @@ func (tm *TaskManager) GetTaskStatus(taskID string) (runtime.TaskStatus, bool) {
return value.(runtime.TaskStatus), true
}

func genOp(runtimeStage, expectedStage metadata.TaskStage) dmpkg.OperateType {
func genOp(
runningStage metadata.TaskStage,
runningStageUpdatedTime time.Time,
expectedStage metadata.TaskStage,
expectedStageUpdatedTime time.Time,
) dmpkg.OperateType {
if runningStageUpdatedTime.After(expectedStageUpdatedTime) {
return dmpkg.None
}
switch {
case expectedStage == metadata.StagePaused && (runtimeStage == metadata.StageRunning || runtimeStage == metadata.StageError):
case expectedStage == metadata.StagePaused && (runningStage == metadata.StageRunning || runningStage == metadata.StageError):
return dmpkg.Pause
case expectedStage == metadata.StageRunning && runtimeStage == metadata.StagePaused:
case expectedStage == metadata.StageRunning && (runningStage == metadata.StagePaused || runningStage == metadata.StageError):
return dmpkg.Resume
// TODO: support update
default:
Expand Down
113 changes: 101 additions & 12 deletions engine/jobmaster/dm/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func (t *testDMJobmasterSuite) TestUpdateTaskStatus() {
}
taskManager.UpdateTaskStatus(loadStatus1)
taskStatusMap = taskManager.TaskStatus()
// copy undetermined time.Now
loadStatus1.StageUpdatedTime = taskStatusMap[jobCfg.Upstreams[0].SourceID].StageUpdatedTime
require.Len(t.T(), taskStatusMap, 2)
require.Contains(t.T(), taskStatusMap, jobCfg.Upstreams[0].SourceID)
require.Contains(t.T(), taskStatusMap, jobCfg.Upstreams[1].SourceID)
Expand Down Expand Up @@ -229,15 +231,91 @@ func (t *testDMJobmasterSuite) TestClearTaskStatus() {
}

func (t *testDMJobmasterSuite) TestGenOp() {
require.Equal(t.T(), genOp(metadata.StagePaused, metadata.StagePaused), dmpkg.None)
require.Equal(t.T(), genOp(metadata.StageRunning, metadata.StageRunning), dmpkg.None)
require.Equal(t.T(), genOp(metadata.StageRunning, metadata.StagePaused), dmpkg.Pause)
require.Equal(t.T(), genOp(metadata.StageError, metadata.StagePaused), dmpkg.Pause)
require.Equal(t.T(), genOp(metadata.StageFinished, metadata.StageRunning), dmpkg.None)
require.Equal(t.T(), genOp(metadata.StagePaused, metadata.StageRunning), dmpkg.Resume)
earlierTime := time.Now()
laterTime := earlierTime.Add(time.Second)
cases := []struct {
runningStage metadata.TaskStage
runningStageUpdatedTime time.Time
expectedStage metadata.TaskStage
expectedStageUpdatedTime time.Time
op dmpkg.OperateType
}{
{
runningStage: metadata.StagePaused,
runningStageUpdatedTime: earlierTime,
expectedStage: metadata.StagePaused,
expectedStageUpdatedTime: laterTime,
op: dmpkg.None,
},
{
runningStage: metadata.StageRunning,
runningStageUpdatedTime: earlierTime,
expectedStage: metadata.StageRunning,
expectedStageUpdatedTime: laterTime,
op: dmpkg.None,
},
{
runningStage: metadata.StageRunning,
runningStageUpdatedTime: earlierTime,
expectedStage: metadata.StagePaused,
expectedStageUpdatedTime: laterTime,
op: dmpkg.Pause,
},
{
runningStage: metadata.StageError,
runningStageUpdatedTime: earlierTime,
expectedStage: metadata.StagePaused,
expectedStageUpdatedTime: laterTime,
op: dmpkg.Pause,
},
{
runningStage: metadata.StageFinished,
runningStageUpdatedTime: earlierTime,
expectedStage: metadata.StageRunning,
expectedStageUpdatedTime: laterTime,
op: dmpkg.None,
},
{
runningStage: metadata.StagePaused,
runningStageUpdatedTime: earlierTime,
expectedStage: metadata.StageRunning,
expectedStageUpdatedTime: laterTime,
op: dmpkg.Resume,
},
{
runningStage: metadata.StageError,
runningStageUpdatedTime: earlierTime,
expectedStage: metadata.StageRunning,
expectedStageUpdatedTime: laterTime,
op: dmpkg.Resume,
},
// any combinations with runningStageUpdatedTime later than expectedStageUpdatedTime is no-op
{
runningStage: metadata.StageError,
runningStageUpdatedTime: laterTime,
expectedStage: metadata.StageRunning,
expectedStageUpdatedTime: earlierTime,
op: dmpkg.None,
},
{
runningStage: metadata.StageRunning,
runningStageUpdatedTime: laterTime,
expectedStage: metadata.StagePaused,
expectedStageUpdatedTime: earlierTime,
op: dmpkg.None,
},
}

for _, c := range cases {
op := genOp(c.runningStage, c.runningStageUpdatedTime, c.expectedStage, c.expectedStageUpdatedTime)
require.Equal(t.T(), c.op, op)
}
}

func (t *testDMJobmasterSuite) TestCheckAndOperateTasks() {
now := time.Now()
oldTime := now.Add(-time.Second)
newTime := now.Add(time.Second)
jobCfg := &config.JobCfg{}
require.NoError(t.T(), jobCfg.DecodeFile(jobTemplatePath))
job := metadata.NewJob(jobCfg)
Expand All @@ -247,23 +325,34 @@ func (t *testDMJobmasterSuite) TestCheckAndOperateTasks() {
require.EqualError(t.T(), taskManager.checkAndOperateTasks(context.Background(), job), "get task running status failed")

dumpStatus1 := runtime.TaskStatus{
Unit: frameModel.WorkerDMDump,
Task: jobCfg.Upstreams[0].SourceID,
Stage: metadata.StageRunning,
Unit: frameModel.WorkerDMDump,
Task: jobCfg.Upstreams[0].SourceID,
Stage: metadata.StageRunning,
StageUpdatedTime: now,
}
dumpStatus2 := runtime.TaskStatus{
Unit: frameModel.WorkerDMDump,
Task: jobCfg.Upstreams[1].SourceID,
Stage: metadata.StageRunning,
Unit: frameModel.WorkerDMDump,
Task: jobCfg.Upstreams[1].SourceID,
Stage: metadata.StageRunning,
StageUpdatedTime: now,
}
taskManager.UpdateTaskStatus(dumpStatus1)
taskManager.UpdateTaskStatus(dumpStatus2)
job.Tasks[jobCfg.Upstreams[0].SourceID].StageUpdatedTime = newTime
job.Tasks[jobCfg.Upstreams[1].SourceID].StageUpdatedTime = newTime
require.NoError(t.T(), taskManager.checkAndOperateTasks(context.Background(), job))

dumpStatus2.Stage = metadata.StagePaused
taskManager.UpdateTaskStatus(dumpStatus2)
e := errors.New("operate task failed")
mockAgent.On("SendMessage").Return(e).Once()
// old expected stage time will not trigger SendMessage
job.Tasks[jobCfg.Upstreams[0].SourceID].StageUpdatedTime = oldTime
job.Tasks[jobCfg.Upstreams[1].SourceID].StageUpdatedTime = oldTime
require.NoError(t.T(), taskManager.checkAndOperateTasks(context.Background(), job))

job.Tasks[jobCfg.Upstreams[0].SourceID].StageUpdatedTime = newTime
job.Tasks[jobCfg.Upstreams[1].SourceID].StageUpdatedTime = newTime
require.EqualError(t.T(), taskManager.checkAndOperateTasks(context.Background(), job), e.Error())
}

Expand Down
Loading