Skip to content

Commit

Permalink
feat: add handleRegisterSeedPeerRequest to service v2 in scheduler (#…
Browse files Browse the repository at this point in the history
…2148)

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Mar 8, 2023
1 parent b310588 commit a2fcca6
Show file tree
Hide file tree
Showing 2 changed files with 688 additions and 196 deletions.
267 changes: 146 additions & 121 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
}
case *schedulerv2.AnnouncePeerRequest_RegisterSeedPeerRequest:
logger.Infof("receive AnnouncePeerRequest_RegisterSeedPeerRequest: %#v", announcePeerRequest.RegisterSeedPeerRequest.Download)
v.handleRegisterSeedPeerRequest(ctx, req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterSeedPeerRequest)
if err := v.handleRegisterSeedPeerRequest(ctx, stream, req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterSeedPeerRequest); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerStartedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerStartedRequest: %#v", announcePeerRequest.DownloadPeerStartedRequest)
v.handleDownloadPeerStartedRequest(ctx, announcePeerRequest.DownloadPeerStartedRequest)
Expand Down Expand Up @@ -151,11 +154,6 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
}
}

// TODO Implement function.
// handleRegisterSeedPeerRequest handles RegisterSeedPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterSeedPeerRequest(ctx context.Context, hostID, taskID, peerID string, req *schedulerv2.RegisterSeedPeerRequest) {
}

// TODO Implement function.
// handleDownloadPeerStartedRequest handles DownloadPeerStartedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPeerStartedRequest(ctx context.Context, req *schedulerv2.DownloadPeerStartedRequest) {
Expand Down Expand Up @@ -669,9 +667,9 @@ func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) e
// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
// Handle resource included host, task, and peer.
_, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req)
_, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.Download)
if err != nil {
return status.Error(codes.FailedPrecondition, err.Error())
return err
}

// When there are no available peers for a task, the scheduler needs to trigger
Expand All @@ -684,8 +682,145 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
}
}

// Provide different scheduling strategies for different task type.
sizeScope := task.SizeScope()
// Scheduling parent for the peer..
return v.schedule(ctx, peer)
}

// handleRegisterSeedPeerRequest handles RegisterSeedPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterSeedPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterSeedPeerRequest) error {
// Handle resource included host, task, and peer.
_, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.Download)
if err != nil {
return err
}

// When there are no available peers for a task, the scheduler needs to trigger
// the first task download in the p2p cluster.
blocklist := set.NewSafeSet[string]()
blocklist.Add(peer.ID)
if !task.HasAvailablePeer(blocklist) {
// When the task has no available peer,
// the seed peer will download back-to-source directly.
peer.NeedBackToSource.Store(true)
}

// Scheduling parent for the peer..
return v.schedule(ctx, peer)
}

// handleResource handles resource included host, task, and peer.
func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, download *commonv2.Download) (*resource.Host, *resource.Task, *resource.Peer, error) {
// If the host does not exist and the host address cannot be found,
// it may cause an exception.
host, loaded := v.resource.HostManager().Load(hostID)
if !loaded {
return nil, nil, nil, status.Errorf(codes.NotFound, "host %s not found", hostID)
}

// Store new task or update task.
task, loaded := v.resource.TaskManager().Load(taskID)
if !loaded {
options := []resource.TaskOption{resource.WithPieceLength(download.PieceLength)}
if download.Digest != "" {
d, err := digest.Parse(download.Digest)
if err != nil {
return nil, nil, nil, status.Error(codes.InvalidArgument, err.Error())
}

// If request has invalid digest, then new task with the nil digest.
options = append(options, resource.WithDigest(d))
}

task = resource.NewTask(taskID, download.Url, download.Tag, download.Application, download.Type,
download.Filters, download.Header, int32(v.config.Scheduler.BackToSourceCount), options...)
v.resource.TaskManager().Store(task)
} else {
task.URL = download.Url
task.Filters = download.Filters
task.Header = download.Header
}

// Store new peer or load peer.
peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded {
options := []resource.PeerOption{resource.WithPriority(download.Priority), resource.WithAnnouncePeerStream(stream)}
if download.Range != nil {
options = append(options, resource.WithRange(http.Range{Start: download.Range.Start, Length: download.Range.Length}))
}

peer = resource.NewPeer(peerID, task, host, options...)
v.resource.PeerManager().Store(peer)
}

return host, task, peer, nil
}

// downloadTaskBySeedPeer downloads task by seed peer.
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) error {
// Trigger the first download task based on different priority levels,
// refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74.
priority := peer.CalculatePriority(v.dynconfig)
peer.Log.Infof("peer priority is %s", priority.String())
switch priority {
case commonv2.Priority_LEVEL6, commonv2.Priority_LEVEL0:
// Super peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeSuperSeed)
break
}

fallthrough
case commonv2.Priority_LEVEL5:
// Strong peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeStrongSeed)
break
}

