Skip to content

Commit

Permalink
feat: update api to v2.0.29 with optional (dragonflyoss#2751)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Sep 22, 2023
1 parent 9eb15b2 commit b65c101
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 118 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.20

require (
d7y.io/api/v2 v2.0.25
d7y.io/api/v2 v2.0.29
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.25 h1:GjjlcFVr3NrEeV6g+f5KbmYIhVUaDw4rP7aGzgWGM80=
d7y.io/api/v2 v2.0.25/go.mod h1:Wit7VpDkM+gP/eZf4DEJo+Mm+mT8fa/FpWQEeJWSTSk=
d7y.io/api/v2 v2.0.29 h1:aXuK0Iqxe9YeCVuFGhaI1Xl0le4Bu3s32v6Czj3osYY=
d7y.io/api/v2 v2.0.29/go.mod h1:Wit7VpDkM+gP/eZf4DEJo+Mm+mT8fa/FpWQEeJWSTSk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
22 changes: 12 additions & 10 deletions scheduler/scheduling/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedule

piece := &commonv2.Piece{
Number: candidateParentPiece.Number,
ParentId: candidateParentPiece.ParentID,
ParentId: &candidateParentPiece.ParentID,
Offset: candidateParentPiece.Offset,
Length: candidateParentPiece.Length,
TrafficType: candidateParentPiece.TrafficType,
Expand All @@ -584,8 +584,8 @@ func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedule
Id: candidateParent.Task.ID,
Type: candidateParent.Task.Type,
Url: candidateParent.Task.URL,
Tag: candidateParent.Task.Tag,
Application: candidateParent.Task.Application,
Tag: &candidateParent.Task.Tag,
Application: &candidateParent.Task.Application,
Filters: candidateParent.Task.Filters,
Header: candidateParent.Task.Header,
PieceLength: candidateParent.Task.PieceLength,
Expand All @@ -600,7 +600,8 @@ func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedule

// Set digest to parent task.
if candidateParent.Task.Digest != nil {
parent.Task.Digest = candidateParent.Task.Digest.String()
dgst := candidateParent.Task.Digest.String()
parent.Task.Digest = &dgst
}

// Set pieces to parent task.
Expand All @@ -613,7 +614,7 @@ func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedule

piece := &commonv2.Piece{
Number: taskPiece.Number,
ParentId: taskPiece.ParentID,
ParentId: &taskPiece.ParentID,
Offset: taskPiece.Offset,
Length: taskPiece.Length,
TrafficType: taskPiece.TrafficType,
Expand Down Expand Up @@ -737,7 +738,7 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can

piece := &commonv2.Piece{
Number: candidateParentPiece.Number,
ParentId: candidateParentPiece.ParentID,
ParentId: &candidateParentPiece.ParentID,
Offset: candidateParentPiece.Offset,
Length: candidateParentPiece.Length,
TrafficType: candidateParentPiece.TrafficType,
Expand All @@ -758,8 +759,8 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can
Id: candidateParent.Task.ID,
Type: candidateParent.Task.Type,
Url: candidateParent.Task.URL,
Tag: candidateParent.Task.Tag,
Application: candidateParent.Task.Application,
Tag: &candidateParent.Task.Tag,
Application: &candidateParent.Task.Application,
Filters: candidateParent.Task.Filters,
Header: candidateParent.Task.Header,
PieceLength: candidateParent.Task.PieceLength,
Expand All @@ -774,7 +775,8 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can

// Set digest to parent task.
if candidateParent.Task.Digest != nil {
parent.Task.Digest = candidateParent.Task.Digest.String()
dgst := candidateParent.Task.Digest.String()
parent.Task.Digest = &dgst
}

// Set pieces to parent task.
Expand All @@ -787,7 +789,7 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can

piece := &commonv2.Piece{
Number: taskPiece.Number,
ParentId: taskPiece.ParentID,
ParentId: &taskPiece.ParentID,
Offset: taskPiece.Offset,
Length: taskPiece.Length,
TrafficType: taskPiece.TrafficType,
Expand Down
36 changes: 21 additions & 15 deletions scheduler/scheduling/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,8 @@ func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) {
{
name: "construct success",
expect: func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_SmallTaskResponse, candidateParent *resource.Peer) {
dgst := candidateParent.Task.Digest.String()

assert := assert.New(t)
assert.EqualValues(resp, &schedulerv2.AnnouncePeerResponse_SmallTaskResponse{
SmallTaskResponse: &schedulerv2.SmallTaskResponse{
Expand All @@ -1280,7 +1282,7 @@ func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) {
Pieces: []*commonv2.Piece{
{
Number: mockPiece.Number,
ParentId: mockPiece.ParentID,
ParentId: &mockPiece.ParentID,
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
Expand All @@ -1295,9 +1297,9 @@ func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) {
Id: candidateParent.Task.ID,
Type: candidateParent.Task.Type,
Url: candidateParent.Task.URL,
Digest: candidateParent.Task.Digest.String(),
Tag: candidateParent.Task.Tag,
Application: candidateParent.Task.Application,
Digest: &dgst,
Tag: &candidateParent.Task.Tag,
Application: &candidateParent.Task.Application,
Filters: candidateParent.Task.Filters,
Header: candidateParent.Task.Header,
PieceLength: candidateParent.Task.PieceLength,
Expand All @@ -1307,7 +1309,7 @@ func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) {
Pieces: []*commonv2.Piece{
{
Number: mockPiece.Number,
ParentId: mockPiece.ParentID,
ParentId: &mockPiece.ParentID,
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
Expand Down Expand Up @@ -1424,6 +1426,8 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
}, nil).Times(1)
},
expect: func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_NormalTaskResponse, candidateParents []*resource.Peer) {
dgst := candidateParents[0].Task.Digest.String()

assert := assert.New(t)
assert.EqualValues(resp, &schedulerv2.AnnouncePeerResponse_NormalTaskResponse{
NormalTaskResponse: &schedulerv2.NormalTaskResponse{
Expand All @@ -1438,7 +1442,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Pieces: []*commonv2.Piece{
{
Number: mockPiece.Number,
ParentId: mockPiece.ParentID,
ParentId: &mockPiece.ParentID,
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
Expand All @@ -1453,9 +1457,9 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Id: candidateParents[0].Task.ID,
Type: candidateParents[0].Task.Type,
Url: candidateParents[0].Task.URL,
Digest: candidateParents[0].Task.Digest.String(),
Tag: candidateParents[0].Task.Tag,
Application: candidateParents[0].Task.Application,
Digest: &dgst,
Tag: &candidateParents[0].Task.Tag,
Application: &candidateParents[0].Task.Application,
Filters: candidateParents[0].Task.Filters,
Header: candidateParents[0].Task.Header,
PieceLength: candidateParents[0].Task.PieceLength,
Expand All @@ -1465,7 +1469,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Pieces: []*commonv2.Piece{
{
Number: mockPiece.Number,
ParentId: mockPiece.ParentID,
ParentId: &mockPiece.ParentID,
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
Expand Down Expand Up @@ -1556,6 +1560,8 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1)
},
expect: func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_NormalTaskResponse, candidateParents []*resource.Peer) {
dgst := candidateParents[0].Task.Digest.String()

assert := assert.New(t)
assert.EqualValues(resp, &schedulerv2.AnnouncePeerResponse_NormalTaskResponse{
NormalTaskResponse: &schedulerv2.NormalTaskResponse{
Expand All @@ -1570,7 +1576,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Pieces: []*commonv2.Piece{
{
Number: mockPiece.Number,
ParentId: mockPiece.ParentID,
ParentId: &mockPiece.ParentID,
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
Expand All @@ -1585,9 +1591,9 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Id: candidateParents[0].Task.ID,
Type: candidateParents[0].Task.Type,
Url: candidateParents[0].Task.URL,
Digest: candidateParents[0].Task.Digest.String(),
Tag: candidateParents[0].Task.Tag,
Application: candidateParents[0].Task.Application,
Digest: &dgst,
Tag: &candidateParents[0].Task.Tag,
Application: &candidateParents[0].Task.Application,
Filters: candidateParents[0].Task.Filters,
Header: candidateParents[0].Task.Header,
PieceLength: candidateParents[0].Task.PieceLength,
Expand All @@ -1597,7 +1603,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Pieces: []*commonv2.Piece{
{
Number: mockPiece.Number,
ParentId: mockPiece.ParentID,
ParentId: &mockPiece.ParentID,
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
Expand Down
40 changes: 21 additions & 19 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c

respPiece := &commonv2.Piece{
Number: piece.Number,
ParentId: piece.ParentID,
ParentId: &piece.ParentID,
Offset: piece.Offset,
Length: piece.Length,
TrafficType: piece.TrafficType,
Expand All @@ -249,8 +249,8 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c
Id: peer.Task.ID,
Type: peer.Task.Type,
Url: peer.Task.URL,
Tag: peer.Task.Tag,
Application: peer.Task.Application,
Tag: &peer.Task.Tag,
Application: &peer.Task.Application,
Filters: peer.Task.Filters,
Header: peer.Task.Header,
PieceLength: peer.Task.PieceLength,
Expand All @@ -265,7 +265,8 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c

// Set digest to task response.
if peer.Task.Digest != nil {
resp.Task.Digest = peer.Task.Digest.String()
dgst := peer.Task.Digest.String()
resp.Task.Digest = &dgst
}

// Set pieces to task response.
Expand All @@ -278,7 +279,7 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c

respPiece := &commonv2.Piece{
Number: piece.Number,
ParentId: piece.ParentID,
ParentId: &piece.ParentID,
Offset: piece.Offset,
Length: piece.Length,
TrafficType: piece.TrafficType,
Expand Down Expand Up @@ -401,8 +402,8 @@ func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*c
Id: task.ID,
Type: task.Type,
Url: task.URL,
Tag: task.Tag,
Application: task.Application,
Tag: &task.Tag,
Application: &task.Application,
Filters: task.Filters,
Header: task.Header,
PieceLength: task.PieceLength,
Expand All @@ -417,7 +418,8 @@ func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*c

// Set digest to response.
if task.Digest != nil {
resp.Digest = task.Digest.String()
dgst := task.Digest.String()
resp.Digest = &dgst
}

// Set pieces to response.
Expand All @@ -430,7 +432,7 @@ func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*c

respPiece := &commonv2.Piece{
Number: piece.Number,
ParentId: piece.ParentID,
ParentId: &piece.ParentID,
Offset: piece.Offset,
Length: piece.Length,
TrafficType: piece.TrafficType,
Expand Down Expand Up @@ -1264,9 +1266,9 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An
// Store new task or update task.
task, loaded := v.resource.TaskManager().Load(taskID)
if !loaded {
options := []resource.TaskOption{resource.WithPieceLength(download.PieceLength)}
if download.Digest != "" {
d, err := digest.Parse(download.Digest)
options := []resource.TaskOption{resource.WithPieceLength(download.GetPieceLength())}
if download.Digest != nil {
d, err := digest.Parse(download.GetDigest())
if err != nil {
return nil, nil, nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand All @@ -1275,21 +1277,21 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An
options = append(options, resource.WithDigest(d))
}

task = resource.NewTask(taskID, download.Url, download.Tag, download.Application, download.Type,
download.Filters, download.Header, int32(v.config.Scheduler.BackToSourceCount), options...)
task = resource.NewTask(taskID, download.GetUrl(), download.GetTag(), download.GetApplication(), download.GetType(),
download.GetFilters(), download.GetHeader(), int32(v.config.Scheduler.BackToSourceCount), options...)
v.resource.TaskManager().Store(task)
} else {
task.URL = download.Url
task.Filters = download.Filters
task.Header = download.Header
task.URL = download.GetUrl()
task.Filters = download.GetFilters()
task.Header = download.GetHeader()
}

// Store new peer or load peer.
peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded {
options := []resource.PeerOption{resource.WithPriority(download.Priority), resource.WithAnnouncePeerStream(stream)}
options := []resource.PeerOption{resource.WithPriority(download.GetPriority()), resource.WithAnnouncePeerStream(stream)}
if download.Range != nil {
options = append(options, resource.WithRange(http.Range{Start: download.Range.Start, Length: download.Range.Length}))
options = append(options, resource.WithRange(http.Range{Start: download.Range.GetStart(), Length: download.Range.GetLength()}))
}

peer = resource.NewPeer(peerID, &v.config.Resource, task, host, options...)
Expand Down
Loading

0 comments on commit b65c101

Please sign in to comment.