Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dxf: add tidb_max_dist_task_nodes to specify max node count #58937

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(241), session.CurrentBootstrapVersion)
require.Equal(t, int64(242), session.CurrentBootstrapVersion)
}
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_dist_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
ext.(*ddl.LitBackfillScheduler).GlobalSort = true
sch.Extension = ext

taskID, err := mgr.CreateTask(ctx, task.Key, proto.Backfill, 1, "", task.Meta)
taskID, err := mgr.CreateTask(ctx, task.Key, proto.Backfill, 1, "", 0, task.Meta)
require.NoError(t, err)
task.ID = taskID
execIDs := []string{":4000"}
Expand Down
33 changes: 33 additions & 0 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import (
"github.com/pingcap/tidb/pkg/util/stringutil"
"github.com/pingcap/tidb/pkg/util/tracing"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pdhttp "github.com/tikv/pd/client/http"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -4931,8 +4932,30 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast
return errors.Trace(err)
}

// GetDXFDefaultMaxNodeCntAuto calcuates a default max node count for distributed task execution.
func GetDXFDefaultMaxNodeCntAuto(store kv.Storage) int {
tikvStore, ok := store.(tikv.Storage)
if !ok {
logutil.DDLLogger().Warn("not an TiKV or TiFlash store instance", zap.String("type", fmt.Sprintf("%T", store)))
return 0
}
pdClient := tikvStore.GetRegionCache().PDClient()
if pdClient == nil {
logutil.DDLLogger().Warn("pd unavailable when get default max node count")
return 0
}
stores, err := pdClient.GetAllStores(context.Background())
if err != nil {
logutil.DDLLogger().Warn("get all stores failed when get default max node count", zap.Error(err))
return 0
}
return max(3, len(stores)/3)
}

