diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index df9c17ee0a4..eb8f6d58785 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -371,9 +371,10 @@ func (pt *peerTask) pullPiecesFromPeers(cleanUnfinishedFunc func()) { defer func() { cleanUnfinishedFunc() }() - - if !pt.waitFirstPeerPacket() { - // TODO 如果是客户端直接回源,这里不应该在输出错误日志 + if ok, backSource := pt.waitFirstPeerPacket(); !ok { + if backSource { + return + } pt.Errorf("wait first peer packet error") return } @@ -487,7 +488,7 @@ func (pt *peerTask) init(piecePacket *base.PiecePacket, pieceBufferSize int32) ( return pieceRequestCh, true } -func (pt *peerTask) waitFirstPeerPacket() bool { +func (pt *peerTask) waitFirstPeerPacket() (done bool, backSource bool) { // wait first available peer select { case <-pt.ctx.Done(): @@ -502,13 +503,14 @@ func (pt *peerTask) waitFirstPeerPacket() bool { // preparePieceTasksByPeer func already send piece result with error pt.Infof("new peer client ready, scheduler time cost: %dus, main peer: %s", time.Now().Sub(pt.callback.GetStartTime()).Microseconds(), pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer) - return true + return true, false } // when scheduler says dfcodes.SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady pt.Infof("start download from source due to dfcodes.SchedNeedBackSource") pt.span.AddEvent("back source due to scheduler says need back source") pt.needBackSource = true pt.backSource() + return false, true case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration): if pt.schedulerOption.DisableAutoBackSource { pt.failedReason = reasonScheduleTimeout @@ -521,9 +523,10 @@ func (pt *peerTask) waitFirstPeerPacket() bool { pt.span.AddEvent("back source due to schedule timeout") pt.needBackSource = true pt.backSource() + return false, true } } - return false + return false, false } func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) { diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index ea33bdbc19e..36757f4e5d3 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -277,10 +277,7 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.ReadCloser, map[string]s firstPiece = first } - pr, pw := io.Pipe() attr := map[string]string{} - var readCloser io.ReadCloser = pr - var writer io.Writer = pw if s.contentLength.Load() != -1 { attr[headers.ContentLength] = fmt.Sprintf("%d", s.contentLength.Load()) } else { @@ -289,89 +286,9 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.ReadCloser, map[string]s attr[config.HeaderDragonflyTask] = s.taskID attr[config.HeaderDragonflyPeer] = s.peerID - go func(first int32) { - defer func() { - s.cancel() - s.span.End() - }() - var ( - desired int32 - cur int32 - wrote int64 - err error - //ok bool - cache = make(map[int32]bool) - ) - // update first piece to cache and check cur with desired - cache[first] = true - cur = first - for { - if desired == cur { - for { - delete(cache, desired) - _, span := tracer.Start(s.ctx, config.SpanWriteBackPiece) - span.SetAttributes(config.AttributePiece.Int(int(desired))) - wrote, err = s.writeTo(writer, desired) - if err != nil { - span.RecordError(err) - span.End() - s.Errorf("write to pipe error: %s", err) - _ = pw.CloseWithError(err) - return - } - span.SetAttributes(config.AttributePieceSize.Int(int(wrote))) - s.Debugf("wrote piece %d to pipe, size %d", desired, wrote) - span.End() - desired++ - cached := cache[desired] - if !cached { - break - } - } - } else { - // not desired piece, cache it - cache[cur] = true - if cur < desired { - s.Warnf("piece number should be equal or greater than %d, received piece number: %d", desired, cur) - } - } - - select { - case <-s.ctx.Done(): - s.Errorf("ctx.PeerTaskDone due to: %s", s.ctx.Err()) - s.span.RecordError(s.ctx.Err()) - if err := pw.CloseWithError(s.ctx.Err()); err != nil { - s.Errorf("CloseWithError failed: %s", err) - } - return - case <-s.streamDone: - for { - // all data wrote to local storage, and all data wrote to pipe write - if s.readyPieces.Settled() == desired { - pw.Close() - return - } - _, span := tracer.Start(s.ctx, config.SpanWriteBackPiece) - span.SetAttributes(config.AttributePiece.Int(int(desired))) - wrote, err = s.writeTo(pw, desired) - if err != nil { - span.RecordError(err) - span.End() - s.span.RecordError(err) - s.Errorf("write to pipe error: %s", err) - _ = pw.CloseWithError(err) - return - } - span.SetAttributes(config.AttributePieceSize.Int(int(wrote))) - span.End() - s.Debugf("wrote piece %d to pipe, size %d", desired, wrote) - desired++ - } - case cur = <-s.successPieceCh: - continue - } - } - }(firstPiece) + pr, pw := io.Pipe() + var readCloser io.ReadCloser = pr + go s.writeToPipe(firstPiece, pw) return readCloser, attr, nil } @@ -428,7 +345,7 @@ func (s *streamPeerTask) SetTotalPieces(i int32) { s.totalPiece = i } -func (s *streamPeerTask) writeTo(w io.Writer, pieceNum int32) (int64, error) { +func (s *streamPeerTask) writeOnePiece(w io.Writer, pieceNum int32) (int64, error) { pr, pc, err := s.pieceManager.ReadPiece(s.ctx, &storage.ReadPieceRequest{ PeerTaskMetaData: storage.PeerTaskMetaData{ PeerID: s.peerID, @@ -476,3 +393,87 @@ func (s *streamPeerTask) backSource() { _ = s.finish() return } + +func (s *streamPeerTask) writeToPipe(firstPiece int32, pw *io.PipeWriter) { + defer func() { + s.cancel() + s.span.End() + }() + var ( + desired int32 + cur int32 + wrote int64 + err error + cache = make(map[int32]bool) + ) + // update first piece to cache and check cur with desired + cache[firstPiece] = true + cur = firstPiece + for { + if desired == cur { + for { + delete(cache, desired) + _, span := tracer.Start(s.ctx, config.SpanWriteBackPiece) + span.SetAttributes(config.AttributePiece.Int(int(desired))) + wrote, err = s.writeOnePiece(pw, desired) + if err != nil { + span.RecordError(err) + span.End() + s.Errorf("write to pipe error: %s", err) + _ = pw.CloseWithError(err) + return + } + span.SetAttributes(config.AttributePieceSize.Int(int(wrote))) + s.Debugf("wrote piece %d to pipe, size %d", desired, wrote) + span.End() + desired++ + cached := cache[desired] + if !cached { + break + } + } + } else { + // not desired piece, cache it + cache[cur] = true + if cur < desired { + s.Warnf("piece number should be equal or greater than %d, received piece number: %d", desired, cur) + } + } + + select { + case <-s.ctx.Done(): + s.Errorf("ctx.PeerTaskDone due to: %s", s.ctx.Err()) + s.span.RecordError(s.ctx.Err()) + if err := pw.CloseWithError(s.ctx.Err()); err != nil { + s.Errorf("CloseWithError failed: %s", err) + } + return + case <-s.streamDone: + for { + // all data wrote to local storage, and all data wrote to pipe write + if s.readyPieces.Settled() == desired { + s.Debugf("all %d pieces wrote to pipe", desired) + pw.Close() + return + } + _, span := tracer.Start(s.ctx, config.SpanWriteBackPiece) + span.SetAttributes(config.AttributePiece.Int(int(desired))) + wrote, err = s.writeOnePiece(pw, desired) + if err != nil { + span.RecordError(err) + span.End() + s.span.RecordError(err) + s.Errorf("write to pipe error: %s", err) + _ = pw.CloseWithError(err) + return + } + span.SetAttributes(config.AttributePieceSize.Int(int(wrote))) + span.End() + s.Debugf("wrote piece %d to pipe, size %d", desired, wrote) + desired++ + } + case cur = <-s.successPieceCh: + continue + } + } +}