Skip to content

Commit

Permalink
Refactor task manager (#219)
Browse files Browse the repository at this point in the history
* fix: get task return values

Signed-off-by: Gaius <[email protected]>

* feat: task manager

Signed-off-by: Gaius <[email protected]>

* chore: add TODO

Signed-off-by: Gaius <[email protected]>

* feat: task manager test file

Signed-off-by: Gaius <[email protected]>

* test: task manager

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jun 28, 2023
1 parent a89e915 commit 3b6ec39
Show file tree
Hide file tree
Showing 5 changed files with 366 additions and 37 deletions.
1 change: 1 addition & 0 deletions scheduler/manager/cdn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (cm *CDNManager) doCallback(task *types.Task, err *dferrors.DfError) {
if err != nil {
time.Sleep(time.Second * 5)
cm.taskManager.Delete(task.TaskId)
cm.taskManager.PeerTask.DeleteTask(task)
}
})
}
Expand Down
71 changes: 50 additions & 21 deletions scheduler/manager/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package manager

import (
"fmt"
"runtime/debug"
"sync"
"time"
Expand All @@ -36,6 +37,7 @@ type TaskManager struct {

func newTaskManager(cfg *config.Config, hostManager *HostManager) *TaskManager {
delay := time.Hour * 48
// TODO(Gaius) TaskDelay use the time.Duration
if cfg.GC.TaskDelay > 0 {
delay = time.Duration(cfg.GC.TaskDelay) * time.Millisecond
}
Expand All @@ -53,49 +55,76 @@ func newTaskManager(cfg *config.Config, hostManager *HostManager) *TaskManager {
return tm
}

func (m *TaskManager) Add(task *types.Task) (*types.Task, bool) {
func (m *TaskManager) Set(k string, task *types.Task) *types.Task {
m.lock.Lock()
defer m.lock.Unlock()
v, ok := m.data[task.TaskId]
if ok {
return v, false
}

copyTask := types.CopyTask(task)
return m.set(k, task)
}

m.data[task.TaskId] = copyTask
return copyTask, true
func (m *TaskManager) set(k string, task *types.Task) *types.Task {
copyTask := types.CopyTask(task)
m.data[k] = copyTask
return copyTask
}

func (m *TaskManager) Delete(taskId string) {
func (m *TaskManager) Add(k string, task *types.Task) error {
m.lock.Lock()
defer m.lock.Unlock()
t, _ := m.data[taskId]
if t != nil {
logger.Infof("Task [%s] Statistic: %+v ", t.TaskId, t.Statistic.GetStatistic())
m.PeerTask.DeleteTask(t)

if _, found := m.get(k); found {
return fmt.Errorf("Task %s already exists", k)
}
delete(m.data, taskId)
return
m.set(k, task)
return nil
}

func (m *TaskManager) Get(taskId string) (h *types.Task, ok bool) {
func (m *TaskManager) Get(k string) (*types.Task, bool) {
m.lock.RLock()
defer m.lock.RUnlock()
h, ok = m.data[taskId]

item, found := m.get(k)
if !found {
return nil, false
}
return item, true
}

func (m *TaskManager) get(k string) (*types.Task, bool) {
item, found := m.data[k]
return item, found
}

func (m *TaskManager) Delete(k string) {
m.lock.Lock()
defer m.lock.Unlock()

m.delete(k)
return
}

func (m *TaskManager) Touch(taskId string) {
func (m *TaskManager) delete(k string) {
if _, found := m.data[k]; found {
delete(m.data, k)
return
}
}

func (m *TaskManager) Touch(k string) {
m.lock.Lock()
defer m.lock.Unlock()
t, _ := m.data[taskId]
if t != nil {

m.touch(k)
return
}

func (m *TaskManager) touch(k string) {
if t, ok := m.data[k]; ok {
t.LastActive = time.Now()
}
return
}

// TODO(Gaius) Use client GC manager
func (m *TaskManager) gcWorkingLoop() {
for {
func() {
Expand Down
Loading

0 comments on commit 3b6ec39

Please sign in to comment.