From 3b6ec39a1b9ae03a267fc0b7008fa5aeab2d29ed Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 13 May 2021 11:06:19 +0800 Subject: [PATCH] Refactor task manager (#219) * fix: get task return values Signed-off-by: Gaius * feat: task manager Signed-off-by: Gaius * chore: add TODO Signed-off-by: Gaius * feat: task manager test file Signed-off-by: Gaius * test: task manager Signed-off-by: Gaius --- scheduler/manager/cdn_manager.go | 1 + scheduler/manager/task_manager.go | 71 ++++-- scheduler/manager/task_manager_test.go | 297 +++++++++++++++++++++++++ scheduler/server/scheduler_server.go | 9 +- scheduler/service/service.go | 25 ++- 5 files changed, 366 insertions(+), 37 deletions(-) create mode 100644 scheduler/manager/task_manager_test.go diff --git a/scheduler/manager/cdn_manager.go b/scheduler/manager/cdn_manager.go index bf6c766d8d4..59d9f808820 100644 --- a/scheduler/manager/cdn_manager.go +++ b/scheduler/manager/cdn_manager.go @@ -149,6 +149,7 @@ func (cm *CDNManager) doCallback(task *types.Task, err *dferrors.DfError) { if err != nil { time.Sleep(time.Second * 5) cm.taskManager.Delete(task.TaskId) + cm.taskManager.PeerTask.DeleteTask(task) } }) } diff --git a/scheduler/manager/task_manager.go b/scheduler/manager/task_manager.go index aa2889d3dc1..52d7bb36db3 100644 --- a/scheduler/manager/task_manager.go +++ b/scheduler/manager/task_manager.go @@ -17,6 +17,7 @@ package manager import ( + "fmt" "runtime/debug" "sync" "time" @@ -36,6 +37,7 @@ type TaskManager struct { func newTaskManager(cfg *config.Config, hostManager *HostManager) *TaskManager { delay := time.Hour * 48 + // TODO(Gaius) TaskDelay use the time.Duration if cfg.GC.TaskDelay > 0 { delay = time.Duration(cfg.GC.TaskDelay) * time.Millisecond } @@ -53,49 +55,76 @@ func newTaskManager(cfg *config.Config, hostManager *HostManager) *TaskManager { return tm } -func (m *TaskManager) Add(task *types.Task) (*types.Task, bool) { +func (m *TaskManager) Set(k string, task *types.Task) *types.Task { m.lock.Lock() defer m.lock.Unlock() - v, ok := m.data[task.TaskId] - if ok { - return v, false - } - copyTask := types.CopyTask(task) + return m.set(k, task) +} - m.data[task.TaskId] = copyTask - return copyTask, true +func (m *TaskManager) set(k string, task *types.Task) *types.Task { + copyTask := types.CopyTask(task) + m.data[k] = copyTask + return copyTask } -func (m *TaskManager) Delete(taskId string) { +func (m *TaskManager) Add(k string, task *types.Task) error { m.lock.Lock() defer m.lock.Unlock() - t, _ := m.data[taskId] - if t != nil { - logger.Infof("Task [%s] Statistic: %+v ", t.TaskId, t.Statistic.GetStatistic()) - m.PeerTask.DeleteTask(t) + + if _, found := m.get(k); found { + return fmt.Errorf("Task %s already exists", k) } - delete(m.data, taskId) - return + m.set(k, task) + return nil } -func (m *TaskManager) Get(taskId string) (h *types.Task, ok bool) { +func (m *TaskManager) Get(k string) (*types.Task, bool) { m.lock.RLock() defer m.lock.RUnlock() - h, ok = m.data[taskId] + + item, found := m.get(k) + if !found { + return nil, false + } + return item, true +} + +func (m *TaskManager) get(k string) (*types.Task, bool) { + item, found := m.data[k] + return item, found +} + +func (m *TaskManager) Delete(k string) { + m.lock.Lock() + defer m.lock.Unlock() + + m.delete(k) return } -func (m *TaskManager) Touch(taskId string) { +func (m *TaskManager) delete(k string) { + if _, found := m.data[k]; found { + delete(m.data, k) + return + } +} + +func (m *TaskManager) Touch(k string) { m.lock.Lock() defer m.lock.Unlock() - t, _ := m.data[taskId] - if t != nil { + + m.touch(k) + return +} + +func (m *TaskManager) touch(k string) { + if t, ok := m.data[k]; ok { t.LastActive = time.Now() } - return } +// TODO(Gaius) Use client GC manager func (m *TaskManager) gcWorkingLoop() { for { func() { diff --git a/scheduler/manager/task_manager_test.go b/scheduler/manager/task_manager_test.go new file mode 100644 index 00000000000..e6a01ebd96a --- /dev/null +++ b/scheduler/manager/task_manager_test.go @@ -0,0 +1,297 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package manager + +import ( + "sync" + "testing" + + "d7y.io/dragonfly/v2/scheduler/types" + "github.com/stretchr/testify/assert" +) + +func TestTaskManager_Set(t *testing.T) { + tests := []struct { + name string + taskManager *TaskManager + task *types.Task + key string + expect func(t *testing.T, d interface{}) + }{ + { + name: "set foo task", + taskManager: &TaskManager{ + lock: new(sync.RWMutex), + data: make(map[string]*types.Task), + }, + key: "foo", + task: &types.Task{ + TaskId: "bar", + }, + expect: func(t *testing.T, d interface{}) { + assert := assert.New(t) + assert.NotEmpty(d) + }, + }, + { + name: "set empty task", + taskManager: &TaskManager{ + lock: new(sync.RWMutex), + data: make(map[string]*types.Task), + }, + key: "foo", + task: &types.Task{}, + expect: func(t *testing.T, d interface{}) { + assert := assert.New(t) + assert.NotEmpty(d) + }, + }, + { + name: "set empty key", + taskManager: &TaskManager{ + lock: new(sync.RWMutex), + data: make(map[string]*types.Task), + }, + key: "", + task: &types.Task{ + TaskId: "bar", + }, + expect: func(t *testing.T, d interface{}) { + assert := assert.New(t) + assert.NotEmpty(d) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tc.taskManager.Set(tc.key, tc.task) + tc.expect(t, tc.taskManager.data[tc.key]) + }) + } +} + +func TestTaskManager_Add(t *testing.T) { + tests := []struct { + name string + taskManager *TaskManager + task *types.Task + key string + expect func(t *testing.T, d interface{}, err error) + }{ + { + name: "add foo task", + taskManager: &TaskManager{ + lock: new(sync.RWMutex), + data: make(map[string]*types.Task), + }, + key: "foo", + task: &types.Task{ + TaskId: "bar", + }, + expect: func(t *testing.T, d interface{}, err error) { + assert := assert.New(t) + assert.NotEmpty(d) + }, + }, + { + name: "add empty task", + taskManager: &TaskManager{ + lock: new(sync.RWMutex), + data: make(map[string]*types.Task), + }, + key: "foo", + task: &types.Task{}, + expect: func(t *testing.T, d interface{}, err error) { + assert := assert.New(t) + assert.NotEmpty(d) + }, + }, + { + name: "add empty key", + taskManager: &TaskManager{ + lock: new(sync.RWMutex), + data: make(map[string]*types.Task), + }, + key: "", + task: &types.Task{ + TaskId: "bar", + }, + expect: func(t *testing.T, d interface{}, err error) { + assert := assert.New(t) + assert.NotEmpty(d) + }, + }, + { + name: "key already exists", + taskManager: &TaskManager{ + lock: new(sync.RWMutex), + data: map[string]*types.Task{"foo": nil}, + }, + key: "foo", + task: &types.Task{ + TaskId: "bar", + }, + expect: func(t *testing.T, d interface{}, err error) { + assert := assert.New(t) + assert.EqualError(err, "Task foo already exists") + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.taskManager.Add(tc.key, tc.task) + tc.expect(t, tc.taskManager.data[tc.key], err) + }) + } +} + +func TestTaskManager_Get(t *testing.T) { + tests := []struct { + name string + taskManager *TaskManager + key string + expect func(t *testing.T, task *types.Task, found bool) + }{ + { + name: "get existing task", + taskManager: &TaskManager{ + lock: new(sync.RWMutex), + data: map[string]*types.Task{"foo": &types.Task{ + TaskId: "bar", + }}, + }, + key: "foo", + expect: func(t *testing.T, task *types.Task, found bool) { + assert := assert.New(t) + assert.Equal(true, found) + assert.Equal("bar", task.TaskId) + }, + }, + { + name: "get non-existent task", + taskManager: &TaskManager{ + lock: new(sync.RWMutex), + data: make(map[string]*types.Task), + }, + key: "foo", + expect: func(t *testing.T, task *types.Task, found bool) { + assert := assert.New(t) + assert.Equal(false, found) + assert.Nil(task) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + task, found := tc.taskManager.Get(tc.key) + tc.expect(t, task, found) + }) + } +} + +func TestTaskManager_Delete(t *testing.T) { + tests := []struct { + name string + taskManager *TaskManager + task *types.Task + key string + expect func(t *testing.T, d interface{}) + }{ + { + name: "delete existing task", + taskManager: &TaskManager{ + lock: new(sync.RWMutex), + data: map[string]*types.Task{"foo": nil}, + }, + key: "foo", + expect: func(t *testing.T, d interface{}) { + assert := assert.New(t) + assert.Equal(d, false) + }, + }, + { + name: "delete non-existent task", + taskManager: &TaskManager{ + lock: new(sync.RWMutex), + data: make(map[string]*types.Task), + }, + key: "foo", + expect: func(t *testing.T, d interface{}) { + assert := assert.New(t) + assert.Equal(d, false) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tc.taskManager.Delete(tc.key) + _, ok := tc.taskManager.Get(tc.key) + tc.expect(t, ok) + }) + } +} + +func TestTaskManager_Touch(t *testing.T) { + tests := []struct { + name string + taskManager *TaskManager + task *types.Task + key string + expect func(t *testing.T, task *types.Task, found bool) + }{ + { + name: "touch existing task", + taskManager: &TaskManager{ + lock: new(sync.RWMutex), + data: map[string]*types.Task{"foo": &types.Task{ + TaskId: "bar", + }}, + }, + key: "foo", + expect: func(t *testing.T, task *types.Task, found bool) { + assert := assert.New(t) + assert.Equal(found, true) + assert.NotEmpty(task.LastActive) + }, + }, + { + name: "touch non-existent task", + taskManager: &TaskManager{ + lock: new(sync.RWMutex), + data: make(map[string]*types.Task), + }, + key: "foo", + expect: func(t *testing.T, task *types.Task, found bool) { + assert := assert.New(t) + assert.Equal(found, false) + assert.Nil(task) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tc.taskManager.Touch(tc.key) + task, found := tc.taskManager.Get(tc.key) + tc.expect(t, task, found) + }) + } +} diff --git a/scheduler/server/scheduler_server.go b/scheduler/server/scheduler_server.go index b53958dcc32..700ed3712e2 100644 --- a/scheduler/server/scheduler_server.go +++ b/scheduler/server/scheduler_server.go @@ -98,16 +98,15 @@ func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *schedul // get or create task var isCdn = false pkg.TaskId = s.service.GenerateTaskID(request.Url, request.Filter, request.UrlMata, request.BizId, request.PeerId) - task, _ := s.service.GetTask(pkg.TaskId) - if task == nil { - task = &types.Task{ + task, ok := s.service.GetTask(pkg.TaskId) + if !ok { + task, err = s.service.AddTask(&types.Task{ TaskId: pkg.TaskId, Url: request.Url, Filter: request.Filter, BizId: request.BizId, UrlMata: request.UrlMata, - } - task, err = s.service.AddTask(task) + }) if err != nil { dferror, _ := err.(*dferrors.DfError) if dferror != nil && dferror.Code == dfcodes.SchedNeedBackSource { diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 02126944d8c..10403dd3dbb 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -55,21 +55,24 @@ func (s *SchedulerService) GenerateTaskID(url string, filter string, meta *base. return idgen.TaskID(url, filter, meta, bizID) } -func (s *SchedulerService) GetTask(taskID string) (task *types.Task, err error) { - task, _ = s.TaskManager.Get(taskID) - if task == nil { - err = errors.New("peer task not exited: " + taskID) - } - return +func (s *SchedulerService) GetTask(taskID string) (*types.Task, bool) { + return s.TaskManager.Get(taskID) } -func (s *SchedulerService) AddTask(task *types.Task) (ret *types.Task, err error) { - ret, added := s.TaskManager.Add(task) - if added { - err = s.CDNManager.TriggerTask(ret, s.TaskManager.PeerTask.CDNCallback) +func (s *SchedulerService) AddTask(task *types.Task) (*types.Task, error) { + // Task already exists + if ret, ok := s.TaskManager.Get(task.TaskId); ok { + s.TaskManager.PeerTask.AddTask(ret) + return ret, nil + } + + // Task does not exist + ret := s.TaskManager.Set(task.TaskId, task) + if err := s.CDNManager.TriggerTask(ret, s.TaskManager.PeerTask.CDNCallback); err != nil { + return nil, err } s.TaskManager.PeerTask.AddTask(ret) - return + return ret, nil } func (s *SchedulerService) ScheduleParent(task *types.PeerTask) (primary *types.PeerTask,