Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler concurrent bug fix #513

Merged
merged 18 commits into from
Aug 2, 2021
Merged
8 changes: 7 additions & 1 deletion client/config/peerhost_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package config
import (
"net"

"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"golang.org/x/time/rate"

"d7y.io/dragonfly/v2/client/clientutil"
Expand All @@ -42,7 +43,12 @@ var peerHostConfig = DaemonOption{
GCInterval: clientutil.Duration{Duration: DefaultGCInterval},
KeepStorage: false,
Scheduler: SchedulerOption{
NetAddrs: nil,
NetAddrs: []dfnet.NetAddr{
{
Type: dfnet.TCP,
Addr: "127.0.0.1:8002",
},
},
ScheduleTimeout: clientutil.Duration{Duration: DefaultScheduleTimeout},
},
Host: HostOption{
Expand Down
4 changes: 2 additions & 2 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer
if getErr != nil {
span.RecordError(getErr)
// fast way to exit retry
if curPeerPacket != pt.peerPacket {
if curPeerPacket.MainPeer.PeerId != pt.peerPacket.MainPeer.PeerId {
pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getErr,
curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId)
peerPacketChanged = true
Expand All @@ -709,7 +709,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer
pt.Errorf("send piece result error: %s, code: %d", peer.PeerId, er)
}
// fast way to exit retry
if curPeerPacket != pt.peerPacket {
if curPeerPacket.MainPeer.PeerId != pt.peerPacket.MainPeer.PeerId {
pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s",
curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId)
peerPacketChanged = true
Expand Down
2 changes: 1 addition & 1 deletion cmd/dependency/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func InitMonitor(verbose bool, pprofPort int, jaeger string) func() {
pprofPort, _ = freeport.GetFreePort()
}

debugAddr := fmt.Sprintf("localhost:%d", pprofPort)
debugAddr := fmt.Sprintf("%s:%d", iputils.HostIP, pprofPort)
viewer.SetConfiguration(viewer.WithAddr(debugAddr))

logger.With("pprof", fmt.Sprintf("http://%s/debug/pprof", debugAddr),
Expand Down
7 changes: 5 additions & 2 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st
}
if selected {
if client, ok := conn.node2ClientMap.Load(node); ok {
return client.(*candidateClient), nil
return &candidateClient{
node: candidateNode,
Ref: client,
}, nil
}
}
}
Expand All @@ -226,7 +229,7 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st
if len(ringNodes) == 0 {
return nil, dferrors.ErrNoCandidateNode
}
candidateNodes := make([]string, 0, 0)
candidateNodes := make([]string, 0)
for _, ringNode := range ringNodes {
candidate := true
for _, exclusiveNode := range exclusiveNodes {
Expand Down
15 changes: 7 additions & 8 deletions pkg/rpc/scheduler/client/peer_packet_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package client
import (
"context"
"io"
"time"

"github.com/pkg/errors"
"google.golang.org/grpc"
Expand Down Expand Up @@ -56,7 +55,7 @@ func newPeerPacketStream(ctx context.Context, sc *schedulerClient, hashKey strin

pps := &peerPacketStream{
sc: sc,
ctx: ctx,
ctx: context.Background(),
hashKey: hashKey,
ptr: ptr,
opts: opts,
Expand Down Expand Up @@ -126,9 +125,9 @@ func (pps *peerPacketStream) retryRecv(cause error) (*scheduler.PeerPacket, erro
if err != nil {
return nil, err
}
timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = client.RegisterPeerTask(timeCtx, pps.ptr)
//timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
//defer cancel()
_, err = client.RegisterPeerTask(pps.ctx, pps.ptr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -198,9 +197,9 @@ func (pps *peerPacketStream) replaceClient(cause error) error {
if err != nil {
return nil, err
}
timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = client.RegisterPeerTask(timeCtx, pps.ptr)
//timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
//defer cancel()
_, err = client.RegisterPeerTask(pps.ctx, pps.ptr)
if err != nil {
return nil, err
}
Expand Down
77 changes: 17 additions & 60 deletions scheduler/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,7 @@ func (s *state) start() {
s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second)
continue
}
if peer.PacketChan == nil {
logger.Errorf("waitScheduleParentPeerQueue: there is no packet chan associated with peer %s", peer.PeerID)
return
}
peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates)
peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates))
logger.WithTaskAndPeerID(peer.Task.TaskID,
peer.PeerID).Debugf("waitScheduleParentPeerQueue: peer has left from waitScheduleParentPeerQueue because it has scheduled new parent %v", parent)
s.waitScheduleParentPeerQueue.Done(v)
Expand All @@ -104,15 +100,11 @@ func (e startReportPieceResultEvent) apply(s *state) {
s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second)
return
}
if e.peer.PacketChan == nil {
logger.Errorf("start report piece result: there is no packet chan associated with peer %s", e.peer.PeerID)
return
}
e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parent, candidates)
e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parent, candidates))
}

func (e startReportPieceResultEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

type peerDownloadPieceSuccessEvent struct {
Expand Down Expand Up @@ -140,17 +132,13 @@ func (e peerDownloadPieceSuccessEvent) apply(s *state) {
if oldParent != nil {
candidates = append(candidates, oldParent)
}
if e.peer.PacketChan == nil {
logger.Errorf("peerDownloadPieceSuccessEvent: there is no packet chan with peer %s", e.peer.PeerID)
return
}
// TODO if parentPeer is equal with oldParent, need schedule again ?
e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parentPeer, candidates)
e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parentPeer, candidates))
return
}

func (e peerDownloadPieceSuccessEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

type peerDownloadPieceFailEvent struct {
Expand Down Expand Up @@ -183,15 +171,15 @@ func (e peerDownloadPieceFailEvent) apply(s *state) {
}
}
func (e peerDownloadPieceFailEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

type peerReplaceParentEvent struct {
peer *types.Peer
}

func (e peerReplaceParentEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

func (e peerReplaceParentEvent) apply(s *state) {
Expand Down Expand Up @@ -226,20 +214,13 @@ func (e peerDownloadSuccessEvent) apply(s *state) {
removePeerFromCurrentTree(e.peer, s)
children := s.sched.ScheduleChildren(e.peer)
for _, child := range children {
if child.PacketChan == nil {
logger.Debugf("reportPeerSuccessResult: there is no packet chan with peer %s", e.peer.PeerID)
continue
}
child.PacketChan <- constructSuccessPeerPacket(child, e.peer, nil)
}
if e.peer.PacketChan != nil {
close(e.peer.PacketChan)
e.peer.PacketChan = nil
child.SendSchedulePacket(constructSuccessPeerPacket(child, e.peer, nil))
}
e.peer.UnBindSendChannel()
}

func (e peerDownloadSuccessEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

type peerDownloadFailEvent struct {
Expand All @@ -260,22 +241,14 @@ func (e peerDownloadFailEvent) apply(s *state) {
s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second)
return true
}
if child.PacketChan == nil {
logger.Warnf("reportPeerFailResult: there is no packet chan associated with peer %s", e.peer.PeerID)
return true
}
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates))
return true
})
if e.peer.PacketChan != nil {
close(e.peer.PacketChan)
e.peer.PacketChan = nil
}
s.peerManager.Delete(e.peer.PeerID)
}

func (e peerDownloadFailEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

type peerLeaveEvent struct {
Expand All @@ -289,7 +262,7 @@ func (e peerLeaveEvent) apply(s *state) {
}

func (e peerLeaveEvent) hashKey() string {
return e.peer.PeerID
return e.peer.Task.TaskID
}

func constructSuccessPeerPacket(peer *types.Peer, parent *types.Peer, candidates []*types.Peer) *schedulerRPC.PeerPacket {
Expand Down Expand Up @@ -337,11 +310,7 @@ func handlePeerLeave(peer *types.Peer, s *state) {
s.waitScheduleParentPeerQueue.AddAfter(child, time.Second)
return true
}
if child.PacketChan == nil {
logger.Debugf("handlePeerLeave: there is no packet chan with peer %s", child.PeerID)
return true
}
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates))
return true
})
s.peerManager.Delete(peer.PeerID)
Expand All @@ -355,22 +324,14 @@ func handleReplaceParent(peer *types.Peer, s *state) {
s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second)
return
}
if peer.PacketChan == nil {
logger.Errorf("handleReplaceParent: there is no packet chan with peer %s", peer.PeerID)
return
}
peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates)
peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates))
}