func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) error {
m := NewDDLReorgMeta(sctx)
defaultMaxNodeCnt := 0
tangenta marked this conversation as resolved.
Show resolved Hide resolved

setReorgParam := func() {
if sv, ok := sctx.GetSessionVars().GetSystemVar(vardef.TiDBDDLReorgWorkerCount); ok {
m.SetConcurrency(variable.TidbOptInt(sv, 0))
Expand All @@ -4946,6 +4969,15 @@ func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) erro
m.IsDistReorg = vardef.EnableDistTask.Load()
m.IsFastReorg = vardef.EnableFastReorg.Load()
m.TargetScope = vardef.ServiceScope.Load()
if sv, ok := sctx.GetSessionVars().GetSystemVar(vardef.TiDBMaxDistTaskNodes); ok {
m.MaxNodeCount = variable.TidbOptInt(sv, 0)
if m.MaxNodeCount == -1 { // -1 means calculate automatically
if defaultMaxNodeCnt == 0 {
defaultMaxNodeCnt = GetDXFDefaultMaxNodeCntAuto(sctx.GetStore())
}
m.MaxNodeCount = defaultMaxNodeCnt
}
}
if hasSysDB(job) {
if m.IsDistReorg {
logutil.DDLLogger().Info("cannot use distributed task execution on system DB",
Expand Down Expand Up @@ -5002,6 +5034,7 @@ func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) erro
zap.Bool("enableDistTask", m.IsDistReorg),
zap.Bool("enableFastReorg", m.IsFastReorg),
zap.String("targetScope", m.TargetScope),
zap.Int("maxNodeCount", m.MaxNodeCount),
zap.Int("concurrency", m.GetConcurrency()),
zap.Int("batchSize", m.GetBatchSize()),
)
Expand Down
8 changes: 5 additions & 3 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2571,7 +2571,9 @@ func (w *worker) executeDistTask(stepCtx context.Context, t table.Table, reorgIn

g.Go(func() error {
defer close(done)
err := submitAndWaitTask(ctx, taskKey, taskType, concurrency, reorgInfo.ReorgMeta.TargetScope, metaData)
targetScope := reorgInfo.ReorgMeta.TargetScope
maxNodeCnt := reorgInfo.ReorgMeta.MaxNodeCount
err := submitAndWaitTask(ctx, taskKey, taskType, concurrency, targetScope, maxNodeCnt, metaData)
failpoint.InjectCall("pauseAfterDistTaskFinished")
if err := w.isReorgRunnable(stepCtx, true); err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
Expand Down Expand Up @@ -2728,8 +2730,8 @@ func (w *worker) updateDistTaskRowCount(taskKey string, jobID int64) {
}

// submitAndWaitTask submits a task and wait for it to finish.
func submitAndWaitTask(ctx context.Context, taskKey string, taskType proto.TaskType, concurrency int, targetScope string, taskMeta []byte) error {
task, err := handle.SubmitTask(ctx, taskKey, taskType, concurrency, targetScope, taskMeta)
func submitAndWaitTask(ctx context.Context, taskKey string, taskType proto.TaskType, concurrency int, targetScope string, maxNodeCnt int, taskMeta []byte) error {
task, err := handle.SubmitTask(ctx, taskKey, taskType, concurrency, targetScope, maxNodeCnt, taskMeta)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/example/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestExampleApplication(t *testing.T) {
}
bytes, err := json.Marshal(meta)
require.NoError(t, err)
task, err := handle.SubmitTask(ctx, "test", proto.TaskTypeExample, 1, "", bytes)
task, err := handle.SubmitTask(ctx, "test", proto.TaskTypeExample, 1, "", 0, bytes)
require.NoError(t, err)
require.NoError(t, handle.WaitTaskDoneByKey(ctx, task.Key))
}
4 changes: 2 additions & 2 deletions pkg/disttask/framework/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func GetCPUCountOfNode(ctx context.Context) (int, error) {
}

// SubmitTask submits a task.
func SubmitTask(ctx context.Context, taskKey string, taskType proto.TaskType, concurrency int, targetScope string, taskMeta []byte) (*proto.Task, error) {
func SubmitTask(ctx context.Context, taskKey string, taskType proto.TaskType, concurrency int, targetScope string, maxNodeCnt int, taskMeta []byte) (*proto.Task, error) {
taskManager, err := storage.GetTaskManager()
if err != nil {
return nil, err
Expand All @@ -68,7 +68,7 @@ func SubmitTask(ctx context.Context, taskKey string, taskType proto.TaskType, co
return nil, storage.ErrTaskAlreadyExists
}

taskID, err := taskManager.CreateTask(ctx, taskKey, taskType, concurrency, targetScope, taskMeta)
taskID, err := taskManager.CreateTask(ctx, taskKey, taskType, concurrency, targetScope, maxNodeCnt, taskMeta)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/disttask/framework/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestHandle(t *testing.T) {
storage.SetTaskManager(mgr)

// no scheduler registered
task, err := handle.SubmitTask(ctx, "1", proto.TaskTypeExample, 2, "", proto.EmptyMeta)
task, err := handle.SubmitTask(ctx, "1", proto.TaskTypeExample, 2, "", 0, proto.EmptyMeta)
require.NoError(t, err)
waitedTaskBase, err := handle.WaitTask(ctx, task.ID, func(task *proto.TaskBase) bool {
return task.IsDone()
Expand All @@ -72,23 +72,23 @@ func TestHandle(t *testing.T) {

require.NoError(t, handle.CancelTask(ctx, "1"))

task, err = handle.SubmitTask(ctx, "2", proto.TaskTypeExample, 2, "", proto.EmptyMeta)
task, err = handle.SubmitTask(ctx, "2", proto.TaskTypeExample, 2, "", 0, proto.EmptyMeta)
require.NoError(t, err)
require.Equal(t, "2", task.Key)

// submit same task.
task, err = handle.SubmitTask(ctx, "2", proto.TaskTypeExample, 2, "", proto.EmptyMeta)
task, err = handle.SubmitTask(ctx, "2", proto.TaskTypeExample, 2, "", 0, proto.EmptyMeta)
require.Nil(t, task)
require.Error(t, storage.ErrTaskAlreadyExists, err)
// pause and resume task.
require.NoError(t, handle.PauseTask(ctx, "2"))
require.NoError(t, handle.ResumeTask(ctx, "2"))

// submit task with same key
task, err = handle.SubmitTask(ctx, "3", proto.TaskTypeExample, 2, "", proto.EmptyMeta)
task, err = handle.SubmitTask(ctx, "3", proto.TaskTypeExample, 2, "", 0, proto.EmptyMeta)
require.NoError(t, err)
require.NoError(t, mgr.TransferTasks2History(ctx, []*proto.Task{task}))
task, err = handle.SubmitTask(ctx, "3", proto.TaskTypeExample, 2, "", proto.EmptyMeta)
task, err = handle.SubmitTask(ctx, "3", proto.TaskTypeExample, 2, "", 0, proto.EmptyMeta)
require.Nil(t, task)
require.Error(t, storage.ErrTaskAlreadyExists, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/integrationtests/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func BenchmarkSchedulerOverhead(b *testing.B) {
for i := 0; i < 4*proto.MaxConcurrentTask; i++ {
taskKey := fmt.Sprintf("task-%03d", i)
taskMeta := make([]byte, *taskMetaSize)
_, err := handle.SubmitTask(c.Ctx, taskKey, proto.TaskTypeExample, 1, "", taskMeta)
_, err := handle.SubmitTask(c.Ctx, taskKey, proto.TaskTypeExample, 1, "", 0, taskMeta)
require.NoError(c.T, err)
}
// task has 2 steps, each step has 1 subtask,wait in serial to reduce WaitTask check overhead.
Expand Down
12 changes: 6 additions & 6 deletions pkg/disttask/framework/integrationtests/modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestModifyTaskConcurrency(t *testing.T) {
var theTask *proto.Task
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeGetSchedulableTasks", func() {
once.Do(func() {
task, err := handle.SubmitTask(c.Ctx, "k1", proto.TaskTypeExample, 3, "", []byte("init"))
task, err := handle.SubmitTask(c.Ctx, "k1", proto.TaskTypeExample, 3, "", 0, []byte("init"))
require.NoError(t, err)
require.Equal(t, 3, task.Concurrency)
require.NoError(t, c.TaskMgr.ModifyTaskByID(c.Ctx, task.ID, &proto.ModifyParam{
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestModifyTaskConcurrency(t *testing.T) {
<-modifySyncCh
})
})
task, err := handle.SubmitTask(c.Ctx, "k2", proto.TaskTypeExample, 3, "", nil)
task, err := handle.SubmitTask(c.Ctx, "k2", proto.TaskTypeExample, 3, "", 0, nil)
require.NoError(t, err)
require.Equal(t, 3, task.Concurrency)
// finish StepOne
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestModifyTaskConcurrency(t *testing.T) {
}
},
)
task, err := handle.SubmitTask(c.Ctx, "k2-2", proto.TaskTypeExample, 3, "", nil)
task, err := handle.SubmitTask(c.Ctx, "k2-2", proto.TaskTypeExample, 3, "", 0, nil)
require.NoError(t, err)
require.Equal(t, 3, task.Concurrency)
for i := 0; i < 5; i++ {
Expand All @@ -217,7 +217,7 @@ func TestModifyTaskConcurrency(t *testing.T) {
var theTask *proto.Task
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeGetSchedulableTasks", func() {
once.Do(func() {
task, err := handle.SubmitTask(c.Ctx, "k3", proto.TaskTypeExample, 3, "", nil)
task, err := handle.SubmitTask(c.Ctx, "k3", proto.TaskTypeExample, 3, "", 0, nil)
require.NoError(t, err)
require.Equal(t, 3, task.Concurrency)
found, err := c.TaskMgr.PauseTask(c.Ctx, task.Key)
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestModifyTaskConcurrency(t *testing.T) {
var theTask *proto.Task
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeGetSchedulableTasks", func() {
once.Do(func() {
task, err := handle.SubmitTask(c.Ctx, "k4", proto.TaskTypeExample, 3, "", nil)
task, err := handle.SubmitTask(c.Ctx, "k4", proto.TaskTypeExample, 3, "", 0, nil)
require.NoError(t, err)
require.Equal(t, 3, task.Concurrency)
require.NoError(t, c.TaskMgr.ModifyTaskByID(c.Ctx, task.ID, &proto.ModifyParam{
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestModifyTaskConcurrency(t *testing.T) {
var theTask *proto.Task
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeGetSchedulableTasks", func() {
once.Do(func() {
task, err := handle.SubmitTask(c.Ctx, "k5", proto.TaskTypeExample, 3, "", []byte("init"))
task, err := handle.SubmitTask(c.Ctx, "k5", proto.TaskTypeExample, 3, "", 0, []byte("init"))
require.NoError(t, err)
require.Equal(t, 3, task.Concurrency)
require.EqualValues(t, []byte("init"), task.Meta)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *resourceCtrlCaseContext) init(subtaskCntMap map[int64]map[proto.Step]in
func (c *resourceCtrlCaseContext) runTaskAsync(prefix string, concurrencies []int) {
for i, concurrency := range concurrencies {
taskKey := fmt.Sprintf("%s-%d", prefix, i)
_, err := handle.SubmitTask(c.Ctx, taskKey, proto.TaskTypeExample, concurrency, "", nil)
_, err := handle.SubmitTask(c.Ctx, taskKey, proto.TaskTypeExample, concurrency, "", 0, nil)
require.NoError(c.T, err)
c.taskWG.RunWithLog(func() {
task := testutil.WaitTaskDoneOrPaused(c.Ctx, c.T, taskKey)
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/planner/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type PlanCtx struct {
TaskKey string
TaskType proto.TaskType
ThreadCnt int
MaxNodeCnt int

// PreviousSubtaskMetas is subtask metas of previous steps.
// We can remove this field if we find a better way to pass the result between steps.
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (*Planner) Run(planCtx PlanCtx, plan LogicalPlan) (int64, error) {
planCtx.TaskType,
planCtx.ThreadCnt,
config.GetGlobalConfig().Instance.TiDBServiceScope,
planCtx.MaxNodeCnt,
taskMeta,
)
}
5 changes: 3 additions & 2 deletions pkg/disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ type TaskBase struct {
// contain the tidb_service_scope=TargetScope label.
// To be compatible with previous version, if it's "" or "background", the task try run on nodes of "background" scope,
// if there is no such nodes, will try nodes of "" scope.
TargetScope string
CreateTime time.Time
TargetScope string
CreateTime time.Time
MaxNodeCount int
}

// IsDone checks if the task is done.
Expand Down
3 changes: 3 additions & 0 deletions pkg/disttask/framework/scheduler/balancer.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after change this might incorrectly takes some node as dead:

  • we have 4 nodes, in order of N1/N2.... task T have 2 subtasks, and they are scheduled to N3/N4 initially, and are running
  • when N1/N2 are available, this PR will choose to use them to schedule subtasks of T, and in L163(have the log dead node or not have enough slots, schedule subtasks away) of doBalanceSubtasks N3/N4 will be taken as dead node, and those running subtasks will be scheduled away

Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func (b *balancer) balanceSubtasks(ctx context.Context, sch Scheduler, managedNo
if err != nil {
return err
}
if task.MaxNodeCount > 0 && len(eligibleNodes) > task.MaxNodeCount {
eligibleNodes = eligibleNodes[:task.MaxNodeCount]
}
tangenta marked this conversation as resolved.
Show resolved Hide resolved
if len(eligibleNodes) == 0 {
return errors.New("no eligible nodes to balance subtasks")
}
Expand Down
41 changes: 40 additions & 1 deletion pkg/disttask/framework/scheduler/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
type balanceTestCase struct {
subtasks []*proto.SubtaskBase
eligibleNodes []string
maxNodeCount int
initUsedSlots map[string]int
expectedSubtasks []*proto.SubtaskBase
expectedUsedSlots map[string]int
Expand Down Expand Up @@ -224,6 +225,44 @@ func TestBalanceOneTask(t *testing.T) {
},
expectedUsedSlots: map[string]int{"tidb2": 16, "tidb3": 16},
},
// balanced, but max node count is limited.
{
subtasks: []*proto.SubtaskBase{
{ID: 1, ExecID: "tidb1", Concurrency: 16, State: proto.SubtaskStateRunning},
{ID: 2, ExecID: "tidb2", Concurrency: 16, State: proto.SubtaskStateRunning},
{ID: 3, ExecID: "tidb2", Concurrency: 16, State: proto.SubtaskStatePending},
},
maxNodeCount: 1,
eligibleNodes: []string{"tidb1", "tidb2"},
initUsedSlots: map[string]int{"tidb1": 0, "tidb2": 0},
expectedSubtasks: []*proto.SubtaskBase{
{ID: 1, ExecID: "tidb1", Concurrency: 16, State: proto.SubtaskStateRunning},
{ID: 2, ExecID: "tidb1", Concurrency: 16, State: proto.SubtaskStateRunning},
{ID: 3, ExecID: "tidb1", Concurrency: 16, State: proto.SubtaskStatePending},
},
expectedUsedSlots: map[string]int{"tidb1": 16, "tidb2": 0},
},
// scale out, but max node count is limited.
{
subtasks: []*proto.SubtaskBase{
{ID: 1, ExecID: "tidb1", Concurrency: 16, State: proto.SubtaskStateRunning},
{ID: 2, ExecID: "tidb1", Concurrency: 16, State: proto.SubtaskStatePending},
{ID: 3, ExecID: "tidb1", Concurrency: 16, State: proto.SubtaskStatePending},
{ID: 4, ExecID: "tidb1", Concurrency: 16, State: proto.SubtaskStatePending},
{ID: 5, ExecID: "tidb1", Concurrency: 16, State: proto.SubtaskStatePending},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe make them on tidb3 initially. they we are sure tidb3 is ordered first

now tidb1 is already the first in eligibleNodes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

},
eligibleNodes: []string{"tidb1", "tidb2", "tidb3"},
maxNodeCount: 2,
initUsedSlots: map[string]int{"tidb1": 0, "tidb2": 0, "tidb3": 0},
expectedSubtasks: []*proto.SubtaskBase{
{ID: 1, ExecID: "tidb1", Concurrency: 16, State: proto.SubtaskStateRunning},
{ID: 2, ExecID: "tidb1", Concurrency: 16, State: proto.SubtaskStatePending},
{ID: 3, ExecID: "tidb1", Concurrency: 16, State: proto.SubtaskStatePending},
{ID: 4, ExecID: "tidb2", Concurrency: 16, State: proto.SubtaskStatePending},
{ID: 5, ExecID: "tidb2", Concurrency: 16, State: proto.SubtaskStatePending},
},
expectedUsedSlots: map[string]int{"tidb1": 16, "tidb2": 16, "tidb3": 0},
},
}

ctx := context.Background()
Expand All @@ -235,7 +274,7 @@ func TestBalanceOneTask(t *testing.T) {
mockTaskMgr.EXPECT().UpdateSubtasksExecIDs(gomock.Any(), gomock.Any()).Return(nil)
}
mockScheduler := mock.NewMockScheduler(ctrl)
mockScheduler.EXPECT().GetTask().Return(&proto.Task{TaskBase: proto.TaskBase{ID: 1}}).Times(2)
mockScheduler.EXPECT().GetTask().Return(&proto.Task{TaskBase: proto.TaskBase{ID: 1, MaxNodeCount: c.maxNodeCount}}).Times(2)
mockScheduler.EXPECT().GetEligibleInstances(gomock.Any(), gomock.Any()).Return(nil, nil)

slotMgr := newSlotManager()
Expand Down
3 changes: 3 additions & 0 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,9 @@ func (s *BaseScheduler) switch2NextStep() error {
if err != nil {
return err
}
if task.MaxNodeCount > 0 && len(eligibleNodes) > task.MaxNodeCount {
tangenta marked this conversation as resolved.
Show resolved Hide resolved
eligibleNodes = eligibleNodes[:task.MaxNodeCount]
}

s.logger.Info("eligible instances", zap.Int("num", len(eligibleNodes)))
if len(eligibleNodes) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/scheduler/scheduler_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestCleanUpRoutine(t *testing.T) {
mockCleanupRoutine.EXPECT().CleanUp(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
sch.Start()
defer sch.Stop()
taskID, err := mgr.CreateTask(ctx, "test", proto.TaskTypeExample, 1, "", nil)
taskID, err := mgr.CreateTask(ctx, "test", proto.TaskTypeExample, 1, "", 0, nil)
require.NoError(t, err)

checkTaskRunningCnt := func() []*proto.Task {
Expand Down
Loading