Skip to content

Commit

Permalink
feat: add handleRegisterSeedPeerRequest to AnnouncePeer in service v2 (
Browse files Browse the repository at this point in the history
…#2147)

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Mar 8, 2023
1 parent 9777569 commit b310588
Show file tree
Hide file tree
Showing 14 changed files with 1,984 additions and 68 deletions.
1 change: 0 additions & 1 deletion build/images/dfdaemon/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,3 @@ COPY --from=health /bin/grpc_health_probe /bin/grpc_health_probe
EXPOSE 65001

ENTRYPOINT ["/opt/dragonfly/bin/dfget", "daemon"]

13 changes: 13 additions & 0 deletions client/daemon/rpcserver/mocks/rpcserver_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.20

require (
d7y.io/api v1.6.8
d7y.io/api v1.7.5
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
github.com/VividCortex/mysqlerr v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api v1.6.8 h1:/oNEZC8FC8P1vPHlzgtJbBQzh5lnf0mZ+9VBx/Nq+iU=
d7y.io/api v1.6.8/go.mod h1:LgmoxxoRDzBiseGFxNWqQP5qsro8+lhYSGwR+/Chplw=
d7y.io/api v1.7.5 h1:JLtbTLAiNom+qT/sQHgzqKApw/tG5MQaTBcsH/Lb2wE=
d7y.io/api v1.7.5/go.mod h1:LgmoxxoRDzBiseGFxNWqQP5qsro8+lhYSGwR+/Chplw=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
17 changes: 12 additions & 5 deletions scheduler/resource/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ const (
// PeerOption is a functional option for peer.
type PeerOption func(peer *Peer)

// WithAnnouncePeerStream set AnnouncePeerStream for peer.
func WithAnnouncePeerStream(stream schedulerv2.Scheduler_AnnouncePeerServer) PeerOption {
return func(p *Peer) {
p.StoreAnnouncePeerStream(stream)
}
}

// WithPriority set Priority for peer.
func WithPriority(priority commonv2.Priority) PeerOption {
return func(p *Peer) {
Expand Down Expand Up @@ -354,7 +361,7 @@ func (p *Peer) DeleteReportPieceResultStream() {
// LoadAnnouncePeerStream return the grpc stream of Scheduler_AnnouncePeerServer,
// Used only in v2 version of the grpc.
func (p *Peer) LoadAnnouncePeerStream() (schedulerv2.Scheduler_AnnouncePeerServer, bool) {
rawStream := p.ReportPieceResultStream.Load()
rawStream := p.AnnouncePeerStream.Load()
if rawStream == nil {
return nil, false
}
Expand All @@ -365,13 +372,13 @@ func (p *Peer) LoadAnnouncePeerStream() (schedulerv2.Scheduler_AnnouncePeerServe
// StoreAnnouncePeerStream set the grpc stream of Scheduler_AnnouncePeerServer,
// Used only in v2 version of the grpc.
func (p *Peer) StoreAnnouncePeerStream(stream schedulerv2.Scheduler_AnnouncePeerServer) {
p.ReportPieceResultStream.Store(stream)
p.AnnouncePeerStream.Store(stream)
}

// DeleteAnnouncePeerStream deletes the grpc stream of Scheduler_AnnouncePeerServer,
// Used only in v2 version of the grpc.
func (p *Peer) DeleteAnnouncePeerStream() {
p.ReportPieceResultStream = &atomic.Value{}
p.AnnouncePeerStream = &atomic.Value{}
}

// LoadPiece return piece for a key.
Expand Down Expand Up @@ -515,8 +522,8 @@ func (p *Peer) DownloadFile() ([]byte, error) {
return io.ReadAll(resp.Body)
}

// GetPriority returns priority of peer.
func (p *Peer) GetPriority(dynconfig config.DynconfigInterface) commonv2.Priority {
// CalculatePriority returns priority of peer.
func (p *Peer) CalculatePriority(dynconfig config.DynconfigInterface) commonv2.Priority {
if p.Priority != commonv2.Priority_LEVEL0 {
return p.Priority
}
Expand Down
34 changes: 32 additions & 2 deletions scheduler/resource/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ var (
)

func TestPeer_NewPeer(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl)

tests := []struct {
name string
id string
Expand Down Expand Up @@ -136,6 +140,32 @@ func TestPeer_NewPeer(t *testing.T) {
assert.NotNil(peer.Log)
},
},
{
name: "new peer with AnnouncePeerStream",
id: mockPeerID,
options: []PeerOption{WithAnnouncePeerStream(stream)},
expect: func(t *testing.T, peer *Peer, mockTask *Task, mockHost *Host) {
assert := assert.New(t)
assert.Equal(peer.ID, mockPeerID)
assert.Nil(peer.Range)
assert.Equal(peer.Priority, commonv2.Priority_LEVEL0)
assert.Empty(peer.Pieces)
assert.Empty(peer.FinishedPieces)
assert.Equal(len(peer.PieceCosts()), 0)
assert.Empty(peer.ReportPieceResultStream)
assert.NotEmpty(peer.AnnouncePeerStream)
assert.Equal(peer.FSM.Current(), PeerStatePending)
assert.EqualValues(peer.Task, mockTask)
assert.EqualValues(peer.Host, mockHost)
assert.Equal(peer.BlockParents.Len(), uint(0))
assert.Equal(peer.NeedBackToSource.Load(), false)
assert.Equal(peer.IsBackToSource.Load(), false)
assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
assert.NotEqual(peer.CreatedAt.Load(), 0)
assert.NotEqual(peer.UpdatedAt.Load(), 0)
assert.NotNil(peer.Log)
},
},
}

for _, tc := range tests {
Expand Down Expand Up @@ -886,7 +916,7 @@ func TestPeer_DownloadFile(t *testing.T) {
}
}

func TestPeer_GetPriority(t *testing.T) {
func TestPeer_CalculatePriority(t *testing.T) {
tests := []struct {
name string
mock func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder)
Expand Down Expand Up @@ -1009,7 +1039,7 @@ func TestPeer_GetPriority(t *testing.T) {
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
peer := NewPeer(mockPeerID, mockTask, mockHost)
tc.mock(peer, dynconfig.EXPECT())
tc.expect(t, peer.GetPriority(dynconfig))
tc.expect(t, peer.CalculatePriority(dynconfig))
})
}
}
12 changes: 9 additions & 3 deletions scheduler/resource/seed_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
"time"

"go.opentelemetry.io/otel/trace"

cdnsystemv1 "d7y.io/api/pkg/apis/cdnsystem/v1"
commonv1 "d7y.io/api/pkg/apis/common/v1"
commonv2 "d7y.io/api/pkg/apis/common/v2"
Expand All @@ -33,6 +35,7 @@ import (
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/net/http"
"d7y.io/dragonfly/v2/pkg/rpc/common"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/metrics"
)

Expand All @@ -45,7 +48,7 @@ const (
type SeedPeer interface {
// DownloadTask downloads task back-to-source.
// Used only in v2 version of the grpc.
DownloadTask(context.Context, *Task) error
DownloadTask(context.Context, *Task, types.HostType) error

// TriggerTask triggers the seed peer to download task.
// Used only in v1 version of the grpc.
Expand Down Expand Up @@ -80,14 +83,17 @@ func newSeedPeer(client SeedPeerClient, peerManager PeerManager, hostManager Hos
// TODO Implement DownloadTask
// DownloadTask downloads task back-to-source.
// Used only in v2 version of the grpc.
func (s *seedPeer) DownloadTask(ctx context.Context, task *Task) error {
func (s *seedPeer) DownloadTask(ctx context.Context, task *Task, hostType types.HostType) error {
// ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)))
// defer cancel()

return nil
}

// TriggerTask triggers the seed peer to download task.
// Used only in v1 version of the grpc.
func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task) (*Peer, *schedulerv1.PeerResult, error) {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)))
defer cancel()

urlMeta := &commonv1.UrlMeta{
Expand Down
9 changes: 5 additions & 4 deletions scheduler/resource/seed_peer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions scheduler/scheduling/mocks/scheduling_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b310588

Please sign in to comment.