Skip to content

Commit

Permalink
fix: peer tree infinite loop
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Apr 21, 2022
1 parent 5efaf23 commit 96b65d2
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 33 deletions.
36 changes: 20 additions & 16 deletions scheduler/resource/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ type Peer struct {
// BlockPeers is bad peer ids
BlockPeers set.SafeSet

// NeedBackToSource needs downloaded from source
NeedBackToSource *atomic.Bool

// CreateAt is peer create time
CreateAt *atomic.Time

Expand All @@ -165,22 +168,23 @@ type Peer struct {
// New Peer instance
func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
p := &Peer{
ID: id,
BizTag: DefaultBizTag,
Pieces: &bitset.BitSet{},
pieceCosts: []int64{},
Stream: &atomic.Value{},
Task: task,
Host: host,
Parent: &atomic.Value{},
Children: &sync.Map{},
ChildCount: atomic.NewInt32(0),
StealPeers: set.NewSafeSet(),
BlockPeers: set.NewSafeSet(),
CreateAt: atomic.NewTime(time.Now()),
UpdateAt: atomic.NewTime(time.Now()),
mu: &sync.RWMutex{},
Log: logger.WithTaskAndPeerID(task.ID, id),
ID: id,
BizTag: DefaultBizTag,
Pieces: &bitset.BitSet{},
pieceCosts: []int64{},
Stream: &atomic.Value{},
Task: task,
Host: host,
Parent: &atomic.Value{},
Children: &sync.Map{},
ChildCount: atomic.NewInt32(0),
StealPeers: set.NewSafeSet(),
BlockPeers: set.NewSafeSet(),
NeedBackToSource: atomic.NewBool(false),
CreateAt: atomic.NewTime(time.Now()),
UpdateAt: atomic.NewTime(time.Now()),
mu: &sync.RWMutex{},
Log: logger.WithTaskAndPeerID(task.ID, id),
}

// Initialize state machine
Expand Down
9 changes: 8 additions & 1 deletion scheduler/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo

// If the scheduling exceeds the RetryBackSourceLimit or the latest cdn peer state is PeerStateFailed,
// peer will download the task back-to-source
if (n >= s.config.RetryBackSourceLimit || peer.Task.IsCDNFailed()) &&
if (n >= s.config.RetryBackSourceLimit || peer.Task.IsCDNFailed() || peer.NeedBackToSource.Load()) &&
peer.Task.CanBackToSource() {
stream, ok := peer.LoadStream()
if !ok {
Expand Down Expand Up @@ -240,6 +240,13 @@ func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) []
return true
}

_, ok = parent.LoadParent()
isBackToSource := peer.Task.BackToSourcePeers.Contains(parent)
if !ok && !parent.Host.IsCDN && !isBackToSource {
peer.Log.Debugf("parent download state is %t %t %t", ok, parent.Host.IsCDN, isBackToSource)
return true
}

if blocklist.Contains(parent.ID) {
peer.Log.Debugf("parent %s is not selected because it is in blocklist", parent.ID)
return true
Expand Down
74 changes: 73 additions & 1 deletion scheduler/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,25 @@ func TestScheduler_ScheduleParent(t *testing.T) {
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
},
},
{
name: "peer need back-to-source and send Code_SchedNeedBackSource code success",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
peer.NeedBackToSource.Store(true)
peer.FSM.SetState(resource.PeerStateRunning)
peer.StoreStream(stream)

mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.False(ok)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
},
},
{
name: "cdn peer state is PeerStateFailed and task state is PeerStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, cdnPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
Expand Down Expand Up @@ -651,6 +670,7 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.BackToSourcePeers.Add(mockPeer)
peer.Task.StorePeer(mockPeer)
mockPeer.Pieces.Set(0)
peer.StoreStream(stream)
Expand Down Expand Up @@ -678,6 +698,8 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
stealPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(mockPeer)
peer.Task.StorePeer(stealPeer)
peer.Task.BackToSourcePeers.Add(mockPeer)
peer.Task.BackToSourcePeers.Add(stealPeer)
mockPeer.Pieces.Set(0)
peer.StoreStream(stream)
gomock.InOrder(
Expand Down Expand Up @@ -807,13 +829,61 @@ func TestScheduler_FindParent(t *testing.T) {
},
},
{
name: "find parent",
name: "find back-to-source parent",
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
mockPeers[0].FSM.SetState(resource.PeerStateRunning)
mockPeers[1].FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(mockPeers[0])
peer.Task.StorePeer(mockPeers[1])
peer.Task.BackToSourcePeers.Add(mockPeers[0])
peer.Task.BackToSourcePeers.Add(mockPeers[1])
mockPeers[0].Pieces.Set(0)
mockPeers[1].Pieces.Set(0)
mockPeers[1].Pieces.Set(1)
mockPeers[1].Pieces.Set(2)

md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1)
},
expect: func(t *testing.T, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
assert := assert.New(t)
assert.True(ok)
assert.Equal(mockPeers[1].ID, parent.ID)
},
},
{
name: "find cdn parent",
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
mockPeers[0].FSM.SetState(resource.PeerStateRunning)
mockPeers[1].FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(mockPeers[0])
peer.Task.StorePeer(mockPeers[1])
mockPeers[0].Host.IsCDN = true
mockPeers[1].Host.IsCDN = true
mockPeers[0].Pieces.Set(0)
mockPeers[1].Pieces.Set(0)
mockPeers[1].Pieces.Set(1)
mockPeers[1].Pieces.Set(2)

md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1)
},
expect: func(t *testing.T, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
assert := assert.New(t)
assert.True(ok)
assert.Equal(mockPeers[1].ID, parent.ID)
},
},
{
name: "find parent with ancestor",
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
mockPeers[0].FSM.SetState(resource.PeerStateRunning)
mockPeers[1].FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(mockPeers[0])
peer.Task.StorePeer(mockPeers[1])
mockPeers[0].StoreParent(mockPeers[3])
mockPeers[1].StoreParent(mockPeers[3])
mockPeers[0].Pieces.Set(0)
mockPeers[1].Pieces.Set(0)
mockPeers[1].Pieces.Set(1)
Expand All @@ -835,6 +905,8 @@ func TestScheduler_FindParent(t *testing.T) {
mockPeers[1].FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(mockPeers[0])
peer.Task.StorePeer(mockPeers[1])
peer.Task.BackToSourcePeers.Add(mockPeers[0])
peer.Task.BackToSourcePeers.Add(mockPeers[1])
mockPeers[0].Pieces.Set(0)
mockPeers[1].Pieces.Set(0)
mockPeers[1].Pieces.Set(1)
Expand Down
15 changes: 10 additions & 5 deletions scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func New(
// RegisterPeerTask registers peer and triggers CDN download task
func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTaskRequest) (*rpcscheduler.RegisterResult, error) {
// Register task and trigger cdn download task
task, err := s.registerTask(ctx, req)
task, needBackToSource, err := s.registerTask(ctx, req)
if err != nil {
msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error())
logger.Error(msg)
Expand All @@ -87,6 +87,10 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
peer := s.registerPeer(ctx, req.PeerId, task, host, req.UrlMeta.Tag)
peer.Log.Infof("register peer task request: %#v %#v %#v", req, req.UrlMeta, req.HostLoad)

// When the peer registers for the first time and
// does not have a CDN, it will back-to-source.
peer.NeedBackToSource.Store(needBackToSource)

// Task has been successful
if task.FSM.Is(resource.TaskStateSucceeded) {
peer.Log.Info("tasks can be reused")
Expand Down Expand Up @@ -501,25 +505,26 @@ func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) e
}

// registerTask creates a new task or reuses a previous task
func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRequest) (*resource.Task, error) {
func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRequest) (*resource.Task, bool, error) {
task := resource.NewTask(idgen.TaskID(req.Url, req.UrlMeta), req.Url, resource.TaskTypeNormal, req.UrlMeta, resource.WithBackToSourceLimit(int32(s.config.Scheduler.BackSourceCount)))
task, loaded := s.resource.TaskManager().LoadOrStore(task)
if loaded && !task.FSM.Is(resource.TaskStateFailed) {
task.Log.Infof("task state is %s", task.FSM.Current())
return task, nil
return task, false, nil
}

// Trigger task
if err := task.FSM.Event(resource.TaskEventDownload); err != nil {
return nil, err
return nil, false, err
}

// Start trigger cdn task
if s.config.CDN.Enable {
go s.triggerCDNTask(ctx, task)
return task, false, nil
}

return task, nil
return task, true, nil
}

// registerHost creates a new host or reuses a previous host
Expand Down
Loading

0 comments on commit 96b65d2

Please sign in to comment.