From 12af7874fb00cac456ee667c524377e2fceae3a8 Mon Sep 17 00:00:00 2001 From: santong Date: Tue, 27 Jul 2021 22:54:41 +0800 Subject: [PATCH 1/7] fix dead lock Signed-off-by: santong --- scheduler/config/config.go | 2 +- scheduler/types/peer.go | 8 +++++++- scheduler/types/task.go | 2 -- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/scheduler/config/config.go b/scheduler/config/config.go index a57b49bf7bd..4a85f30c2b7 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -113,7 +113,7 @@ func NewDefaultSchedulerConfig() *SchedulerConfig { Scheduler: "basic", CDNLoad: 100, ClientLoad: 10, - OpenMonitor: false, + OpenMonitor: true, GC: NewDefaultGCConfig(), } } diff --git a/scheduler/types/peer.go b/scheduler/types/peer.go index 32d1b2dc5d3..b56b1efc85a 100644 --- a/scheduler/types/peer.go +++ b/scheduler/types/peer.go @@ -114,14 +114,18 @@ func (peer *Peer) Touch() { } func (peer *Peer) associateChild(child *Peer) { + peer.lock.Lock() peer.children.Store(child.PeerID, child) peer.Host.IncUploadLoad() + peer.lock.Unlock() peer.Task.peers.Update(peer) } func (peer *Peer) disassociateChild(child *Peer) { + peer.lock.Lock() peer.children.Delete(child.PeerID) peer.Host.DecUploadLoad() + peer.lock.Unlock() peer.Task.peers.Update(peer) } @@ -159,15 +163,17 @@ func (peer *Peer) GetCost() int { func (peer *Peer) AddPieceInfo(finishedCount int32, cost int) { peer.lock.Lock() - defer peer.lock.Unlock() if finishedCount > peer.finishedNum.Load() { peer.finishedNum.Store(finishedCount) peer.costHistory = append(peer.costHistory, cost) if len(peer.costHistory) > 20 { peer.costHistory = peer.costHistory[len(peer.costHistory)-20:] } + peer.lock.Unlock() peer.Task.peers.Update(peer) + return } + peer.lock.Unlock() } func (peer *Peer) GetDepth() int { diff --git a/scheduler/types/task.go b/scheduler/types/task.go index 2c069ed0e32..6e867c295bf 100644 --- a/scheduler/types/task.go +++ b/scheduler/types/task.go @@ -105,8 +105,6 @@ func (task *Task) GetPiece(pieceNum int32) *PieceInfo { } func (task *Task) AddPeer(peer *Peer) { - task.lock.Lock() - defer task.lock.Unlock() task.peers.UpdateOrAdd(peer) } From 034d776371194f47a6b1b62cba375e545ec365b0 Mon Sep 17 00:00:00 2001 From: santong Date: Wed, 28 Jul 2021 20:22:03 +0800 Subject: [PATCH 2/7] client log chaser and conn manager lock Signed-off-by: santong --- client/daemon/daemon.go | 4 +- client/daemon/peer/peertask_file.go | 2 + client/daemon/peer/peertask_file_callback.go | 8 +- client/daemon/peer/peertask_stream.go | 2 + .../daemon/peer/peertask_stream_callback.go | 8 +- pkg/rpc/client.go | 66 +++++++++------ pkg/rpc/client_util.go | 6 +- pkg/structure/sortedlist/sorted_list.go | 8 +- .../core/evaluator/basic/basic_evaluator.go | 2 +- scheduler/core/monitor.go | 25 +++--- .../core/scheduler/basic/basic_scheduler.go | 83 +++++++++++++++---- scheduler/core/scheduler_service.go | 4 +- scheduler/daemon/peer/manager.go | 1 + scheduler/types/peer.go | 14 ++-- scheduler/types/task.go | 4 +- 15 files changed, 155 insertions(+), 82 deletions(-) diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index eb45d66a0a4..1fb2edffe78 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -111,9 +111,9 @@ func New(opt *config.DaemonOption) (Daemon, error) { PeerId: request.PeerID, }) if er != nil { - logger.Errorf("leave task %s/%s, error: %v", request.TaskID, request.PeerID, er) + logger.Errorf("step4:leave task %s/%s, error: %v", request.TaskID, request.PeerID, er) } else { - logger.Infof("leave task %s/%s state ok", request.TaskID, request.PeerID) + logger.Infof("step4:leave task %s/%s state ok", request.TaskID, request.PeerID) } }) if err != nil { diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 7f481963a3e..59488544ef5 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -89,6 +89,7 @@ func newFilePeerTask(ctx context.Context, // trace register _, regSpan := tracer.Start(ctx, config.SpanRegisterTask) result, err := schedulerClient.RegisterPeerTask(ctx, request) + logger.Infof("step1: peer %s start to register", request.PeerId) regSpan.RecordError(err) regSpan.End() @@ -147,6 +148,7 @@ func newFilePeerTask(ctx context.Context, } peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) + logger.Infof("step2: start report peer %s piece result", request.PeerId) if err != nil { defer span.End() span.RecordError(err) diff --git a/client/daemon/peer/peertask_file_callback.go b/client/daemon/peer/peertask_file_callback.go index db94b9b9298..0963450bbd9 100644 --- a/client/daemon/peer/peertask_file_callback.go +++ b/client/daemon/peer/peertask_file_callback.go @@ -106,9 +106,9 @@ func (p *filePeerTaskCallback) Done(pt Task) error { Code: dfcodes.Success, }) if err != nil { - pt.Log().Errorf("report successful peer result, error: %v", err) + pt.Log().Errorf("step3: report successful peer result, error: %v", err) } else { - pt.Log().Infof("report successful peer result ok") + pt.Log().Infof("step3: report successful peer result ok") } return nil } @@ -131,9 +131,9 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro Code: code, }) if err != nil { - pt.Log().Errorf("report fail peer result, error: %v", err) + pt.Log().Errorf("step3: report fail peer result, error: %v", err) } else { - pt.Log().Infof("report fail peer result ok") + pt.Log().Infof("step3: report fail peer result ok") } return nil } diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 216c777dd5d..40a9bc7bff8 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -72,6 +72,7 @@ func newStreamPeerTask(ctx context.Context, // trace register _, regSpan := tracer.Start(ctx, config.SpanRegisterTask) result, err := schedulerClient.RegisterPeerTask(ctx, request) + logger.Infof("step1: peer %s start to register", request.PeerId) regSpan.RecordError(err) regSpan.End() @@ -128,6 +129,7 @@ func newStreamPeerTask(ctx context.Context, } peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) + logger.Infof("step2: start report peer %s piece result", request.PeerId) if err != nil { defer span.End() span.RecordError(err) diff --git a/client/daemon/peer/peertask_stream_callback.go b/client/daemon/peer/peertask_stream_callback.go index 5455575cb90..00861516678 100644 --- a/client/daemon/peer/peertask_stream_callback.go +++ b/client/daemon/peer/peertask_stream_callback.go @@ -104,9 +104,9 @@ func (p *streamPeerTaskCallback) Done(pt Task) error { Code: dfcodes.Success, }) if err != nil { - pt.Log().Errorf("report successful peer result, error: %v", err) + pt.Log().Errorf("step3: report successful peer result, error: %v", err) } else { - pt.Log().Infof("report successful peer result ok") + pt.Log().Infof("step3: report successful peer result ok") } return nil } @@ -129,9 +129,9 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er Code: code, }) if err != nil { - pt.Log().Errorf("report fail peer result, error: %v", err) + pt.Log().Errorf("step3: report fail peer result, error: %v", err) } else { - pt.Log().Infof("report fail peer result ok") + pt.Log().Infof("step3: report fail peer result ok") } return nil } diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index a8663934a5b..505acd58f82 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -26,7 +26,6 @@ import ( "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/basic/dfnet" - "d7y.io/dragonfly/v2/pkg/synclock" "github.com/pkg/errors" "github.com/serialx/hashring" "google.golang.org/grpc" @@ -55,7 +54,7 @@ type ConnStatus string type Connection struct { ctx context.Context cancelFun context.CancelFunc - rwMutex *synclock.LockerPool + rwMutex sync.RWMutex dialOpts []grpc.DialOption key2NodeMap sync.Map // key -> node(many to one) node2ClientMap sync.Map // node -> clientConn(one to one) @@ -75,11 +74,7 @@ func newDefaultConnection(ctx context.Context) *Connection { return &Connection{ ctx: childCtx, cancelFun: cancel, - rwMutex: synclock.NewLockerPool(), dialOpts: defaultClientOpts, - key2NodeMap: sync.Map{}, - node2ClientMap: sync.Map{}, - accessNodeMap: sync.Map{}, connExpireTime: defaultConnExpireTime, gcConnTimeout: defaultGcConnTimeout, gcConnInterval: defaultGcConnInterval, @@ -169,10 +164,10 @@ func (conn *Connection) CorrectKey2NodeRelation(tmpHashKey, realHashKey string) if tmpHashKey == realHashKey { return } + conn.rwMutex.Lock() + defer conn.rwMutex.Unlock() key, _ := conn.key2NodeMap.Load(tmpHashKey) serverNode := key.(string) - conn.rwMutex.Lock(serverNode, false) - defer conn.rwMutex.UnLock(serverNode, false) conn.key2NodeMap.Store(realHashKey, serverNode) conn.key2NodeMap.Delete(tmpHashKey) } @@ -197,11 +192,11 @@ func (conn *Connection) UpdateAccessNodeMapByServerNode(serverNode string) { } func (conn *Connection) AddServerNodes(addrs []dfnet.NetAddr) error { + conn.rwMutex.Lock() + defer conn.rwMutex.Unlock() for _, addr := range addrs { serverNode := addr.GetEndpoint() - conn.rwMutex.Lock(serverNode, false) conn.hashRing = conn.hashRing.AddNode(serverNode) - conn.rwMutex.UnLock(serverNode, false) logger.With("conn", conn.name).Debugf("success add %s to server node list", addr) } return nil @@ -209,6 +204,21 @@ func (conn *Connection) AddServerNodes(addrs []dfnet.NetAddr) error { // findCandidateClientConn find candidate node client conn other than exclusiveNodes func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...string) (*candidateClient, error) { + if node, ok := conn.key2NodeMap.Load(key); ok { + candidateNode := node.(string) + selected := true + for _, exclusiveNode := range exclusiveNodes { + if exclusiveNode == candidateNode { + selected = false + } + } + if selected { + if client, ok := conn.node2ClientMap.Load(node); ok { + return client.(*candidateClient), nil + } + } + } + ringNodes, ok := conn.hashRing.GetNodes(key, conn.hashRing.Size()) if !ok { logger.Warnf("cannot obtain expected %d server nodes", conn.hashRing.Size()) @@ -231,12 +241,10 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st logger.With("conn", conn.name).Infof("candidate result for hash key %s: all server node list: %v, exclusiveNodes node list: %v, candidate node list: %v", key, ringNodes, exclusiveNodes, candidateNodes) for _, candidateNode := range candidateNodes { - conn.rwMutex.Lock(candidateNode, true) // Check whether there is a corresponding mapping client in the node2ClientMap // TODO 下面部分可以直接调用loadOrCreate方法,但是日志没有这么调用打印全 if client, ok := conn.node2ClientMap.Load(candidateNode); ok { logger.With("conn", conn.name).Infof("hit cache candidateNode %s for hash key %s", candidateNode, key) - conn.rwMutex.UnLock(candidateNode, true) return &candidateClient{ node: candidateNode, Ref: client, @@ -246,7 +254,6 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st clientConn, err := conn.createClient(candidateNode, append(defaultClientOpts, conn.dialOpts...)...) if err == nil { logger.With("conn", conn.name).Infof("success connect to candidateNode %s for hash key %s", candidateNode, key) - conn.rwMutex.UnLock(candidateNode, true) return &candidateClient{ node: candidateNode, Ref: clientConn, @@ -254,7 +261,6 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st } logger.With("conn", conn.name).Infof("failed to connect candidateNode %s for hash key %s: %v", candidateNode, key, err) - conn.rwMutex.UnLock(candidateNode, true) } return nil, dferrors.ErrNoCandidateNode } @@ -273,6 +279,8 @@ func (conn *Connection) createClient(target string, opts ...grpc.DialOption) (*g // GetServerNode func (conn *Connection) GetServerNode(hashKey string) (string, bool) { + conn.rwMutex.RLock() + defer conn.rwMutex.RUnlock() node, ok := conn.key2NodeMap.Load(hashKey) serverNode := node.(string) if ok { @@ -283,8 +291,8 @@ func (conn *Connection) GetServerNode(hashKey string) (string, bool) { func (conn *Connection) GetClientConnByTarget(node string) (*grpc.ClientConn, error) { logger.With("conn", conn.name).Debugf("start to get client conn by target %s", node) - conn.rwMutex.Lock(node, true) - defer conn.rwMutex.UnLock(node, true) + conn.rwMutex.RLock() + defer conn.rwMutex.RUnlock() clientConn, err := conn.loadOrCreateClientConnByNode(node) if err != nil { return nil, errors.Wrapf(err, "get client conn by conn %s", node) @@ -322,30 +330,32 @@ func (conn *Connection) loadOrCreateClientConnByNode(node string) (clientConn *g // stick whether hash key need already associated with specify node func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientConn, error) { logger.With("conn", conn.name).Debugf("start to get client conn hashKey %s, stick %t", hashKey, stick) + conn.rwMutex.RLock() node, ok := conn.key2NodeMap.Load(hashKey) if stick && !ok { + conn.rwMutex.RUnlock() // if request is stateful, hash key must exist in key2NodeMap return nil, fmt.Errorf("it is a stateful request, but cannot find hash key(%s) in key2NodeMap", hashKey) } if ok { // if exist serverNode := node.(string) - conn.rwMutex.Lock(serverNode, true) clientConn, err := conn.loadOrCreateClientConnByNode(serverNode) - conn.rwMutex.UnLock(serverNode, true) + conn.rwMutex.RUnlock() if err != nil { return nil, err } return clientConn, nil } logger.With("conn", conn.name).Infof("no server node associated with hash key %s was found, start find candidate", hashKey) + conn.rwMutex.RUnlock() // if absence + conn.rwMutex.Lock() + defer conn.rwMutex.Unlock() client, err := conn.findCandidateClientConn(hashKey) if err != nil { return nil, errors.Wrapf(err, "prob candidate client conn for hash key %s", hashKey) } - conn.rwMutex.Lock(client.node, false) - defer conn.rwMutex.UnLock(client.node, false) conn.key2NodeMap.Store(hashKey, client.node) conn.node2ClientMap.Store(client.node, client.Ref) conn.accessNodeMap.Store(client.node, time.Now()) @@ -363,19 +373,21 @@ func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []str } } currentNode := "" + conn.rwMutex.RLock() if currentNode, ok := conn.key2NodeMap.Load(key); ok { preNode = currentNode.(string) exclusiveNodes = append(exclusiveNodes, currentNode.(string)) } else { logger.With("conn", conn.name).Warnf("failed to find server node for hash key %s", key) } + conn.rwMutex.RUnlock() + conn.rwMutex.Lock() + defer conn.rwMutex.Unlock() client, err := conn.findCandidateClientConn(key, exclusiveNodes...) if err != nil { return "", errors.Wrapf(err, "find candidate client conn for hash key %s", key) } logger.With("conn", conn.name).Infof("successfully migrate hash key %s from server node %s to %s", key, currentNode, client.node) - conn.rwMutex.Lock(client.node, false) - defer conn.rwMutex.UnLock(client.node, false) conn.key2NodeMap.Store(key, client.node) conn.node2ClientMap.Store(client.node, client.Ref) conn.accessNodeMap.Store(client.node, time.Now()) @@ -383,9 +395,10 @@ func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []str } func (conn *Connection) Close() error { + conn.rwMutex.Lock() + defer conn.rwMutex.Unlock() for i := range conn.serverNodes { serverNode := conn.serverNodes[i].GetEndpoint() - conn.rwMutex.Lock(serverNode, false) conn.hashRing.RemoveNode(serverNode) value, ok := conn.node2ClientMap.Load(serverNode) if ok { @@ -406,19 +419,18 @@ func (conn *Connection) Close() error { return true }) conn.accessNodeMap.Delete(serverNode) - conn.rwMutex.UnLock(serverNode, false) } conn.cancelFun() return nil } func (conn *Connection) UpdateState(addrs []dfnet.NetAddr) { - // TODO lock - conn.serverNodes = addrs var addresses []string for _, addr := range addrs { addresses = append(addresses, addr.GetEndpoint()) } - + conn.rwMutex.Lock() + defer conn.rwMutex.Unlock() + conn.serverNodes = addrs conn.hashRing = hashring.New(addresses) } diff --git a/pkg/rpc/client_util.go b/pkg/rpc/client_util.go index 03324f497c6..d6ddf92e4b8 100644 --- a/pkg/rpc/client_util.go +++ b/pkg/rpc/client_util.go @@ -47,7 +47,7 @@ func (conn *Connection) startGC() { startTime := time.Now() // TODO use anther locker, @santong - //conn.rwMutex.Lock() + conn.rwMutex.Lock() // range all connections and determine whether they are expired conn.accessNodeMap.Range(func(node, accessTime interface{}) bool { serverNode := node.(string) @@ -61,7 +61,7 @@ func (conn *Connection) startGC() { return true }) // TODO use anther locker, @santong - //conn.rwMutex.Unlock() + conn.rwMutex.Unlock() // slow GC detected, report it with a log warning if timeElapse := time.Since(startTime); timeElapse > conn.gcConnTimeout { logger.GrpcLogger.With("conn", conn.name).Warnf("gc %d conns, cost: %.3f seconds", removedConnCount, timeElapse.Seconds()) @@ -81,8 +81,6 @@ func (conn *Connection) startGC() { // gcConn gc keys and clients associated with server node func (conn *Connection) gcConn(node string) { - conn.rwMutex.Lock(node, false) - defer conn.rwMutex.UnLock(node, false) logger.GrpcLogger.With("conn", conn.name).Infof("gc keys and clients associated with server node: %s starting", node) value, ok := conn.node2ClientMap.Load(node) if ok { diff --git a/pkg/structure/sortedlist/sorted_list.go b/pkg/structure/sortedlist/sorted_list.go index 80e168be235..efd8b828f11 100644 --- a/pkg/structure/sortedlist/sorted_list.go +++ b/pkg/structure/sortedlist/sorted_list.go @@ -132,7 +132,9 @@ func (l *SortedList) RangeLimit(limit int, fn func(Item) bool) { } l.l.RLock() defer l.l.RUnlock() - + if len(l.buckets) == 0 { + return + } count := 0 for i := l.left; i <= l.right; i++ { buc := l.buckets[i] @@ -160,7 +162,9 @@ func (l *SortedList) RangeReverseLimit(limit int, fn func(Item) bool) { } l.l.RLock() defer l.l.RUnlock() - + if len(l.buckets) == 0 { + return + } count := 0 for i := l.right; i >= l.left; i-- { for j := len(l.buckets[i].buckets) - 1; j >= 0; j-- { diff --git a/scheduler/core/evaluator/basic/basic_evaluator.go b/scheduler/core/evaluator/basic/basic_evaluator.go index 12f33e3bf1c..cf9e662620e 100644 --- a/scheduler/core/evaluator/basic/basic_evaluator.go +++ b/scheduler/core/evaluator/basic/basic_evaluator.go @@ -82,7 +82,7 @@ func (eval *baseEvaluator) IsBadNode(peer *types.Peer) bool { return true } - if peer.IsWaiting() { + if peer.IsBlocking() { return false } diff --git a/scheduler/core/monitor.go b/scheduler/core/monitor.go index 2d982128cf1..7bf71b14aba 100644 --- a/scheduler/core/monitor.go +++ b/scheduler/core/monitor.go @@ -23,10 +23,10 @@ import ( "strings" "time" - logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/scheduler/daemon" "d7y.io/dragonfly/v2/scheduler/types" "github.com/olekukonko/tablewriter" + "go.uber.org/zap" "k8s.io/client-go/util/workqueue" ) @@ -38,14 +38,16 @@ const ( type monitor struct { downloadMonitorQueue workqueue.DelayingInterface peerManager daemon.PeerMgr - verbose bool done chan struct{} + log *zap.SugaredLogger } func newMonitor(peerManager daemon.PeerMgr) *monitor { + logger, _ := zap.NewDevelopment() return &monitor{ downloadMonitorQueue: workqueue.NewDelayingQueue(), peerManager: peerManager, + log: logger.Sugar(), } } @@ -54,7 +56,7 @@ func (m *monitor) start() { for { select { case <-ticker.C: - logger.Debugf(m.printDebugInfo()) + m.log.Info(m.printDebugInfo()) case <-m.done: return } @@ -71,7 +73,6 @@ func (m *monitor) printDebugInfo() string { buffer := bytes.NewBuffer([]byte{}) table := tablewriter.NewWriter(buffer) table.SetHeader([]string{"PeerID", "URL", "parent node", "status", "start time", "Finished Piece Num", "Finished", "Free Load"}) - m.peerManager.ListPeers().Range(func(key interface{}, value interface{}) (ok bool) { ok = true peer := value.(*types.Peer) @@ -85,7 +86,9 @@ func (m *monitor) printDebugInfo() string { parentNode = peer.GetParent().PeerID } - table.Append([]string{peer.PeerID, peer.Task.URL, parentNode, peer.GetStatus().String(), peer.CreateTime.String(), strconv.Itoa(int(peer.GetFinishedNum())), + table.Append([]string{peer.PeerID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)-5], parentNode, peer.GetStatus().String(), + peer.CreateTime.String(), + strconv.Itoa(int(peer.GetFinishedNum())), strconv.FormatBool(peer.IsSuccess()), strconv.Itoa(peer.Host.GetFreeUploadLoad())}) return }) @@ -117,15 +120,15 @@ func (m *monitor) printDebugInfo() string { printTree(root, nil) } - msg := "============\n" + strings.Join(msgs, "\n") + "\n===============" + msg := "============\n" + strings.Join(append(msgs, strconv.Itoa(table.NumLines())), "\n") + "\n===============" return msg } func (m *monitor) RefreshDownloadMonitor(peer *types.Peer) { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("downloadMonitorWorkingLoop refresh ") + m.log.With("taskID", peer.Task.TaskID, "peerID", peer.PeerID).Debugf("downloadMonitorWorkingLoop refresh ") if !peer.IsRunning() { m.downloadMonitorQueue.AddAfter(peer, time.Second*2) - } else if peer.IsWaiting() { + } else if peer.IsBlocking() { m.downloadMonitorQueue.AddAfter(peer, time.Second*2) } else { delay := time.Millisecond * time.Duration(peer.GetCost()*10) @@ -141,13 +144,13 @@ func (m *monitor) downloadMonitorWorkingLoop() { for { v, shutdown := m.downloadMonitorQueue.Get() if shutdown { - logger.Infof("download monitor working loop closed") + m.log.Infof("download monitor working loop closed") break } //if m.downloadMonitorCallBack != nil { peer := v.(*types.Peer) if peer != nil { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("downloadMonitorWorkingLoop status[%d]", peer.GetStatus()) + m.log.With("taskID", peer.Task.TaskID, "peerID", peer.PeerID).Debugf("downloadMonitorWorkingLoop status[%d]", peer.GetStatus()) if peer.IsSuccess() || peer.Host.CDN { // clear from monitor } else { @@ -158,7 +161,7 @@ func (m *monitor) downloadMonitorWorkingLoop() { //pt.SendError(dferrors.New(dfcodes.SchedPeerGone, "report time out")) } //m.downloadMonitorCallBack(peer) - } else if !peer.IsWaiting() { + } else if !peer.IsBlocking() { //m.downloadMonitorCallBack(peer) } else { if time.Now().After(peer.GetLastAccessTime().Add(PeerForceGoneTimeout)) { diff --git a/scheduler/core/scheduler/basic/basic_scheduler.go b/scheduler/core/scheduler/basic/basic_scheduler.go index 1b952aa4b3c..644354e4e0c 100644 --- a/scheduler/core/scheduler/basic/basic_scheduler.go +++ b/scheduler/core/scheduler/basic/basic_scheduler.go @@ -145,31 +145,52 @@ func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list []*types.Peer) { return s.peerManager.Pick(peer.Task, limit, func(candidateNode *types.Peer) bool { if candidateNode == nil { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("*candidate child peer is not selected because it is nil") + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer is not selected because it is nil") return false } - if candidateNode.IsDone() || candidateNode.IsLeave() || candidateNode == peer { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("*candidate child peer %s is not selected because it is %+v", - candidateNode.PeerID, candidateNode) + if candidateNode.IsDone() { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected because it has done", + candidateNode.PeerID) + return false + } + if candidateNode.IsLeave() { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected because it has left", + candidateNode.PeerID) + return false + } + if candidateNode == peer { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected because it and peer are the same", + candidateNode.PeerID) + return false + } + if peer.GetParent() == candidateNode { + logger.WithTaskAndPeerID(peer.Task.TaskID, + peer.PeerID).Debugf("******candidate child peer %s is not selected because peer's parent is candidate peer", candidateNode.PeerID) + return false + } + if candidateNode.GetFinishedNum() > peer.GetFinishedNum() { + logger.WithTaskAndPeerID(peer.Task.TaskID, + peer.PeerID).Debugf("******candidate child peer %s is not selected because it finished number of download is more than peer's", + candidateNode.PeerID) return false } if candidateNode.Host != nil && candidateNode.Host.CDN { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate child peer %s is not selected because it is a cdn host", candidateNode.PeerID) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected because it is a cdn host", + candidateNode.PeerID) return false } if candidateNode.GetParent() == nil { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate child peer %s is selected because it has not parent", + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is selected because it has not parent", candidateNode.PeerID) return true } if candidateNode.GetParent() != nil && s.evaluator.IsBadNode(candidateNode.GetParent()) { logger.WithTaskAndPeerID(peer.Task.TaskID, - peer.PeerID).Debugf("candidate child peer %s is selected because it has parent and parent status is not health", candidateNode.PeerID) + peer.PeerID).Debugf("******candidate child peer %s is selected because it has parent and parent status is not health", candidateNode.PeerID) return true } - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("=candidate child peer %s is not selected because it is %+v", - candidateNode.PeerID, candidateNode) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected", candidateNode.PeerID) return false }) } @@ -177,17 +198,47 @@ func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list [ func (s *Scheduler) selectCandidateParents(peer *types.Peer, limit int) (list []*types.Peer) { return s.peerManager.PickReverse(peer.Task, limit, func(candidateNode *types.Peer) bool { if candidateNode == nil { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer is not selected because it is nil") + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer is not selected because it is nil") + return false + } + if s.evaluator.IsBadNode(candidateNode) { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it is badNode", + candidateNode.PeerID) return false } - if s.evaluator.IsBadNode(candidateNode) || candidateNode.IsLeave() || candidateNode == peer || candidateNode.Host. - GetFreeUploadLoad() <= 0 { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is not selected because it is %+v", - candidateNode.PeerID, candidateNode) + if candidateNode.IsLeave() { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it has already left", + candidateNode.PeerID) + return false + } + if candidateNode == peer { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it and peer are the same", + candidateNode.PeerID) + return false + } + if candidateNode.GetParent() == peer { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it's parent is peer", + candidateNode.PeerID) + return false + } + if candidateNode.Host.GetFreeUploadLoad() <= 0 { + logger.WithTaskAndPeerID(peer.Task.TaskID, + peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it's free upload load equal to less than zero", + candidateNode.PeerID) + return false + } + if candidateNode.IsWaiting() { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it's status is waiting", + candidateNode.PeerID) + return false + } + if candidateNode.GetFinishedNum() < peer.GetFinishedNum() { + logger.WithTaskAndPeerID(peer.Task.TaskID, + peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it finished number of download is smaller than peer's", + candidateNode.PeerID) return false } - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is selected because it is %+v", - candidateNode.PeerID, candidateNode) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is selected", candidateNode.PeerID) return true }) } diff --git a/scheduler/core/scheduler_service.go b/scheduler/core/scheduler_service.go index 3c16b500ae2..bd4ebeb3968 100644 --- a/scheduler/core/scheduler_service.go +++ b/scheduler/core/scheduler_service.go @@ -127,8 +127,8 @@ func (s *SchedulerService) GenerateTaskID(url string, filter string, meta *base. } func (s *SchedulerService) ScheduleParent(peer *types.Peer) (parent *types.Peer, err error) { - parent, candidates, hasParent := s.sched.ScheduleParent(peer) - logger.Debugf("schedule parent result: parent %v, candidates:%v", parent, candidates) + parent, _, hasParent := s.sched.ScheduleParent(peer) + //logger.Debugf("schedule parent result: parent %v, candidates:%v", parent, candidates) if !hasParent || parent == nil { return nil, errors.Errorf("no parent peer available for peer %v", peer.PeerID) } diff --git a/scheduler/daemon/peer/manager.go b/scheduler/daemon/peer/manager.go index f8e74177e5e..b6e77252df8 100644 --- a/scheduler/daemon/peer/manager.go +++ b/scheduler/daemon/peer/manager.go @@ -72,6 +72,7 @@ func (m *manager) Delete(peerID string) { peer.Task.DeletePeer(peer) if peer.PacketChan != nil { close(peer.PacketChan) + logger.Infof("close peer %s stream", peerID) peer.PacketChan = nil } m.peerMap.Delete(peerID) diff --git a/scheduler/types/peer.go b/scheduler/types/peer.go index b56b1efc85a..0ded10791a7 100644 --- a/scheduler/types/peer.go +++ b/scheduler/types/peer.go @@ -130,8 +130,8 @@ func (peer *Peer) disassociateChild(child *Peer) { } func (peer *Peer) ReplaceParent(parent *Peer) { - peer.lock.Lock() - defer peer.lock.Unlock() + //peer.lock.Lock() + //defer peer.lock.Unlock() oldParent := peer.parent if oldParent != nil { oldParent.disassociateChild(peer) @@ -219,7 +219,7 @@ func (peer *Peer) IsAncestor(ancestor *Peer) bool { return false } -func (peer *Peer) IsWaiting() bool { +func (peer *Peer) IsBlocking() bool { peer.lock.RLock() defer peer.lock.RUnlock() if peer.parent == nil { @@ -241,10 +241,6 @@ func (peer *Peer) getFreeLoad() int { return peer.Host.GetFreeUploadLoad() } -func (peer *Peer) GetFinishNum() int32 { - return peer.finishedNum.Load() -} - func GetDiffPieceNum(src *Peer, dst *Peer) int32 { diff := src.finishedNum.Load() - dst.finishedNum.Load() if diff > 0 { @@ -285,6 +281,10 @@ func (peer *Peer) IsRunning() bool { return peer.status == PeerStatusRunning } +func (peer *Peer) IsWaiting() bool { + return peer.status == PeerStatusWaiting +} + func (peer *Peer) IsSuccess() bool { return peer.status == PeerStatusSuccess } diff --git a/scheduler/types/task.go b/scheduler/types/task.go index 6e867c295bf..df37081f345 100644 --- a/scheduler/types/task.go +++ b/scheduler/types/task.go @@ -109,8 +109,8 @@ func (task *Task) AddPeer(peer *Peer) { } func (task *Task) DeletePeer(peer *Peer) { - task.lock.Lock() - defer task.lock.Unlock() + //task.lock.Lock() + //defer task.lock.Unlock() task.peers.Delete(peer) } From 9e89fb699194419a7c8f1fc9916dd6a3553fcc88 Mon Sep 17 00:00:00 2001 From: santong Date: Wed, 28 Jul 2021 23:29:44 +0800 Subject: [PATCH 3/7] chore: client logs Signed-off-by: santong --- client/daemon/peer/peertask_file.go | 7 ++++--- internal/dfcodes/rpc_code.go | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 59488544ef5..cc83222cd94 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -95,6 +95,7 @@ func newFilePeerTask(ctx context.Context, var backSource bool if err != nil { + logger.Errorf("step1: peer %s register failed: err", request.PeerId, err) // check if it is back source error if de, ok := err.(*dferrors.DfError); ok && de.Code == dfcodes.SchedNeedBackSource { backSource = true @@ -103,18 +104,17 @@ func newFilePeerTask(ctx context.Context, if !backSource { span.RecordError(err) span.End() - logger.Errorf("register peer task failed: %s, peer id: %s", err, request.PeerId) return ctx, nil, nil, err } } if result == nil { defer span.End() span.RecordError(err) - err = errors.Errorf("empty schedule result") + err = errors.Errorf("step1: peer register result is nil") return ctx, nil, nil, err } span.SetAttributes(config.AttributeTaskID.String(result.TaskId)) - logger.Infof("register task success, task id: %s, peer id: %s, SizeScope: %s", + logger.Infof("step1: register task success, task id: %s, peer id: %s, SizeScope: %s", result.TaskId, request.PeerId, base.SizeScope_name[int32(result.SizeScope)]) var singlePiece *scheduler.SinglePiece @@ -150,6 +150,7 @@ func newFilePeerTask(ctx context.Context, peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) logger.Infof("step2: start report peer %s piece result", request.PeerId) if err != nil { + logger.Errorf("step2: peer %s report piece failed: err", request.PeerId, err) defer span.End() span.RecordError(err) return ctx, nil, nil, err diff --git a/internal/dfcodes/rpc_code.go b/internal/dfcodes/rpc_code.go index a7c3d928acd..531fcdff217 100644 --- a/internal/dfcodes/rpc_code.go +++ b/internal/dfcodes/rpc_code.go @@ -41,7 +41,8 @@ const ( ClientRequestLimitFail base.Code = 4006 // scheduler response error 5000-5999 - SchedError base.Code = 5000 + SchedError base.Code = 5000 + /** @deprecated */ SchedNeedBackSource base.Code = 5001 // client should try to download from source SchedPeerGone base.Code = 5002 // client should disconnect from scheduler SchedPeerRegisterFail base.Code = 5003 From 9dca4808894d7a918a940607c7d87540ef2901bd Mon Sep 17 00:00:00 2001 From: santong Date: Thu, 29 Jul 2021 14:10:14 +0800 Subject: [PATCH 4/7] fix concurrent download Signed-off-by: santong --- client/daemon/peer/peertask_base.go | 2 +- client/daemon/service/manager.go | 2 +- scheduler/core/events.go | 14 ++++++--- scheduler/core/monitor.go | 3 +- .../core/scheduler/basic/basic_scheduler.go | 14 ++++----- scheduler/types/peer.go | 29 +++++++++++++++++-- 6 files changed, 48 insertions(+), 16 deletions(-) diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index b439c4e22f1..ce1a68528be 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -715,7 +715,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer } span.AddEvent("retry due to empty pieces", trace.WithAttributes(config.AttributeGetPieceRetry.Int(count))) - pt.Warnf("peer %s returns success but with empty pieces, retry later", peer.PeerId) + pt.Infof("peer %s returns success but with empty pieces, retry later", peer.PeerId) return nil, false, dferrors.ErrEmptyValue } return pp, false, nil diff --git a/client/daemon/service/manager.go b/client/daemon/service/manager.go index 31f0380fb7b..69cc0eed0fd 100644 --- a/client/daemon/service/manager.go +++ b/client/daemon/service/manager.go @@ -105,7 +105,7 @@ func (m *manager) GetPieceTasks(ctx context.Context, request *base.PieceTaskRequ return nil, dferrors.New(code, err.Error()) } - logger.Warnf("try to get piece tasks, "+ + logger.Infof("try to get piece tasks, "+ "but target peer task is initializing, "+ "there is no available pieces, "+ "task id: %s, src peer: %s, dst peer: %s, piece num: %d, limit: %d", diff --git a/scheduler/core/events.go b/scheduler/core/events.go index 979abc1c350..fa74e370a81 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -162,10 +162,8 @@ var _ event = peerDownloadPieceFailEvent{} func (e peerDownloadPieceFailEvent) apply(s *state) { switch e.pr.Code { - case dfcodes.PeerTaskNotFound: - handlePeerLeave(e.peer, s) - return - case dfcodes.ClientPieceRequestFail, dfcodes.ClientPieceDownloadFail: + case dfcodes.PeerTaskNotFound, dfcodes.ClientPieceRequestFail, dfcodes.ClientPieceDownloadFail: + // TODO PeerTaskNotFound remove dest peer task, ClientPieceDownloadFail add blank list handleReplaceParent(e.peer, s) return case dfcodes.CdnTaskNotFound, dfcodes.CdnError, dfcodes.CdnTaskRegistryFail, dfcodes.CdnTaskDownloadFail: @@ -231,6 +229,10 @@ func (e peerDownloadSuccessEvent) apply(s *state) { } child.PacketChan <- constructSuccessPeerPacket(child, e.peer, nil) } + if e.peer.PacketChan != nil { + close(e.peer.PacketChan) + e.peer.PacketChan = nil + } } func (e peerDownloadSuccessEvent) hashKey() string { @@ -262,6 +264,10 @@ func (e peerDownloadFailEvent) apply(s *state) { child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates) return true }) + if e.peer.PacketChan != nil { + close(e.peer.PacketChan) + e.peer.PacketChan = nil + } s.peerManager.Delete(e.peer.PeerID) } diff --git a/scheduler/core/monitor.go b/scheduler/core/monitor.go index 7bf71b14aba..f6b5e6acb98 100644 --- a/scheduler/core/monitor.go +++ b/scheduler/core/monitor.go @@ -43,7 +43,8 @@ type monitor struct { } func newMonitor(peerManager daemon.PeerMgr) *monitor { - logger, _ := zap.NewDevelopment() + config := zap.NewDevelopmentConfig() + logger, _ := config.Build() return &monitor{ downloadMonitorQueue: workqueue.NewDelayingQueue(), peerManager: peerManager, diff --git a/scheduler/core/scheduler/basic/basic_scheduler.go b/scheduler/core/scheduler/basic/basic_scheduler.go index 644354e4e0c..a5f5ccd8f52 100644 --- a/scheduler/core/scheduler/basic/basic_scheduler.go +++ b/scheduler/core/scheduler/basic/basic_scheduler.go @@ -108,8 +108,8 @@ func (s *Scheduler) ScheduleChildren(peer *types.Peer) (children []*types.Peer) func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer, bool) { logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start scheduler parent flow") if !s.evaluator.NeedAdjustParent(peer) { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("peer does not need to replace the parent node, peer is %v and current parent is %v", - peer, peer.GetParent()) + //logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("peer does not need to replace the parent node, peer is %v and current parent is %v", + // peer, peer.GetParent()) if peer.GetParent() == nil { return nil, nil, false } @@ -163,9 +163,9 @@ func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list [ candidateNode.PeerID) return false } - if peer.GetParent() == candidateNode { + if candidateNode.IsAncestorOf(peer) { logger.WithTaskAndPeerID(peer.Task.TaskID, - peer.PeerID).Debugf("******candidate child peer %s is not selected because peer's parent is candidate peer", candidateNode.PeerID) + peer.PeerID).Debugf("******candidate child peer %s is not selected because peer's ancestor is candidate peer", candidateNode.PeerID) return false } if candidateNode.GetFinishedNum() > peer.GetFinishedNum() { @@ -190,7 +190,7 @@ func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list [ peer.PeerID).Debugf("******candidate child peer %s is selected because it has parent and parent status is not health", candidateNode.PeerID) return true } - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected", candidateNode.PeerID) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is selected", candidateNode.PeerID) return false }) } @@ -216,8 +216,8 @@ func (s *Scheduler) selectCandidateParents(peer *types.Peer, limit int) (list [] candidateNode.PeerID) return false } - if candidateNode.GetParent() == peer { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it's parent is peer", + if candidateNode.IsDescendantOf(peer) { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it's ancestor is peer", candidateNode.PeerID) return false } diff --git a/scheduler/types/peer.go b/scheduler/types/peer.go index 0ded10791a7..5d47538ef08 100644 --- a/scheduler/types/peer.go +++ b/scheduler/types/peer.go @@ -202,11 +202,16 @@ func (peer *Peer) GetTreeRoot() *Peer { return node } -// if ancestor is ancestor of peer -func (peer *Peer) IsAncestor(ancestor *Peer) bool { +// if peer is offspring of ancestor +func (peer *Peer) IsDescendantOf(ancestor *Peer) bool { if ancestor == nil { return false } + // TODO avoid circulation + peer.lock.RLock() + ancestor.lock.RLock() + defer ancestor.lock.RUnlock() + defer peer.lock.RUnlock() node := peer for node != nil { if node.parent == nil || node.Host.CDN { @@ -219,6 +224,26 @@ func (peer *Peer) IsAncestor(ancestor *Peer) bool { return false } +func (peer *Peer) IsAncestorOf(offspring *Peer) bool { + if offspring == nil { + return false + } + offspring.lock.RLock() + peer.lock.RLock() + defer peer.lock.RUnlock() + defer offspring.lock.RUnlock() + node := offspring + for node != nil { + if node.parent == nil || node.Host.CDN { + return false + } else if node.PeerID == peer.PeerID { + return true + } + node = node.parent + } + return false +} + func (peer *Peer) IsBlocking() bool { peer.lock.RLock() defer peer.lock.RUnlock() From fe2a9c2f5ead5ea8fe999a402e7b872dd012261e Mon Sep 17 00:00:00 2001 From: santong Date: Thu, 29 Jul 2021 16:17:20 +0800 Subject: [PATCH 5/7] chore: add log Signed-off-by: santong --- cdnsystem/server/service/cdn_seed_server.go | 1 - client/daemon/peer/peertask_base.go | 6 ++++-- scheduler/core/events.go | 17 ++++++++++------- .../core/scheduler/basic/basic_scheduler.go | 9 ++++++--- scheduler/core/scheduler_service.go | 6 ++++-- scheduler/daemon/cdn/d7y/manager.go | 1 + scheduler/server/service/scheduler_server.go | 4 +++- scheduler/types/peer.go | 2 -- scheduler/types/task.go | 5 +++++ 9 files changed, 33 insertions(+), 18 deletions(-) diff --git a/cdnsystem/server/service/cdn_seed_server.go b/cdnsystem/server/service/cdn_seed_server.go index f61824765f0..0d78553fc55 100644 --- a/cdnsystem/server/service/cdn_seed_server.go +++ b/cdnsystem/server/service/cdn_seed_server.go @@ -159,7 +159,6 @@ func (css *CdnSeedServer) GetPieceTasks(ctx context.Context, req *base.PieceTask return nil, dferrors.Newf(dfcodes.BadRequest, "failed to validate seed request for task(%s): %v", req.TaskId, err) } task, err := css.taskMgr.Get(req.TaskId) - logger.Debugf("task: %+v", task) if err != nil { if cdnerrors.IsDataNotFound(err) { return nil, dferrors.Newf(dfcodes.CdnTaskNotFound, "failed to get task(%s) from cdn: %v", req.TaskId, err) diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index ce1a68528be..dd28df4b81d 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -685,7 +685,8 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer span.RecordError(getErr) // fast way to exit retry if curPeerPacket != pt.peerPacket { - pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet", getErr) + pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getErr, + curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId) peerPacketChanged = true return nil, true, nil } @@ -709,7 +710,8 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer } // fast way to exit retry if curPeerPacket != pt.peerPacket { - pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet") + pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", + curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId) peerPacketChanged = true return nil, true, nil } diff --git a/scheduler/core/events.go b/scheduler/core/events.go index fa74e370a81..aee72229c58 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -167,13 +167,16 @@ func (e peerDownloadPieceFailEvent) apply(s *state) { handleReplaceParent(e.peer, s) return case dfcodes.CdnTaskNotFound, dfcodes.CdnError, dfcodes.CdnTaskRegistryFail, dfcodes.CdnTaskDownloadFail: - if err := s.cdnManager.StartSeedTask(context.Background(), e.peer.Task); err != nil { - logger.Errorf("start seed task fail: %v", err) - e.peer.Task.SetStatus(types.TaskStatusFailed) - handleSeedTaskFail(e.peer.Task) - return - } - logger.Debugf("===== successfully obtain seeds from cdn, task: %+v =====", e.peer.Task) + go func(task *types.Task) { + task.SetStatus(types.TaskStatusRunning) + if err := s.cdnManager.StartSeedTask(context.Background(), task); err != nil { + logger.Errorf("start seed task fail: %v", err) + task.SetStatus(types.TaskStatusFailed) + handleSeedTaskFail(task) + return + } + logger.Debugf("===== successfully obtain seeds from cdn, task: %+v =====", e.peer.Task) + }(e.peer.Task) default: handleReplaceParent(e.peer, s) return diff --git a/scheduler/core/scheduler/basic/basic_scheduler.go b/scheduler/core/scheduler/basic/basic_scheduler.go index a5f5ccd8f52..a2309bb6831 100644 --- a/scheduler/core/scheduler/basic/basic_scheduler.go +++ b/scheduler/core/scheduler/basic/basic_scheduler.go @@ -123,9 +123,9 @@ func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer worth := s.evaluator.Evaluate(candidate, peer) // scheduler the same parent, worth reduce a half - if peer.GetParent() != nil && peer.GetParent().PeerID == candidate.PeerID { - worth = worth / 2.0 - } + //if peer.GetParent() != nil && peer.GetParent().PeerID == candidate.PeerID { + // worth = worth / 2.0 + //} if worth > value { value = worth @@ -196,6 +196,9 @@ func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list [ } func (s *Scheduler) selectCandidateParents(peer *types.Peer, limit int) (list []*types.Peer) { + if !peer.Task.CanSchedule() { + return nil + } return s.peerManager.PickReverse(peer.Task, limit, func(candidateNode *types.Peer) bool { if candidateNode == nil { logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer is not selected because it is nil") diff --git a/scheduler/core/scheduler_service.go b/scheduler/core/scheduler_service.go index bd4ebeb3968..df634a59025 100644 --- a/scheduler/core/scheduler_service.go +++ b/scheduler/core/scheduler_service.go @@ -178,10 +178,12 @@ func (s *SchedulerService) GetOrCreateTask(ctx context.Context, task *types.Task // notify peer tasks synclock.Lock(task.TaskID, false) defer synclock.UnLock(task.TaskID, false) - if !task.IsHealth() { + if task.IsHealth() && task.GetLastTriggerTime().Add(s.config.AccessWindow).After(time.Now()) { + return task, nil + } + if task.IsFrozen() { task.SetStatus(types.TaskStatusRunning) } - go func() { if err := s.cdnManager.StartSeedTask(ctx, task); err != nil { if !task.IsSuccess() { diff --git a/scheduler/daemon/cdn/d7y/manager.go b/scheduler/daemon/cdn/d7y/manager.go index 1f67cdc7b45..56788ac9d7c 100644 --- a/scheduler/daemon/cdn/d7y/manager.go +++ b/scheduler/daemon/cdn/d7y/manager.go @@ -145,6 +145,7 @@ func (cm *manager) receivePiece(task *types.Task, stream *client.PieceSeedStream if err != nil || cdnPeer == nil { return err } + task.SetStatus(types.TaskStatusSeeding) cdnPeer.Touch() if piece.Done { task.PieceTotal = piece.TotalPieceCount diff --git a/scheduler/server/service/scheduler_server.go b/scheduler/server/service/scheduler_server.go index f0bc7793565..1391e32c7de 100644 --- a/scheduler/server/service/scheduler_server.go +++ b/scheduler/server/service/scheduler_server.go @@ -86,7 +86,9 @@ func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *schedul } parent, schErr := s.service.ScheduleParent(peer) if schErr != nil { - err = dferrors.Newf(dfcodes.SchedPeerScheduleFail, "failed to schedule peer %v: %v", peer.PeerID, schErr) + resp.SizeScope = base.SizeScope_NORMAL + resp.TaskId = taskID + //err = dferrors.Newf(dfcodes.SchedPeerScheduleFail, "failed to schedule peer %v: %v", peer.PeerID, schErr) return } singlePiece := task.GetPiece(0) diff --git a/scheduler/types/peer.go b/scheduler/types/peer.go index 5d47538ef08..0675ee429ec 100644 --- a/scheduler/types/peer.go +++ b/scheduler/types/peer.go @@ -130,8 +130,6 @@ func (peer *Peer) disassociateChild(child *Peer) { } func (peer *Peer) ReplaceParent(parent *Peer) { - //peer.lock.Lock() - //defer peer.lock.Unlock() oldParent := peer.parent if oldParent != nil { oldParent.disassociateChild(peer) diff --git a/scheduler/types/task.go b/scheduler/types/task.go index df37081f345..29637047f3d 100644 --- a/scheduler/types/task.go +++ b/scheduler/types/task.go @@ -48,6 +48,7 @@ func (status TaskStatus) String() string { const ( TaskStatusWaiting TaskStatus = iota TaskStatusRunning + TaskStatusSeeding TaskStatusSuccess TaskStatusCDNRegisterFail TaskStatusFailed @@ -171,6 +172,10 @@ func (task *Task) IsFrozen() bool { task.status == TaskStatusSourceError || task.status == TaskStatusCDNRegisterFail } +func (task *Task) CanSchedule() bool { + return task.status == TaskStatusSeeding || task.status == TaskStatusSuccess +} + func (task *Task) IsWaiting() bool { return task.status == TaskStatusWaiting } From a35854bd237da27c8b6069fd7f40a20377d41d46 Mon Sep 17 00:00:00 2001 From: santong Date: Thu, 29 Jul 2021 16:24:59 +0800 Subject: [PATCH 6/7] Extends the scheduler connection expiration time Signed-off-by: santong --- pkg/rpc/scheduler/client/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index a715426c8aa..e1bbb6a434b 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -39,7 +39,7 @@ func GetClientByAddr(addrs []dfnet.NetAddr, opts ...grpc.DialOption) (SchedulerC } sc := &schedulerClient{ rpc.NewConnection(context.Background(), "scheduler-static", addrs, []rpc.ConnOption{ - rpc.WithConnExpireTime(5 * time.Minute), + rpc.WithConnExpireTime(30 * time.Minute), rpc.WithDialOption(opts), }), } From 959674d901f4bf1f7a387b5c3dabd2ef82b15460 Mon Sep 17 00:00:00 2001 From: santong Date: Thu, 29 Jul 2021 19:59:20 +0800 Subject: [PATCH 7/7] chore: client step log Signed-off-by: santong --- client/daemon/daemon.go | 4 ++-- client/daemon/peer/peertask_file.go | 12 ++++++------ client/daemon/peer/peertask_file_callback.go | 8 ++++---- client/daemon/peer/peertask_stream.go | 4 ++-- client/daemon/peer/peertask_stream_callback.go | 8 ++++---- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 1fb2edffe78..60f27b1b9b8 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -111,9 +111,9 @@ func New(opt *config.DaemonOption) (Daemon, error) { PeerId: request.PeerID, }) if er != nil { - logger.Errorf("step4:leave task %s/%s, error: %v", request.TaskID, request.PeerID, er) + logger.Errorf("step 4:leave task %s/%s, error: %v", request.TaskID, request.PeerID, er) } else { - logger.Infof("step4:leave task %s/%s state ok", request.TaskID, request.PeerID) + logger.Infof("step 4:leave task %s/%s state ok", request.TaskID, request.PeerID) } }) if err != nil { diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index cc83222cd94..dc6c5e0e6ca 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -89,13 +89,13 @@ func newFilePeerTask(ctx context.Context, // trace register _, regSpan := tracer.Start(ctx, config.SpanRegisterTask) result, err := schedulerClient.RegisterPeerTask(ctx, request) - logger.Infof("step1: peer %s start to register", request.PeerId) + logger.Infof("step 1: peer %s start to register", request.PeerId) regSpan.RecordError(err) regSpan.End() var backSource bool if err != nil { - logger.Errorf("step1: peer %s register failed: err", request.PeerId, err) + logger.Errorf("step 1: peer %s register failed: err", request.PeerId, err) // check if it is back source error if de, ok := err.(*dferrors.DfError); ok && de.Code == dfcodes.SchedNeedBackSource { backSource = true @@ -110,11 +110,11 @@ func newFilePeerTask(ctx context.Context, if result == nil { defer span.End() span.RecordError(err) - err = errors.Errorf("step1: peer register result is nil") + err = errors.Errorf("step 1: peer register result is nil") return ctx, nil, nil, err } span.SetAttributes(config.AttributeTaskID.String(result.TaskId)) - logger.Infof("step1: register task success, task id: %s, peer id: %s, SizeScope: %s", + logger.Infof("step 1: register task success, task id: %s, peer id: %s, SizeScope: %s", result.TaskId, request.PeerId, base.SizeScope_name[int32(result.SizeScope)]) var singlePiece *scheduler.SinglePiece @@ -148,9 +148,9 @@ func newFilePeerTask(ctx context.Context, } peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) - logger.Infof("step2: start report peer %s piece result", request.PeerId) + logger.Infof("step 2: start report peer %s piece result", request.PeerId) if err != nil { - logger.Errorf("step2: peer %s report piece failed: err", request.PeerId, err) + logger.Errorf("step 2: peer %s report piece failed: err", request.PeerId, err) defer span.End() span.RecordError(err) return ctx, nil, nil, err diff --git a/client/daemon/peer/peertask_file_callback.go b/client/daemon/peer/peertask_file_callback.go index 0963450bbd9..ffadc4f64d6 100644 --- a/client/daemon/peer/peertask_file_callback.go +++ b/client/daemon/peer/peertask_file_callback.go @@ -106,9 +106,9 @@ func (p *filePeerTaskCallback) Done(pt Task) error { Code: dfcodes.Success, }) if err != nil { - pt.Log().Errorf("step3: report successful peer result, error: %v", err) + pt.Log().Errorf("step 3: report successful peer result, error: %v", err) } else { - pt.Log().Infof("step3: report successful peer result ok") + pt.Log().Infof("step 3: report successful peer result ok") } return nil } @@ -131,9 +131,9 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro Code: code, }) if err != nil { - pt.Log().Errorf("step3: report fail peer result, error: %v", err) + pt.Log().Errorf("step 3: report fail peer result, error: %v", err) } else { - pt.Log().Infof("step3: report fail peer result ok") + pt.Log().Infof("step 3: report fail peer result ok") } return nil } diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 40a9bc7bff8..8574c5b8977 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -72,7 +72,7 @@ func newStreamPeerTask(ctx context.Context, // trace register _, regSpan := tracer.Start(ctx, config.SpanRegisterTask) result, err := schedulerClient.RegisterPeerTask(ctx, request) - logger.Infof("step1: peer %s start to register", request.PeerId) + logger.Infof("step 1: peer %s start to register", request.PeerId) regSpan.RecordError(err) regSpan.End() @@ -129,7 +129,7 @@ func newStreamPeerTask(ctx context.Context, } peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) - logger.Infof("step2: start report peer %s piece result", request.PeerId) + logger.Infof("step 2: start report peer %s piece result", request.PeerId) if err != nil { defer span.End() span.RecordError(err) diff --git a/client/daemon/peer/peertask_stream_callback.go b/client/daemon/peer/peertask_stream_callback.go index 00861516678..09dcf28fc83 100644 --- a/client/daemon/peer/peertask_stream_callback.go +++ b/client/daemon/peer/peertask_stream_callback.go @@ -104,9 +104,9 @@ func (p *streamPeerTaskCallback) Done(pt Task) error { Code: dfcodes.Success, }) if err != nil { - pt.Log().Errorf("step3: report successful peer result, error: %v", err) + pt.Log().Errorf("step 3: report successful peer result, error: %v", err) } else { - pt.Log().Infof("step3: report successful peer result ok") + pt.Log().Infof("step 3: report successful peer result ok") } return nil } @@ -129,9 +129,9 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er Code: code, }) if err != nil { - pt.Log().Errorf("step3: report fail peer result, error: %v", err) + pt.Log().Errorf("step 3: report fail peer result, error: %v", err) } else { - pt.Log().Infof("step3: report fail peer result ok") + pt.Log().Infof("step 3: report fail peer result ok") } return nil }