Skip to content

Commit

Permalink
Merge branch 'dragonflyoss:main' into fix/cdn-gc
Browse files Browse the repository at this point in the history
  • Loading branch information
zzy987 authored Aug 31, 2021
2 parents 8093cb1 + 4464dae commit 89716cc
Show file tree
Hide file tree
Showing 36 changed files with 1,495 additions and 522 deletions.
80 changes: 54 additions & 26 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ type peerTask struct {

// done channel will be close when peer task is finished
done chan struct{}
// peerTaskDone will be true after peer task done
peerTaskDone bool
// success will be true after peer task done
success bool
// span stands open telemetry trace span
span trace.Span

// same actions must be done only once, like close done channel and so on
once sync.Once

// failedPieceCh will hold all pieces which download failed,
// those pieces will be retry later
// those pieces will be retried later
failedPieceCh chan int32
// failedReason will be set when peer task failed
failedReason string
Expand Down Expand Up @@ -226,6 +226,10 @@ loop:
break loop
}
if err != nil {
// when success, context will be cancelled, check if pt.success is true
if pt.success {
return
}
pt.failedCode = dfcodes.UnknownError
if de, ok := err.(*dferrors.DfError); ok {
if de.Code == dfcodes.SchedNeedBackSource {
Expand Down Expand Up @@ -388,7 +392,7 @@ loop:
break loop
case <-pt.ctx.Done():
pt.Debugf("context done due to %s", pt.ctx.Err())
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand Down Expand Up @@ -488,20 +492,38 @@ func (pt *peerTask) waitFirstPeerPacket() bool {
pt.failedReason = err.Error()
}
pt.span.AddEvent(fmt.Sprintf("pulling pieces end due to %s", err))
case <-pt.peerPacketReady:
// preparePieceTasksByPeer func already send piece result with error
pt.Infof("new peer client ready, scheduler time cost: %dus, main peer: %s",
time.Now().Sub(pt.callback.GetStartTime()).Microseconds(), pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer)
return true
case _, ok := <-pt.peerPacketReady:
if ok {
// preparePieceTasksByPeer func already send piece result with error
pt.Infof("new peer client ready, scheduler time cost: %dus, main peer: %s",
time.Now().Sub(pt.callback.GetStartTime()).Microseconds(), pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer)
return true
}
// when schedule timeout, receivePeerPacket will close pt.peerPacketReady
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonBackSourceDisabled
err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
pt.span.RecordError(err)
pt.Errorf(err.Error())
} else {
pt.Warnf("start download from source due to dfcodes.SchedNeedBackSource")
pt.span.AddEvent("back source due to scheduler says need back source")
pt.needBackSource = true
pt.backSource()
}
case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration):
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonScheduleTimeout
pt.failedCode = dfcodes.ClientScheduleTimeout
logger.Errorf("%s, auto back source disabled", pt.failedReason)
err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
pt.span.RecordError(err)
pt.Errorf(err.Error())
} else {
pt.Warnf("start download from source due to %s", reasonScheduleTimeout)
pt.span.AddEvent("back source due to schedule timeout")
pt.needBackSource = true
pt.backSource()
}
pt.Errorf("start download from source due to %s", reasonScheduleTimeout)
pt.needBackSource = true
pt.backSource()
}
return false
}
Expand All @@ -511,10 +533,10 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
select {
// when peer task without content length or total pieces count, match here
case <-pt.done:
pt.Infof("peer task done, stop get pieces from peer")
pt.Infof("peer task done, stop wait available peer packet")
case <-pt.ctx.Done():
pt.Debugf("context done due to %s", pt.ctx.Err())
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand All @@ -529,23 +551,29 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
}
// when schedule timeout, receivePeerPacket will close pt.peerPacketReady
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonReScheduleTimeout
pt.failedCode = dfcodes.ClientScheduleTimeout
logger.Errorf("%s, auto back source disabled", pt.failedReason)
pt.failedReason = reasonBackSourceDisabled
err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
pt.span.RecordError(err)
pt.Errorf(err.Error())
} else {
pt.Errorf("start download from source due to dfcodes.SchedNeedBackSource")
pt.Warnf("start download from source due to dfcodes.SchedNeedBackSource")
pt.span.AddEvent("back source due to scheduler says need back source ")
pt.needBackSource = true
pt.backSource()
}
case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration):
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonReScheduleTimeout
pt.failedCode = dfcodes.ClientScheduleTimeout
logger.Errorf("%s, auto back source disabled", pt.failedReason)
err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
pt.span.RecordError(err)
pt.Errorf(err.Error())
} else {
pt.Warnf("start download from source due to %s", reasonReScheduleTimeout)
pt.span.AddEvent("back source due to schedule timeout")
pt.needBackSource = true
pt.backSource()
}
pt.Errorf("start download from source due to %s", reasonReScheduleTimeout)
pt.needBackSource = true
pt.backSource()
}
return -1, false
}
Expand All @@ -568,7 +596,7 @@ func (pt *peerTask) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceReque
pt.Warnf("peer task done, but still some piece request not process")
case <-pt.ctx.Done():
pt.Warnf("context done due to %s", pt.ctx.Err())
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand All @@ -585,10 +613,10 @@ func (pt *peerTask) waitFailedPiece() (int32, bool) {
// use no default branch select to wait failed piece or exit
select {
case <-pt.done:
pt.Infof("peer task done, stop get pieces from peer")
pt.Infof("peer task done, stop wait failed piece")
return -1, false
case <-pt.ctx.Done():
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand Down
17 changes: 10 additions & 7 deletions client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,10 @@ func (pt *filePeerTask) finish() error {
pt.Debugf("finish end piece result sent")

var (
success = true
code = dfcodes.Success
message = "Success"
success = true
code = dfcodes.Success
message = "Success"
progressDone bool
)

// callback to store data to output
Expand All @@ -322,7 +323,7 @@ func (pt *filePeerTask) finish() error {
CompletedLength: pt.completedLength.Load(),
PeerTaskDone: true,
DoneCallback: func() {
pt.peerTaskDone = true
progressDone = true
close(pt.progressStopCh)
},
}
Expand All @@ -341,13 +342,14 @@ func (pt *filePeerTask) finish() error {
case <-pt.progressStopCh:
pt.Infof("progress stopped")
case <-pt.ctx.Done():
if pt.peerTaskDone {
if progressDone {
pt.Debugf("progress stopped and context done")
} else {
pt.Warnf("wait progress stopped failed, context done, but progress not stopped")
}
}
pt.Debugf("finished: close channel")
pt.success = true
close(pt.done)
pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
pt.span.End()
Expand All @@ -365,6 +367,7 @@ func (pt *filePeerTask) cleanUnfinished() {
scheduler.NewEndPieceResult(pt.taskID, pt.peerID, pt.readyPieces.Settled()))
pt.Debugf("clean up end piece result sent")

var progressDone bool
pg := &FilePeerTaskProgress{
State: &ProgressState{
Success: false,
Expand All @@ -377,7 +380,7 @@ func (pt *filePeerTask) cleanUnfinished() {
CompletedLength: pt.completedLength.Load(),
PeerTaskDone: true,
DoneCallback: func() {
pt.peerTaskDone = true
progressDone = true
close(pt.progressStopCh)
},
}
Expand All @@ -396,7 +399,7 @@ func (pt *filePeerTask) cleanUnfinished() {
case <-pt.progressStopCh:
pt.Infof("progress stopped")
case <-pt.ctx.Done():
if pt.peerTaskDone {
if progressDone {
pt.Debugf("progress stopped and context done")
} else {
pt.Warnf("wait progress stopped failed, context done, but progress not stopped")
Expand Down
5 changes: 3 additions & 2 deletions client/daemon/peer/peertask_file_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package peer

import (
"context"
"time"

"d7y.io/dragonfly/v2/client/daemon/storage"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (p *filePeerTaskCallback) Done(pt Task) error {
return e
}
p.ptm.PeerTaskDone(p.req.PeerId)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -116,7 +117,7 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro
p.ptm.PeerTaskDone(p.req.PeerId)
var end = time.Now()
pt.Log().Errorf("file peer task failed, code: %d, reason: %s", code, reason)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand Down
15 changes: 10 additions & 5 deletions client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type StreamPeerTask interface {

type streamPeerTask struct {
peerTask
streamDone chan struct{}
successPieceCh chan int32
}

Expand Down Expand Up @@ -149,6 +150,8 @@ func newStreamPeerTask(ctx context.Context,
limiter = rate.NewLimiter(perPeerRateLimit, int(perPeerRateLimit))
}
pt := &streamPeerTask{
successPieceCh: make(chan int32),
streamDone: make(chan struct{}),
peerTask: peerTask{
ctx: ctx,
host: host,
Expand Down Expand Up @@ -177,7 +180,6 @@ func newStreamPeerTask(ctx context.Context,
usedTraffic: atomic.NewInt64(0),
SugaredLoggerOnWith: logger.With("peer", request.PeerId, "task", result.TaskId, "component", "streamPeerTask"),
},
successPieceCh: make(chan int32),
}
// bind func that base peer task did not implement
pt.backSourceFunc = pt.backSource
Expand Down Expand Up @@ -334,9 +336,9 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]strin
s.Errorf("CloseWithError failed: %s", err)
}
return
case <-s.done:
case <-s.streamDone:
for {
// all data is wrote to local storage, and all data is wrote to pipe write
// all data wrote to local storage, and all data wrote to pipe write
if s.readyPieces.Settled() == desired {
pw.Close()
return
Expand Down Expand Up @@ -369,17 +371,19 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]strin
func (s *streamPeerTask) finish() error {
// send last progress
s.once.Do(func() {
s.success = true
// let stream return immediately
close(s.streamDone)
// send EOF piece result to scheduler
_ = s.peerPacketStream.Send(
scheduler.NewEndPieceResult(s.taskID, s.peerID, s.readyPieces.Settled()))
s.Debugf("end piece result sent, peer task finished")
close(s.done)
//close(s.successPieceCh)
if err := s.callback.Done(s); err != nil {
s.span.RecordError(err)
s.Errorf("done callback error: %s", err)
}
s.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
close(s.done)
})
return nil
}
Expand All @@ -391,6 +395,7 @@ func (s *streamPeerTask) cleanUnfinished() {
_ = s.peerPacketStream.Send(
scheduler.NewEndPieceResult(s.taskID, s.peerID, s.readyPieces.Settled()))
s.Errorf("end piece result sent, peer task failed")
close(s.streamDone)
close(s.done)
//close(s.successPieceCh)
if err := s.callback.Fail(s, s.failedCode, s.failedReason); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions client/daemon/peer/peertask_stream_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package peer

import (
"context"
"time"

"d7y.io/dragonfly/v2/client/daemon/storage"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (p *streamPeerTaskCallback) Done(pt Task) error {
return e
}
p.ptm.PeerTaskDone(p.req.PeerId)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -114,7 +115,7 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er
p.ptm.PeerTaskDone(p.req.PeerId)
var end = time.Now()
pt.Log().Errorf("stream peer task failed, code: %d, reason: %s", code, reason)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
go.uber.org/atomic v1.6.0
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -977,8 +977,9 @@ golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210113205817-d3ed898aa8a3/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210201163806-010130855d6c h1:HiAZXo96zOhVhtFHchj/ojzoxCFiPrp9/j0GtS38V3g=
golang.org/x/oauth2 v0.0.0-20210201163806-010130855d6c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a h1:4Kd8OPUx1xgUwrHDaviWZO8MsgoZTZYC3g+8m16RBww=
golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
Loading

0 comments on commit 89716cc

Please sign in to comment.