diff --git a/scheduler/config/config.go b/scheduler/config/config.go index 36243057e4e..a57b49bf7bd 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -108,13 +108,12 @@ func NewDefaultSchedulerConfig() *SchedulerConfig { AScheduler: "", BScheduler: "", WorkerNum: runtime.GOMAXPROCS(0), - Monitor: false, AccessWindow: 3 * time.Minute, CandidateParentCount: 10, Scheduler: "basic", CDNLoad: 100, ClientLoad: 10, - OpenMonitor: true, + OpenMonitor: false, GC: NewDefaultGCConfig(), } } @@ -186,7 +185,6 @@ type SchedulerConfig struct { AScheduler string `yaml:"ascheduler" mapstructure:"ascheduler"` BScheduler string `yaml:"bscheduler" mapstructure:"bscheduler"` WorkerNum int `yaml:"workerNum" mapstructure:"workerNum"` - Monitor bool `yaml:"monitor" mapstructure:"monitor"` // AccessWindow should less than CDN task expireTime AccessWindow time.Duration `yaml:"accessWindow" mapstructure:"accessWindow"` CandidateParentCount int `yaml:"candidateParentCount" mapstructure:"candidateParentCount"` diff --git a/scheduler/core/events.go b/scheduler/core/events.go index d49bc1a4b93..979abc1c350 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -247,19 +247,21 @@ var _ event = peerDownloadFailEvent{} func (e peerDownloadFailEvent) apply(s *state) { e.peer.SetStatus(types.PeerStatusFail) removePeerFromCurrentTree(e.peer, s) - for _, child := range e.peer.GetChildren() { + e.peer.GetChildren().Range(func(key, value interface{}) bool { + child := (value).(*types.Peer) parent, candidates, hasParent := s.sched.ScheduleParent(child) - if child.PacketChan == nil { - logger.Warnf("reportPeerFailResult: there is no packet chan associated with peer %s", e.peer.PeerID) - continue - } if !hasParent { logger.WithTaskAndPeerID(child.Task.TaskID, child.PeerID).Warnf("peerDownloadFailEvent: there is no available parent,reschedule it in one second") s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second) - return + return true + } + if child.PacketChan == nil { + logger.Warnf("reportPeerFailResult: there is no packet chan associated with peer %s", e.peer.PeerID) + return true } child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates) - } + return true + }) s.peerManager.Delete(e.peer.PeerID) } @@ -318,19 +320,21 @@ func constructFailPeerPacket(peer *types.Peer, errCode base.Code) *schedulerRPC. func handlePeerLeave(peer *types.Peer, s *state) { peer.MarkLeave() removePeerFromCurrentTree(peer, s) - for _, child := range peer.GetChildren() { + peer.GetChildren().Range(func(key, value interface{}) bool { + child := value.(*types.Peer) parent, candidates, hasParent := s.sched.ScheduleParent(child) if !hasParent { logger.WithTaskAndPeerID(child.Task.TaskID, child.PeerID).Warnf("handlePeerLeave: there is no available parent,reschedule it in one second") s.waitScheduleParentPeerQueue.AddAfter(child, time.Second) - continue + return true } if child.PacketChan == nil { logger.Debugf("handlePeerLeave: there is no packet chan with peer %s", child.PeerID) - continue + return true } child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates) - } + return true + }) s.peerManager.Delete(peer.PeerID) } diff --git a/scheduler/core/monitor.go b/scheduler/core/monitor.go index 7f540a6c807..2d982128cf1 100644 --- a/scheduler/core/monitor.go +++ b/scheduler/core/monitor.go @@ -103,12 +103,14 @@ func (m *monitor) printDebugInfo() string { if len(path) >= 1 { msgs = append(msgs, node.PeerID+" || "+strings.Join(nPath, "-")) } - for _, child := range node.GetChildren() { + node.GetChildren().Range(func(key, value interface{}) bool { + child := (value).(*types.Peer) if child == nil { - continue + return true } printTree(child, nPath) - } + return true + }) } for _, root := range roots { diff --git a/scheduler/types/peer.go b/scheduler/types/peer.go index a2f8d3f37be..32d1b2dc5d3 100644 --- a/scheduler/types/peer.go +++ b/scheduler/types/peer.go @@ -21,6 +21,7 @@ import ( "time" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" + "go.uber.org/atomic" ) type PeerStatus uint8 @@ -63,13 +64,13 @@ type Peer struct { // createTime CreateTime time.Time // finishedNum specifies downloaded finished piece number - finishedNum int32 + finishedNum atomic.Int32 lastAccessTime time.Time parent *Peer - children map[string]*Peer + children sync.Map status PeerStatus costHistory []int - leave bool + leave atomic.Bool } func NewPeer(peerID string, task *Task, host *PeerHost) *Peer { @@ -79,7 +80,6 @@ func NewPeer(peerID string, task *Task, host *PeerHost) *Peer { Host: host, CreateTime: time.Now(), lastAccessTime: time.Now(), - children: make(map[string]*Peer), status: PeerStatusWaiting, } } @@ -89,9 +89,11 @@ func (peer *Peer) GetWholeTreeNode() int { peer.lock.RLock() defer peer.lock.RUnlock() count := 1 - for _, peerNode := range peer.children { + peer.children.Range(func(key, value interface{}) bool { + peerNode := value.(*Peer) count += peerNode.GetWholeTreeNode() - } + return true + }) return count } @@ -105,24 +107,20 @@ func (peer *Peer) Touch() { peer.lock.Lock() defer peer.lock.Unlock() peer.lastAccessTime = time.Now() - if peer.status == PeerStatusZombie && !peer.leave { + if peer.status == PeerStatusZombie && !peer.leave.Load() { peer.status = PeerStatusRunning } peer.Task.Touch() } func (peer *Peer) associateChild(child *Peer) { - peer.lock.Lock() - defer peer.lock.Unlock() - peer.children[child.PeerID] = child + peer.children.Store(child.PeerID, child) peer.Host.IncUploadLoad() peer.Task.peers.Update(peer) } func (peer *Peer) disassociateChild(child *Peer) { - peer.lock.Lock() - defer peer.lock.Unlock() - delete(peer.children, child.PeerID) + peer.children.Delete(child.PeerID) peer.Host.DecUploadLoad() peer.Task.peers.Update(peer) } @@ -162,8 +160,8 @@ func (peer *Peer) GetCost() int { func (peer *Peer) AddPieceInfo(finishedCount int32, cost int) { peer.lock.Lock() defer peer.lock.Unlock() - if finishedCount > peer.finishedNum { - peer.finishedNum = finishedCount + if finishedCount > peer.finishedNum.Load() { + peer.finishedNum.Store(finishedCount) peer.costHistory = append(peer.costHistory, cost) if len(peer.costHistory) > 20 { peer.costHistory = peer.costHistory[len(peer.costHistory)-20:] @@ -221,11 +219,11 @@ func (peer *Peer) IsWaiting() bool { if peer.parent == nil { return false } - return peer.finishedNum >= peer.parent.finishedNum + return peer.finishedNum.Load() >= peer.parent.finishedNum.Load() } func (peer *Peer) GetSortKeys() (key1, key2 int) { - key1 = int(peer.finishedNum) + key1 = int(peer.finishedNum.Load()) key2 = peer.getFreeLoad() return } @@ -238,13 +236,11 @@ func (peer *Peer) getFreeLoad() int { } func (peer *Peer) GetFinishNum() int32 { - peer.lock.RLock() - defer peer.lock.RUnlock() - return peer.finishedNum + return peer.finishedNum.Load() } func GetDiffPieceNum(src *Peer, dst *Peer) int32 { - diff := src.finishedNum - dst.finishedNum + diff := src.finishedNum.Load() - dst.finishedNum.Load() if diff > 0 { return diff } @@ -257,10 +253,10 @@ func (peer *Peer) GetParent() *Peer { return peer.parent } -func (peer *Peer) GetChildren() map[string]*Peer { +func (peer *Peer) GetChildren() *sync.Map { peer.lock.RLock() defer peer.lock.RUnlock() - return peer.children + return &peer.children } func (peer *Peer) SetStatus(status PeerStatus) { @@ -287,12 +283,6 @@ func (peer *Peer) IsSuccess() bool { return peer.status == PeerStatusSuccess } -func (peer *Peer) IncFinishNum() { - peer.lock.Lock() - defer peer.lock.Unlock() - peer.finishedNum++ -} - func (peer *Peer) IsDone() bool { return peer.status == PeerStatusSuccess || peer.status == PeerStatusFail } @@ -302,9 +292,7 @@ func (peer *Peer) IsBad() bool { } func (peer *Peer) GetFinishedNum() int32 { - peer.lock.RLock() - defer peer.lock.RUnlock() - return peer.finishedNum + return peer.finishedNum.Load() } func (peer *Peer) GetStatus() PeerStatus { @@ -314,13 +302,9 @@ func (peer *Peer) GetStatus() PeerStatus { } func (peer *Peer) MarkLeave() { - peer.lock.Lock() - defer peer.lock.Unlock() - peer.leave = true + peer.leave.Store(true) } func (peer *Peer) IsLeave() bool { - peer.lock.RLock() - defer peer.lock.RUnlock() - return peer.leave + return peer.leave.Load() }