Skip to content

Commit

Permalink
feat: scheduler channel blocking (#756)
Browse files Browse the repository at this point in the history
* fix: scheduler channel blocking

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jun 28, 2023
1 parent 20e894f commit 90653d4
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 24 deletions.
25 changes: 17 additions & 8 deletions scheduler/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (e reScheduleParentEvent) apply(s *state) {

// TODO if parentPeer is equal with oldParent, need schedule again ?
if err := peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates)); err != nil {
logger.WithTaskAndPeerID(peer.Task.ID, peer.ID).Warnf("send schedule packet to peer %s failed: %v", peer.ID, err)
sendErrorHandler(err, s, peer)
}
}

Expand All @@ -119,7 +119,7 @@ func (e startReportPieceResultEvent) apply(s *state) {
if parent, ok := e.peer.GetParent(); ok {
e.peer.Log().Warnf("startReportPieceResultEvent: no need schedule parent because peer already had parent %s", parent.ID)
if err := e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parent, nil)); err != nil {
logger.WithTaskAndPeerID(e.peer.Task.ID, e.peer.ID).Warnf("send schedule packet to peer failed: %v", err)
sendErrorHandler(err, s, e.peer)
}
return
}
Expand All @@ -145,7 +145,7 @@ func (e startReportPieceResultEvent) apply(s *state) {
return
}
if err := e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parent, candidates)); err != nil {
e.peer.Log().Warnf("send schedule packet failed: %v", err)
sendErrorHandler(err, s, e.peer)
}
}

Expand Down Expand Up @@ -197,7 +197,7 @@ func (e peerDownloadPieceSuccessEvent) apply(s *state) {
}
// TODO if parentPeer is equal with oldParent, need schedule again ?
if err := e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parentPeer, candidates)); err != nil {
e.peer.Log().Warnf("send schedule packet to peer %s failed: %v", e.peer.ID, err)
sendErrorHandler(err, s, e.peer)
}
}

Expand Down Expand Up @@ -268,7 +268,7 @@ func (e peerDownloadSuccessEvent) apply(s *state) {
children := s.sched.ScheduleChildren(e.peer, sets.NewString())
for _, child := range children {
if err := child.SendSchedulePacket(constructSuccessPeerPacket(child, e.peer, nil)); err != nil {
e.peer.Log().Warnf("send schedule packet to peer %s failed: %v", child.ID, err)
sendErrorHandler(err, s, child)
}
}
}
Expand Down Expand Up @@ -301,7 +301,7 @@ func (e peerDownloadFailEvent) apply(s *state) {
return true
}
if err := child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates)); err != nil {
e.peer.Log().Warnf("send schedule packet to peer %s failed: %v", child.ID, err)
sendErrorHandler(err, s, child)
}
return true
})
Expand Down Expand Up @@ -330,7 +330,7 @@ func (e peerLeaveEvent) apply(s *state) {
return true
}
if err := child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates)); err != nil {
e.peer.Log().Warnf("send schedule packet to peer %s failed: %v", child.ID, err)
sendErrorHandler(err, s, child)
}
return true
})
Expand Down Expand Up @@ -403,8 +403,17 @@ func removePeerFromCurrentTree(peer *supervisor.Peer, s *state) {
children := s.sched.ScheduleChildren(parent, sets.NewString(peer.ID))
for _, child := range children {
if err := child.SendSchedulePacket(constructSuccessPeerPacket(child, peer, nil)); err != nil {
peer.Log().Warnf("send schedule packet to peer %s failed: %v", child.ID, err)
sendErrorHandler(err, s, child)
}
}
}
}

func sendErrorHandler(err error, s *state, p *supervisor.Peer) {
if err == supervisor.ErrChannelBusy {
p.Log().Info("send schedule packet channel busy")
s.waitScheduleParentPeerQueue.AddAfter(&rsPeer{peer: p}, 10*time.Millisecond)
} else {
p.Log().Errorf("send schedule packet failed: %v", err)
}
}
40 changes: 24 additions & 16 deletions scheduler/supervisor/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
PeerGCID = "peer"
)

var ErrChannelBusy = errors.New("channel busy")

type PeerManager interface {
// Add peer
Add(*Peer)
Expand Down Expand Up @@ -513,14 +515,13 @@ func (peer *Peer) Log() *logger.SugaredLoggerOnWith {
}

type Channel struct {
startOnce sync.Once
sender chan *scheduler.PeerPacket
receiver chan *scheduler.PieceResult
stream scheduler.Scheduler_ReportPieceResultServer
closed *atomic.Bool
done chan struct{}
wg sync.WaitGroup
err error
sender chan *scheduler.PeerPacket
receiver chan *scheduler.PieceResult
stream scheduler.Scheduler_ReportPieceResultServer
closed *atomic.Bool
done chan struct{}
wg sync.WaitGroup
err error
}

func newChannel(stream scheduler.Scheduler_ReportPieceResultServer) *Channel {
Expand All @@ -531,16 +532,19 @@ func newChannel(stream scheduler.Scheduler_ReportPieceResultServer) *Channel {
closed: atomic.NewBool(false),
done: make(chan struct{}),
}

c.wg.Add(2)
c.start()
return c
}

func (c *Channel) start() {
c.startOnce.Do(func() {
c.wg.Add(2)
go c.receiveLoop()
go c.sendLoop()
})
startWG := &sync.WaitGroup{}
startWG.Add(2)

go c.receiveLoop(startWG)
go c.sendLoop(startWG)
startWG.Wait()
}

func (c *Channel) Send(packet *scheduler.PeerPacket) error {
Expand All @@ -550,7 +554,7 @@ func (c *Channel) Send(packet *scheduler.PeerPacket) error {
case c.sender <- packet:
return nil
default:
return errors.New("send channel is blocking")
return ErrChannelBusy
}
}

Expand Down Expand Up @@ -583,13 +587,15 @@ func (c *Channel) IsClosed() bool {
return c.closed.Load()
}

func (c *Channel) receiveLoop() {
func (c *Channel) receiveLoop(startWG *sync.WaitGroup) {
defer func() {
close(c.receiver)
c.wg.Done()
c.Close()
}()

startWG.Done()

for {
select {
case <-c.done:
Expand All @@ -608,12 +614,14 @@ func (c *Channel) receiveLoop() {
}
}

func (c *Channel) sendLoop() {
func (c *Channel) sendLoop(startWG *sync.WaitGroup) {
defer func() {
c.wg.Done()
c.Close()
}()

startWG.Done()

for {
select {
case <-c.done:
Expand Down

0 comments on commit 90653d4

Please sign in to comment.