func handleSeedTaskFail(task *types.Task) {
if task.IsFail() {
task.ListPeers().Range(func(data sortedlist.Item) bool {
peer := data.(*types.Peer)
if peer.PacketChan == nil {
logger.Debugf("taskSeedFailEvent: there is no packet chan with peer %s", peer.PeerID)
return true
}
peer.PacketChan <- constructFailPeerPacket(peer, dfcodes.CdnError)
peer.SendSchedulePacket(constructFailPeerPacket(peer, dfcodes.CdnError))
return true
})
}
Expand All @@ -383,11 +344,7 @@ func removePeerFromCurrentTree(peer *types.Peer, s *state) {
if parent != nil {
children := s.sched.ScheduleChildren(parent)
for _, child := range children {
if child.PacketChan == nil {
logger.Debugf("removePeerFromCurrentTree: there is no packet chan with peer %s", peer.PeerID)
continue
}
child.PacketChan <- constructSuccessPeerPacket(child, peer, nil)
child.SendSchedulePacket(constructSuccessPeerPacket(child, peer, nil))
}
}
}
2 changes: 1 addition & 1 deletion scheduler/core/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (m *monitor) printDebugInfo() string {
parentNode = peer.GetParent().PeerID
}

table.Append([]string{peer.PeerID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)-5], parentNode, peer.GetStatus().String(),
table.Append([]string{peer.PeerID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)], parentNode, peer.GetStatus().String(),
peer.CreateTime.String(),
strconv.Itoa(int(peer.GetFinishedNum())),
strconv.FormatBool(peer.IsSuccess()), strconv.Itoa(peer.Host.GetFreeUploadLoad())})
Expand Down
3 changes: 3 additions & 0 deletions scheduler/core/scheduler_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ func (s *SchedulerService) GetOrCreateTask(ctx context.Context, task *types.Task
if task.IsFrozen() {
task.SetStatus(types.TaskStatusRunning)
}
//if s.config.DisableCDN {
// TODO NeedBackSource
//}
go func() {
if err := s.cdnManager.StartSeedTask(ctx, task); err != nil {
if !task.IsSuccess() {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (wg *workerGroup) send(e event) bool {

func (wg *workerGroup) stop() {
close(wg.stopCh)
wg.s.start()
wg.s.stop()
for _, worker := range wg.workerList {
worker.stop()
}
Expand Down
7 changes: 4 additions & 3 deletions scheduler/daemon/cdn/d7y/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (cm *manager) StartSeedTask(ctx context.Context, task *types.Task) error {
}

func (cm *manager) receivePiece(task *types.Task, stream *client.PieceSeedStream) error {
var once sync.Once
var initialized bool
var cdnPeer *types.Peer
for {
piece, err := stream.Recv()
Expand All @@ -138,9 +138,10 @@ func (cm *manager) receivePiece(task *types.Task, stream *client.PieceSeedStream
return errors.Wrapf(ErrCDNInvokeFail, "receive piece from cdn: %v", err)
}
if piece != nil {
once.Do(func() {
if !initialized {
cdnPeer, err = cm.initCdnPeer(task, piece)
})
initialized = true
}
if err != nil || cdnPeer == nil {
return err
}
Expand Down
6 changes: 1 addition & 5 deletions scheduler/daemon/peer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ func (m *manager) Delete(peerID string) {
if ok {
peer.Host.DeletePeer(peerID)
peer.Task.DeletePeer(peer)
if peer.PacketChan != nil {
close(peer.PacketChan)
logger.Infof("close peer %s stream", peerID)
peer.PacketChan = nil
}
peer.UnBindSendChannel()
m.peerMap.Delete(peerID)
}
return
Expand Down
Loading