Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize trigger download task return empty #2958

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat: optimize trigger download task return empty
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Dec 19, 2023
commit 9df25c578c16005e4c619fa89e5e6a104709673e
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.67
d7y.io/api/v2 v2.0.70
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.67 h1:4fiGXT1WHWgRXSTmnP53MU83Zbf+7i1jYeGNEJWrM7Q=
d7y.io/api/v2 v2.0.67/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
d7y.io/api/v2 v2.0.70 h1:qZ1HslBwgI24VlgtyA1K3GpS3Mm7wWtgMujtcMCl2TY=
d7y.io/api/v2 v2.0.70/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
7 changes: 4 additions & 3 deletions pkg/rpc/dfdaemon/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type V2 interface {
DownloadPiece(context.Context, *dfdaemonv2.DownloadPieceRequest, ...grpc.CallOption) (*dfdaemonv2.DownloadPieceResponse, error)

// TriggerDownloadTask triggers download task from the other peer.
TriggerDownloadTask(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest, ...grpc.CallOption) (*dfdaemonv2.TriggerDownloadTaskResponse, error)
TriggerDownloadTask(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest, ...grpc.CallOption) error

// Close tears down the ClientConn and all underlying connections.
Close() error
Expand Down Expand Up @@ -125,13 +125,14 @@ func (v *v2) DownloadPiece(ctx context.Context, req *dfdaemonv2.DownloadPieceReq
}

// TriggerDownloadTask triggers download task from the other peer.
func (v *v2) TriggerDownloadTask(ctx context.Context, req *dfdaemonv2.TriggerDownloadTaskRequest, opts ...grpc.CallOption) (*dfdaemonv2.TriggerDownloadTaskResponse, error) {
func (v *v2) TriggerDownloadTask(ctx context.Context, req *dfdaemonv2.TriggerDownloadTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

return v.DfdaemonUploadClient.TriggerDownloadTask(
_, err := v.DfdaemonUploadClient.TriggerDownloadTask(
ctx,
req,
opts...,
)
return err
}
7 changes: 3 additions & 4 deletions pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions scheduler/resource/seed_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
type SeedPeer interface {
// TriggerDownloadTask triggers the seed peer to download task.
// Used only in v2 version of the grpc.
TriggerDownloadTask(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) (*dfdaemonv2.TriggerDownloadTaskResponse, error)
TriggerDownloadTask(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) error

// TriggerTask triggers the seed peer to download task.
// Used only in v1 version of the grpc.
Expand Down Expand Up @@ -89,7 +89,7 @@ func newSeedPeer(cfg *config.Config, client SeedPeerClient, peerManager PeerMana

// TriggerDownloadTask triggers the seed peer to download task.
// Used only in v2 version of the grpc.
func (s *seedPeer) TriggerDownloadTask(ctx context.Context, req *dfdaemonv2.TriggerDownloadTaskRequest) (*dfdaemonv2.TriggerDownloadTaskResponse, error) {
func (s *seedPeer) TriggerDownloadTask(ctx context.Context, req *dfdaemonv2.TriggerDownloadTaskRequest) error {
ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)))
defer cancel()

Expand Down
7 changes: 3 additions & 4 deletions scheduler/resource/seed_peer_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions scheduler/resource/seed_peer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 6 additions & 10 deletions scheduler/resource/seed_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,26 @@ func TestSeedPeer_TriggerDownloadTask(t *testing.T) {
tests := []struct {
name string
mock func(mc *MockSeedPeerClientMockRecorder)
expect func(t *testing.T, resp *dfdaemonv2.TriggerDownloadTaskResponse, err error)
expect func(t *testing.T, err error)
}{
{
name: "trigger download task failed",
mock: func(mc *MockSeedPeerClientMockRecorder) {
mc.TriggerDownloadTask(gomock.Any(), gomock.Any()).Return(nil, errors.New("foo")).Times(1)
mc.TriggerDownloadTask(gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1)
},
expect: func(t *testing.T, resp *dfdaemonv2.TriggerDownloadTaskResponse, err error) {
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "foo")
},
},
{
name: "trigger download task scuccess",
mock: func(mc *MockSeedPeerClientMockRecorder) {
mc.TriggerDownloadTask(gomock.Any(), gomock.Any()).Return(&dfdaemonv2.TriggerDownloadTaskResponse{HostId: mockHostID, TaskId: mockTaskID, PeerId: mockPeerID}, nil).Times(1)
mc.TriggerDownloadTask(gomock.Any(), gomock.Any()).Return(nil).Times(1)
},
expect: func(t *testing.T, resp *dfdaemonv2.TriggerDownloadTaskResponse, err error) {
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
assert.Equal(mockHostID, resp.HostId)
assert.Equal(mockTaskID, resp.TaskId)
assert.Equal(mockPeerID, resp.PeerId)
},
},
}
Expand All @@ -98,8 +95,7 @@ func TestSeedPeer_TriggerDownloadTask(t *testing.T) {
tc.mock(client.EXPECT())

seedPeer := newSeedPeer(mockConfig, client, peerManager, hostManager)
resp, err := seedPeer.TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{})
tc.expect(t, resp, err)
tc.expect(t, seedPeer.TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{}))
})
}
}
Expand Down
15 changes: 6 additions & 9 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,13 +1313,12 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Down
// Super peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, download *commonv2.Download, hostType types.HostType) {
resp, err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download})
if err != nil {
if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {
peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())
return
}