fallthrough
case commonv2.Priority_LEVEL4:
// Weak peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeWeakSeed)
break
}

fallthrough
case commonv2.Priority_LEVEL3:
// When the task has no available peer,
// the peer is first to download back-to-source.
peer.NeedBackToSource.Store(true)
case commonv2.Priority_LEVEL2:
// Peer is first to download back-to-source.
return status.Errorf(codes.NotFound, "%s peer not found candidate peers", commonv2.Priority_LEVEL2.String())
case commonv2.Priority_LEVEL1:
// Download task is forbidden.
return status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String())
default:
return status.Errorf(codes.InvalidArgument, "invalid priority %#v", priority)
}

return nil
}

// schedule provides different scheduling strategies for different task type.
func (v *V2) schedule(ctx context.Context, peer *resource.Peer) error {
sizeScope := peer.Task.SizeScope()
switch sizeScope {
case commonv2.SizeScope_EMPTY:
// Return an EmptyTaskResponse directly.
Expand Down Expand Up @@ -716,7 +851,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
// it will be scheduled as a Normal Task.
peer.Log.Info("scheduling as SizeScope_TINY")
if !peer.Task.CanReuseDirectPiece() {
peer.Log.Warnf("can not reuse direct piece %d %d", len(task.DirectPiece), task.ContentLength.Load())
peer.Log.Warnf("can not reuse direct piece %d %d", len(peer.Task.DirectPiece), peer.Task.ContentLength.Load())
break
}

Expand Down Expand Up @@ -796,113 +931,3 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S

return nil
}

// handleResource handles resource included host, task, and peer.
func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) (*resource.Host, *resource.Task, *resource.Peer, error) {
// If the host does not exist and the host address cannot be found,
// it may cause an exception.
host, loaded := v.resource.HostManager().Load(hostID)
if !loaded {
return nil, nil, nil, fmt.Errorf("host %s not found", hostID)
}

// Store new task or update task.
task, loaded := v.resource.TaskManager().Load(taskID)
if !loaded {
options := []resource.TaskOption{resource.WithPieceLength(req.Download.PieceLength)}
if req.Download.Digest != "" {
d, err := digest.Parse(req.Download.Digest)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid digest %s", req.Download.Digest)
}

// If request has invalid digest, then new task with the nil digest.
options = append(options, resource.WithDigest(d))
}

task = resource.NewTask(taskID, req.Download.Url, req.Download.Tag, req.Download.Application, req.Download.Type,
req.Download.Filters, req.Download.Header, int32(v.config.Scheduler.BackToSourceCount), options...)
v.resource.TaskManager().Store(task)
} else {
task.URL = req.Download.Url
task.Filters = req.Download.Filters
task.Header = req.Download.Header
}

// Store new peer or load peer.
peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded {
options := []resource.PeerOption{resource.WithPriority(req.Download.Priority), resource.WithAnnouncePeerStream(stream)}
if req.Download.Range != nil {
options = append(options, resource.WithRange(http.Range{Start: req.Download.Range.Start, Length: req.Download.Range.Length}))
}

peer = resource.NewPeer(peerID, task, host, options...)
v.resource.PeerManager().Store(peer)
}

return host, task, peer, nil
}

// downloadTaskBySeedPeer downloads task by seed peer.
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) error {
// Trigger the first download task based on different priority levels,
// refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74.
priority := peer.CalculatePriority(v.dynconfig)
peer.Log.Infof("peer priority is %s", priority.String())
switch priority {
case commonv2.Priority_LEVEL6, commonv2.Priority_LEVEL0:
// Super peer is first triggered to back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeSuperSeed)
break
}

fallthrough
case commonv2.Priority_LEVEL5:
// Strong peer is first triggered to back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeStrongSeed)
break
}

fallthrough
case commonv2.Priority_LEVEL4:
// Weak peer is first triggered to back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeWeakSeed)
break
}

fallthrough
case commonv2.Priority_LEVEL3:
// When the task is downloaded for the first time,
// the normal peer is first to download back-to-source.
peer.NeedBackToSource.Store(true)
case commonv2.Priority_LEVEL2:
// Peer is first to download back-to-source.
return status.Errorf(codes.NotFound, "%s peer not found candidate peers", commonv2.Priority_LEVEL2.String())
case commonv2.Priority_LEVEL1:
// Download task is forbidden.
return status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String())
default:
return status.Errorf(codes.InvalidArgument, "invalid priority %#v", priority)
}

return nil
}
Loading

0 comments on commit a2fcca6

Please sign in to comment.