Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: peer empty parent #724

Merged
merged 15 commits into from
Oct 13, 2021
1 change: 1 addition & 0 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (p *preheat) CreatePreheat(hostnames []string, json types.CreatePreheatRequ
return nil, errors.New("unknow preheat type")
}

logger.Infof("preheat file count: %d queues: %v", len(files), queues)
return p.createGroupJob(files, queues)
}

Expand Down
1 change: 1 addition & 0 deletions scheduler/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ func handleCDNSeedTaskFail(task *supervisor.Task) {
func removePeerFromCurrentTree(peer *supervisor.Peer, s *state) {
parent, ok := peer.GetParent()
peer.ReplaceParent(nil)

// parent frees up upload resources
if ok {
children := s.sched.ScheduleChildren(parent, sets.NewString(peer.ID))
Expand Down
13 changes: 7 additions & 6 deletions scheduler/supervisor/cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *cdn) receivePiece(ctx context.Context, task *Task, stream *client.Piece
span.SetAttributes(config.AttributePeerDownloadSuccess.Bool(true))
return cdnPeer, nil
}
return cdnPeer, errors.Errorf("cdn stream receive EOF but task status is %s", task.GetStatus())
return nil, errors.Errorf("cdn stream receive EOF but task status is %s", task.GetStatus())
}

span.RecordError(err)
Expand All @@ -137,21 +137,21 @@ func (c *cdn) receivePiece(ctx context.Context, task *Task, stream *client.Piece
if recvErr, ok := err.(*dferrors.DfError); ok {
switch recvErr.Code {
case dfcodes.CdnTaskRegistryFail:
return cdnPeer, errors.Wrapf(ErrCDNRegisterFail, "receive piece")
return nil, errors.Wrapf(ErrCDNRegisterFail, "receive piece")
case dfcodes.CdnTaskDownloadFail:
return cdnPeer, errors.Wrapf(ErrCDNDownloadFail, "receive piece")
return nil, errors.Wrapf(ErrCDNDownloadFail, "receive piece")
default:
return cdnPeer, errors.Wrapf(ErrCDNUnknown, "recive piece")
return nil, errors.Wrapf(ErrCDNUnknown, "recive piece")
}
}
return cdnPeer, errors.Wrapf(ErrCDNInvokeFail, "receive piece from cdn: %v", err)
return nil, errors.Wrapf(ErrCDNInvokeFail, "receive piece from cdn: %v", err)
}

if piece != nil {
logger.Infof("task %s add piece %v", task.ID, piece)
if !initialized {
cdnPeer, err = c.initCDNPeer(ctx, task, piece)
if err != nil || cdnPeer == nil {
if err != nil {
return nil, err
}

Expand All @@ -161,6 +161,7 @@ func (c *cdn) receivePiece(ctx context.Context, task *Task, stream *client.Piece
}
initialized = true
}

span.AddEvent(config.EventCDNPieceReceived, trace.WithAttributes(config.AttributePieceReceived.String(piece.String())))
cdnPeer.Touch()
if piece.Done {
Expand Down
22 changes: 12 additions & 10 deletions scheduler/supervisor/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,6 @@ func (peer *Peer) GetChildren() *sync.Map {
}

func (peer *Peer) SetParent(parent *Peer) {
if parent == nil {
return
}

peer.parent.Store(parent)
}

Expand All @@ -342,7 +338,12 @@ func (peer *Peer) GetParent() (*Peer, bool) {
return nil, false
}

return parent.(*Peer), true
p, ok := parent.(*Peer)
if p == nil || !ok {
return nil, false
}

return p, true
}

func (peer *Peer) Touch() {
Expand Down Expand Up @@ -461,10 +462,6 @@ func (peer *Peer) BindNewConn(stream scheduler.Scheduler_ReportPieceResultServer
}

func (peer *Peer) setConn(conn *Channel) {
if conn == nil {
return
}

peer.conn.Store(conn)
}

Expand All @@ -474,7 +471,12 @@ func (peer *Peer) getConn() (*Channel, bool) {
return nil, false
}

return conn.(*Channel), true
c, ok := conn.(*Channel)
if c == nil || !ok {
return nil, false
}

return c, true
}

func (peer *Peer) IsConnected() bool {
Expand Down