Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement client seed mode #1247

Merged
merged 6 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/config/constants_otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ const (
AttributeGetPieceCount = attribute.Key("d7y.peer.piece.count")
AttributeGetPieceRetry = attribute.Key("d7y.peer.piece.retry")
AttributeWritePieceSuccess = attribute.Key("d7y.peer.piece.write.success")
AttributeSeedTaskSuccess = attribute.Key("d7y.seed.task.success")

SpanFileTask = "file-task"
SpanStreamTask = "stream-task"
SpanSeedTask = "seed-task"
SpanPeerTask = "peer-task"
SpanReusePeerTask = "reuse-peer-task"
SpanRegisterTask = "register"
Expand Down
7 changes: 7 additions & 0 deletions client/daemon/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ var (
Help: "Counter of the total stream tasks.",
})

SeedTaskCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: constants.MetricsNamespace,
Subsystem: constants.DfdaemonMetricsName,
Name: "seed_task_total",
Help: "Counter of the total seed tasks.",
})

PeerTaskCacheHitCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: constants.MetricsNamespace,
Subsystem: constants.DfdaemonMetricsName,
Expand Down
19 changes: 15 additions & 4 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type peerTaskConductor struct {

// needBackSource indicates downloading resource from instead of other peers
needBackSource *atomic.Bool
seed bool

// pieceManager will be used for downloading piece
pieceManager PieceManager
Expand Down Expand Up @@ -165,7 +166,8 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
request *scheduler.PeerTaskRequest,
limit rate.Limit,
parent *peerTaskConductor,
rg *clientutil.Range) *peerTaskConductor {
rg *clientutil.Range,
seed bool) *peerTaskConductor {
// use a new context with span info
ctx = trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx))
ctx, span := tracer.Start(ctx, config.SpanPeerTask, trace.WithSpanKind(trace.SpanKindClient))
Expand Down Expand Up @@ -226,6 +228,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
completedLength: atomic.NewInt64(0),
usedTraffic: atomic.NewUint64(0),
SugaredLoggerOnWith: log,
seed: seed,

parent: parent,
rg: rg,
Expand Down Expand Up @@ -329,9 +332,17 @@ func (pt *peerTaskConductor) register() error {
}