peer.Log.Infof("%s seed peer triggers download task success, hostID: %s, peerID: %s", hostType.Name(), resp.GetHostId(), resp.GetPeerId())
peer.Log.Infof("%s seed peer triggers download task success", hostType.Name())
}(ctx, download, types.HostTypeSuperSeed)

break
Expand All @@ -1330,13 +1329,12 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Down
// Strong peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, download *commonv2.Download, hostType types.HostType) {
resp, err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download})
if err != nil {
if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {
peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())
return
}

peer.Log.Infof("%s seed peer triggers download task success, hostID: %s, peerID: %s", hostType.Name(), resp.GetHostId(), resp.GetPeerId())
peer.Log.Infof("%s seed peer triggers download task success", hostType.Name())
}(ctx, download, types.HostTypeSuperSeed)

break
Expand All @@ -1347,13 +1345,12 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Down
// Weak peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, download *commonv2.Download, hostType types.HostType) {
resp, err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download})
if err != nil {
if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {
peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())
return
}

peer.Log.Infof("%s seed peer triggers download task success, hostID: %s, peerID: %s", hostType.Name(), resp.GetHostId(), resp.GetPeerId())
peer.Log.Infof("%s seed peer triggers download task success", hostType.Name())
}(ctx, download, types.HostTypeSuperSeed)

break
Expand Down
12 changes: 6 additions & 6 deletions scheduler/service/service_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3364,7 +3364,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {

gomock.InOrder(
mr.SeedPeer().Return(seedPeerClient).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(&dfdaemonv2.TriggerDownloadTaskResponse{}, nil).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil).Times(1),
)

peer.Priority = commonv2.Priority_LEVEL6
Expand All @@ -3388,7 +3388,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {

gomock.InOrder(
mr.SeedPeer().Return(seedPeerClient).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil, errors.New("foo")).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(errors.New("foo")).Times(1),
)

peer.Priority = commonv2.Priority_LEVEL6
Expand Down Expand Up @@ -3427,7 +3427,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {

gomock.InOrder(
mr.SeedPeer().Return(seedPeerClient).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(&dfdaemonv2.TriggerDownloadTaskResponse{}, nil).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil).Times(1),
)

peer.Priority = commonv2.Priority_LEVEL5
Expand All @@ -3451,7 +3451,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {

gomock.InOrder(
mr.SeedPeer().Return(seedPeerClient).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil, errors.New("foo")).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(errors.New("foo")).Times(1),
)

peer.Priority = commonv2.Priority_LEVEL5
Expand Down Expand Up @@ -3490,7 +3490,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {

gomock.InOrder(
mr.SeedPeer().Return(seedPeerClient).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(&dfdaemonv2.TriggerDownloadTaskResponse{}, nil).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil).Times(1),
)

peer.Priority = commonv2.Priority_LEVEL4
Expand All @@ -3514,7 +3514,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {

gomock.InOrder(
mr.SeedPeer().Return(seedPeerClient).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil, errors.New("foo")).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(errors.New("foo")).Times(1),
)

peer.Priority = commonv2.Priority_LEVEL4
Expand Down