Skip to content

Commit

Permalink
feat: implement v2 version of scheduler service
Browse files Browse the repository at this point in the history
Implement StatTask, AnnounceHost and LeaveHost api in
scheduler service v2.

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Feb 28, 2023
1 parent de2a7c9 commit c0a11ef
Show file tree
Hide file tree
Showing 11 changed files with 1,373 additions and 59 deletions.
22 changes: 11 additions & 11 deletions client/daemon/peer/peertask_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions client/daemon/peer/piece_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ const (
)

// Name returns the name of host type.
func (h *HostType) Name() string {
switch *h {
func (h HostType) Name() string {
switch h {
case HostTypeSuperSeed:
return HostTypeSuperSeedName
case HostTypeStrongSeed:
Expand Down
19 changes: 17 additions & 2 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ var (
Help: "Counter of the number of failed of the leaving peer.",
})

ExchangePeerCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "exchange_peer_total",
Help: "Counter of the number of the leaving peer.",
})

ExchangePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "exchange_peer_failure_total",
Help: "Counter of the number of failed of the leaving peer.",
})

RegisterTaskCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand Down Expand Up @@ -152,12 +166,13 @@ var (
}, []string{"os", "platform", "platform_family", "platform_version",
"kernel_version", "git_version", "git_commit", "go_version", "build_platform"})

AnnounceHostFailureCount = promauto.NewCounter(prometheus.CounterOpts{
AnnounceHostFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_host_failure_total",
Help: "Counter of the number of failed of the announce host.",
})
}, []string{"os", "platform", "platform_family", "platform_version",
"kernel_version", "git_version", "git_commit", "go_version", "build_platform"})

LeaveTaskCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Expand Down
6 changes: 3 additions & 3 deletions scheduler/resource/seed_peer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions scheduler/rpcserver/scheduler_server_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *schedulerServerV1) AnnounceTask(ctx context.Context, req *schedulerv1.A
metrics.AnnounceTaskCount.Inc()
if err := s.service.AnnounceTask(ctx, req); err != nil {
metrics.AnnounceTaskFailureCount.Inc()
return new(emptypb.Empty), err
return nil, err
}

return new(emptypb.Empty), nil
Expand All @@ -95,13 +95,13 @@ func (s *schedulerServerV1) AnnounceTask(ctx context.Context, req *schedulerv1.A
// StatTask checks if the given task exists.
func (s *schedulerServerV1) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest) (*schedulerv1.Task, error) {
metrics.StatTaskCount.Inc()
task, err := s.service.StatTask(ctx, req)
resp, err := s.service.StatTask(ctx, req)
if err != nil {
metrics.StatTaskFailureCount.Inc()
return nil, err
}

return task, nil
return resp, nil
}

// LeaveTask makes the peer unschedulable.
Expand All @@ -114,8 +114,9 @@ func (s *schedulerServerV1) AnnounceHost(ctx context.Context, req *schedulerv1.A
metrics.AnnounceHostCount.WithLabelValues(req.Os, req.Platform, req.PlatformFamily, req.PlatformVersion,
req.KernelVersion, req.Build.GitVersion, req.Build.GitCommit, req.Build.GoVersion, req.Build.Platform).Inc()
if err := s.service.AnnounceHost(ctx, req); err != nil {
metrics.AnnounceHostFailureCount.Inc()
return new(emptypb.Empty), err
metrics.AnnounceHostFailureCount.WithLabelValues(req.Os, req.Platform, req.PlatformFamily, req.PlatformVersion,
req.KernelVersion, req.Build.GitVersion, req.Build.GitCommit, req.Build.GoVersion, req.Build.Platform).Inc()
return nil, err
}

return new(emptypb.Empty), nil
Expand All @@ -126,7 +127,7 @@ func (s *schedulerServerV1) LeaveHost(ctx context.Context, req *schedulerv1.Leav
metrics.LeaveHostCount.Inc()
if err := s.service.LeaveHost(ctx, req); err != nil {
metrics.LeaveHostFailureCount.Inc()
return new(emptypb.Empty), err
return nil, err
}

return new(emptypb.Empty), nil
Expand Down
45 changes: 36 additions & 9 deletions scheduler/rpcserver/scheduler_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,45 +58,72 @@ func (s *schedulerServerV2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePe
// StatPeer checks information of peer.
func (s *schedulerServerV2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*commonv2.Peer, error) {
metrics.StatPeerCount.Inc()
peer, err := s.service.StatPeer(ctx, req)
resp, err := s.service.StatPeer(ctx, req)
if err != nil {
metrics.StatPeerFailureCount.Inc()
return nil, err
}

return peer, nil
return resp, nil
}

// LeavePeer releases peer in scheduler.
func (s *schedulerServerV2) LeavePeer(ctx context.Context, req *schedulerv2.LeavePeerRequest) (*emptypb.Empty, error) {
metrics.LeavePeerCount.Inc()
if err := s.service.LeavePeer(ctx, req); err != nil {
metrics.LeavePeerFailureCount.Inc()
return new(emptypb.Empty), err
return nil, err
}

return new(emptypb.Empty), nil
}

// TODO exchange peer api definition.
// ExchangePeer exchanges peer information.
func (s *schedulerServerV2) ExchangePeer(ctx context.Context, req *schedulerv2.ExchangePeerRequest) (*schedulerv2.ExchangePeerResponse, error) {
return nil, nil
metrics.ExchangePeerCount.Inc()
resp, err := s.service.ExchangePeer(ctx, req)
if err != nil {
metrics.ExchangePeerFailureCount.Inc()
return nil, err
}

return resp, nil
}

// Checks information of task.
// StatTask checks information of task.
func (s *schedulerServerV2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*commonv2.Task, error) {
return nil, nil
metrics.StatTaskCount.Inc()
resp, err := s.service.StatTask(ctx, req)
if err != nil {
metrics.StatTaskFailureCount.Inc()
return nil, err
}

return resp, nil
}

// AnnounceHost announces host to scheduler.
func (s *schedulerServerV2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequest) (*emptypb.Empty, error) {
return nil, nil
metrics.AnnounceHostCount.WithLabelValues(req.Host.Os, req.Host.Platform, req.Host.PlatformFamily, req.Host.PlatformVersion,
req.Host.KernelVersion, req.Host.Build.GitVersion, req.Host.Build.GitCommit, req.Host.Build.GoVersion, req.Host.Build.Platform).Inc()
if err := s.service.AnnounceHost(ctx, req); err != nil {
metrics.AnnounceHostFailureCount.WithLabelValues(req.Host.Os, req.Host.Platform, req.Host.PlatformFamily, req.Host.PlatformVersion,
req.Host.KernelVersion, req.Host.Build.GitVersion, req.Host.Build.GitCommit, req.Host.Build.GoVersion, req.Host.Build.Platform).Inc()
return nil, err
}

return new(emptypb.Empty), nil
}

// LeaveHost releases host in scheduler.
func (s *schedulerServerV2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) (*emptypb.Empty, error) {
return nil, nil
metrics.LeaveHostCount.Inc()
if err := s.service.LeaveHost(ctx, req); err != nil {
metrics.LeaveHostFailureCount.Inc()
return nil, err
}

return new(emptypb.Empty), nil
}

// SyncProbes sync probes of the host.
Expand Down
9 changes: 5 additions & 4 deletions scheduler/service/service_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ
return nil
}

// Host already exists and updates properties.
host.Port = req.Port
host.DownloadPort = req.DownloadPort
host.Type = types.ParseHostType(req.Type)
Expand All @@ -548,6 +549,10 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ
host.KernelVersion = req.KernelVersion
host.UpdatedAt.Store(time.Now())

if concurrentUploadLimit > 0 {
host.ConcurrentUploadLimit.Store(concurrentUploadLimit)
}

if req.Cpu != nil {
host.CPU = resource.CPU{
LogicalCount: req.Cpu.LogicalCount,
Expand Down Expand Up @@ -612,10 +617,6 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ
}
}

if concurrentUploadLimit > 0 {
host.ConcurrentUploadLimit.Store(concurrentUploadLimit)
}

return nil
}

Expand Down
Loading

0 comments on commit c0a11ef

Please sign in to comment.