Skip to content

Commit

Permalink
chore: optimize stream peer task (#763)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored and gaius-qi committed Jun 28, 2023
1 parent 6546115 commit 860af67
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 93 deletions.
15 changes: 9 additions & 6 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,10 @@ func (pt *peerTask) pullPiecesFromPeers(cleanUnfinishedFunc func()) {
defer func() {
cleanUnfinishedFunc()
}()

if !pt.waitFirstPeerPacket() {
// TODO 如果是客户端直接回源,这里不应该在输出错误日志
if ok, backSource := pt.waitFirstPeerPacket(); !ok {
if backSource {
return
}
pt.Errorf("wait first peer packet error")
return
}
Expand Down Expand Up @@ -487,7 +488,7 @@ func (pt *peerTask) init(piecePacket *base.PiecePacket, pieceBufferSize int32) (
return pieceRequestCh, true
}

func (pt *peerTask) waitFirstPeerPacket() bool {
func (pt *peerTask) waitFirstPeerPacket() (done bool, backSource bool) {
// wait first available peer
select {
case <-pt.ctx.Done():
Expand All @@ -502,13 +503,14 @@ func (pt *peerTask) waitFirstPeerPacket() bool {
// preparePieceTasksByPeer func already send piece result with error
pt.Infof("new peer client ready, scheduler time cost: %dus, main peer: %s",
time.Now().Sub(pt.callback.GetStartTime()).Microseconds(), pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer)
return true
return true, false
}
// when scheduler says dfcodes.SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady
pt.Infof("start download from source due to dfcodes.SchedNeedBackSource")
pt.span.AddEvent("back source due to scheduler says need back source")
pt.needBackSource = true
pt.backSource()
return false, true
case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration):
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonScheduleTimeout
Expand All @@ -521,9 +523,10 @@ func (pt *peerTask) waitFirstPeerPacket() bool {
pt.span.AddEvent("back source due to schedule timeout")
pt.needBackSource = true
pt.backSource()
return false, true
}
}
return false
return false, false
}

func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
Expand Down
175 changes: 88 additions & 87 deletions client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,7 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.ReadCloser, map[string]s
firstPiece = first
}

pr, pw := io.Pipe()
attr := map[string]string{}
var readCloser io.ReadCloser = pr
var writer io.Writer = pw
if s.contentLength.Load() != -1 {
attr[headers.ContentLength] = fmt.Sprintf("%d", s.contentLength.Load())
} else {
Expand All @@ -289,89 +286,9 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.ReadCloser, map[string]s
attr[config.HeaderDragonflyTask] = s.taskID
attr[config.HeaderDragonflyPeer] = s.peerID

go func(first int32) {
defer func() {
s.cancel()
s.span.End()
}()
var (
desired int32
cur int32
wrote int64
err error
//ok bool
cache = make(map[int32]bool)
)
// update first piece to cache and check cur with desired
cache[first] = true
cur = first
for {
if desired == cur {
for {
delete(cache, desired)
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
span.SetAttributes(config.AttributePiece.Int(int(desired)))
wrote, err = s.writeTo(writer, desired)
if err != nil {
span.RecordError(err)
span.End()
s.Errorf("write to pipe error: %s", err)
_ = pw.CloseWithError(err)
return
}
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
span.End()
desired++
cached := cache[desired]
if !cached {
break
}
}
} else {
// not desired piece, cache it
cache[cur] = true
if cur < desired {
s.Warnf("piece number should be equal or greater than %d, received piece number: %d", desired, cur)
}
}

select {
case <-s.ctx.Done():
s.Errorf("ctx.PeerTaskDone due to: %s", s.ctx.Err())
s.span.RecordError(s.ctx.Err())
if err := pw.CloseWithError(s.ctx.Err()); err != nil {
s.Errorf("CloseWithError failed: %s", err)
}
return
case <-s.streamDone:
for {
// all data wrote to local storage, and all data wrote to pipe write
if s.readyPieces.Settled() == desired {
pw.Close()
return
}
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
span.SetAttributes(config.AttributePiece.Int(int(desired)))
wrote, err = s.writeTo(pw, desired)
if err != nil {
span.RecordError(err)
span.End()
s.span.RecordError(err)
s.Errorf("write to pipe error: %s", err)
_ = pw.CloseWithError(err)
return
}
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
span.End()
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
desired++
}
case cur = <-s.successPieceCh:
continue
}
}
}(firstPiece)
pr, pw := io.Pipe()
var readCloser io.ReadCloser = pr
go s.writeToPipe(firstPiece, pw)

return readCloser, attr, nil
}
Expand Down Expand Up @@ -428,7 +345,7 @@ func (s *streamPeerTask) SetTotalPieces(i int32) {
s.totalPiece = i
}

func (s *streamPeerTask) writeTo(w io.Writer, pieceNum int32) (int64, error) {
func (s *streamPeerTask) writeOnePiece(w io.Writer, pieceNum int32) (int64, error) {
pr, pc, err := s.pieceManager.ReadPiece(s.ctx, &storage.ReadPieceRequest{
PeerTaskMetaData: storage.PeerTaskMetaData{
PeerID: s.peerID,
Expand Down Expand Up @@ -476,3 +393,87 @@ func (s *streamPeerTask) backSource() {
_ = s.finish()
return
}

func (s *streamPeerTask) writeToPipe(firstPiece int32, pw *io.PipeWriter) {
defer func() {
s.cancel()
s.span.End()
}()
var (
desired int32
cur int32
wrote int64
err error
cache = make(map[int32]bool)
)
// update first piece to cache and check cur with desired
cache[firstPiece] = true
cur = firstPiece
for {
if desired == cur {
for {
delete(cache, desired)
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
span.SetAttributes(config.AttributePiece.Int(int(desired)))
wrote, err = s.writeOnePiece(pw, desired)
if err != nil {
span.RecordError(err)
span.End()
s.Errorf("write to pipe error: %s", err)
_ = pw.CloseWithError(err)
return
}
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
span.End()
desired++
cached := cache[desired]
if !cached {
break
}
}
} else {
// not desired piece, cache it
cache[cur] = true
if cur < desired {
s.Warnf("piece number should be equal or greater than %d, received piece number: %d", desired, cur)
}
}

select {
case <-s.ctx.Done():
s.Errorf("ctx.PeerTaskDone due to: %s", s.ctx.Err())
s.span.RecordError(s.ctx.Err())
if err := pw.CloseWithError(s.ctx.Err()); err != nil {
s.Errorf("CloseWithError failed: %s", err)
}
return
case <-s.streamDone:
for {
// all data wrote to local storage, and all data wrote to pipe write
if s.readyPieces.Settled() == desired {
s.Debugf("all %d pieces wrote to pipe", desired)
pw.Close()
return
}
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
span.SetAttributes(config.AttributePiece.Int(int(desired)))
wrote, err = s.writeOnePiece(pw, desired)
if err != nil {
span.RecordError(err)
span.End()
s.span.RecordError(err)
s.Errorf("write to pipe error: %s", err)
_ = pw.CloseWithError(err)
return
}
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
span.End()
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
desired++
}
case cur = <-s.successPieceCh:
continue
}
}
}

0 comments on commit 860af67

Please sign in to comment.