Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: remove the option of running subtasks concurrently #45566

Merged
merged 3 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion disttask/framework/framework_rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func RegisterRollbackTaskMeta(v *atomic.Int64) {
scheduler.RegisterTaskType(proto.TaskTypeExample)
scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ int64, _ []byte, _ int64) (scheduler.Scheduler, error) {
return &rollbackScheduler{v: v}, nil
}, scheduler.WithConcurrentSubtask())
})
scheduler.RegisterSubtaskExectorConstructor(proto.TaskTypeExample, proto.StepOne, func(_ proto.MinimalTask, _ int64) (scheduler.SubtaskExecutor, error) {
return &rollbackSubtaskExecutor{v: v}, nil
})
Expand Down
2 changes: 1 addition & 1 deletion disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func RegisterTaskMeta(v *atomic.Int64) {
scheduler.RegisterTaskType(proto.TaskTypeExample)
scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ int64, _ []byte, _ int64) (scheduler.Scheduler, error) {
return &testScheduler{}, nil
}, scheduler.WithConcurrentSubtask())
})
scheduler.RegisterSubtaskExectorConstructor(proto.TaskTypeExample, proto.StepOne, func(_ proto.MinimalTask, _ int64) (scheduler.SubtaskExecutor, error) {
return &testSubtaskExecutor{v: v}, nil
})
Expand Down
8 changes: 0 additions & 8 deletions disttask/framework/scheduler/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func WithPoolSize(poolSize int32) TaskTypeOption {
}

type schedulerRegisterOptions struct {
ConcurrentSubtask bool
}

// Constructor is the constructor of Scheduler.
Expand All @@ -45,13 +44,6 @@ type Constructor func(taskID int64, taskMeta []byte, step int64) (Scheduler, err
// RegisterOption is the register option of Scheduler.
type RegisterOption func(opts *schedulerRegisterOptions)

// WithConcurrentSubtask is the option of Scheduler to run subtasks concurrently.
func WithConcurrentSubtask() RegisterOption {
return func(opts *schedulerRegisterOptions) {
opts.ConcurrentSubtask = true
}
}

// SubtaskExecutorConstructor is the constructor of SubtaskExecutor.
type SubtaskExecutorConstructor func(minimalTask proto.MinimalTask, step int64) (SubtaskExecutor, error)

Expand Down
39 changes: 5 additions & 34 deletions disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type InternalSchedulerImpl struct {
taskTable TaskTable
pool Pool
wg sync.WaitGroup
subtaskWg sync.WaitGroup
logCtx context.Context

mu struct {
Expand Down Expand Up @@ -80,7 +79,6 @@ func (*InternalSchedulerImpl) Start() {
// Stop stops the scheduler.
func (s *InternalSchedulerImpl) Stop() {
s.cancel()
s.subtaskWg.Wait()
s.wg.Wait()
}

Expand Down Expand Up @@ -144,11 +142,6 @@ func (s *InternalSchedulerImpl) run(ctx context.Context, task *proto.Task) error
return s.getError()
}

concurrentSubtask := false
key := getKey(task.Type, task.Step)
if opts, ok := schedulerOptions[key]; ok && opts.ConcurrentSubtask {
concurrentSubtask = true
}
for {
// check if any error occurs
if err := s.getError(); err != nil {
Expand All @@ -167,13 +160,8 @@ func (s *InternalSchedulerImpl) run(ctx context.Context, task *proto.Task) error
if err := s.getError(); err != nil {
break
}
s.subtaskWg.Add(1)
s.runSubtask(runCtx, scheduler, subtask, task.Step, minimalTaskCh)
if !concurrentSubtask {
s.subtaskWg.Wait()
}
}
s.subtaskWg.Wait()
return s.getError()
}

Expand All @@ -187,36 +175,17 @@ func (s *InternalSchedulerImpl) runSubtask(ctx context.Context, scheduler Schedu
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateFailed, s.getError())
}
s.markErrorHandled()
s.subtaskWg.Done()
return
}
logutil.Logger(s.logCtx).Info("split subTask", zap.Int("cnt", len(minimalTasks)), zap.Int64("subtask-id", subtask.ID))

// fast path for ADD INDEX.
// ADD INDEX is a special case now, no minimal tasks will be generated.
// run it synchronously now.
if len(minimalTasks) == 0 {
s.onSubtaskFinished(ctx, scheduler, subtask)
s.subtaskWg.Done()
return
}

var mu sync.Mutex
var cnt int
var minimalTaskWg sync.WaitGroup
for _, minimalTask := range minimalTasks {
minimalTaskWg.Add(1)
j := minimalTask
minimalTaskCh <- func() {
defer func() {
mu.Lock()
defer mu.Unlock()
cnt++
// last minimal task should mark subtask as finished
if cnt == len(minimalTasks) {
s.onSubtaskFinished(ctx, scheduler, subtask)
s.subtaskWg.Done()
}
}()
s.runMinimalTask(ctx, j, subtask.Type, step)
minimalTaskWg.Done()
}
}
failpoint.Inject("waitUntilError", func() {
Expand All @@ -227,6 +196,8 @@ func (s *InternalSchedulerImpl) runSubtask(ctx context.Context, scheduler Schedu
time.Sleep(500 * time.Millisecond)
}
})
minimalTaskWg.Wait()
s.onSubtaskFinished(ctx, scheduler, subtask)
}

func (s *InternalSchedulerImpl) onSubtaskFinished(ctx context.Context, scheduler Scheduler, subtask *proto.Subtask) {
Expand Down
7 changes: 3 additions & 4 deletions disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ func TestSchedulerRun(t *testing.T) {
err = scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency})
require.NoError(t, err)

// 9. run subtask concurrently
// 9. run subtask one by one
RegisterSchedulerConstructor(tp, proto.StepOne, func(_ int64, task []byte, step int64) (Scheduler, error) {
return mockScheduler, nil
}, WithConcurrentSubtask())
})
mockScheduler.On("InitSubtaskExecEnv", mock.Anything).Return(nil).Once()
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp}, nil).Once()
Expand Down Expand Up @@ -161,7 +161,6 @@ func TestSchedulerRun(t *testing.T) {
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(context.Canceled).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateCanceled).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(nil, nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()

var wg sync.WaitGroup
Expand All @@ -175,7 +174,7 @@ func TestSchedulerRun(t *testing.T) {
runCancel()
wg.Wait()

// 11. run subtask concurrently, on error, we should wait all minimal task finished before call CleanupSubtaskExecEnv
// 11. run subtask one by one, on error, we should wait all minimal task finished before call CleanupSubtaskExecEnv
syncCh := make(chan struct{})
lastMinimalTaskFinishTime, cleanupTime := time.Time{}, time.Time{}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/waitUntilError", `return(true)`))
Expand Down
1 change: 0 additions & 1 deletion disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ func init() {
logger: logger,
}, nil
},
scheduler.WithConcurrentSubtask(),
)
scheduler.RegisterSchedulerConstructor(proto.ImportInto, StepPostProcess,
func(taskID int64, bs []byte, step int64) (scheduler.Scheduler, error) {
Expand Down