From 21e03e25bd045a017eac13e67ab49d0fc9a9468e Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 10 Mar 2023 16:27:18 +0800 Subject: [PATCH] feat: add metrics to service v2 (#2153) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Gaius Signed-off-by: 李龙峰 --- go.mod | 2 +- go.sum | 4 +- manager/rpcserver/manager_server_v1.go | 3 +- manager/rpcserver/manager_server_v2.go | 3 +- scheduler/metrics/metrics.go | 134 ++++++++++-------- scheduler/resource/seed_peer.go | 7 +- scheduler/rpcserver/scheduler_server_v1.go | 34 ++++- scheduler/rpcserver/scheduler_server_v2.go | 18 +++ scheduler/service/service_v1.go | 39 ++++-- scheduler/service/service_v1_test.go | 56 +++----- scheduler/service/service_v2.go | 139 ++++++++++++++++--- scheduler/service/service_v2_test.go | 150 +++++++++++++-------- 12 files changed, 401 insertions(+), 188 deletions(-) diff --git a/go.mod b/go.mod index 53a55e9e8fd..6c539640fec 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.20 require ( - d7y.io/api v1.7.8 + d7y.io/api v1.7.9 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 github.com/VividCortex/mysqlerr v1.0.0 diff --git a/go.sum b/go.sum index 7292a19d152..55294bbf38e 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api v1.7.8 h1:49HLi64UX6w4SMJ+NOM7VUwznbVRygnCmO36SALWmSc= -d7y.io/api v1.7.8/go.mod h1:LgmoxxoRDzBiseGFxNWqQP5qsro8+lhYSGwR+/Chplw= +d7y.io/api v1.7.9 h1:lYMPN4DZCigliNJAovwP+eBRvYWSy8up82kyCpVhil4= +d7y.io/api v1.7.9/go.mod h1:LgmoxxoRDzBiseGFxNWqQP5qsro8+lhYSGwR+/Chplw= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/manager/rpcserver/manager_server_v1.go b/manager/rpcserver/manager_server_v1.go index 885d2ee3b10..b8d033a7b0f 100644 --- a/manager/rpcserver/manager_server_v1.go +++ b/manager/rpcserver/manager_server_v1.go @@ -436,10 +436,9 @@ func (s *managerServerV1) createScheduler(ctx context.Context, req *managerv1.Up // List acitve schedulers configuration. func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.ListSchedulersRequest) (*managerv1.ListSchedulersResponse, error) { - metrics.SearchSchedulerClusterCount.WithLabelValues(req.Version, req.Commit).Inc() - log := logger.WithHostnameAndIP(req.HostName, req.Ip) log.Debugf("list schedulers, version %s, commit %s", req.Version, req.Commit) + metrics.SearchSchedulerClusterCount.WithLabelValues(req.Version, req.Commit).Inc() // Count the number of the active peer. if s.config.Metrics.EnablePeerGauge && req.SourceType == managerv1.SourceType_PEER_SOURCE { diff --git a/manager/rpcserver/manager_server_v2.go b/manager/rpcserver/manager_server_v2.go index 114ea470c4f..bbe74223e51 100644 --- a/manager/rpcserver/manager_server_v2.go +++ b/manager/rpcserver/manager_server_v2.go @@ -436,10 +436,9 @@ func (s *managerServerV2) createScheduler(ctx context.Context, req *managerv2.Up // List acitve schedulers configuration. func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.ListSchedulersRequest) (*managerv2.ListSchedulersResponse, error) { - metrics.SearchSchedulerClusterCount.WithLabelValues(req.Version, req.Commit).Inc() - log := logger.WithHostnameAndIP(req.HostName, req.Ip) log.Debugf("list schedulers, version %s, commit %s", req.Version, req.Commit) + metrics.SearchSchedulerClusterCount.WithLabelValues(req.Version, req.Commit).Inc() // Count the number of the active peer. if s.config.Metrics.EnablePeerGauge && req.SourceType == managerv2.SourceType_PEER_SOURCE { diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index b93d45dbf8c..fca25ca14d3 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -36,12 +36,6 @@ var ( // HostTrafficDownloadType is download traffic type for host traffic metrics. HostTrafficDownloadType = "download" - - // DownloadFailureBackToSourceType is back-to-source type for download failure count metrics. - DownloadFailureBackToSourceType = "back_to_source" - - // DownloadFailureP2PType is p2p type for download failure count metrics. - DownloadFailureP2PType = "p2p" ) // Variables declared for metrics. @@ -102,60 +96,102 @@ var ( Help: "Counter of the number of failed of the leaving peer.", }) - RegisterTaskCount = promauto.NewCounterVec(prometheus.CounterOpts{ + RegisterPeerCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, - Name: "register_task_total", - Help: "Counter of the number of the register task.", - }, []string{"tag", "app"}) + Name: "register_peer_total", + Help: "Counter of the number of the register peer.", + }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) - RegisterTaskFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ + RegisterPeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, - Name: "register_task_failure_total", - Help: "Counter of the number of failed of the register task.", - }, []string{"tag", "app"}) + Name: "register_peer_failure_total", + Help: "Counter of the number of failed of the register peer.", + }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) - DownloadTaskCount = promauto.NewCounterVec(prometheus.CounterOpts{ + DownloadPeerStartedCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, - Name: "download_task_total", - Help: "Counter of the number of the task downloading.", - }, []string{"tag", "app", "host_type"}) + Name: "download_peer_started_total", + Help: "Counter of the number of the download peer started.", + }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) - DownloadTaskFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ + DownloadPeerStartedFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, - Name: "download_task_failure_total", - Help: "Counter of the number of failed of the task downloading.", - }, []string{"tag", "app", "type", "code", "host_type"}) + Name: "download_peer_started_failure_total", + Help: "Counter of the number of failed of the download peer started.", + }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) - StatTaskCount = promauto.NewCounter(prometheus.CounterOpts{ + DownloadPeerBackToSourceStartedCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, - Name: "stat_task_total", - Help: "Counter of the number of the stat task.", - }) + Name: "download_peer_back_to_source_started_total", + Help: "Counter of the number of the download peer back-to-source started.", + }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) - StatTaskFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + DownloadPeerBackToSourceStartedFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, - Name: "stat_task_failure_total", - Help: "Counter of the number of failed of the stat task.", - }) + Name: "download_peer_back_to_source_started_failure_total", + Help: "Counter of the number of failed of the download peer back-to-source started.", + }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) + + DownloadPeerCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "download_peer_finished_total", + Help: "Counter of the number of the download peer.", + }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) + + DownloadPeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "download_peer_finished_failure_total", + Help: "Counter of the number of failed of the download peer.", + }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) + + DownloadPeerBackToSourceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "download_peer_back_to_source_finished_failure_total", + Help: "Counter of the number of failed of the download peer back-to-source.", + }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) + + DownloadPieceCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "download_piece_finished_total", + Help: "Counter of the number of the download piece.", + }, []string{"traffic_type", "task_type", "task_tag", "task_app", "host_type"}) + + DownloadPieceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "download_piece_finished_failure_total", + Help: "Counter of the number of failed of the download piece.", + }, []string{"traffic_type", "task_type", "task_tag", "task_app", "host_type"}) - AnnounceTaskCount = promauto.NewCounter(prometheus.CounterOpts{ + DownloadPieceBackToSourceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, - Name: "announce_task_total", - Help: "Counter of the number of the announce task.", + Name: "download_piece_back_to_source_finished_failure_total", + Help: "Counter of the number of failed of the download piece back-to-source.", + }, []string{"traffic_type", "task_type", "task_tag", "task_app", "host_type"}) + + StatTaskCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "stat_task_total", + Help: "Counter of the number of the stat task.", }) - AnnounceTaskFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + StatTaskFailureCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, - Name: "announce_task_failure_total", - Help: "Counter of the number of failed of the announce task.", + Name: "stat_task_failure_total", + Help: "Counter of the number of failed of the stat task.", }) AnnounceHostCount = promauto.NewCounterVec(prometheus.CounterOpts{ @@ -174,20 +210,6 @@ var ( }, []string{"os", "platform", "platform_family", "platform_version", "kernel_version", "git_version", "git_commit", "go_version", "build_platform"}) - LeaveTaskCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: types.MetricsNamespace, - Subsystem: types.SchedulerMetricsName, - Name: "leave_task_total", - Help: "Counter of the number of the leaving task.", - }, []string{"tag", "app", "host_type"}) - - LeaveTaskFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: types.MetricsNamespace, - Subsystem: types.SchedulerMetricsName, - Name: "leave_task_failure_total", - Help: "Counter of the number of failed of the leaving task.", - }, []string{"tag", "app", "host_type"}) - LeaveHostCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, @@ -207,22 +229,22 @@ var ( Subsystem: types.SchedulerMetricsName, Name: "traffic", Help: "Counter of the number of traffic.", - }, []string{"task_tag", "task_app", "type"}) + }, []string{"type", "task_type", "task_tag", "task_app", "host_type"}) HostTraffic = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "host_traffic", Help: "Counter of the number of per host traffic.", - }, []string{"task_tag", "task_app", "type", "host_id", "host_ip"}) + }, []string{"type", "task_type", "task_tag", "task_app", "host_type", "host_id", "host_ip", "host_name"}) - DownloadTaskDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + DownloadPeerDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, - Name: "download_task_duration_milliseconds", - Help: "Histogram of the time each task downloading.", + Name: "download_peer_duration_milliseconds", + Help: "Histogram of the time each peer downloading.", Buckets: []float64{100, 200, 500, 1000, 1500, 2 * 1000, 3 * 1000, 5 * 1000, 10 * 1000, 20 * 1000, 60 * 1000, 120 * 1000, 300 * 1000}, - }, []string{"tag", "app", "host_type"}) + }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) ConcurrentScheduleGauge = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: types.MetricsNamespace, diff --git a/scheduler/resource/seed_peer.go b/scheduler/resource/seed_peer.go index 2471a36f0df..2be18961be2 100644 --- a/scheduler/resource/seed_peer.go +++ b/scheduler/resource/seed_peer.go @@ -188,12 +188,13 @@ func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task) peer.PieceUpdatedAt.Store(time.Now()) task.StorePiece(piece) - // Statistical traffic metrics. + // Collect Traffic metrics. trafficType := commonv2.TrafficType_BACK_TO_SOURCE if pieceSeed.Reuse { - trafficType = commonv2.TrafficType_REMOTE_PEER + trafficType = commonv2.TrafficType_LOCAL_PEER } - metrics.Traffic.WithLabelValues(peer.Task.Tag, peer.Task.Application, trafficType.String()).Add(float64(pieceSeed.PieceInfo.RangeSize)) + metrics.Traffic.WithLabelValues(trafficType.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(pieceSeed.PieceInfo.RangeSize)) } // Handle end of piece. diff --git a/scheduler/rpcserver/scheduler_server_v1.go b/scheduler/rpcserver/scheduler_server_v1.go index a5477a1fc90..7530ff3046c 100644 --- a/scheduler/rpcserver/scheduler_server_v1.go +++ b/scheduler/rpcserver/scheduler_server_v1.go @@ -21,9 +21,11 @@ import ( "google.golang.org/protobuf/types/known/emptypb" + commonv1 "d7y.io/api/pkg/apis/common/v1" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/metrics" "d7y.io/dragonfly/v2/scheduler/resource" @@ -58,11 +60,16 @@ func (s *schedulerServerV1) RegisterPeerTask(ctx context.Context, req *scheduler tag := req.UrlMeta.Tag application := req.UrlMeta.Application + priority := req.UrlMeta.Priority - metrics.RegisterTaskCount.WithLabelValues(tag, application).Inc() + // Collect RegisterPeerCount metrics. + metrics.RegisterPeerCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(), + tag, application, types.HostTypeNormalName).Inc() resp, err := s.service.RegisterPeerTask(ctx, req) if err != nil { - metrics.RegisterTaskFailureCount.WithLabelValues(tag, application).Inc() + // Collect RegisterPeerFailureCount metrics. + metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(), + tag, application, types.HostTypeNormalName).Inc() } return resp, err @@ -70,6 +77,7 @@ func (s *schedulerServerV1) RegisterPeerTask(ctx context.Context, req *scheduler // ReportPieceResult handles the piece information reported by dfdaemon. func (s *schedulerServerV1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultServer) error { + // Collect ConcurrentScheduleGauge metrics. metrics.ConcurrentScheduleGauge.Inc() defer metrics.ConcurrentScheduleGauge.Dec() @@ -83,9 +91,11 @@ func (s *schedulerServerV1) ReportPeerResult(ctx context.Context, req *scheduler // AnnounceTask informs scheduler a peer has completed task. func (s *schedulerServerV1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequest) (*emptypb.Empty, error) { - metrics.AnnounceTaskCount.Inc() + // Collect AnnouncePeerCount metrics. + metrics.AnnouncePeerCount.Inc() if err := s.service.AnnounceTask(ctx, req); err != nil { - metrics.AnnounceTaskFailureCount.Inc() + // Collect AnnouncePeerFailureCount metrics. + metrics.AnnouncePeerFailureCount.Inc() return nil, err } @@ -94,9 +104,11 @@ func (s *schedulerServerV1) AnnounceTask(ctx context.Context, req *schedulerv1.A // StatTask checks if the given task exists. func (s *schedulerServerV1) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest) (*schedulerv1.Task, error) { + // Collect StatTaskCount metrics. metrics.StatTaskCount.Inc() resp, err := s.service.StatTask(ctx, req) if err != nil { + // Collect StatTaskFailureCount metrics. metrics.StatTaskFailureCount.Inc() return nil, err } @@ -106,14 +118,24 @@ func (s *schedulerServerV1) StatTask(ctx context.Context, req *schedulerv1.StatT // LeaveTask makes the peer unschedulable. func (s *schedulerServerV1) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) (*emptypb.Empty, error) { - return new(emptypb.Empty), s.service.LeaveTask(ctx, req) + // Collect LeavePeerCount metrics. + metrics.LeavePeerCount.Inc() + if err := s.service.LeaveTask(ctx, req); err != nil { + // Collect LeavePeerFailureCount metrics. + metrics.LeavePeerFailureCount.Inc() + return nil, err + } + + return new(emptypb.Empty), nil } // AnnounceHost announces host to scheduler. func (s *schedulerServerV1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequest) (*emptypb.Empty, error) { + // Collect AnnounceHostCount metrics. metrics.AnnounceHostCount.WithLabelValues(req.Os, req.Platform, req.PlatformFamily, req.PlatformVersion, req.KernelVersion, req.Build.GitVersion, req.Build.GitCommit, req.Build.GoVersion, req.Build.Platform).Inc() if err := s.service.AnnounceHost(ctx, req); err != nil { + // Collect AnnounceHostFailureCount metrics. metrics.AnnounceHostFailureCount.WithLabelValues(req.Os, req.Platform, req.PlatformFamily, req.PlatformVersion, req.KernelVersion, req.Build.GitVersion, req.Build.GitCommit, req.Build.GoVersion, req.Build.Platform).Inc() return nil, err @@ -124,8 +146,10 @@ func (s *schedulerServerV1) AnnounceHost(ctx context.Context, req *schedulerv1.A // LeaveHost releases host in scheduler. func (s *schedulerServerV1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) (*emptypb.Empty, error) { + // Collect LeaveHostCount metrics. metrics.LeaveHostCount.Inc() if err := s.service.LeaveHost(ctx, req); err != nil { + // Collect LeaveHostFailureCount metrics. metrics.LeaveHostFailureCount.Inc() return nil, err } diff --git a/scheduler/rpcserver/scheduler_server_v2.go b/scheduler/rpcserver/scheduler_server_v2.go index 2d4c2c7a292..7b08386ab5e 100644 --- a/scheduler/rpcserver/scheduler_server_v2.go +++ b/scheduler/rpcserver/scheduler_server_v2.go @@ -52,8 +52,14 @@ func newSchedulerServerV2( // AnnouncePeer announces peer to scheduler. func (s *schedulerServerV2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error { + // Collect ConcurrentScheduleGauge metrics. + metrics.ConcurrentScheduleGauge.Inc() + defer metrics.ConcurrentScheduleGauge.Dec() + + // Collect AnnouncePeerCount metrics. metrics.AnnouncePeerCount.Inc() if err := s.service.AnnouncePeer(stream); err != nil { + // Collect AnnouncePeerFailureCount metrics. metrics.AnnouncePeerFailureCount.Inc() return err } @@ -63,9 +69,11 @@ func (s *schedulerServerV2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePe // StatPeer checks information of peer. func (s *schedulerServerV2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*commonv2.Peer, error) { + // Collect StatPeerCount metrics. metrics.StatPeerCount.Inc() resp, err := s.service.StatPeer(ctx, req) if err != nil { + // Collect StatPeerFailureCount metrics. metrics.StatPeerFailureCount.Inc() return nil, err } @@ -75,8 +83,10 @@ func (s *schedulerServerV2) StatPeer(ctx context.Context, req *schedulerv2.StatP // LeavePeer releases peer in scheduler. func (s *schedulerServerV2) LeavePeer(ctx context.Context, req *schedulerv2.LeavePeerRequest) (*emptypb.Empty, error) { + // Collect LeavePeerCount metrics. metrics.LeavePeerCount.Inc() if err := s.service.LeavePeer(ctx, req); err != nil { + // Collect LeavePeerFailureCount metrics. metrics.LeavePeerFailureCount.Inc() return nil, err } @@ -86,9 +96,11 @@ func (s *schedulerServerV2) LeavePeer(ctx context.Context, req *schedulerv2.Leav // ExchangePeer exchanges peer information. func (s *schedulerServerV2) ExchangePeer(ctx context.Context, req *schedulerv2.ExchangePeerRequest) (*schedulerv2.ExchangePeerResponse, error) { + // Collect ExchangePeerCount metrics. metrics.ExchangePeerCount.Inc() resp, err := s.service.ExchangePeer(ctx, req) if err != nil { + // Collect ExchangePeerFailureCount metrics. metrics.ExchangePeerFailureCount.Inc() return nil, err } @@ -98,9 +110,11 @@ func (s *schedulerServerV2) ExchangePeer(ctx context.Context, req *schedulerv2.E // StatTask checks information of task. func (s *schedulerServerV2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*commonv2.Task, error) { + // Collect StatTaskCount metrics. metrics.StatTaskCount.Inc() resp, err := s.service.StatTask(ctx, req) if err != nil { + // Collect StatTaskFailureCount metrics. metrics.StatTaskFailureCount.Inc() return nil, err } @@ -110,9 +124,11 @@ func (s *schedulerServerV2) StatTask(ctx context.Context, req *schedulerv2.StatT // AnnounceHost announces host to scheduler. func (s *schedulerServerV2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequest) (*emptypb.Empty, error) { + // Collect AnnounceHostCount metrics. metrics.AnnounceHostCount.WithLabelValues(req.Host.Os, req.Host.Platform, req.Host.PlatformFamily, req.Host.PlatformVersion, req.Host.KernelVersion, req.Host.Build.GitVersion, req.Host.Build.GitCommit, req.Host.Build.GoVersion, req.Host.Build.Platform).Inc() if err := s.service.AnnounceHost(ctx, req); err != nil { + // Collect AnnounceHostFailureCount metrics. metrics.AnnounceHostFailureCount.WithLabelValues(req.Host.Os, req.Host.Platform, req.Host.PlatformFamily, req.Host.PlatformVersion, req.Host.KernelVersion, req.Host.Build.GitVersion, req.Host.Build.GitCommit, req.Host.Build.GoVersion, req.Host.Build.Platform).Inc() return nil, err @@ -123,8 +139,10 @@ func (s *schedulerServerV2) AnnounceHost(ctx context.Context, req *schedulerv2.A // LeaveHost releases host in scheduler. func (s *schedulerServerV2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) (*emptypb.Empty, error) { + // Collect LeaveHostCount metrics. metrics.LeaveHostCount.Inc() if err := s.service.LeaveHost(ctx, req); err != nil { + // Collect LeaveHostFailureCount metrics. metrics.LeaveHostFailureCount.Inc() return nil, err } diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 34e38653fda..d8cfc851633 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -232,11 +232,13 @@ func (v *V1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultSer peer.Log.Infof("receive success piece: %#v %#v", piece, piece.PieceInfo) v.handlePieceSuccess(ctx, peer, piece) - // Collect peer host traffic metrics. + // Collect host traffic metrics. if v.config.Metrics.Enable && v.config.Metrics.EnableHost { - metrics.HostTraffic.WithLabelValues(peer.Task.Tag, peer.Task.Application, metrics.HostTrafficDownloadType, peer.Host.ID, peer.Host.IP).Add(float64(piece.PieceInfo.RangeSize)) + metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, + peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.PieceInfo.RangeSize)) if parent, loaded := v.resource.PeerManager().Load(piece.DstPid); loaded { - metrics.HostTraffic.WithLabelValues(peer.Task.Tag, peer.Task.Application, metrics.HostTrafficUploadType, parent.Host.ID, parent.Host.IP).Add(float64(piece.PieceInfo.RangeSize)) + metrics.HostTraffic.WithLabelValues(metrics.HostTrafficUploadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, + parent.Host.Type.Name(), parent.Host.ID, parent.Host.IP, parent.Host.Hostname).Add(float64(piece.PieceInfo.RangeSize)) } else if !resource.IsPieceBackToSource(piece.DstPid) { peer.Log.Warnf("dst peer %s not found", piece.DstPid) } @@ -244,9 +246,11 @@ func (v *V1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultSer // Collect traffic metrics. if !resource.IsPieceBackToSource(piece.DstPid) { - metrics.Traffic.WithLabelValues(peer.Task.Tag, peer.Task.Application, commonv2.TrafficType_REMOTE_PEER.String()).Add(float64(piece.PieceInfo.RangeSize)) + metrics.Traffic.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.PieceInfo.RangeSize)) } else { - metrics.Traffic.WithLabelValues(peer.Task.Tag, peer.Task.Application, commonv2.TrafficType_BACK_TO_SOURCE.String()).Add(float64(piece.PieceInfo.RangeSize)) + metrics.Traffic.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.PieceInfo.RangeSize)) } continue } @@ -278,27 +282,38 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) logger.Error(msg) return dferrors.New(commonv1.Code_SchedPeerNotFound, msg) } - metrics.DownloadTaskCount.WithLabelValues(peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + + // Collect DownloadPeerCount metrics. + priority := peer.CalculatePriority(v.dynconfig) + metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() parents := peer.Parents() if !req.Success { peer.Log.Error("report failed peer") if peer.FSM.Is(resource.PeerStateBackToSource) { - metrics.DownloadTaskFailureCount.WithLabelValues(peer.Task.Tag, peer.Task.Application, - metrics.DownloadFailureBackToSourceType, req.Code.String(), peer.Host.Type.Name()).Inc() + // Collect DownloadPeerBackToSourceFailureCount metrics. + metrics.DownloadPeerBackToSourceFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + go v.createRecord(peer, parents, req) v.handleTaskFailure(ctx, peer.Task, req.GetSourceError(), nil) v.handlePeerFailure(ctx, peer) return nil } - metrics.DownloadTaskFailureCount.WithLabelValues(peer.Task.Tag, peer.Task.Application, - metrics.DownloadFailureP2PType, req.Code.String(), peer.Host.Type.Name()).Inc() + // Collect DownloadPeerFailureCount metrics. + metrics.DownloadPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + go v.createRecord(peer, parents, req) v.handlePeerFailure(ctx, peer) return nil } - metrics.DownloadTaskDuration.WithLabelValues(peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Observe(float64(req.Cost)) + + // Collect DownloadPeerDuration metrics. + metrics.DownloadPeerDuration.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Observe(float64(req.Cost)) peer.Log.Info("report success peer") if peer.FSM.Is(resource.PeerStateBackToSource) { @@ -430,10 +445,8 @@ func (v *V1) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) error { logger.Error(msg) return dferrors.New(commonv1.Code_SchedPeerNotFound, msg) } - metrics.LeaveTaskCount.WithLabelValues(peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() if err := peer.FSM.Event(ctx, resource.PeerEventLeave); err != nil { - metrics.LeaveTaskFailureCount.WithLabelValues(peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) peer.Log.Error(msg) return dferrors.New(commonv1.Code_SchedTaskStatusError, msg) diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 9a57ef1761f..1ff4f74fb1f 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -1178,24 +1178,18 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { tests := []struct { name string req *schedulerv1.PeerResult - run func( - t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, - mockPeer *resource.Peer, - res resource.Resource, peerManager resource.PeerManager, + run func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - ) + md *configmocks.MockDynconfigInterfaceMockRecorder) }{ { name: "peer not found", req: &schedulerv1.PeerResult{ PeerId: mockPeerID, }, - run: func( - t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, - mockPeer *resource.Peer, - res resource.Resource, peerManager resource.PeerManager, + run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - ) { + md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(nil, false).Times(1), @@ -1214,12 +1208,9 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { Success: false, PeerId: mockPeerID, }, - run: func( - t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, - mockPeer *resource.Peer, - res resource.Resource, peerManager resource.PeerManager, + run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - ) { + md *configmocks.MockDynconfigInterfaceMockRecorder) { var wg sync.WaitGroup wg.Add(1) defer wg.Wait() @@ -1228,6 +1219,7 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ms.Create(gomock.Any()).Do(func(record storage.Record) { wg.Done() }).Return(nil).Times(1), ) @@ -1242,12 +1234,9 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { Success: false, PeerId: mockPeerID, }, - run: func( - t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, - mockPeer *resource.Peer, - res resource.Resource, peerManager resource.PeerManager, + run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - ) { + md *configmocks.MockDynconfigInterfaceMockRecorder) { var wg sync.WaitGroup wg.Add(1) defer wg.Wait() @@ -1256,6 +1245,7 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ms.Create(gomock.Any()).Do(func(record storage.Record) { wg.Done() }).Return(nil).Times(1), ) @@ -1270,12 +1260,9 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { Success: true, PeerId: mockPeerID, }, - run: func( - t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, - mockPeer *resource.Peer, - res resource.Resource, peerManager resource.PeerManager, + run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - ) { + md *configmocks.MockDynconfigInterfaceMockRecorder) { var wg sync.WaitGroup wg.Add(1) defer wg.Wait() @@ -1284,6 +1271,7 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ms.Create(gomock.Any()).Do(func(record storage.Record) { wg.Done() }).Return(nil).Times(1), ) @@ -1298,12 +1286,9 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { Success: true, PeerId: mockPeerID, }, - run: func( - t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, - mockPeer *resource.Peer, - res resource.Resource, peerManager resource.PeerManager, + run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - ) { + md *configmocks.MockDynconfigInterfaceMockRecorder) { var wg sync.WaitGroup wg.Add(1) defer wg.Wait() @@ -1312,6 +1297,7 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ms.Create(gomock.Any()).Do(func(record storage.Record) { wg.Done() }).Return(nil).Times(1), ) @@ -1326,12 +1312,9 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { Success: true, PeerId: mockPeerID, }, - run: func( - t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, - mockPeer *resource.Peer, - res resource.Resource, peerManager resource.PeerManager, + run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - ) { + md *configmocks.MockDynconfigInterfaceMockRecorder) { var wg sync.WaitGroup wg.Add(1) defer wg.Wait() @@ -1340,6 +1323,7 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ms.Create(gomock.Any()).Do(func(record storage.Record) { wg.Done() }).Return(nil).Times(1), ) @@ -1366,7 +1350,7 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) - tc.run(t, mockPeer, tc.req, svc, mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), storage.EXPECT()) + tc.run(t, mockPeer, tc.req, svc, mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), storage.EXPECT(), dynconfig.EXPECT()) }) } } diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 75bccffb1e9..72849e707a6 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -36,6 +36,7 @@ import ( "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" + "d7y.io/dragonfly/v2/scheduler/metrics" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/storage" @@ -647,18 +648,33 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S return err } + // Collect RegisterPeerCount metrics. + priority := peer.CalculatePriority(v.dynconfig) + metrics.RegisterPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + // When there are no available peers for a task, the scheduler needs to trigger // the first task download in the p2p cluster. blocklist := set.NewSafeSet[string]() blocklist.Add(peer.ID) if task.FSM.Is(resource.TaskStateFailed) || !task.HasAvailablePeer(blocklist) { if err := v.downloadTaskBySeedPeer(ctx, peer); err != nil { + // Collect RegisterPeerFailureCount metrics. + metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() return err } } - // Scheduling parent for the peer.. - return v.schedule(ctx, peer) + // Scheduling parent for the peer. + if err := v.schedule(ctx, peer); err != nil { + // Collect RegisterPeerFailureCount metrics. + metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + return err + } + + return nil } // handleRegisterSeedPeerRequest handles RegisterSeedPeerRequest of AnnouncePeerRequest. @@ -669,6 +685,11 @@ func (v *V2) handleRegisterSeedPeerRequest(ctx context.Context, stream scheduler return err } + // Collect RegisterPeerCount metrics. + priority := peer.CalculatePriority(v.dynconfig) + metrics.RegisterPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + // When there are no available peers for a task, the scheduler needs to trigger // the first task download in the p2p cluster. blocklist := set.NewSafeSet[string]() @@ -679,8 +700,15 @@ func (v *V2) handleRegisterSeedPeerRequest(ctx context.Context, stream scheduler peer.NeedBackToSource.Store(true) } - // Scheduling parent for the peer.. - return v.schedule(ctx, peer) + // Scheduling parent for the peer. + if err := v.schedule(ctx, peer); err != nil { + // Collect RegisterPeerFailureCount metrics. + metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + return err + } + + return nil } // handleDownloadPeerStartedRequest handles DownloadPeerStartedRequest of AnnouncePeerRequest. @@ -690,14 +718,25 @@ func (v *V2) handleDownloadPeerStartedRequest(ctx context.Context, peerID string return status.Errorf(codes.NotFound, "peer %s not found", peerID) } + // Collect DownloadPeerStartedCount metrics. + priority := peer.CalculatePriority(v.dynconfig) + metrics.DownloadPeerStartedCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + // Handle peer with peer started request. if err := peer.FSM.Event(ctx, resource.PeerEventDownload); err != nil { + // Collect DownloadPeerStartedFailureCount metrics. + metrics.DownloadPeerStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() return status.Error(codes.Internal, err.Error()) } // Handle task with peer started request. if !peer.Task.FSM.Is(resource.TaskStateRunning) { if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownload); err != nil { + // Collect DownloadPeerStartedFailureCount metrics. + metrics.DownloadPeerStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() return status.Error(codes.Internal, err.Error()) } } else { @@ -714,14 +753,25 @@ func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, p return status.Errorf(codes.NotFound, "peer %s not found", peerID) } + // Collect DownloadPeerBackToSourceStartedCount metrics. + priority := peer.CalculatePriority(v.dynconfig) + metrics.DownloadPeerBackToSourceStartedCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + // Handle peer with peer back-to-source started request. if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil { + // Collect DownloadPeerBackToSourceStartedFailureCount metrics. + metrics.DownloadPeerBackToSourceStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() return status.Error(codes.Internal, err.Error()) } // Handle task with peer back-to-source started request. if !peer.Task.FSM.Is(resource.TaskStateRunning) { if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownload); err != nil { + // Collect DownloadPeerBackToSourceStartedFailureCount metrics. + metrics.DownloadPeerBackToSourceStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() return status.Error(codes.Internal, err.Error()) } } else { @@ -744,6 +794,13 @@ func (v *V2) handleDownloadPeerFinishedRequest(ctx context.Context, peerID strin return status.Error(codes.Internal, err.Error()) } + // Collect DownloadPeerCount and DownloadPeerDuration metrics. + priority := peer.CalculatePriority(v.dynconfig) + metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + metrics.DownloadPeerDuration.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Observe(float64(peer.Cost.Load())) + return nil } @@ -787,6 +844,13 @@ func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(ctx context.Context, } } + // Collect DownloadPeerCount and DownloadPeerDuration metrics. + priority := peer.CalculatePriority(v.dynconfig) + metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + metrics.DownloadPeerDuration.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Observe(float64(peer.Cost.Load())) + return nil } @@ -804,6 +868,11 @@ func (v *V2) handleDownloadPeerFailedRequest(ctx context.Context, peerID string) // Handle task with peer failed request. peer.Task.UpdatedAt.Store(time.Now()) + + // Collect DownloadPeerFailureCount and DownloadPeerDuration metrics. + metrics.DownloadPeerFailureCount.WithLabelValues(peer.CalculatePriority(v.dynconfig).String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + return nil } @@ -827,6 +896,10 @@ func (v *V2) handleDownloadPeerBackToSourceFailedRequest(ctx context.Context, pe return status.Error(codes.Internal, err.Error()) } + // Collect DownloadPeerBackToSourceFailureCount and DownloadPeerDuration metrics. + metrics.DownloadPeerBackToSourceFailureCount.WithLabelValues(peer.CalculatePriority(v.dynconfig).String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + return nil } @@ -838,7 +911,7 @@ func (v *V2) handleDownloadPieceFinishedRequest(ctx context.Context, peerID stri ParentID: req.Piece.ParentId, Offset: req.Piece.Offset, Length: req.Piece.Length, - TrafficType: commonv2.TrafficType_REMOTE_PEER, + TrafficType: req.Piece.TrafficType, Cost: req.Piece.Cost.AsDuration(), CreatedAt: req.Piece.CreatedAt.AsTime(), } @@ -867,13 +940,29 @@ func (v *V2) handleDownloadPieceFinishedRequest(ctx context.Context, peerID stri // When the piece is downloaded successfully, parent.UpdatedAt needs to be updated // to prevent the parent from being GC during the download process. - if parent, loaded := v.resource.PeerManager().Load(piece.ParentID); loaded { + parent, loadedParent := v.resource.PeerManager().Load(piece.ParentID) + if loadedParent { parent.UpdatedAt.Store(time.Now()) parent.Host.UpdatedAt.Store(time.Now()) } // Handle task with piece finished request. peer.Task.UpdatedAt.Store(time.Now()) + + // Collect piece and traffic metrics. + metrics.DownloadPieceCount.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + metrics.Traffic.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.Length)) + if v.config.Metrics.EnableHost { + metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, + peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.Length)) + if loadedParent { + metrics.HostTraffic.WithLabelValues(metrics.HostTrafficUploadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, + parent.Host.Type.Name(), parent.Host.ID, parent.Host.IP, parent.Host.Hostname).Add(float64(piece.Length)) + } + } + return nil } @@ -885,7 +974,7 @@ func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(ctx context.Context, ParentID: req.Piece.ParentId, Offset: req.Piece.Offset, Length: req.Piece.Length, - TrafficType: commonv2.TrafficType_REMOTE_PEER, + TrafficType: req.Piece.TrafficType, Cost: req.Piece.Cost.AsDuration(), CreatedAt: req.Piece.CreatedAt.AsTime(), } @@ -915,25 +1004,40 @@ func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(ctx context.Context, // Handle task with piece back-to-source finished request. peer.Task.StorePiece(piece) peer.Task.UpdatedAt.Store(time.Now()) + + // Collect piece and traffic metrics. + metrics.DownloadPieceCount.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + metrics.Traffic.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.Length)) + if v.config.Metrics.EnableHost { + metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, + peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.Length)) + } + return nil } // handleDownloadPieceFailedRequest handles DownloadPieceFailedRequest of AnnouncePeerRequest. func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceFailedRequest) error { - if req.Temporary { - peer, loaded := v.resource.PeerManager().Load(peerID) - if !loaded { - return status.Errorf(codes.NotFound, "peer %s not found", peerID) - } + peer, loaded := v.resource.PeerManager().Load(peerID) + if !loaded { + return status.Errorf(codes.NotFound, "peer %s not found", peerID) + } + + // Collect DownloadPieceFailureCount metrics. + metrics.DownloadPieceFailureCount.WithLabelValues(req.Piece.TrafficType.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + if req.Temporary { // Handle peer with piece temporary failed request. peer.UpdatedAt.Store(time.Now()) - peer.BlockParents.Add(req.ParentId) + peer.BlockParents.Add(req.Piece.ParentId) if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil { return status.Error(codes.FailedPrecondition, err.Error()) } - if parent, loaded := v.resource.PeerManager().Load(req.ParentId); loaded { + if parent, loaded := v.resource.PeerManager().Load(req.Piece.ParentId); loaded { parent.Host.UploadFailedCount.Inc() } @@ -957,7 +1061,12 @@ func (v *V2) handleDownloadPieceBackToSourceFailedRequest(ctx context.Context, p // Handle task with piece back-to-source failed request. peer.Task.UpdatedAt.Store(time.Now()) - return status.Error(codes.Internal, req.Status) + + // Collect DownloadPieceFailureCount metrics. + metrics.DownloadPieceFailureCount.WithLabelValues(req.Piece.TrafficType.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + + return status.Error(codes.Internal, "download piece from source failed") } // TODO Implement function. diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 5a9889425ba..973a299406b 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -36,6 +36,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" commonv2 "d7y.io/api/pkg/apis/common/v2" + managerv2 "d7y.io/api/pkg/apis/manager/v2" schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" schedulerv2mocks "d7y.io/api/pkg/apis/scheduler/v2/mocks" @@ -1935,11 +1936,11 @@ func TestServiceV2_handleRegisterSeedPeerRequest(t *testing.T) { func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) { tests := []struct { name string - run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) + run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) }{ { name: "peer can not be loaded", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1), @@ -1951,10 +1952,11 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) { }, { name: "peer state is PeerStateRunning", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ) peer.FSM.SetState(resource.PeerStateRunning) @@ -1965,10 +1967,11 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) { }, { name: "task state is TaskStateRunning", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ) peer.FSM.SetState(resource.PeerStateReceivedNormal) @@ -1982,10 +1985,11 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) { }, { name: "task state is TaskStatePending", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ) peer.FSM.SetState(resource.PeerStateReceivedNormal) @@ -2016,7 +2020,7 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) { peer := resource.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) - tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT()) + tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) } } @@ -2024,11 +2028,11 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) { func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { tests := []struct { name string - run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) + run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) }{ { name: "peer can not be loaded", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1), @@ -2040,10 +2044,11 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { }, { name: "peer state is PeerStateRunning", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ) peer.FSM.SetState(resource.PeerStateBackToSource) @@ -2054,10 +2059,11 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { }, { name: "task state is TaskStateRunning", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ) peer.FSM.SetState(resource.PeerStateReceivedNormal) @@ -2071,10 +2077,11 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { }, { name: "task state is TaskStatePending", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ) peer.FSM.SetState(resource.PeerStateReceivedNormal) @@ -2105,7 +2112,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { peer := resource.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) - tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT()) + tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) } } @@ -2113,11 +2120,11 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) { tests := []struct { name string - run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) + run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) }{ { name: "peer can not be loaded", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1), @@ -2129,7 +2136,7 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) { }, { name: "peer state is PeerStateSucceeded", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), @@ -2144,10 +2151,11 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) { }, { name: "peer state is PeerStateRunning", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ) peer.FSM.SetState(resource.PeerStateRunning) @@ -2177,7 +2185,7 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) { peer := resource.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) - tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT()) + tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) } } @@ -2196,12 +2204,14 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { tests := []struct { name string req *schedulerv2.DownloadPeerBackToSourceFinishedRequest - run func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) + run func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) }{ { name: "peer can not be loaded", req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{}, - run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1), @@ -2218,7 +2228,8 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { { name: "peer state is PeerStateSucceeded", req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{}, - run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), @@ -2238,10 +2249,12 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { { name: "peer has range", req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{}, - run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ) peer.FSM.SetState(resource.PeerStateRunning) @@ -2260,10 +2273,12 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { { name: "task state is TaskStateSucceeded", req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{}, - run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ) peer.FSM.SetState(resource.PeerStateRunning) @@ -2285,7 +2300,8 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { ContentLength: 1024, PieceCount: 10, }, - run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), @@ -2310,10 +2326,12 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { ContentLength: 1024, PieceCount: 10, }, - run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ) peer.FSM.SetState(resource.PeerStateRunning) @@ -2335,7 +2353,8 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { ContentLength: 127, PieceCount: 1, }, - run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), @@ -2361,7 +2380,8 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { ContentLength: 126, PieceCount: 1, }, - run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), @@ -2386,10 +2406,12 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { ContentLength: 1, PieceCount: 1, }, - run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, + mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ) peer.FSM.SetState(resource.PeerStateRunning) @@ -2442,7 +2464,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { peer := resource.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) - tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT()) + tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) } } @@ -2450,11 +2472,11 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) { tests := []struct { name string - run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) + run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) }{ { name: "peer can not be loaded", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1), @@ -2466,7 +2488,7 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) { }, { name: "peer state is PeerEventDownloadFailed", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), @@ -2480,10 +2502,11 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) { }, { name: "peer state is PeerStateRunning", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ) peer.FSM.SetState(resource.PeerStateRunning) @@ -2513,7 +2536,7 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) { peer := resource.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) - tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT()) + tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) } } @@ -2521,11 +2544,11 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) { func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) { tests := []struct { name string - run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) + run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) }{ { name: "peer can not be loaded", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1), @@ -2546,7 +2569,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) { }, { name: "peer state is PeerStateFailed", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), @@ -2568,7 +2591,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) { }, { name: "task state is TaskStateFailed", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), @@ -2591,10 +2614,11 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) { }, { name: "task state is TaskStateRunning", - run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), + md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), ) peer.FSM.SetState(resource.PeerStateRunning) @@ -2631,7 +2655,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) { peer := resource.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) - tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT()) + tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) } } @@ -2927,35 +2951,49 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) }{ { - name: "temporary is false", + name: "peer can not be loaded", req: &schedulerv2.DownloadPieceFailedRequest{ - Temporary: false, + Piece: &commonv2.Piece{ + ParentId: mockSeedPeerID, + }, + Temporary: true, }, run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { + gomock.InOrder( + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1), + ) + assert := assert.New(t) - assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.FailedPrecondition, "download piece failed")) + assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Errorf(codes.NotFound, "peer %s not found", peer.ID)) }, }, { - name: "temporary is true and peer can not be loaded", + name: "temporary is false", req: &schedulerv2.DownloadPieceFailedRequest{ - Temporary: true, + Piece: &commonv2.Piece{ + ParentId: mockSeedPeerID, + }, + Temporary: false, }, run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1), + mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), ) assert := assert.New(t) - assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Errorf(codes.NotFound, "peer %s not found", peer.ID)) + assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.FailedPrecondition, "download piece failed")) }, }, { name: "schedule failed", req: &schedulerv2.DownloadPieceFailedRequest{ + Piece: &commonv2.Piece{ + ParentId: mockSeedPeerID, + }, Temporary: true, }, run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, @@ -2969,12 +3007,15 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { assert := assert.New(t) assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.FailedPrecondition, "foo")) assert.NotEqual(peer.UpdatedAt.Load(), 0) - assert.True(peer.BlockParents.Contains(req.ParentId)) + assert.True(peer.BlockParents.Contains(req.Piece.ParentId)) }, }, { name: "parent can not be loaded", req: &schedulerv2.DownloadPieceFailedRequest{ + Piece: &commonv2.Piece{ + ParentId: mockSeedPeerID, + }, Temporary: true, }, run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, @@ -2984,19 +3025,22 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(req.ParentId)).Return(nil, false).Times(1), + mp.Load(gomock.Eq(req.Piece.ParentId)).Return(nil, false).Times(1), ) assert := assert.New(t) assert.NoError(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req)) assert.NotEqual(peer.UpdatedAt.Load(), 0) - assert.True(peer.BlockParents.Contains(req.ParentId)) + assert.True(peer.BlockParents.Contains(req.Piece.ParentId)) assert.NotEqual(peer.Task.UpdatedAt.Load(), 0) }, }, { name: "parent can be loaded", req: &schedulerv2.DownloadPieceFailedRequest{ + Piece: &commonv2.Piece{ + ParentId: mockSeedPeerID, + }, Temporary: true, }, run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, @@ -3006,13 +3050,13 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1), ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1), mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(req.ParentId)).Return(peer, true).Times(1), + mp.Load(gomock.Eq(req.Piece.ParentId)).Return(peer, true).Times(1), ) assert := assert.New(t) assert.NoError(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req)) assert.NotEqual(peer.UpdatedAt.Load(), 0) - assert.True(peer.BlockParents.Contains(req.ParentId)) + assert.True(peer.BlockParents.Contains(req.Piece.ParentId)) assert.NotEqual(peer.Task.UpdatedAt.Load(), 0) assert.Equal(peer.Host.UploadFailedCount.Load(), int64(1)) }, @@ -3065,7 +3109,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) { { name: "peer can be loaded", req: &schedulerv2.DownloadPieceBackToSourceFailedRequest{ - Status: "foo", + Piece: &commonv2.Piece{}, }, run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { @@ -3075,7 +3119,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) { ) assert := assert.New(t) - assert.ErrorIs(svc.handleDownloadPieceBackToSourceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.Internal, "foo")) + assert.ErrorIs(svc.handleDownloadPieceBackToSourceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.Internal, "download piece from source failed")) assert.NotEqual(peer.UpdatedAt.Load(), 0) assert.NotEqual(peer.Task.UpdatedAt.Load(), 0) },