func (pt *peerTaskConductor) start() error {
// register to scheduler
if err := pt.register(); err != nil {
return err
// when is seed task, setup back source
if pt.seed {
pt.peerPacketStream = &dummyPeerPacketStream{}
pt.schedulerClient = &dummySchedulerClient{}
pt.sizeScope = base.SizeScope_NORMAL
pt.needBackSource = atomic.NewBool(true)
} else {
// register to scheduler
if err := pt.register(); err != nil {
return err
}
}

go pt.broker.Start()
Expand Down
5 changes: 4 additions & 1 deletion client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package peer

import (
"context"
"fmt"

"go.opentelemetry.io/otel/trace"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -95,7 +96,7 @@ func (ptm *peerTaskManager) newFileTask(
}

taskID := idgen.TaskID(request.Url, request.UrlMeta)
ptc, err := ptm.getPeerTaskConductor(ctx, taskID, &request.PeerTaskRequest, limit, parent, request.Range, request.Output)
ptc, err := ptm.getPeerTaskConductor(ctx, taskID, &request.PeerTaskRequest, limit, parent, request.Range, request.Output, false)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -125,12 +126,14 @@ func (f *fileTask) Start(ctx context.Context) (chan *FileTaskProgress, error) {
}

func (f *fileTask) syncProgress() {
defer f.span.End()
for {
select {
case <-f.peerTaskConductor.successCh:
f.storeToOutput()
return
case <-f.peerTaskConductor.failCh:
f.span.RecordError(fmt.Errorf(f.peerTaskConductor.failedReason))
f.sendFailProgress(f.peerTaskConductor.failedCode, f.peerTaskConductor.failedReason)
return
case <-f.ctx.Done():
Expand Down
44 changes: 33 additions & 11 deletions client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ type TaskManager interface {
StartFileTask(ctx context.Context, req *FileTaskRequest) (
progress chan *FileTaskProgress, tiny *TinyData, err error)
// StartStreamTask starts a peer task with stream io
// tiny stands task file is tiny and task is done
StartStreamTask(ctx context.Context, req *StreamTaskRequest) (
readCloser io.ReadCloser, attribute map[string]string, err error)
// StartSeedTask starts a seed peer task
StartSeedTask(ctx context.Context, req *SeedTaskRequest) (
seedTaskResult *SeedTaskResponse, err error)

Subscribe(request *base.PieceTaskRequest) (*SubscribeResult, bool)
Subscribe(request *base.PieceTaskRequest) (*SubscribeResponse, bool)

IsPeerTaskRunning(id string) bool

// Check if the given task exists in P2P network
// StatTask checks whether the given task exists in P2P network
StatTask(ctx context.Context, taskID string) (*scheduler.Task, error)

// AnnouncePeerTask announces peer task info to P2P network
Expand Down Expand Up @@ -179,8 +181,9 @@ func (ptm *peerTaskManager) getPeerTaskConductor(ctx context.Context,
limit rate.Limit,
parent *peerTaskConductor,
rg *clientutil.Range,
desiredLocation string) (*peerTaskConductor, error) {
ptc, created, err := ptm.getOrCreatePeerTaskConductor(ctx, taskID, request, limit, parent, rg, desiredLocation)
desiredLocation string,
seed bool) (*peerTaskConductor, error) {
ptc, created, err := ptm.getOrCreatePeerTaskConductor(ctx, taskID, request, limit, parent, rg, desiredLocation, seed)
if err != nil {
return nil, err
}
Expand All @@ -201,12 +204,13 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
limit rate.Limit,
parent *peerTaskConductor,
rg *clientutil.Range,
desiredLocation string) (*peerTaskConductor, bool, error) {
desiredLocation string,
seed bool) (*peerTaskConductor, bool, error) {
if ptc, ok := ptm.findPeerTaskConductor(taskID); ok {
logger.Debugf("peer task found: %s/%s", ptc.taskID, ptc.peerID)
return ptc, false, nil
}
ptc := ptm.newPeerTaskConductor(ctx, request, limit, parent, rg)
ptc := ptm.newPeerTaskConductor(ctx, request, limit, parent, rg, seed)

ptm.conductorLock.Lock()
// double check
Expand Down Expand Up @@ -256,7 +260,7 @@ func (ptm *peerTaskManager) prefetchParentTask(request *scheduler.PeerTaskReques
}

logger.Infof("prefetch peer task %s/%s", taskID, req.PeerId)
prefetch, err := ptm.getPeerTaskConductor(context.Background(), taskID, req, limit, nil, nil, desiredLocation)
prefetch, err := ptm.getPeerTaskConductor(context.Background(), taskID, req, limit, nil, nil, desiredLocation, false)
if err != nil {
logger.Errorf("prefetch peer task %s/%s error: %s", prefetch.taskID, prefetch.peerID, err)
return nil
Expand Down Expand Up @@ -325,20 +329,38 @@ func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTask
return readCloser, attribute, err
}

type SubscribeResult struct {
func (ptm *peerTaskManager) StartSeedTask(ctx context.Context, req *SeedTaskRequest) (response *SeedTaskResponse, err error) {
response, ok := ptm.tryReuseSeedPeerTask(ctx, req)
if ok {
metrics.PeerTaskCacheHitCount.Add(1)
return response, nil
}

var limit = rate.Inf
if ptm.perPeerRateLimit > 0 {
limit = ptm.perPeerRateLimit
}
if req.Limit > 0 {
limit = rate.Limit(req.Limit)
}

return ptm.newSeedTask(ctx, req, limit)
}

type SubscribeResponse struct {
Storage storage.TaskStorageDriver
PieceInfoChannel chan *PieceInfo
Success chan struct{}
Fail chan struct{}
}

func (ptm *peerTaskManager) Subscribe(request *base.PieceTaskRequest) (*SubscribeResult, bool) {
func (ptm *peerTaskManager) Subscribe(request *base.PieceTaskRequest) (*SubscribeResponse, bool) {
ptc, ok := ptm.findPeerTaskConductor(request.TaskId)
if !ok {
return nil, false
}

result := &SubscribeResult{
result := &SubscribeResponse{
Storage: ptc.storage,
PieceInfoChannel: ptc.broker.Subscribe(),
Success: ptc.successCh,
Expand Down
19 changes: 17 additions & 2 deletions client/daemon/peer/peertask_manager_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading