From 6a01766eafc4408073dca1781c2a4d78aaba2443 Mon Sep 17 00:00:00 2001 From: santong Date: Mon, 2 Aug 2021 17:39:01 +0800 Subject: [PATCH] fix concurrent download Signed-off-by: santong --- client/config/peerhost_darwin.go | 14 ++--- client/config/peerhost_linux.go | 8 ++- client/daemon/peer/peertask_base.go | 4 +- cmd/dependency/dependency.go | 2 +- .../scheduler/client/peer_packet_stream.go | 15 +++-- scheduler/core/events.go | 61 +++---------------- scheduler/core/monitor.go | 2 +- scheduler/core/worker.go | 2 +- scheduler/daemon/peer/manager.go | 5 +- scheduler/server/service/scheduler_server.go | 33 +++++++--- scheduler/types/peer.go | 26 ++++++-- test/stress/main.go | 2 +- 12 files changed, 83 insertions(+), 91 deletions(-) diff --git a/client/config/peerhost_darwin.go b/client/config/peerhost_darwin.go index 1cb85c3b2f9..3c3f529df37 100644 --- a/client/config/peerhost_darwin.go +++ b/client/config/peerhost_darwin.go @@ -79,11 +79,8 @@ var peerHostConfig = DaemonOption{ Insecure: true, }, TCPListen: &TCPListenOption{ - Listen: net.IPv4zero.String(), - PortRange: TCPListenPortRange{ - Start: 65000, - End: 65535, - }, + Listen: net.IPv4zero.String(), + PortRange: TCPListenPortRange{}, }, }, }, @@ -110,8 +107,11 @@ var peerHostConfig = DaemonOption{ Insecure: true, }, TCPListen: &TCPListenOption{ - Listen: net.IPv4zero.String(), - PortRange: TCPListenPortRange{}, + Listen: net.IPv4zero.String(), + PortRange: TCPListenPortRange{ + 65001, + 65001, + }, }, }, }, diff --git a/client/config/peerhost_linux.go b/client/config/peerhost_linux.go index cf5605b0c5f..2c799284683 100644 --- a/client/config/peerhost_linux.go +++ b/client/config/peerhost_linux.go @@ -21,6 +21,7 @@ package config import ( "net" + "d7y.io/dragonfly/v2/pkg/basic/dfnet" "golang.org/x/time/rate" "d7y.io/dragonfly/v2/client/clientutil" @@ -42,7 +43,12 @@ var peerHostConfig = DaemonOption{ GCInterval: clientutil.Duration{Duration: DefaultGCInterval}, KeepStorage: false, Scheduler: SchedulerOption{ - NetAddrs: nil, + NetAddrs: []dfnet.NetAddr{ + { + Type: dfnet.TCP, + Addr: "127.0.0.1:8002", + }, + }, ScheduleTimeout: clientutil.Duration{Duration: DefaultScheduleTimeout}, }, Host: HostOption{ diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index dd28df4b81d..2376aae07d4 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -684,7 +684,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer if getErr != nil { span.RecordError(getErr) // fast way to exit retry - if curPeerPacket != pt.peerPacket { + if curPeerPacket.MainPeer.PeerId != pt.peerPacket.MainPeer.PeerId { pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getErr, curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId) peerPacketChanged = true @@ -709,7 +709,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer pt.Errorf("send piece result error: %s, code: %d", peer.PeerId, er) } // fast way to exit retry - if curPeerPacket != pt.peerPacket { + if curPeerPacket.MainPeer.PeerId != pt.peerPacket.MainPeer.PeerId { pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId) peerPacketChanged = true diff --git a/cmd/dependency/dependency.go b/cmd/dependency/dependency.go index 7a05b027f99..48d6edf7b14 100644 --- a/cmd/dependency/dependency.go +++ b/cmd/dependency/dependency.go @@ -99,7 +99,7 @@ func InitMonitor(verbose bool, pprofPort int, jaeger string) func() { pprofPort, _ = freeport.GetFreePort() } - debugAddr := fmt.Sprintf("localhost:%d", pprofPort) + debugAddr := fmt.Sprintf("%s:%d", iputils.HostIP, pprofPort) viewer.SetConfiguration(viewer.WithAddr(debugAddr)) logger.With("pprof", fmt.Sprintf("http://%s/debug/pprof", debugAddr), diff --git a/pkg/rpc/scheduler/client/peer_packet_stream.go b/pkg/rpc/scheduler/client/peer_packet_stream.go index a62bbf01f7c..d3983dba01f 100644 --- a/pkg/rpc/scheduler/client/peer_packet_stream.go +++ b/pkg/rpc/scheduler/client/peer_packet_stream.go @@ -19,7 +19,6 @@ package client import ( "context" "io" - "time" "github.com/pkg/errors" "google.golang.org/grpc" @@ -56,7 +55,7 @@ func newPeerPacketStream(ctx context.Context, sc *schedulerClient, hashKey strin pps := &peerPacketStream{ sc: sc, - ctx: ctx, + ctx: context.Background(), hashKey: hashKey, ptr: ptr, opts: opts, @@ -126,9 +125,9 @@ func (pps *peerPacketStream) retryRecv(cause error) (*scheduler.PeerPacket, erro if err != nil { return nil, err } - timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _, err = client.RegisterPeerTask(timeCtx, pps.ptr) + //timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + //defer cancel() + _, err = client.RegisterPeerTask(pps.ctx, pps.ptr) if err != nil { return nil, err } @@ -198,9 +197,9 @@ func (pps *peerPacketStream) replaceClient(cause error) error { if err != nil { return nil, err } - timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _, err = client.RegisterPeerTask(timeCtx, pps.ptr) + //timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + //defer cancel() + _, err = client.RegisterPeerTask(pps.ctx, pps.ptr) if err != nil { return nil, err } diff --git a/scheduler/core/events.go b/scheduler/core/events.go index 3de9f96bc2b..6e9b24f4348 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -74,11 +74,7 @@ func (s *state) start() { s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second) continue } - if peer.PacketChan == nil { - logger.Errorf("waitScheduleParentPeerQueue: there is no packet chan associated with peer %s", peer.PeerID) - return - } - peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates) + peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates)) logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("waitScheduleParentPeerQueue: peer has left from waitScheduleParentPeerQueue because it has scheduled new parent %v", parent) s.waitScheduleParentPeerQueue.Done(v) @@ -104,11 +100,7 @@ func (e startReportPieceResultEvent) apply(s *state) { s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second) return } - if e.peer.PacketChan == nil { - logger.Errorf("start report piece result: there is no packet chan associated with peer %s", e.peer.PeerID) - return - } - e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parent, candidates) + e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parent, candidates)) } func (e startReportPieceResultEvent) hashKey() string { @@ -140,12 +132,8 @@ func (e peerDownloadPieceSuccessEvent) apply(s *state) { if oldParent != nil { candidates = append(candidates, oldParent) } - if e.peer.PacketChan == nil { - logger.Errorf("peerDownloadPieceSuccessEvent: there is no packet chan with peer %s", e.peer.PeerID) - return - } // TODO if parentPeer is equal with oldParent, need schedule again ? - e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parentPeer, candidates) + e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parentPeer, candidates)) return } @@ -226,15 +214,9 @@ func (e peerDownloadSuccessEvent) apply(s *state) { removePeerFromCurrentTree(e.peer, s) children := s.sched.ScheduleChildren(e.peer) for _, child := range children { - if child.PacketChan == nil { - logger.Debugf("reportPeerSuccessResult: there is no packet chan with peer %s", e.peer.PeerID) - continue - } - child.PacketChan <- constructSuccessPeerPacket(child, e.peer, nil) - } - if e.peer.PacketChan != nil { - close(e.peer.PacketChan) + child.SendSchedulePacket(constructSuccessPeerPacket(child, e.peer, nil)) } + e.peer.UnBindSendChannel() } func (e peerDownloadSuccessEvent) hashKey() string { @@ -259,16 +241,9 @@ func (e peerDownloadFailEvent) apply(s *state) { s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second) return true } - if child.PacketChan == nil { - logger.Warnf("reportPeerFailResult: there is no packet chan associated with peer %s", e.peer.PeerID) - return true - } - child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates) + child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates)) return true }) - if e.peer.PacketChan != nil { - close(e.peer.PacketChan) - } s.peerManager.Delete(e.peer.PeerID) } @@ -335,11 +310,7 @@ func handlePeerLeave(peer *types.Peer, s *state) { s.waitScheduleParentPeerQueue.AddAfter(child, time.Second) return true } - if child.PacketChan == nil { - logger.Debugf("handlePeerLeave: there is no packet chan with peer %s", child.PeerID) - return true - } - child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates) + child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates)) return true }) s.peerManager.Delete(peer.PeerID) @@ -353,22 +324,14 @@ func handleReplaceParent(peer *types.Peer, s *state) { s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second) return } - if peer.PacketChan == nil { - logger.Errorf("handleReplaceParent: there is no packet chan with peer %s", peer.PeerID) - return - } - peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates) + peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates)) } func handleSeedTaskFail(task *types.Task) { if task.IsFail() { task.ListPeers().Range(func(data sortedlist.Item) bool { peer := data.(*types.Peer) - if peer.PacketChan == nil { - logger.Debugf("taskSeedFailEvent: there is no packet chan with peer %s", peer.PeerID) - return true - } - peer.PacketChan <- constructFailPeerPacket(peer, dfcodes.CdnError) + peer.SendSchedulePacket(constructFailPeerPacket(peer, dfcodes.CdnError)) return true }) } @@ -381,11 +344,7 @@ func removePeerFromCurrentTree(peer *types.Peer, s *state) { if parent != nil { children := s.sched.ScheduleChildren(parent) for _, child := range children { - if child.PacketChan == nil { - logger.Debugf("removePeerFromCurrentTree: there is no packet chan with peer %s", peer.PeerID) - continue - } - child.PacketChan <- constructSuccessPeerPacket(child, peer, nil) + child.SendSchedulePacket(constructSuccessPeerPacket(child, peer, nil)) } } } diff --git a/scheduler/core/monitor.go b/scheduler/core/monitor.go index f6b5e6acb98..dd23de57473 100644 --- a/scheduler/core/monitor.go +++ b/scheduler/core/monitor.go @@ -87,7 +87,7 @@ func (m *monitor) printDebugInfo() string { parentNode = peer.GetParent().PeerID } - table.Append([]string{peer.PeerID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)-5], parentNode, peer.GetStatus().String(), + table.Append([]string{peer.PeerID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)], parentNode, peer.GetStatus().String(), peer.CreateTime.String(), strconv.Itoa(int(peer.GetFinishedNum())), strconv.FormatBool(peer.IsSuccess()), strconv.Itoa(peer.Host.GetFreeUploadLoad())}) diff --git a/scheduler/core/worker.go b/scheduler/core/worker.go index 7f35c168732..2c14a16280a 100644 --- a/scheduler/core/worker.go +++ b/scheduler/core/worker.go @@ -64,7 +64,7 @@ func (wg *workerGroup) send(e event) bool { func (wg *workerGroup) stop() { close(wg.stopCh) - wg.s.start() + wg.s.stop() for _, worker := range wg.workerList { worker.stop() } diff --git a/scheduler/daemon/peer/manager.go b/scheduler/daemon/peer/manager.go index de041bc01be..d450f486b29 100644 --- a/scheduler/daemon/peer/manager.go +++ b/scheduler/daemon/peer/manager.go @@ -70,10 +70,7 @@ func (m *manager) Delete(peerID string) { if ok { peer.Host.DeletePeer(peerID) peer.Task.DeletePeer(peer) - if peer.PacketChan != nil { - close(peer.PacketChan) - logger.Infof("close peer %s stream", peerID) - } + peer.UnBindSendChannel() m.peerMap.Delete(peerID) } return diff --git a/scheduler/server/service/scheduler_server.go b/scheduler/server/service/scheduler_server.go index e68a55a07fd..9aa0c0eb881 100644 --- a/scheduler/server/service/scheduler_server.go +++ b/scheduler/server/service/scheduler_server.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "sync" "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" @@ -32,6 +33,8 @@ import ( "d7y.io/dragonfly/v2/scheduler/core" "d7y.io/dragonfly/v2/scheduler/types" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type SchedulerServer struct { @@ -117,24 +120,33 @@ func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *schedul func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPieceResultServer) error { peerPacketChan := make(chan *scheduler.PeerPacket, 1) + var peer *types.Peer var initialized bool - g, ctx := errgroup.WithContext(context.Background()) - stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(stream.Context()) + g, ctx := errgroup.WithContext(ctx) + var once sync.Once g.Go(func() error { + defer func() { + cancel() + once.Do(peer.UnBindSendChannel) + }() for { - var peer *types.Peer select { case <-ctx.Done(): return nil - case <-stopCh: - return nil default: pieceResult, err := stream.Recv() if err == io.EOF { return nil } if err != nil { - return dferrors.Newf(dfcodes.SchedPeerPieceResultReportFail, "peer piece result report error") + if status.Code(err) == codes.Canceled { + if peer != nil { + logger.Info("peer %s canceled", peer.PeerID) + return nil + } + } + return dferrors.Newf(dfcodes.SchedPeerPieceResultReportFail, "peer piece result report error: %v", err) } logger.Debugf("report piece result %v of peer %s", pieceResult, pieceResult.SrcPid) var ok bool @@ -155,21 +167,22 @@ func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPie }) g.Go(func() error { + defer func() { + cancel() + once.Do(peer.UnBindSendChannel) + }() for { select { case <-ctx.Done(): return nil - case <-stopCh: - return nil case pp, ok := <-peerPacketChan: if !ok { - close(stopCh) return nil } err := stream.Send(pp) if err != nil { logger.Errorf("send peer %s schedule packet %v failed: %v", pp.SrcPid, pp, err) - return err + return dferrors.Newf(dfcodes.SchedPeerPieceResultReportFail, "peer piece result report error: %v", err) } } } diff --git a/scheduler/types/peer.go b/scheduler/types/peer.go index 0675ee429ec..28ce9712b3a 100644 --- a/scheduler/types/peer.go +++ b/scheduler/types/peer.go @@ -59,8 +59,10 @@ type Peer struct { Task *Task // Host specifies Host *PeerHost + // bindPacketChan + bindPacketChan bool // PacketChan send schedulerPacket to peer client - PacketChan chan *scheduler.PeerPacket + packetChan chan *scheduler.PeerPacket // createTime CreateTime time.Time // finishedNum specifies downloaded finished piece number @@ -293,11 +295,27 @@ func (peer *Peer) SetStatus(status PeerStatus) { func (peer *Peer) BindSendChannel(packetChan chan *scheduler.PeerPacket) { peer.lock.Lock() defer peer.lock.Unlock() - peer.PacketChan = packetChan + peer.bindPacketChan = true + peer.packetChan = packetChan } -func (peer *Peer) GetSendChannel() chan *scheduler.PeerPacket { - return peer.PacketChan +func (peer *Peer) UnBindSendChannel() { + peer.lock.Lock() + defer peer.lock.Unlock() + if peer.bindPacketChan { + if peer.packetChan != nil { + close(peer.packetChan) + } + peer.bindPacketChan = false + } +} + +func (peer *Peer) SendSchedulePacket(packet *scheduler.PeerPacket) { + peer.lock.Lock() + defer peer.lock.Unlock() + if peer.bindPacketChan { + peer.packetChan <- packet + } } func (peer *Peer) IsRunning() bool { diff --git a/test/stress/main.go b/test/stress/main.go index 816e53b1bd8..73ac1c1d1df 100644 --- a/test/stress/main.go +++ b/test/stress/main.go @@ -129,7 +129,7 @@ loop: } func debug() { - debugAddr := fmt.Sprintf("%s:%d", iputils.HostIP, 18066) + debugAddr := fmt.Sprintf("%s:%d", iputils.HostIP, 18070) viewer.SetConfiguration(viewer.WithAddr(debugAddr)) statsview.New().Start() }