From 12af7874fb00cac456ee667c524377e2fceae3a8 Mon Sep 17 00:00:00 2001 From: santong Date: Tue, 27 Jul 2021 22:54:41 +0800 Subject: [PATCH 01/16] fix dead lock Signed-off-by: santong --- scheduler/config/config.go | 2 +- scheduler/types/peer.go | 8 +++++++- scheduler/types/task.go | 2 -- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/scheduler/config/config.go b/scheduler/config/config.go index a57b49bf7bd..4a85f30c2b7 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -113,7 +113,7 @@ func NewDefaultSchedulerConfig() *SchedulerConfig { Scheduler: "basic", CDNLoad: 100, ClientLoad: 10, - OpenMonitor: false, + OpenMonitor: true, GC: NewDefaultGCConfig(), } } diff --git a/scheduler/types/peer.go b/scheduler/types/peer.go index 32d1b2dc5d3..b56b1efc85a 100644 --- a/scheduler/types/peer.go +++ b/scheduler/types/peer.go @@ -114,14 +114,18 @@ func (peer *Peer) Touch() { } func (peer *Peer) associateChild(child *Peer) { + peer.lock.Lock() peer.children.Store(child.PeerID, child) peer.Host.IncUploadLoad() + peer.lock.Unlock() peer.Task.peers.Update(peer) } func (peer *Peer) disassociateChild(child *Peer) { + peer.lock.Lock() peer.children.Delete(child.PeerID) peer.Host.DecUploadLoad() + peer.lock.Unlock() peer.Task.peers.Update(peer) } @@ -159,15 +163,17 @@ func (peer *Peer) GetCost() int { func (peer *Peer) AddPieceInfo(finishedCount int32, cost int) { peer.lock.Lock() - defer peer.lock.Unlock() if finishedCount > peer.finishedNum.Load() { peer.finishedNum.Store(finishedCount) peer.costHistory = append(peer.costHistory, cost) if len(peer.costHistory) > 20 { peer.costHistory = peer.costHistory[len(peer.costHistory)-20:] } + peer.lock.Unlock() peer.Task.peers.Update(peer) + return } + peer.lock.Unlock() } func (peer *Peer) GetDepth() int { diff --git a/scheduler/types/task.go b/scheduler/types/task.go index 2c069ed0e32..6e867c295bf 100644 --- a/scheduler/types/task.go +++ b/scheduler/types/task.go @@ -105,8 +105,6 @@ func (task *Task) GetPiece(pieceNum int32) *PieceInfo { } func (task *Task) AddPeer(peer *Peer) { - task.lock.Lock() - defer task.lock.Unlock() task.peers.UpdateOrAdd(peer) } From 034d776371194f47a6b1b62cba375e545ec365b0 Mon Sep 17 00:00:00 2001 From: santong Date: Wed, 28 Jul 2021 20:22:03 +0800 Subject: [PATCH 02/16] client log chaser and conn manager lock Signed-off-by: santong --- client/daemon/daemon.go | 4 +- client/daemon/peer/peertask_file.go | 2 + client/daemon/peer/peertask_file_callback.go | 8 +- client/daemon/peer/peertask_stream.go | 2 + .../daemon/peer/peertask_stream_callback.go | 8 +- pkg/rpc/client.go | 66 +++++++++------ pkg/rpc/client_util.go | 6 +- pkg/structure/sortedlist/sorted_list.go | 8 +- .../core/evaluator/basic/basic_evaluator.go | 2 +- scheduler/core/monitor.go | 25 +++--- .../core/scheduler/basic/basic_scheduler.go | 83 +++++++++++++++---- scheduler/core/scheduler_service.go | 4 +- scheduler/daemon/peer/manager.go | 1 + scheduler/types/peer.go | 14 ++-- scheduler/types/task.go | 4 +- 15 files changed, 155 insertions(+), 82 deletions(-) diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index eb45d66a0a4..1fb2edffe78 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -111,9 +111,9 @@ func New(opt *config.DaemonOption) (Daemon, error) { PeerId: request.PeerID, }) if er != nil { - logger.Errorf("leave task %s/%s, error: %v", request.TaskID, request.PeerID, er) + logger.Errorf("step4:leave task %s/%s, error: %v", request.TaskID, request.PeerID, er) } else { - logger.Infof("leave task %s/%s state ok", request.TaskID, request.PeerID) + logger.Infof("step4:leave task %s/%s state ok", request.TaskID, request.PeerID) } }) if err != nil { diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 7f481963a3e..59488544ef5 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -89,6 +89,7 @@ func newFilePeerTask(ctx context.Context, // trace register _, regSpan := tracer.Start(ctx, config.SpanRegisterTask) result, err := schedulerClient.RegisterPeerTask(ctx, request) + logger.Infof("step1: peer %s start to register", request.PeerId) regSpan.RecordError(err) regSpan.End() @@ -147,6 +148,7 @@ func newFilePeerTask(ctx context.Context, } peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) + logger.Infof("step2: start report peer %s piece result", request.PeerId) if err != nil { defer span.End() span.RecordError(err) diff --git a/client/daemon/peer/peertask_file_callback.go b/client/daemon/peer/peertask_file_callback.go index db94b9b9298..0963450bbd9 100644 --- a/client/daemon/peer/peertask_file_callback.go +++ b/client/daemon/peer/peertask_file_callback.go @@ -106,9 +106,9 @@ func (p *filePeerTaskCallback) Done(pt Task) error { Code: dfcodes.Success, }) if err != nil { - pt.Log().Errorf("report successful peer result, error: %v", err) + pt.Log().Errorf("step3: report successful peer result, error: %v", err) } else { - pt.Log().Infof("report successful peer result ok") + pt.Log().Infof("step3: report successful peer result ok") } return nil } @@ -131,9 +131,9 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro Code: code, }) if err != nil { - pt.Log().Errorf("report fail peer result, error: %v", err) + pt.Log().Errorf("step3: report fail peer result, error: %v", err) } else { - pt.Log().Infof("report fail peer result ok") + pt.Log().Infof("step3: report fail peer result ok") } return nil } diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 216c777dd5d..40a9bc7bff8 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -72,6 +72,7 @@ func newStreamPeerTask(ctx context.Context, // trace register _, regSpan := tracer.Start(ctx, config.SpanRegisterTask) result, err := schedulerClient.RegisterPeerTask(ctx, request) + logger.Infof("step1: peer %s start to register", request.PeerId) regSpan.RecordError(err) regSpan.End() @@ -128,6 +129,7 @@ func newStreamPeerTask(ctx context.Context, } peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) + logger.Infof("step2: start report peer %s piece result", request.PeerId) if err != nil { defer span.End() span.RecordError(err) diff --git a/client/daemon/peer/peertask_stream_callback.go b/client/daemon/peer/peertask_stream_callback.go index 5455575cb90..00861516678 100644 --- a/client/daemon/peer/peertask_stream_callback.go +++ b/client/daemon/peer/peertask_stream_callback.go @@ -104,9 +104,9 @@ func (p *streamPeerTaskCallback) Done(pt Task) error { Code: dfcodes.Success, }) if err != nil { - pt.Log().Errorf("report successful peer result, error: %v", err) + pt.Log().Errorf("step3: report successful peer result, error: %v", err) } else { - pt.Log().Infof("report successful peer result ok") + pt.Log().Infof("step3: report successful peer result ok") } return nil } @@ -129,9 +129,9 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er Code: code, }) if err != nil { - pt.Log().Errorf("report fail peer result, error: %v", err) + pt.Log().Errorf("step3: report fail peer result, error: %v", err) } else { - pt.Log().Infof("report fail peer result ok") + pt.Log().Infof("step3: report fail peer result ok") } return nil } diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index a8663934a5b..505acd58f82 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -26,7 +26,6 @@ import ( "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/basic/dfnet" - "d7y.io/dragonfly/v2/pkg/synclock" "github.com/pkg/errors" "github.com/serialx/hashring" "google.golang.org/grpc" @@ -55,7 +54,7 @@ type ConnStatus string type Connection struct { ctx context.Context cancelFun context.CancelFunc - rwMutex *synclock.LockerPool + rwMutex sync.RWMutex dialOpts []grpc.DialOption key2NodeMap sync.Map // key -> node(many to one) node2ClientMap sync.Map // node -> clientConn(one to one) @@ -75,11 +74,7 @@ func newDefaultConnection(ctx context.Context) *Connection { return &Connection{ ctx: childCtx, cancelFun: cancel, - rwMutex: synclock.NewLockerPool(), dialOpts: defaultClientOpts, - key2NodeMap: sync.Map{}, - node2ClientMap: sync.Map{}, - accessNodeMap: sync.Map{}, connExpireTime: defaultConnExpireTime, gcConnTimeout: defaultGcConnTimeout, gcConnInterval: defaultGcConnInterval, @@ -169,10 +164,10 @@ func (conn *Connection) CorrectKey2NodeRelation(tmpHashKey, realHashKey string) if tmpHashKey == realHashKey { return } + conn.rwMutex.Lock() + defer conn.rwMutex.Unlock() key, _ := conn.key2NodeMap.Load(tmpHashKey) serverNode := key.(string) - conn.rwMutex.Lock(serverNode, false) - defer conn.rwMutex.UnLock(serverNode, false) conn.key2NodeMap.Store(realHashKey, serverNode) conn.key2NodeMap.Delete(tmpHashKey) } @@ -197,11 +192,11 @@ func (conn *Connection) UpdateAccessNodeMapByServerNode(serverNode string) { } func (conn *Connection) AddServerNodes(addrs []dfnet.NetAddr) error { + conn.rwMutex.Lock() + defer conn.rwMutex.Unlock() for _, addr := range addrs { serverNode := addr.GetEndpoint() - conn.rwMutex.Lock(serverNode, false) conn.hashRing = conn.hashRing.AddNode(serverNode) - conn.rwMutex.UnLock(serverNode, false) logger.With("conn", conn.name).Debugf("success add %s to server node list", addr) } return nil @@ -209,6 +204,21 @@ func (conn *Connection) AddServerNodes(addrs []dfnet.NetAddr) error { // findCandidateClientConn find candidate node client conn other than exclusiveNodes func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...string) (*candidateClient, error) { + if node, ok := conn.key2NodeMap.Load(key); ok { + candidateNode := node.(string) + selected := true + for _, exclusiveNode := range exclusiveNodes { + if exclusiveNode == candidateNode { + selected = false + } + } + if selected { + if client, ok := conn.node2ClientMap.Load(node); ok { + return client.(*candidateClient), nil + } + } + } + ringNodes, ok := conn.hashRing.GetNodes(key, conn.hashRing.Size()) if !ok { logger.Warnf("cannot obtain expected %d server nodes", conn.hashRing.Size()) @@ -231,12 +241,10 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st logger.With("conn", conn.name).Infof("candidate result for hash key %s: all server node list: %v, exclusiveNodes node list: %v, candidate node list: %v", key, ringNodes, exclusiveNodes, candidateNodes) for _, candidateNode := range candidateNodes { - conn.rwMutex.Lock(candidateNode, true) // Check whether there is a corresponding mapping client in the node2ClientMap // TODO 下面部分可以直接调用loadOrCreate方法,但是日志没有这么调用打印全 if client, ok := conn.node2ClientMap.Load(candidateNode); ok { logger.With("conn", conn.name).Infof("hit cache candidateNode %s for hash key %s", candidateNode, key) - conn.rwMutex.UnLock(candidateNode, true) return &candidateClient{ node: candidateNode, Ref: client, @@ -246,7 +254,6 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st clientConn, err := conn.createClient(candidateNode, append(defaultClientOpts, conn.dialOpts...)...) if err == nil { logger.With("conn", conn.name).Infof("success connect to candidateNode %s for hash key %s", candidateNode, key) - conn.rwMutex.UnLock(candidateNode, true) return &candidateClient{ node: candidateNode, Ref: clientConn, @@ -254,7 +261,6 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st } logger.With("conn", conn.name).Infof("failed to connect candidateNode %s for hash key %s: %v", candidateNode, key, err) - conn.rwMutex.UnLock(candidateNode, true) } return nil, dferrors.ErrNoCandidateNode } @@ -273,6 +279,8 @@ func (conn *Connection) createClient(target string, opts ...grpc.DialOption) (*g // GetServerNode func (conn *Connection) GetServerNode(hashKey string) (string, bool) { + conn.rwMutex.RLock() + defer conn.rwMutex.RUnlock() node, ok := conn.key2NodeMap.Load(hashKey) serverNode := node.(string) if ok { @@ -283,8 +291,8 @@ func (conn *Connection) GetServerNode(hashKey string) (string, bool) { func (conn *Connection) GetClientConnByTarget(node string) (*grpc.ClientConn, error) { logger.With("conn", conn.name).Debugf("start to get client conn by target %s", node) - conn.rwMutex.Lock(node, true) - defer conn.rwMutex.UnLock(node, true) + conn.rwMutex.RLock() + defer conn.rwMutex.RUnlock() clientConn, err := conn.loadOrCreateClientConnByNode(node) if err != nil { return nil, errors.Wrapf(err, "get client conn by conn %s", node) @@ -322,30 +330,32 @@ func (conn *Connection) loadOrCreateClientConnByNode(node string) (clientConn *g // stick whether hash key need already associated with specify node func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientConn, error) { logger.With("conn", conn.name).Debugf("start to get client conn hashKey %s, stick %t", hashKey, stick) + conn.rwMutex.RLock() node, ok := conn.key2NodeMap.Load(hashKey) if stick && !ok { + conn.rwMutex.RUnlock() // if request is stateful, hash key must exist in key2NodeMap return nil, fmt.Errorf("it is a stateful request, but cannot find hash key(%s) in key2NodeMap", hashKey) } if ok { // if exist serverNode := node.(string) - conn.rwMutex.Lock(serverNode, true) clientConn, err := conn.loadOrCreateClientConnByNode(serverNode) - conn.rwMutex.UnLock(serverNode, true) + conn.rwMutex.RUnlock() if err != nil { return nil, err } return clientConn, nil } logger.With("conn", conn.name).Infof("no server node associated with hash key %s was found, start find candidate", hashKey) + conn.rwMutex.RUnlock() // if absence + conn.rwMutex.Lock() + defer conn.rwMutex.Unlock() client, err := conn.findCandidateClientConn(hashKey) if err != nil { return nil, errors.Wrapf(err, "prob candidate client conn for hash key %s", hashKey) } - conn.rwMutex.Lock(client.node, false) - defer conn.rwMutex.UnLock(client.node, false) conn.key2NodeMap.Store(hashKey, client.node) conn.node2ClientMap.Store(client.node, client.Ref) conn.accessNodeMap.Store(client.node, time.Now()) @@ -363,19 +373,21 @@ func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []str } } currentNode := "" + conn.rwMutex.RLock() if currentNode, ok := conn.key2NodeMap.Load(key); ok { preNode = currentNode.(string) exclusiveNodes = append(exclusiveNodes, currentNode.(string)) } else { logger.With("conn", conn.name).Warnf("failed to find server node for hash key %s", key) } + conn.rwMutex.RUnlock() + conn.rwMutex.Lock() + defer conn.rwMutex.Unlock() client, err := conn.findCandidateClientConn(key, exclusiveNodes...) if err != nil { return "", errors.Wrapf(err, "find candidate client conn for hash key %s", key) } logger.With("conn", conn.name).Infof("successfully migrate hash key %s from server node %s to %s", key, currentNode, client.node) - conn.rwMutex.Lock(client.node, false) - defer conn.rwMutex.UnLock(client.node, false) conn.key2NodeMap.Store(key, client.node) conn.node2ClientMap.Store(client.node, client.Ref) conn.accessNodeMap.Store(client.node, time.Now()) @@ -383,9 +395,10 @@ func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []str } func (conn *Connection) Close() error { + conn.rwMutex.Lock() + defer conn.rwMutex.Unlock() for i := range conn.serverNodes { serverNode := conn.serverNodes[i].GetEndpoint() - conn.rwMutex.Lock(serverNode, false) conn.hashRing.RemoveNode(serverNode) value, ok := conn.node2ClientMap.Load(serverNode) if ok { @@ -406,19 +419,18 @@ func (conn *Connection) Close() error { return true }) conn.accessNodeMap.Delete(serverNode) - conn.rwMutex.UnLock(serverNode, false) } conn.cancelFun() return nil } func (conn *Connection) UpdateState(addrs []dfnet.NetAddr) { - // TODO lock - conn.serverNodes = addrs var addresses []string for _, addr := range addrs { addresses = append(addresses, addr.GetEndpoint()) } - + conn.rwMutex.Lock() + defer conn.rwMutex.Unlock() + conn.serverNodes = addrs conn.hashRing = hashring.New(addresses) } diff --git a/pkg/rpc/client_util.go b/pkg/rpc/client_util.go index 03324f497c6..d6ddf92e4b8 100644 --- a/pkg/rpc/client_util.go +++ b/pkg/rpc/client_util.go @@ -47,7 +47,7 @@ func (conn *Connection) startGC() { startTime := time.Now() // TODO use anther locker, @santong - //conn.rwMutex.Lock() + conn.rwMutex.Lock() // range all connections and determine whether they are expired conn.accessNodeMap.Range(func(node, accessTime interface{}) bool { serverNode := node.(string) @@ -61,7 +61,7 @@ func (conn *Connection) startGC() { return true }) // TODO use anther locker, @santong - //conn.rwMutex.Unlock() + conn.rwMutex.Unlock() // slow GC detected, report it with a log warning if timeElapse := time.Since(startTime); timeElapse > conn.gcConnTimeout { logger.GrpcLogger.With("conn", conn.name).Warnf("gc %d conns, cost: %.3f seconds", removedConnCount, timeElapse.Seconds()) @@ -81,8 +81,6 @@ func (conn *Connection) startGC() { // gcConn gc keys and clients associated with server node func (conn *Connection) gcConn(node string) { - conn.rwMutex.Lock(node, false) - defer conn.rwMutex.UnLock(node, false) logger.GrpcLogger.With("conn", conn.name).Infof("gc keys and clients associated with server node: %s starting", node) value, ok := conn.node2ClientMap.Load(node) if ok { diff --git a/pkg/structure/sortedlist/sorted_list.go b/pkg/structure/sortedlist/sorted_list.go index 80e168be235..efd8b828f11 100644 --- a/pkg/structure/sortedlist/sorted_list.go +++ b/pkg/structure/sortedlist/sorted_list.go @@ -132,7 +132,9 @@ func (l *SortedList) RangeLimit(limit int, fn func(Item) bool) { } l.l.RLock() defer l.l.RUnlock() - + if len(l.buckets) == 0 { + return + } count := 0 for i := l.left; i <= l.right; i++ { buc := l.buckets[i] @@ -160,7 +162,9 @@ func (l *SortedList) RangeReverseLimit(limit int, fn func(Item) bool) { } l.l.RLock() defer l.l.RUnlock() - + if len(l.buckets) == 0 { + return + } count := 0 for i := l.right; i >= l.left; i-- { for j := len(l.buckets[i].buckets) - 1; j >= 0; j-- { diff --git a/scheduler/core/evaluator/basic/basic_evaluator.go b/scheduler/core/evaluator/basic/basic_evaluator.go index 12f33e3bf1c..cf9e662620e 100644 --- a/scheduler/core/evaluator/basic/basic_evaluator.go +++ b/scheduler/core/evaluator/basic/basic_evaluator.go @@ -82,7 +82,7 @@ func (eval *baseEvaluator) IsBadNode(peer *types.Peer) bool { return true } - if peer.IsWaiting() { + if peer.IsBlocking() { return false } diff --git a/scheduler/core/monitor.go b/scheduler/core/monitor.go index 2d982128cf1..7bf71b14aba 100644 --- a/scheduler/core/monitor.go +++ b/scheduler/core/monitor.go @@ -23,10 +23,10 @@ import ( "strings" "time" - logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/scheduler/daemon" "d7y.io/dragonfly/v2/scheduler/types" "github.com/olekukonko/tablewriter" + "go.uber.org/zap" "k8s.io/client-go/util/workqueue" ) @@ -38,14 +38,16 @@ const ( type monitor struct { downloadMonitorQueue workqueue.DelayingInterface peerManager daemon.PeerMgr - verbose bool done chan struct{} + log *zap.SugaredLogger } func newMonitor(peerManager daemon.PeerMgr) *monitor { + logger, _ := zap.NewDevelopment() return &monitor{ downloadMonitorQueue: workqueue.NewDelayingQueue(), peerManager: peerManager, + log: logger.Sugar(), } } @@ -54,7 +56,7 @@ func (m *monitor) start() { for { select { case <-ticker.C: - logger.Debugf(m.printDebugInfo()) + m.log.Info(m.printDebugInfo()) case <-m.done: return } @@ -71,7 +73,6 @@ func (m *monitor) printDebugInfo() string { buffer := bytes.NewBuffer([]byte{}) table := tablewriter.NewWriter(buffer) table.SetHeader([]string{"PeerID", "URL", "parent node", "status", "start time", "Finished Piece Num", "Finished", "Free Load"}) - m.peerManager.ListPeers().Range(func(key interface{}, value interface{}) (ok bool) { ok = true peer := value.(*types.Peer) @@ -85,7 +86,9 @@ func (m *monitor) printDebugInfo() string { parentNode = peer.GetParent().PeerID } - table.Append([]string{peer.PeerID, peer.Task.URL, parentNode, peer.GetStatus().String(), peer.CreateTime.String(), strconv.Itoa(int(peer.GetFinishedNum())), + table.Append([]string{peer.PeerID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)-5], parentNode, peer.GetStatus().String(), + peer.CreateTime.String(), + strconv.Itoa(int(peer.GetFinishedNum())), strconv.FormatBool(peer.IsSuccess()), strconv.Itoa(peer.Host.GetFreeUploadLoad())}) return }) @@ -117,15 +120,15 @@ func (m *monitor) printDebugInfo() string { printTree(root, nil) } - msg := "============\n" + strings.Join(msgs, "\n") + "\n===============" + msg := "============\n" + strings.Join(append(msgs, strconv.Itoa(table.NumLines())), "\n") + "\n===============" return msg } func (m *monitor) RefreshDownloadMonitor(peer *types.Peer) { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("downloadMonitorWorkingLoop refresh ") + m.log.With("taskID", peer.Task.TaskID, "peerID", peer.PeerID).Debugf("downloadMonitorWorkingLoop refresh ") if !peer.IsRunning() { m.downloadMonitorQueue.AddAfter(peer, time.Second*2) - } else if peer.IsWaiting() { + } else if peer.IsBlocking() { m.downloadMonitorQueue.AddAfter(peer, time.Second*2) } else { delay := time.Millisecond * time.Duration(peer.GetCost()*10) @@ -141,13 +144,13 @@ func (m *monitor) downloadMonitorWorkingLoop() { for { v, shutdown := m.downloadMonitorQueue.Get() if shutdown { - logger.Infof("download monitor working loop closed") + m.log.Infof("download monitor working loop closed") break } //if m.downloadMonitorCallBack != nil { peer := v.(*types.Peer) if peer != nil { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("downloadMonitorWorkingLoop status[%d]", peer.GetStatus()) + m.log.With("taskID", peer.Task.TaskID, "peerID", peer.PeerID).Debugf("downloadMonitorWorkingLoop status[%d]", peer.GetStatus()) if peer.IsSuccess() || peer.Host.CDN { // clear from monitor } else { @@ -158,7 +161,7 @@ func (m *monitor) downloadMonitorWorkingLoop() { //pt.SendError(dferrors.New(dfcodes.SchedPeerGone, "report time out")) } //m.downloadMonitorCallBack(peer) - } else if !peer.IsWaiting() { + } else if !peer.IsBlocking() { //m.downloadMonitorCallBack(peer) } else { if time.Now().After(peer.GetLastAccessTime().Add(PeerForceGoneTimeout)) { diff --git a/scheduler/core/scheduler/basic/basic_scheduler.go b/scheduler/core/scheduler/basic/basic_scheduler.go index 1b952aa4b3c..644354e4e0c 100644 --- a/scheduler/core/scheduler/basic/basic_scheduler.go +++ b/scheduler/core/scheduler/basic/basic_scheduler.go @@ -145,31 +145,52 @@ func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list []*types.Peer) { return s.peerManager.Pick(peer.Task, limit, func(candidateNode *types.Peer) bool { if candidateNode == nil { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("*candidate child peer is not selected because it is nil") + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer is not selected because it is nil") return false } - if candidateNode.IsDone() || candidateNode.IsLeave() || candidateNode == peer { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("*candidate child peer %s is not selected because it is %+v", - candidateNode.PeerID, candidateNode) + if candidateNode.IsDone() { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected because it has done", + candidateNode.PeerID) + return false + } + if candidateNode.IsLeave() { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected because it has left", + candidateNode.PeerID) + return false + } + if candidateNode == peer { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected because it and peer are the same", + candidateNode.PeerID) + return false + } + if peer.GetParent() == candidateNode { + logger.WithTaskAndPeerID(peer.Task.TaskID, + peer.PeerID).Debugf("******candidate child peer %s is not selected because peer's parent is candidate peer", candidateNode.PeerID) + return false + } + if candidateNode.GetFinishedNum() > peer.GetFinishedNum() { + logger.WithTaskAndPeerID(peer.Task.TaskID, + peer.PeerID).Debugf("******candidate child peer %s is not selected because it finished number of download is more than peer's", + candidateNode.PeerID) return false } if candidateNode.Host != nil && candidateNode.Host.CDN { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate child peer %s is not selected because it is a cdn host", candidateNode.PeerID) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected because it is a cdn host", + candidateNode.PeerID) return false } if candidateNode.GetParent() == nil { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate child peer %s is selected because it has not parent", + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is selected because it has not parent", candidateNode.PeerID) return true } if candidateNode.GetParent() != nil && s.evaluator.IsBadNode(candidateNode.GetParent()) { logger.WithTaskAndPeerID(peer.Task.TaskID, - peer.PeerID).Debugf("candidate child peer %s is selected because it has parent and parent status is not health", candidateNode.PeerID) + peer.PeerID).Debugf("******candidate child peer %s is selected because it has parent and parent status is not health", candidateNode.PeerID) return true } - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("=candidate child peer %s is not selected because it is %+v", - candidateNode.PeerID, candidateNode) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected", candidateNode.PeerID) return false }) } @@ -177,17 +198,47 @@ func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list [ func (s *Scheduler) selectCandidateParents(peer *types.Peer, limit int) (list []*types.Peer) { return s.peerManager.PickReverse(peer.Task, limit, func(candidateNode *types.Peer) bool { if candidateNode == nil { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer is not selected because it is nil") + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer is not selected because it is nil") + return false + } + if s.evaluator.IsBadNode(candidateNode) { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it is badNode", + candidateNode.PeerID) return false } - if s.evaluator.IsBadNode(candidateNode) || candidateNode.IsLeave() || candidateNode == peer || candidateNode.Host. - GetFreeUploadLoad() <= 0 { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is not selected because it is %+v", - candidateNode.PeerID, candidateNode) + if candidateNode.IsLeave() { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it has already left", + candidateNode.PeerID) + return false + } + if candidateNode == peer { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it and peer are the same", + candidateNode.PeerID) + return false + } + if candidateNode.GetParent() == peer { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it's parent is peer", + candidateNode.PeerID) + return false + } + if candidateNode.Host.GetFreeUploadLoad() <= 0 { + logger.WithTaskAndPeerID(peer.Task.TaskID, + peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it's free upload load equal to less than zero", + candidateNode.PeerID) + return false + } + if candidateNode.IsWaiting() { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it's status is waiting", + candidateNode.PeerID) + return false + } + if candidateNode.GetFinishedNum() < peer.GetFinishedNum() { + logger.WithTaskAndPeerID(peer.Task.TaskID, + peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it finished number of download is smaller than peer's", + candidateNode.PeerID) return false } - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is selected because it is %+v", - candidateNode.PeerID, candidateNode) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is selected", candidateNode.PeerID) return true }) } diff --git a/scheduler/core/scheduler_service.go b/scheduler/core/scheduler_service.go index 3c16b500ae2..bd4ebeb3968 100644 --- a/scheduler/core/scheduler_service.go +++ b/scheduler/core/scheduler_service.go @@ -127,8 +127,8 @@ func (s *SchedulerService) GenerateTaskID(url string, filter string, meta *base. } func (s *SchedulerService) ScheduleParent(peer *types.Peer) (parent *types.Peer, err error) { - parent, candidates, hasParent := s.sched.ScheduleParent(peer) - logger.Debugf("schedule parent result: parent %v, candidates:%v", parent, candidates) + parent, _, hasParent := s.sched.ScheduleParent(peer) + //logger.Debugf("schedule parent result: parent %v, candidates:%v", parent, candidates) if !hasParent || parent == nil { return nil, errors.Errorf("no parent peer available for peer %v", peer.PeerID) } diff --git a/scheduler/daemon/peer/manager.go b/scheduler/daemon/peer/manager.go index f8e74177e5e..b6e77252df8 100644 --- a/scheduler/daemon/peer/manager.go +++ b/scheduler/daemon/peer/manager.go @@ -72,6 +72,7 @@ func (m *manager) Delete(peerID string) { peer.Task.DeletePeer(peer) if peer.PacketChan != nil { close(peer.PacketChan) + logger.Infof("close peer %s stream", peerID) peer.PacketChan = nil } m.peerMap.Delete(peerID) diff --git a/scheduler/types/peer.go b/scheduler/types/peer.go index b56b1efc85a..0ded10791a7 100644 --- a/scheduler/types/peer.go +++ b/scheduler/types/peer.go @@ -130,8 +130,8 @@ func (peer *Peer) disassociateChild(child *Peer) { } func (peer *Peer) ReplaceParent(parent *Peer) { - peer.lock.Lock() - defer peer.lock.Unlock() + //peer.lock.Lock() + //defer peer.lock.Unlock() oldParent := peer.parent if oldParent != nil { oldParent.disassociateChild(peer) @@ -219,7 +219,7 @@ func (peer *Peer) IsAncestor(ancestor *Peer) bool { return false } -func (peer *Peer) IsWaiting() bool { +func (peer *Peer) IsBlocking() bool { peer.lock.RLock() defer peer.lock.RUnlock() if peer.parent == nil { @@ -241,10 +241,6 @@ func (peer *Peer) getFreeLoad() int { return peer.Host.GetFreeUploadLoad() } -func (peer *Peer) GetFinishNum() int32 { - return peer.finishedNum.Load() -} - func GetDiffPieceNum(src *Peer, dst *Peer) int32 { diff := src.finishedNum.Load() - dst.finishedNum.Load() if diff > 0 { @@ -285,6 +281,10 @@ func (peer *Peer) IsRunning() bool { return peer.status == PeerStatusRunning } +func (peer *Peer) IsWaiting() bool { + return peer.status == PeerStatusWaiting +} + func (peer *Peer) IsSuccess() bool { return peer.status == PeerStatusSuccess } diff --git a/scheduler/types/task.go b/scheduler/types/task.go index 6e867c295bf..df37081f345 100644 --- a/scheduler/types/task.go +++ b/scheduler/types/task.go @@ -109,8 +109,8 @@ func (task *Task) AddPeer(peer *Peer) { } func (task *Task) DeletePeer(peer *Peer) { - task.lock.Lock() - defer task.lock.Unlock() + //task.lock.Lock() + //defer task.lock.Unlock() task.peers.Delete(peer) } From 9e89fb699194419a7c8f1fc9916dd6a3553fcc88 Mon Sep 17 00:00:00 2001 From: santong Date: Wed, 28 Jul 2021 23:29:44 +0800 Subject: [PATCH 03/16] chore: client logs Signed-off-by: santong --- client/daemon/peer/peertask_file.go | 7 ++++--- internal/dfcodes/rpc_code.go | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 59488544ef5..cc83222cd94 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -95,6 +95,7 @@ func newFilePeerTask(ctx context.Context, var backSource bool if err != nil { + logger.Errorf("step1: peer %s register failed: err", request.PeerId, err) // check if it is back source error if de, ok := err.(*dferrors.DfError); ok && de.Code == dfcodes.SchedNeedBackSource { backSource = true @@ -103,18 +104,17 @@ func newFilePeerTask(ctx context.Context, if !backSource { span.RecordError(err) span.End() - logger.Errorf("register peer task failed: %s, peer id: %s", err, request.PeerId) return ctx, nil, nil, err } } if result == nil { defer span.End() span.RecordError(err) - err = errors.Errorf("empty schedule result") + err = errors.Errorf("step1: peer register result is nil") return ctx, nil, nil, err } span.SetAttributes(config.AttributeTaskID.String(result.TaskId)) - logger.Infof("register task success, task id: %s, peer id: %s, SizeScope: %s", + logger.Infof("step1: register task success, task id: %s, peer id: %s, SizeScope: %s", result.TaskId, request.PeerId, base.SizeScope_name[int32(result.SizeScope)]) var singlePiece *scheduler.SinglePiece @@ -150,6 +150,7 @@ func newFilePeerTask(ctx context.Context, peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) logger.Infof("step2: start report peer %s piece result", request.PeerId) if err != nil { + logger.Errorf("step2: peer %s report piece failed: err", request.PeerId, err) defer span.End() span.RecordError(err) return ctx, nil, nil, err diff --git a/internal/dfcodes/rpc_code.go b/internal/dfcodes/rpc_code.go index a7c3d928acd..531fcdff217 100644 --- a/internal/dfcodes/rpc_code.go +++ b/internal/dfcodes/rpc_code.go @@ -41,7 +41,8 @@ const ( ClientRequestLimitFail base.Code = 4006 // scheduler response error 5000-5999 - SchedError base.Code = 5000 + SchedError base.Code = 5000 + /** @deprecated */ SchedNeedBackSource base.Code = 5001 // client should try to download from source SchedPeerGone base.Code = 5002 // client should disconnect from scheduler SchedPeerRegisterFail base.Code = 5003 From 9dca4808894d7a918a940607c7d87540ef2901bd Mon Sep 17 00:00:00 2001 From: santong Date: Thu, 29 Jul 2021 14:10:14 +0800 Subject: [PATCH 04/16] fix concurrent download Signed-off-by: santong --- client/daemon/peer/peertask_base.go | 2 +- client/daemon/service/manager.go | 2 +- scheduler/core/events.go | 14 ++++++--- scheduler/core/monitor.go | 3 +- .../core/scheduler/basic/basic_scheduler.go | 14 ++++----- scheduler/types/peer.go | 29 +++++++++++++++++-- 6 files changed, 48 insertions(+), 16 deletions(-) diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index b439c4e22f1..ce1a68528be 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -715,7 +715,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer } span.AddEvent("retry due to empty pieces", trace.WithAttributes(config.AttributeGetPieceRetry.Int(count))) - pt.Warnf("peer %s returns success but with empty pieces, retry later", peer.PeerId) + pt.Infof("peer %s returns success but with empty pieces, retry later", peer.PeerId) return nil, false, dferrors.ErrEmptyValue } return pp, false, nil diff --git a/client/daemon/service/manager.go b/client/daemon/service/manager.go index 31f0380fb7b..69cc0eed0fd 100644 --- a/client/daemon/service/manager.go +++ b/client/daemon/service/manager.go @@ -105,7 +105,7 @@ func (m *manager) GetPieceTasks(ctx context.Context, request *base.PieceTaskRequ return nil, dferrors.New(code, err.Error()) } - logger.Warnf("try to get piece tasks, "+ + logger.Infof("try to get piece tasks, "+ "but target peer task is initializing, "+ "there is no available pieces, "+ "task id: %s, src peer: %s, dst peer: %s, piece num: %d, limit: %d", diff --git a/scheduler/core/events.go b/scheduler/core/events.go index 979abc1c350..fa74e370a81 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -162,10 +162,8 @@ var _ event = peerDownloadPieceFailEvent{} func (e peerDownloadPieceFailEvent) apply(s *state) { switch e.pr.Code { - case dfcodes.PeerTaskNotFound: - handlePeerLeave(e.peer, s) - return - case dfcodes.ClientPieceRequestFail, dfcodes.ClientPieceDownloadFail: + case dfcodes.PeerTaskNotFound, dfcodes.ClientPieceRequestFail, dfcodes.ClientPieceDownloadFail: + // TODO PeerTaskNotFound remove dest peer task, ClientPieceDownloadFail add blank list handleReplaceParent(e.peer, s) return case dfcodes.CdnTaskNotFound, dfcodes.CdnError, dfcodes.CdnTaskRegistryFail, dfcodes.CdnTaskDownloadFail: @@ -231,6 +229,10 @@ func (e peerDownloadSuccessEvent) apply(s *state) { } child.PacketChan <- constructSuccessPeerPacket(child, e.peer, nil) } + if e.peer.PacketChan != nil { + close(e.peer.PacketChan) + e.peer.PacketChan = nil + } } func (e peerDownloadSuccessEvent) hashKey() string { @@ -262,6 +264,10 @@ func (e peerDownloadFailEvent) apply(s *state) { child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates) return true }) + if e.peer.PacketChan != nil { + close(e.peer.PacketChan) + e.peer.PacketChan = nil + } s.peerManager.Delete(e.peer.PeerID) } diff --git a/scheduler/core/monitor.go b/scheduler/core/monitor.go index 7bf71b14aba..f6b5e6acb98 100644 --- a/scheduler/core/monitor.go +++ b/scheduler/core/monitor.go @@ -43,7 +43,8 @@ type monitor struct { } func newMonitor(peerManager daemon.PeerMgr) *monitor { - logger, _ := zap.NewDevelopment() + config := zap.NewDevelopmentConfig() + logger, _ := config.Build() return &monitor{ downloadMonitorQueue: workqueue.NewDelayingQueue(), peerManager: peerManager, diff --git a/scheduler/core/scheduler/basic/basic_scheduler.go b/scheduler/core/scheduler/basic/basic_scheduler.go index 644354e4e0c..a5f5ccd8f52 100644 --- a/scheduler/core/scheduler/basic/basic_scheduler.go +++ b/scheduler/core/scheduler/basic/basic_scheduler.go @@ -108,8 +108,8 @@ func (s *Scheduler) ScheduleChildren(peer *types.Peer) (children []*types.Peer) func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer, bool) { logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start scheduler parent flow") if !s.evaluator.NeedAdjustParent(peer) { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("peer does not need to replace the parent node, peer is %v and current parent is %v", - peer, peer.GetParent()) + //logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("peer does not need to replace the parent node, peer is %v and current parent is %v", + // peer, peer.GetParent()) if peer.GetParent() == nil { return nil, nil, false } @@ -163,9 +163,9 @@ func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list [ candidateNode.PeerID) return false } - if peer.GetParent() == candidateNode { + if candidateNode.IsAncestorOf(peer) { logger.WithTaskAndPeerID(peer.Task.TaskID, - peer.PeerID).Debugf("******candidate child peer %s is not selected because peer's parent is candidate peer", candidateNode.PeerID) + peer.PeerID).Debugf("******candidate child peer %s is not selected because peer's ancestor is candidate peer", candidateNode.PeerID) return false } if candidateNode.GetFinishedNum() > peer.GetFinishedNum() { @@ -190,7 +190,7 @@ func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list [ peer.PeerID).Debugf("******candidate child peer %s is selected because it has parent and parent status is not health", candidateNode.PeerID) return true } - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected", candidateNode.PeerID) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is selected", candidateNode.PeerID) return false }) } @@ -216,8 +216,8 @@ func (s *Scheduler) selectCandidateParents(peer *types.Peer, limit int) (list [] candidateNode.PeerID) return false } - if candidateNode.GetParent() == peer { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it's parent is peer", + if candidateNode.IsDescendantOf(peer) { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it's ancestor is peer", candidateNode.PeerID) return false } diff --git a/scheduler/types/peer.go b/scheduler/types/peer.go index 0ded10791a7..5d47538ef08 100644 --- a/scheduler/types/peer.go +++ b/scheduler/types/peer.go @@ -202,11 +202,16 @@ func (peer *Peer) GetTreeRoot() *Peer { return node } -// if ancestor is ancestor of peer -func (peer *Peer) IsAncestor(ancestor *Peer) bool { +// if peer is offspring of ancestor +func (peer *Peer) IsDescendantOf(ancestor *Peer) bool { if ancestor == nil { return false } + // TODO avoid circulation + peer.lock.RLock() + ancestor.lock.RLock() + defer ancestor.lock.RUnlock() + defer peer.lock.RUnlock() node := peer for node != nil { if node.parent == nil || node.Host.CDN { @@ -219,6 +224,26 @@ func (peer *Peer) IsAncestor(ancestor *Peer) bool { return false } +func (peer *Peer) IsAncestorOf(offspring *Peer) bool { + if offspring == nil { + return false + } + offspring.lock.RLock() + peer.lock.RLock() + defer peer.lock.RUnlock() + defer offspring.lock.RUnlock() + node := offspring + for node != nil { + if node.parent == nil || node.Host.CDN { + return false + } else if node.PeerID == peer.PeerID { + return true + } + node = node.parent + } + return false +} + func (peer *Peer) IsBlocking() bool { peer.lock.RLock() defer peer.lock.RUnlock() From fe2a9c2f5ead5ea8fe999a402e7b872dd012261e Mon Sep 17 00:00:00 2001 From: santong Date: Thu, 29 Jul 2021 16:17:20 +0800 Subject: [PATCH 05/16] chore: add log Signed-off-by: santong --- cdnsystem/server/service/cdn_seed_server.go | 1 - client/daemon/peer/peertask_base.go | 6 ++++-- scheduler/core/events.go | 17 ++++++++++------- .../core/scheduler/basic/basic_scheduler.go | 9 ++++++--- scheduler/core/scheduler_service.go | 6 ++++-- scheduler/daemon/cdn/d7y/manager.go | 1 + scheduler/server/service/scheduler_server.go | 4 +++- scheduler/types/peer.go | 2 -- scheduler/types/task.go | 5 +++++ 9 files changed, 33 insertions(+), 18 deletions(-) diff --git a/cdnsystem/server/service/cdn_seed_server.go b/cdnsystem/server/service/cdn_seed_server.go index f61824765f0..0d78553fc55 100644 --- a/cdnsystem/server/service/cdn_seed_server.go +++ b/cdnsystem/server/service/cdn_seed_server.go @@ -159,7 +159,6 @@ func (css *CdnSeedServer) GetPieceTasks(ctx context.Context, req *base.PieceTask return nil, dferrors.Newf(dfcodes.BadRequest, "failed to validate seed request for task(%s): %v", req.TaskId, err) } task, err := css.taskMgr.Get(req.TaskId) - logger.Debugf("task: %+v", task) if err != nil { if cdnerrors.IsDataNotFound(err) { return nil, dferrors.Newf(dfcodes.CdnTaskNotFound, "failed to get task(%s) from cdn: %v", req.TaskId, err) diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index ce1a68528be..dd28df4b81d 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -685,7 +685,8 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer span.RecordError(getErr) // fast way to exit retry if curPeerPacket != pt.peerPacket { - pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet", getErr) + pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getErr, + curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId) peerPacketChanged = true return nil, true, nil } @@ -709,7 +710,8 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer } // fast way to exit retry if curPeerPacket != pt.peerPacket { - pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet") + pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", + curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId) peerPacketChanged = true return nil, true, nil } diff --git a/scheduler/core/events.go b/scheduler/core/events.go index fa74e370a81..aee72229c58 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -167,13 +167,16 @@ func (e peerDownloadPieceFailEvent) apply(s *state) { handleReplaceParent(e.peer, s) return case dfcodes.CdnTaskNotFound, dfcodes.CdnError, dfcodes.CdnTaskRegistryFail, dfcodes.CdnTaskDownloadFail: - if err := s.cdnManager.StartSeedTask(context.Background(), e.peer.Task); err != nil { - logger.Errorf("start seed task fail: %v", err) - e.peer.Task.SetStatus(types.TaskStatusFailed) - handleSeedTaskFail(e.peer.Task) - return - } - logger.Debugf("===== successfully obtain seeds from cdn, task: %+v =====", e.peer.Task) + go func(task *types.Task) { + task.SetStatus(types.TaskStatusRunning) + if err := s.cdnManager.StartSeedTask(context.Background(), task); err != nil { + logger.Errorf("start seed task fail: %v", err) + task.SetStatus(types.TaskStatusFailed) + handleSeedTaskFail(task) + return + } + logger.Debugf("===== successfully obtain seeds from cdn, task: %+v =====", e.peer.Task) + }(e.peer.Task) default: handleReplaceParent(e.peer, s) return diff --git a/scheduler/core/scheduler/basic/basic_scheduler.go b/scheduler/core/scheduler/basic/basic_scheduler.go index a5f5ccd8f52..a2309bb6831 100644 --- a/scheduler/core/scheduler/basic/basic_scheduler.go +++ b/scheduler/core/scheduler/basic/basic_scheduler.go @@ -123,9 +123,9 @@ func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer worth := s.evaluator.Evaluate(candidate, peer) // scheduler the same parent, worth reduce a half - if peer.GetParent() != nil && peer.GetParent().PeerID == candidate.PeerID { - worth = worth / 2.0 - } + //if peer.GetParent() != nil && peer.GetParent().PeerID == candidate.PeerID { + // worth = worth / 2.0 + //} if worth > value { value = worth @@ -196,6 +196,9 @@ func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list [ } func (s *Scheduler) selectCandidateParents(peer *types.Peer, limit int) (list []*types.Peer) { + if !peer.Task.CanSchedule() { + return nil + } return s.peerManager.PickReverse(peer.Task, limit, func(candidateNode *types.Peer) bool { if candidateNode == nil { logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer is not selected because it is nil") diff --git a/scheduler/core/scheduler_service.go b/scheduler/core/scheduler_service.go index bd4ebeb3968..df634a59025 100644 --- a/scheduler/core/scheduler_service.go +++ b/scheduler/core/scheduler_service.go @@ -178,10 +178,12 @@ func (s *SchedulerService) GetOrCreateTask(ctx context.Context, task *types.Task // notify peer tasks synclock.Lock(task.TaskID, false) defer synclock.UnLock(task.TaskID, false) - if !task.IsHealth() { + if task.IsHealth() && task.GetLastTriggerTime().Add(s.config.AccessWindow).After(time.Now()) { + return task, nil + } + if task.IsFrozen() { task.SetStatus(types.TaskStatusRunning) } - go func() { if err := s.cdnManager.StartSeedTask(ctx, task); err != nil { if !task.IsSuccess() { diff --git a/scheduler/daemon/cdn/d7y/manager.go b/scheduler/daemon/cdn/d7y/manager.go index 1f67cdc7b45..56788ac9d7c 100644 --- a/scheduler/daemon/cdn/d7y/manager.go +++ b/scheduler/daemon/cdn/d7y/manager.go @@ -145,6 +145,7 @@ func (cm *manager) receivePiece(task *types.Task, stream *client.PieceSeedStream if err != nil || cdnPeer == nil { return err } + task.SetStatus(types.TaskStatusSeeding) cdnPeer.Touch() if piece.Done { task.PieceTotal = piece.TotalPieceCount diff --git a/scheduler/server/service/scheduler_server.go b/scheduler/server/service/scheduler_server.go index f0bc7793565..1391e32c7de 100644 --- a/scheduler/server/service/scheduler_server.go +++ b/scheduler/server/service/scheduler_server.go @@ -86,7 +86,9 @@ func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *schedul } parent, schErr := s.service.ScheduleParent(peer) if schErr != nil { - err = dferrors.Newf(dfcodes.SchedPeerScheduleFail, "failed to schedule peer %v: %v", peer.PeerID, schErr) + resp.SizeScope = base.SizeScope_NORMAL + resp.TaskId = taskID + //err = dferrors.Newf(dfcodes.SchedPeerScheduleFail, "failed to schedule peer %v: %v", peer.PeerID, schErr) return } singlePiece := task.GetPiece(0) diff --git a/scheduler/types/peer.go b/scheduler/types/peer.go index 5d47538ef08..0675ee429ec 100644 --- a/scheduler/types/peer.go +++ b/scheduler/types/peer.go @@ -130,8 +130,6 @@ func (peer *Peer) disassociateChild(child *Peer) { } func (peer *Peer) ReplaceParent(parent *Peer) { - //peer.lock.Lock() - //defer peer.lock.Unlock() oldParent := peer.parent if oldParent != nil { oldParent.disassociateChild(peer) diff --git a/scheduler/types/task.go b/scheduler/types/task.go index df37081f345..29637047f3d 100644 --- a/scheduler/types/task.go +++ b/scheduler/types/task.go @@ -48,6 +48,7 @@ func (status TaskStatus) String() string { const ( TaskStatusWaiting TaskStatus = iota TaskStatusRunning + TaskStatusSeeding TaskStatusSuccess TaskStatusCDNRegisterFail TaskStatusFailed @@ -171,6 +172,10 @@ func (task *Task) IsFrozen() bool { task.status == TaskStatusSourceError || task.status == TaskStatusCDNRegisterFail } +func (task *Task) CanSchedule() bool { + return task.status == TaskStatusSeeding || task.status == TaskStatusSuccess +} + func (task *Task) IsWaiting() bool { return task.status == TaskStatusWaiting } From a35854bd237da27c8b6069fd7f40a20377d41d46 Mon Sep 17 00:00:00 2001 From: santong Date: Thu, 29 Jul 2021 16:24:59 +0800 Subject: [PATCH 06/16] Extends the scheduler connection expiration time Signed-off-by: santong --- pkg/rpc/scheduler/client/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index a715426c8aa..e1bbb6a434b 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -39,7 +39,7 @@ func GetClientByAddr(addrs []dfnet.NetAddr, opts ...grpc.DialOption) (SchedulerC } sc := &schedulerClient{ rpc.NewConnection(context.Background(), "scheduler-static", addrs, []rpc.ConnOption{ - rpc.WithConnExpireTime(5 * time.Minute), + rpc.WithConnExpireTime(30 * time.Minute), rpc.WithDialOption(opts), }), } From 959674d901f4bf1f7a387b5c3dabd2ef82b15460 Mon Sep 17 00:00:00 2001 From: santong Date: Thu, 29 Jul 2021 19:59:20 +0800 Subject: [PATCH 07/16] chore: client step log Signed-off-by: santong --- client/daemon/daemon.go | 4 ++-- client/daemon/peer/peertask_file.go | 12 ++++++------ client/daemon/peer/peertask_file_callback.go | 8 ++++---- client/daemon/peer/peertask_stream.go | 4 ++-- client/daemon/peer/peertask_stream_callback.go | 8 ++++---- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 1fb2edffe78..60f27b1b9b8 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -111,9 +111,9 @@ func New(opt *config.DaemonOption) (Daemon, error) { PeerId: request.PeerID, }) if er != nil { - logger.Errorf("step4:leave task %s/%s, error: %v", request.TaskID, request.PeerID, er) + logger.Errorf("step 4:leave task %s/%s, error: %v", request.TaskID, request.PeerID, er) } else { - logger.Infof("step4:leave task %s/%s state ok", request.TaskID, request.PeerID) + logger.Infof("step 4:leave task %s/%s state ok", request.TaskID, request.PeerID) } }) if err != nil { diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index cc83222cd94..dc6c5e0e6ca 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -89,13 +89,13 @@ func newFilePeerTask(ctx context.Context, // trace register _, regSpan := tracer.Start(ctx, config.SpanRegisterTask) result, err := schedulerClient.RegisterPeerTask(ctx, request) - logger.Infof("step1: peer %s start to register", request.PeerId) + logger.Infof("step 1: peer %s start to register", request.PeerId) regSpan.RecordError(err) regSpan.End() var backSource bool if err != nil { - logger.Errorf("step1: peer %s register failed: err", request.PeerId, err) + logger.Errorf("step 1: peer %s register failed: err", request.PeerId, err) // check if it is back source error if de, ok := err.(*dferrors.DfError); ok && de.Code == dfcodes.SchedNeedBackSource { backSource = true @@ -110,11 +110,11 @@ func newFilePeerTask(ctx context.Context, if result == nil { defer span.End() span.RecordError(err) - err = errors.Errorf("step1: peer register result is nil") + err = errors.Errorf("step 1: peer register result is nil") return ctx, nil, nil, err } span.SetAttributes(config.AttributeTaskID.String(result.TaskId)) - logger.Infof("step1: register task success, task id: %s, peer id: %s, SizeScope: %s", + logger.Infof("step 1: register task success, task id: %s, peer id: %s, SizeScope: %s", result.TaskId, request.PeerId, base.SizeScope_name[int32(result.SizeScope)]) var singlePiece *scheduler.SinglePiece @@ -148,9 +148,9 @@ func newFilePeerTask(ctx context.Context, } peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) - logger.Infof("step2: start report peer %s piece result", request.PeerId) + logger.Infof("step 2: start report peer %s piece result", request.PeerId) if err != nil { - logger.Errorf("step2: peer %s report piece failed: err", request.PeerId, err) + logger.Errorf("step 2: peer %s report piece failed: err", request.PeerId, err) defer span.End() span.RecordError(err) return ctx, nil, nil, err diff --git a/client/daemon/peer/peertask_file_callback.go b/client/daemon/peer/peertask_file_callback.go index 0963450bbd9..ffadc4f64d6 100644 --- a/client/daemon/peer/peertask_file_callback.go +++ b/client/daemon/peer/peertask_file_callback.go @@ -106,9 +106,9 @@ func (p *filePeerTaskCallback) Done(pt Task) error { Code: dfcodes.Success, }) if err != nil { - pt.Log().Errorf("step3: report successful peer result, error: %v", err) + pt.Log().Errorf("step 3: report successful peer result, error: %v", err) } else { - pt.Log().Infof("step3: report successful peer result ok") + pt.Log().Infof("step 3: report successful peer result ok") } return nil } @@ -131,9 +131,9 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro Code: code, }) if err != nil { - pt.Log().Errorf("step3: report fail peer result, error: %v", err) + pt.Log().Errorf("step 3: report fail peer result, error: %v", err) } else { - pt.Log().Infof("step3: report fail peer result ok") + pt.Log().Infof("step 3: report fail peer result ok") } return nil } diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 40a9bc7bff8..8574c5b8977 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -72,7 +72,7 @@ func newStreamPeerTask(ctx context.Context, // trace register _, regSpan := tracer.Start(ctx, config.SpanRegisterTask) result, err := schedulerClient.RegisterPeerTask(ctx, request) - logger.Infof("step1: peer %s start to register", request.PeerId) + logger.Infof("step 1: peer %s start to register", request.PeerId) regSpan.RecordError(err) regSpan.End() @@ -129,7 +129,7 @@ func newStreamPeerTask(ctx context.Context, } peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) - logger.Infof("step2: start report peer %s piece result", request.PeerId) + logger.Infof("step 2: start report peer %s piece result", request.PeerId) if err != nil { defer span.End() span.RecordError(err) diff --git a/client/daemon/peer/peertask_stream_callback.go b/client/daemon/peer/peertask_stream_callback.go index 00861516678..09dcf28fc83 100644 --- a/client/daemon/peer/peertask_stream_callback.go +++ b/client/daemon/peer/peertask_stream_callback.go @@ -104,9 +104,9 @@ func (p *streamPeerTaskCallback) Done(pt Task) error { Code: dfcodes.Success, }) if err != nil { - pt.Log().Errorf("step3: report successful peer result, error: %v", err) + pt.Log().Errorf("step 3: report successful peer result, error: %v", err) } else { - pt.Log().Infof("step3: report successful peer result ok") + pt.Log().Infof("step 3: report successful peer result ok") } return nil } @@ -129,9 +129,9 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er Code: code, }) if err != nil { - pt.Log().Errorf("step3: report fail peer result, error: %v", err) + pt.Log().Errorf("step 3: report fail peer result, error: %v", err) } else { - pt.Log().Infof("step3: report fail peer result ok") + pt.Log().Infof("step 3: report fail peer result ok") } return nil } From 08a74e9025fa96b2be1b4e8db151ffb4381a18bd Mon Sep 17 00:00:00 2001 From: santong Date: Fri, 30 Jul 2021 00:15:52 +0800 Subject: [PATCH 08/16] hashkey Signed-off-by: santong --- scheduler/core/events.go | 14 +++++++------- scheduler/core/scheduler_service.go | 3 +++ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/scheduler/core/events.go b/scheduler/core/events.go index aee72229c58..5b5238bd4a5 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -112,7 +112,7 @@ func (e startReportPieceResultEvent) apply(s *state) { } func (e startReportPieceResultEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } type peerDownloadPieceSuccessEvent struct { @@ -150,7 +150,7 @@ func (e peerDownloadPieceSuccessEvent) apply(s *state) { } func (e peerDownloadPieceSuccessEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } type peerDownloadPieceFailEvent struct { @@ -183,7 +183,7 @@ func (e peerDownloadPieceFailEvent) apply(s *state) { } } func (e peerDownloadPieceFailEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } type peerReplaceParentEvent struct { @@ -191,7 +191,7 @@ type peerReplaceParentEvent struct { } func (e peerReplaceParentEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } func (e peerReplaceParentEvent) apply(s *state) { @@ -239,7 +239,7 @@ func (e peerDownloadSuccessEvent) apply(s *state) { } func (e peerDownloadSuccessEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } type peerDownloadFailEvent struct { @@ -275,7 +275,7 @@ func (e peerDownloadFailEvent) apply(s *state) { } func (e peerDownloadFailEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } type peerLeaveEvent struct { @@ -289,7 +289,7 @@ func (e peerLeaveEvent) apply(s *state) { } func (e peerLeaveEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } func constructSuccessPeerPacket(peer *types.Peer, parent *types.Peer, candidates []*types.Peer) *schedulerRPC.PeerPacket { diff --git a/scheduler/core/scheduler_service.go b/scheduler/core/scheduler_service.go index df634a59025..15f4727ffd0 100644 --- a/scheduler/core/scheduler_service.go +++ b/scheduler/core/scheduler_service.go @@ -184,6 +184,9 @@ func (s *SchedulerService) GetOrCreateTask(ctx context.Context, task *types.Task if task.IsFrozen() { task.SetStatus(types.TaskStatusRunning) } + if s.config.DisableCDN { + // TODO NeedBackSource + } go func() { if err := s.cdnManager.StartSeedTask(ctx, task); err != nil { if !task.IsSuccess() { From 139312d65f6e7ab08280a077bc36a0f457941239 Mon Sep 17 00:00:00 2001 From: santong Date: Fri, 30 Jul 2021 00:20:33 +0800 Subject: [PATCH 09/16] hashkey Signed-off-by: santong --- scheduler/core/scheduler_service.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scheduler/core/scheduler_service.go b/scheduler/core/scheduler_service.go index 15f4727ffd0..eca0c82b851 100644 --- a/scheduler/core/scheduler_service.go +++ b/scheduler/core/scheduler_service.go @@ -184,9 +184,9 @@ func (s *SchedulerService) GetOrCreateTask(ctx context.Context, task *types.Task if task.IsFrozen() { task.SetStatus(types.TaskStatusRunning) } - if s.config.DisableCDN { - // TODO NeedBackSource - } + //if s.config.DisableCDN { + // TODO NeedBackSource + //} go func() { if err := s.cdnManager.StartSeedTask(ctx, task); err != nil { if !task.IsSuccess() { From 7cbcb43b8393e21450c04d694246dac774b8f869 Mon Sep 17 00:00:00 2001 From: santong Date: Sat, 31 Jul 2021 13:47:50 +0800 Subject: [PATCH 10/16] remove set packetChannel nil Signed-off-by: santong --- scheduler/core/events.go | 2 -- scheduler/daemon/peer/manager.go | 1 - 2 files changed, 3 deletions(-) diff --git a/scheduler/core/events.go b/scheduler/core/events.go index 5b5238bd4a5..3de9f96bc2b 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -234,7 +234,6 @@ func (e peerDownloadSuccessEvent) apply(s *state) { } if e.peer.PacketChan != nil { close(e.peer.PacketChan) - e.peer.PacketChan = nil } } @@ -269,7 +268,6 @@ func (e peerDownloadFailEvent) apply(s *state) { }) if e.peer.PacketChan != nil { close(e.peer.PacketChan) - e.peer.PacketChan = nil } s.peerManager.Delete(e.peer.PeerID) } diff --git a/scheduler/daemon/peer/manager.go b/scheduler/daemon/peer/manager.go index b6e77252df8..de041bc01be 100644 --- a/scheduler/daemon/peer/manager.go +++ b/scheduler/daemon/peer/manager.go @@ -73,7 +73,6 @@ func (m *manager) Delete(peerID string) { if peer.PacketChan != nil { close(peer.PacketChan) logger.Infof("close peer %s stream", peerID) - peer.PacketChan = nil } m.peerMap.Delete(peerID) } From db2bddb95bafd74ecea36e1984e78a7526ee6d11 Mon Sep 17 00:00:00 2001 From: santong Date: Sat, 31 Jul 2021 14:22:06 +0800 Subject: [PATCH 11/16] fix conn convert fail Signed-off-by: santong --- pkg/rpc/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 505acd58f82..6bd642ab27a 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -214,7 +214,10 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st } if selected { if client, ok := conn.node2ClientMap.Load(node); ok { - return client.(*candidateClient), nil + return &candidateClient{ + node: candidateNode, + Ref: client, + }, nil } } } From 346c6caeaec17eac8c3cdab2ff6749e775a41327 Mon Sep 17 00:00:00 2001 From: santong Date: Sat, 31 Jul 2021 14:44:50 +0800 Subject: [PATCH 12/16] chore repace once with initialized Signed-off-by: santong --- pkg/rpc/client.go | 2 +- scheduler/daemon/cdn/d7y/manager.go | 7 ++++--- scheduler/server/service/scheduler_server.go | 8 ++++---- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 6bd642ab27a..3456e6be1a6 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -229,7 +229,7 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st if len(ringNodes) == 0 { return nil, dferrors.ErrNoCandidateNode } - candidateNodes := make([]string, 0, 0) + candidateNodes := make([]string, 0) for _, ringNode := range ringNodes { candidate := true for _, exclusiveNode := range exclusiveNodes { diff --git a/scheduler/daemon/cdn/d7y/manager.go b/scheduler/daemon/cdn/d7y/manager.go index 083d0900ebd..5bedc2f860a 100644 --- a/scheduler/daemon/cdn/d7y/manager.go +++ b/scheduler/daemon/cdn/d7y/manager.go @@ -114,7 +114,7 @@ func (cm *manager) StartSeedTask(ctx context.Context, task *types.Task) error { } func (cm *manager) receivePiece(task *types.Task, stream *client.PieceSeedStream) error { - var once sync.Once + var initialized bool var cdnPeer *types.Peer for { piece, err := stream.Recv() @@ -138,9 +138,10 @@ func (cm *manager) receivePiece(task *types.Task, stream *client.PieceSeedStream return errors.Wrapf(ErrCDNInvokeFail, "receive piece from cdn: %v", err) } if piece != nil { - once.Do(func() { + if !initialized { cdnPeer, err = cm.initCdnPeer(task, piece) - }) + initialized = true + } if err != nil || cdnPeer == nil { return err } diff --git a/scheduler/server/service/scheduler_server.go b/scheduler/server/service/scheduler_server.go index d190dca0ac8..e68a55a07fd 100644 --- a/scheduler/server/service/scheduler_server.go +++ b/scheduler/server/service/scheduler_server.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "io" - "sync" "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" @@ -118,7 +117,7 @@ func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *schedul func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPieceResultServer) error { peerPacketChan := make(chan *scheduler.PeerPacket, 1) - var once sync.Once + var initialized bool g, ctx := errgroup.WithContext(context.Background()) stopCh := make(chan struct{}) g.Go(func() error { @@ -143,10 +142,11 @@ func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPie if !ok { return dferrors.Newf(dfcodes.SchedPeerNotFound, "peer %s not found", pieceResult.SrcPid) } - once.Do(func() { + if !initialized { peer.BindSendChannel(peerPacketChan) peer.SetStatus(types.PeerStatusRunning) - }) + initialized = true + } if err := s.service.HandlePieceResult(peer, pieceResult); err != nil { logger.Errorf("handle piece result %v fail: %v", pieceResult, err) } From 6a01766eafc4408073dca1781c2a4d78aaba2443 Mon Sep 17 00:00:00 2001 From: santong Date: Mon, 2 Aug 2021 17:39:01 +0800 Subject: [PATCH 13/16] fix concurrent download Signed-off-by: santong --- client/config/peerhost_darwin.go | 14 ++--- client/config/peerhost_linux.go | 8 ++- client/daemon/peer/peertask_base.go | 4 +- cmd/dependency/dependency.go | 2 +- .../scheduler/client/peer_packet_stream.go | 15 +++-- scheduler/core/events.go | 61 +++---------------- scheduler/core/monitor.go | 2 +- scheduler/core/worker.go | 2 +- scheduler/daemon/peer/manager.go | 5 +- scheduler/server/service/scheduler_server.go | 33 +++++++--- scheduler/types/peer.go | 26 ++++++-- test/stress/main.go | 2 +- 12 files changed, 83 insertions(+), 91 deletions(-) diff --git a/client/config/peerhost_darwin.go b/client/config/peerhost_darwin.go index 1cb85c3b2f9..3c3f529df37 100644 --- a/client/config/peerhost_darwin.go +++ b/client/config/peerhost_darwin.go @@ -79,11 +79,8 @@ var peerHostConfig = DaemonOption{ Insecure: true, }, TCPListen: &TCPListenOption{ - Listen: net.IPv4zero.String(), - PortRange: TCPListenPortRange{ - Start: 65000, - End: 65535, - }, + Listen: net.IPv4zero.String(), + PortRange: TCPListenPortRange{}, }, }, }, @@ -110,8 +107,11 @@ var peerHostConfig = DaemonOption{ Insecure: true, }, TCPListen: &TCPListenOption{ - Listen: net.IPv4zero.String(), - PortRange: TCPListenPortRange{}, + Listen: net.IPv4zero.String(), + PortRange: TCPListenPortRange{ + 65001, + 65001, + }, }, }, }, diff --git a/client/config/peerhost_linux.go b/client/config/peerhost_linux.go index cf5605b0c5f..2c799284683 100644 --- a/client/config/peerhost_linux.go +++ b/client/config/peerhost_linux.go @@ -21,6 +21,7 @@ package config import ( "net" + "d7y.io/dragonfly/v2/pkg/basic/dfnet" "golang.org/x/time/rate" "d7y.io/dragonfly/v2/client/clientutil" @@ -42,7 +43,12 @@ var peerHostConfig = DaemonOption{ GCInterval: clientutil.Duration{Duration: DefaultGCInterval}, KeepStorage: false, Scheduler: SchedulerOption{ - NetAddrs: nil, + NetAddrs: []dfnet.NetAddr{ + { + Type: dfnet.TCP, + Addr: "127.0.0.1:8002", + }, + }, ScheduleTimeout: clientutil.Duration{Duration: DefaultScheduleTimeout}, }, Host: HostOption{ diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index dd28df4b81d..2376aae07d4 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -684,7 +684,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer if getErr != nil { span.RecordError(getErr) // fast way to exit retry - if curPeerPacket != pt.peerPacket { + if curPeerPacket.MainPeer.PeerId != pt.peerPacket.MainPeer.PeerId { pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getErr, curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId) peerPacketChanged = true @@ -709,7 +709,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer pt.Errorf("send piece result error: %s, code: %d", peer.PeerId, er) } // fast way to exit retry - if curPeerPacket != pt.peerPacket { + if curPeerPacket.MainPeer.PeerId != pt.peerPacket.MainPeer.PeerId { pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId) peerPacketChanged = true diff --git a/cmd/dependency/dependency.go b/cmd/dependency/dependency.go index 7a05b027f99..48d6edf7b14 100644 --- a/cmd/dependency/dependency.go +++ b/cmd/dependency/dependency.go @@ -99,7 +99,7 @@ func InitMonitor(verbose bool, pprofPort int, jaeger string) func() { pprofPort, _ = freeport.GetFreePort() } - debugAddr := fmt.Sprintf("localhost:%d", pprofPort) + debugAddr := fmt.Sprintf("%s:%d", iputils.HostIP, pprofPort) viewer.SetConfiguration(viewer.WithAddr(debugAddr)) logger.With("pprof", fmt.Sprintf("http://%s/debug/pprof", debugAddr), diff --git a/pkg/rpc/scheduler/client/peer_packet_stream.go b/pkg/rpc/scheduler/client/peer_packet_stream.go index a62bbf01f7c..d3983dba01f 100644 --- a/pkg/rpc/scheduler/client/peer_packet_stream.go +++ b/pkg/rpc/scheduler/client/peer_packet_stream.go @@ -19,7 +19,6 @@ package client import ( "context" "io" - "time" "github.com/pkg/errors" "google.golang.org/grpc" @@ -56,7 +55,7 @@ func newPeerPacketStream(ctx context.Context, sc *schedulerClient, hashKey strin pps := &peerPacketStream{ sc: sc, - ctx: ctx, + ctx: context.Background(), hashKey: hashKey, ptr: ptr, opts: opts, @@ -126,9 +125,9 @@ func (pps *peerPacketStream) retryRecv(cause error) (*scheduler.PeerPacket, erro if err != nil { return nil, err } - timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _, err = client.RegisterPeerTask(timeCtx, pps.ptr) + //timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + //defer cancel() + _, err = client.RegisterPeerTask(pps.ctx, pps.ptr) if err != nil { return nil, err } @@ -198,9 +197,9 @@ func (pps *peerPacketStream) replaceClient(cause error) error { if err != nil { return nil, err } - timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _, err = client.RegisterPeerTask(timeCtx, pps.ptr) + //timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + //defer cancel() + _, err = client.RegisterPeerTask(pps.ctx, pps.ptr) if err != nil { return nil, err } diff --git a/scheduler/core/events.go b/scheduler/core/events.go index 3de9f96bc2b..6e9b24f4348 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -74,11 +74,7 @@ func (s *state) start() { s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second) continue } - if peer.PacketChan == nil { - logger.Errorf("waitScheduleParentPeerQueue: there is no packet chan associated with peer %s", peer.PeerID) - return - } - peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates) + peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates)) logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("waitScheduleParentPeerQueue: peer has left from waitScheduleParentPeerQueue because it has scheduled new parent %v", parent) s.waitScheduleParentPeerQueue.Done(v) @@ -104,11 +100,7 @@ func (e startReportPieceResultEvent) apply(s *state) { s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second) return } - if e.peer.PacketChan == nil { - logger.Errorf("start report piece result: there is no packet chan associated with peer %s", e.peer.PeerID) - return - } - e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parent, candidates) + e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parent, candidates)) } func (e startReportPieceResultEvent) hashKey() string { @@ -140,12 +132,8 @@ func (e peerDownloadPieceSuccessEvent) apply(s *state) { if oldParent != nil { candidates = append(candidates, oldParent) } - if e.peer.PacketChan == nil { - logger.Errorf("peerDownloadPieceSuccessEvent: there is no packet chan with peer %s", e.peer.PeerID) - return - } // TODO if parentPeer is equal with oldParent, need schedule again ? - e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parentPeer, candidates) + e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parentPeer, candidates)) return } @@ -226,15 +214,9 @@ func (e peerDownloadSuccessEvent) apply(s *state) { removePeerFromCurrentTree(e.peer, s) children := s.sched.ScheduleChildren(e.peer) for _, child := range children { - if child.PacketChan == nil { - logger.Debugf("reportPeerSuccessResult: there is no packet chan with peer %s", e.peer.PeerID) - continue - } - child.PacketChan <- constructSuccessPeerPacket(child, e.peer, nil) - } - if e.peer.PacketChan != nil { - close(e.peer.PacketChan) + child.SendSchedulePacket(constructSuccessPeerPacket(child, e.peer, nil)) } + e.peer.UnBindSendChannel() } func (e peerDownloadSuccessEvent) hashKey() string { @@ -259,16 +241,9 @@ func (e peerDownloadFailEvent) apply(s *state) { s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second) return true } - if child.PacketChan == nil { - logger.Warnf("reportPeerFailResult: there is no packet chan associated with peer %s", e.peer.PeerID) - return true - } - child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates) + child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates)) return true }) - if e.peer.PacketChan != nil { - close(e.peer.PacketChan) - } s.peerManager.Delete(e.peer.PeerID) } @@ -335,11 +310,7 @@ func handlePeerLeave(peer *types.Peer, s *state) { s.waitScheduleParentPeerQueue.AddAfter(child, time.Second) return true } - if child.PacketChan == nil { - logger.Debugf("handlePeerLeave: there is no packet chan with peer %s", child.PeerID) - return true - } - child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates) + child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates)) return true }) s.peerManager.Delete(peer.PeerID) @@ -353,22 +324,14 @@ func handleReplaceParent(peer *types.Peer, s *state) { s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second) return } - if peer.PacketChan == nil { - logger.Errorf("handleReplaceParent: there is no packet chan with peer %s", peer.PeerID) - return - } - peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates) + peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates)) } func handleSeedTaskFail(task *types.Task) { if task.IsFail() { task.ListPeers().Range(func(data sortedlist.Item) bool { peer := data.(*types.Peer) - if peer.PacketChan == nil { - logger.Debugf("taskSeedFailEvent: there is no packet chan with peer %s", peer.PeerID) - return true - } - peer.PacketChan <- constructFailPeerPacket(peer, dfcodes.CdnError) + peer.SendSchedulePacket(constructFailPeerPacket(peer, dfcodes.CdnError)) return true }) } @@ -381,11 +344,7 @@ func removePeerFromCurrentTree(peer *types.Peer, s *state) { if parent != nil { children := s.sched.ScheduleChildren(parent) for _, child := range children { - if child.PacketChan == nil { - logger.Debugf("removePeerFromCurrentTree: there is no packet chan with peer %s", peer.PeerID) - continue - } - child.PacketChan <- constructSuccessPeerPacket(child, peer, nil) + child.SendSchedulePacket(constructSuccessPeerPacket(child, peer, nil)) } } } diff --git a/scheduler/core/monitor.go b/scheduler/core/monitor.go index f6b5e6acb98..dd23de57473 100644 --- a/scheduler/core/monitor.go +++ b/scheduler/core/monitor.go @@ -87,7 +87,7 @@ func (m *monitor) printDebugInfo() string { parentNode = peer.GetParent().PeerID } - table.Append([]string{peer.PeerID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)-5], parentNode, peer.GetStatus().String(), + table.Append([]string{peer.PeerID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)], parentNode, peer.GetStatus().String(), peer.CreateTime.String(), strconv.Itoa(int(peer.GetFinishedNum())), strconv.FormatBool(peer.IsSuccess()), strconv.Itoa(peer.Host.GetFreeUploadLoad())}) diff --git a/scheduler/core/worker.go b/scheduler/core/worker.go index 7f35c168732..2c14a16280a 100644 --- a/scheduler/core/worker.go +++ b/scheduler/core/worker.go @@ -64,7 +64,7 @@ func (wg *workerGroup) send(e event) bool { func (wg *workerGroup) stop() { close(wg.stopCh) - wg.s.start() + wg.s.stop() for _, worker := range wg.workerList { worker.stop() } diff --git a/scheduler/daemon/peer/manager.go b/scheduler/daemon/peer/manager.go index de041bc01be..d450f486b29 100644 --- a/scheduler/daemon/peer/manager.go +++ b/scheduler/daemon/peer/manager.go @@ -70,10 +70,7 @@ func (m *manager) Delete(peerID string) { if ok { peer.Host.DeletePeer(peerID) peer.Task.DeletePeer(peer) - if peer.PacketChan != nil { - close(peer.PacketChan) - logger.Infof("close peer %s stream", peerID) - } + peer.UnBindSendChannel() m.peerMap.Delete(peerID) } return diff --git a/scheduler/server/service/scheduler_server.go b/scheduler/server/service/scheduler_server.go index e68a55a07fd..9aa0c0eb881 100644 --- a/scheduler/server/service/scheduler_server.go +++ b/scheduler/server/service/scheduler_server.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "sync" "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" @@ -32,6 +33,8 @@ import ( "d7y.io/dragonfly/v2/scheduler/core" "d7y.io/dragonfly/v2/scheduler/types" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type SchedulerServer struct { @@ -117,24 +120,33 @@ func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *schedul func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPieceResultServer) error { peerPacketChan := make(chan *scheduler.PeerPacket, 1) + var peer *types.Peer var initialized bool - g, ctx := errgroup.WithContext(context.Background()) - stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(stream.Context()) + g, ctx := errgroup.WithContext(ctx) + var once sync.Once g.Go(func() error { + defer func() { + cancel() + once.Do(peer.UnBindSendChannel) + }() for { - var peer *types.Peer select { case <-ctx.Done(): return nil - case <-stopCh: - return nil default: pieceResult, err := stream.Recv() if err == io.EOF { return nil } if err != nil { - return dferrors.Newf(dfcodes.SchedPeerPieceResultReportFail, "peer piece result report error") + if status.Code(err) == codes.Canceled { + if peer != nil { + logger.Info("peer %s canceled", peer.PeerID) + return nil + } + } + return dferrors.Newf(dfcodes.SchedPeerPieceResultReportFail, "peer piece result report error: %v", err) } logger.Debugf("report piece result %v of peer %s", pieceResult, pieceResult.SrcPid) var ok bool @@ -155,21 +167,22 @@ func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPie }) g.Go(func() error { + defer func() { + cancel() + once.Do(peer.UnBindSendChannel) + }() for { select { case <-ctx.Done(): return nil - case <-stopCh: - return nil case pp, ok := <-peerPacketChan: if !ok { - close(stopCh) return nil } err := stream.Send(pp) if err != nil { logger.Errorf("send peer %s schedule packet %v failed: %v", pp.SrcPid, pp, err) - return err + return dferrors.Newf(dfcodes.SchedPeerPieceResultReportFail, "peer piece result report error: %v", err) } } } diff --git a/scheduler/types/peer.go b/scheduler/types/peer.go index 0675ee429ec..28ce9712b3a 100644 --- a/scheduler/types/peer.go +++ b/scheduler/types/peer.go @@ -59,8 +59,10 @@ type Peer struct { Task *Task // Host specifies Host *PeerHost + // bindPacketChan + bindPacketChan bool // PacketChan send schedulerPacket to peer client - PacketChan chan *scheduler.PeerPacket + packetChan chan *scheduler.PeerPacket // createTime CreateTime time.Time // finishedNum specifies downloaded finished piece number @@ -293,11 +295,27 @@ func (peer *Peer) SetStatus(status PeerStatus) { func (peer *Peer) BindSendChannel(packetChan chan *scheduler.PeerPacket) { peer.lock.Lock() defer peer.lock.Unlock() - peer.PacketChan = packetChan + peer.bindPacketChan = true + peer.packetChan = packetChan } -func (peer *Peer) GetSendChannel() chan *scheduler.PeerPacket { - return peer.PacketChan +func (peer *Peer) UnBindSendChannel() { + peer.lock.Lock() + defer peer.lock.Unlock() + if peer.bindPacketChan { + if peer.packetChan != nil { + close(peer.packetChan) + } + peer.bindPacketChan = false + } +} + +func (peer *Peer) SendSchedulePacket(packet *scheduler.PeerPacket) { + peer.lock.Lock() + defer peer.lock.Unlock() + if peer.bindPacketChan { + peer.packetChan <- packet + } } func (peer *Peer) IsRunning() bool { diff --git a/test/stress/main.go b/test/stress/main.go index 816e53b1bd8..73ac1c1d1df 100644 --- a/test/stress/main.go +++ b/test/stress/main.go @@ -129,7 +129,7 @@ loop: } func debug() { - debugAddr := fmt.Sprintf("%s:%d", iputils.HostIP, 18066) + debugAddr := fmt.Sprintf("%s:%d", iputils.HostIP, 18070) viewer.SetConfiguration(viewer.WithAddr(debugAddr)) statsview.New().Start() } From b0ffe750656e62b95f3de4d4672272c1ea6374c6 Mon Sep 17 00:00:00 2001 From: santong Date: Mon, 2 Aug 2021 17:48:41 +0800 Subject: [PATCH 14/16] reset config Signed-off-by: santong --- client/config/peerhost_darwin.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/client/config/peerhost_darwin.go b/client/config/peerhost_darwin.go index 3c3f529df37..9955797adca 100644 --- a/client/config/peerhost_darwin.go +++ b/client/config/peerhost_darwin.go @@ -79,8 +79,11 @@ var peerHostConfig = DaemonOption{ Insecure: true, }, TCPListen: &TCPListenOption{ - Listen: net.IPv4zero.String(), - PortRange: TCPListenPortRange{}, + Listen: net.IPv4zero.String(), + PortRange: TCPListenPortRange{ + Start: 65000, + End: 65535, + }, }, }, }, From e50ba1395c5c936634527cb62a8af9d400d479fd Mon Sep 17 00:00:00 2001 From: santong Date: Mon, 2 Aug 2021 17:49:19 +0800 Subject: [PATCH 15/16] reset config Signed-off-by: santong --- client/config/peerhost_darwin.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/client/config/peerhost_darwin.go b/client/config/peerhost_darwin.go index 9955797adca..1cb85c3b2f9 100644 --- a/client/config/peerhost_darwin.go +++ b/client/config/peerhost_darwin.go @@ -110,11 +110,8 @@ var peerHostConfig = DaemonOption{ Insecure: true, }, TCPListen: &TCPListenOption{ - Listen: net.IPv4zero.String(), - PortRange: TCPListenPortRange{ - 65001, - 65001, - }, + Listen: net.IPv4zero.String(), + PortRange: TCPListenPortRange{}, }, }, }, From f26c3d0d37e2fb6762709f84222af66f3f6e8c1c Mon Sep 17 00:00:00 2001 From: santong Date: Mon, 2 Aug 2021 17:50:36 +0800 Subject: [PATCH 16/16] reset config Signed-off-by: santong --- test/stress/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/stress/main.go b/test/stress/main.go index 73ac1c1d1df..816e53b1bd8 100644 --- a/test/stress/main.go +++ b/test/stress/main.go @@ -129,7 +129,7 @@ loop: } func debug() { - debugAddr := fmt.Sprintf("%s:%d", iputils.HostIP, 18070) + debugAddr := fmt.Sprintf("%s:%d", iputils.HostIP, 18066) viewer.SetConfiguration(viewer.WithAddr(debugAddr)) statsview.New().Start() }