Skip to content

Commit

Permalink
feat: add metrics to service v2 (dragonflyoss#2153)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
Signed-off-by: 李龙峰 <[email protected]>
  • Loading branch information
gaius-qi authored and 李龙峰 committed May 15, 2023
1 parent 110d219 commit 21e03e2
Show file tree
Hide file tree
Showing 12 changed files with 401 additions and 188 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.20

require (
d7y.io/api v1.7.8
d7y.io/api v1.7.9
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
github.com/VividCortex/mysqlerr v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api v1.7.8 h1:49HLi64UX6w4SMJ+NOM7VUwznbVRygnCmO36SALWmSc=
d7y.io/api v1.7.8/go.mod h1:LgmoxxoRDzBiseGFxNWqQP5qsro8+lhYSGwR+/Chplw=
d7y.io/api v1.7.9 h1:lYMPN4DZCigliNJAovwP+eBRvYWSy8up82kyCpVhil4=
d7y.io/api v1.7.9/go.mod h1:LgmoxxoRDzBiseGFxNWqQP5qsro8+lhYSGwR+/Chplw=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
3 changes: 1 addition & 2 deletions manager/rpcserver/manager_server_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,9 @@ func (s *managerServerV1) createScheduler(ctx context.Context, req *managerv1.Up

// List acitve schedulers configuration.
func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.ListSchedulersRequest) (*managerv1.ListSchedulersResponse, error) {
metrics.SearchSchedulerClusterCount.WithLabelValues(req.Version, req.Commit).Inc()

log := logger.WithHostnameAndIP(req.HostName, req.Ip)
log.Debugf("list schedulers, version %s, commit %s", req.Version, req.Commit)
metrics.SearchSchedulerClusterCount.WithLabelValues(req.Version, req.Commit).Inc()

// Count the number of the active peer.
if s.config.Metrics.EnablePeerGauge && req.SourceType == managerv1.SourceType_PEER_SOURCE {
Expand Down
3 changes: 1 addition & 2 deletions manager/rpcserver/manager_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,9 @@ func (s *managerServerV2) createScheduler(ctx context.Context, req *managerv2.Up

// List acitve schedulers configuration.
func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.ListSchedulersRequest) (*managerv2.ListSchedulersResponse, error) {
metrics.SearchSchedulerClusterCount.WithLabelValues(req.Version, req.Commit).Inc()

log := logger.WithHostnameAndIP(req.HostName, req.Ip)
log.Debugf("list schedulers, version %s, commit %s", req.Version, req.Commit)
metrics.SearchSchedulerClusterCount.WithLabelValues(req.Version, req.Commit).Inc()

// Count the number of the active peer.
if s.config.Metrics.EnablePeerGauge && req.SourceType == managerv2.SourceType_PEER_SOURCE {
Expand Down
134 changes: 78 additions & 56 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ var (

// HostTrafficDownloadType is download traffic type for host traffic metrics.
HostTrafficDownloadType = "download"

// DownloadFailureBackToSourceType is back-to-source type for download failure count metrics.
DownloadFailureBackToSourceType = "back_to_source"

// DownloadFailureP2PType is p2p type for download failure count metrics.
DownloadFailureP2PType = "p2p"
)

// Variables declared for metrics.
Expand Down Expand Up @@ -102,60 +96,102 @@ var (
Help: "Counter of the number of failed of the leaving peer.",
})

RegisterTaskCount = promauto.NewCounterVec(prometheus.CounterOpts{
RegisterPeerCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "register_task_total",
Help: "Counter of the number of the register task.",
}, []string{"tag", "app"})
Name: "register_peer_total",
Help: "Counter of the number of the register peer.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"})

RegisterTaskFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
RegisterPeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "register_task_failure_total",
Help: "Counter of the number of failed of the register task.",
}, []string{"tag", "app"})
Name: "register_peer_failure_total",
Help: "Counter of the number of failed of the register peer.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"})

DownloadTaskCount = promauto.NewCounterVec(prometheus.CounterOpts{
DownloadPeerStartedCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_task_total",
Help: "Counter of the number of the task downloading.",
}, []string{"tag", "app", "host_type"})
Name: "download_peer_started_total",
Help: "Counter of the number of the download peer started.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"})

DownloadTaskFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
DownloadPeerStartedFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_task_failure_total",
Help: "Counter of the number of failed of the task downloading.",
}, []string{"tag", "app", "type", "code", "host_type"})
Name: "download_peer_started_failure_total",
Help: "Counter of the number of failed of the download peer started.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"})

StatTaskCount = promauto.NewCounter(prometheus.CounterOpts{
DownloadPeerBackToSourceStartedCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "stat_task_total",
Help: "Counter of the number of the stat task.",
})
Name: "download_peer_back_to_source_started_total",
Help: "Counter of the number of the download peer back-to-source started.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"})

StatTaskFailureCount = promauto.NewCounter(prometheus.CounterOpts{
DownloadPeerBackToSourceStartedFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "stat_task_failure_total",
Help: "Counter of the number of failed of the stat task.",
})
Name: "download_peer_back_to_source_started_failure_total",
Help: "Counter of the number of failed of the download peer back-to-source started.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"})

DownloadPeerCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_peer_finished_total",
Help: "Counter of the number of the download peer.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"})

DownloadPeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_peer_finished_failure_total",
Help: "Counter of the number of failed of the download peer.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"})

DownloadPeerBackToSourceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_peer_back_to_source_finished_failure_total",
Help: "Counter of the number of failed of the download peer back-to-source.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"})

DownloadPieceCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_piece_finished_total",
Help: "Counter of the number of the download piece.",
}, []string{"traffic_type", "task_type", "task_tag", "task_app", "host_type"})

DownloadPieceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_piece_finished_failure_total",
Help: "Counter of the number of failed of the download piece.",
}, []string{"traffic_type", "task_type", "task_tag", "task_app", "host_type"})

AnnounceTaskCount = promauto.NewCounter(prometheus.CounterOpts{
DownloadPieceBackToSourceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_task_total",
Help: "Counter of the number of the announce task.",
Name: "download_piece_back_to_source_finished_failure_total",
Help: "Counter of the number of failed of the download piece back-to-source.",
}, []string{"traffic_type", "task_type", "task_tag", "task_app", "host_type"})

StatTaskCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "stat_task_total",
Help: "Counter of the number of the stat task.",
})

AnnounceTaskFailureCount = promauto.NewCounter(prometheus.CounterOpts{
StatTaskFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_task_failure_total",
Help: "Counter of the number of failed of the announce task.",
Name: "stat_task_failure_total",
Help: "Counter of the number of failed of the stat task.",
})

AnnounceHostCount = promauto.NewCounterVec(prometheus.CounterOpts{
Expand All @@ -174,20 +210,6 @@ var (
}, []string{"os", "platform", "platform_family", "platform_version",
"kernel_version", "git_version", "git_commit", "go_version", "build_platform"})

LeaveTaskCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "leave_task_total",
Help: "Counter of the number of the leaving task.",
}, []string{"tag", "app", "host_type"})

LeaveTaskFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "leave_task_failure_total",
Help: "Counter of the number of failed of the leaving task.",
}, []string{"tag", "app", "host_type"})

LeaveHostCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand All @@ -207,22 +229,22 @@ var (
Subsystem: types.SchedulerMetricsName,
Name: "traffic",
Help: "Counter of the number of traffic.",
}, []string{"task_tag", "task_app", "type"})
}, []string{"type", "task_type", "task_tag", "task_app", "host_type"})

HostTraffic = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "host_traffic",
Help: "Counter of the number of per host traffic.",
}, []string{"task_tag", "task_app", "type", "host_id", "host_ip"})
}, []string{"type", "task_type", "task_tag", "task_app", "host_type", "host_id", "host_ip", "host_name"})

DownloadTaskDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
DownloadPeerDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_task_duration_milliseconds",
Help: "Histogram of the time each task downloading.",
Name: "download_peer_duration_milliseconds",
Help: "Histogram of the time each peer downloading.",
Buckets: []float64{100, 200, 500, 1000, 1500, 2 * 1000, 3 * 1000, 5 * 1000, 10 * 1000, 20 * 1000, 60 * 1000, 120 * 1000, 300 * 1000},
}, []string{"tag", "app", "host_type"})
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"})

ConcurrentScheduleGauge = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: types.MetricsNamespace,
Expand Down
7 changes: 4 additions & 3 deletions scheduler/resource/seed_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,13 @@ func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task)
peer.PieceUpdatedAt.Store(time.Now())
task.StorePiece(piece)

// Statistical traffic metrics.
// Collect Traffic metrics.
trafficType := commonv2.TrafficType_BACK_TO_SOURCE
if pieceSeed.Reuse {
trafficType = commonv2.TrafficType_REMOTE_PEER
trafficType = commonv2.TrafficType_LOCAL_PEER
}
metrics.Traffic.WithLabelValues(peer.Task.Tag, peer.Task.Application, trafficType.String()).Add(float64(pieceSeed.PieceInfo.RangeSize))
metrics.Traffic.WithLabelValues(trafficType.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(pieceSeed.PieceInfo.RangeSize))
}

// Handle end of piece.
Expand Down
34 changes: 29 additions & 5 deletions scheduler/rpcserver/scheduler_server_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (

"google.golang.org/protobuf/types/known/emptypb"

commonv1 "d7y.io/api/pkg/apis/common/v1"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"

"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
"d7y.io/dragonfly/v2/scheduler/resource"
Expand Down Expand Up @@ -58,18 +60,24 @@ func (s *schedulerServerV1) RegisterPeerTask(ctx context.Context, req *scheduler

tag := req.UrlMeta.Tag
application := req.UrlMeta.Application
priority := req.UrlMeta.Priority

metrics.RegisterTaskCount.WithLabelValues(tag, application).Inc()
// Collect RegisterPeerCount metrics.
metrics.RegisterPeerCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(),
tag, application, types.HostTypeNormalName).Inc()
resp, err := s.service.RegisterPeerTask(ctx, req)
if err != nil {
metrics.RegisterTaskFailureCount.WithLabelValues(tag, application).Inc()
// Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(),
tag, application, types.HostTypeNormalName).Inc()
}

return resp, err
}

// ReportPieceResult handles the piece information reported by dfdaemon.
func (s *schedulerServerV1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultServer) error {
// Collect ConcurrentScheduleGauge metrics.
metrics.ConcurrentScheduleGauge.Inc()
defer metrics.ConcurrentScheduleGauge.Dec()

Expand All @@ -83,9 +91,11 @@ func (s *schedulerServerV1) ReportPeerResult(ctx context.Context, req *scheduler

// AnnounceTask informs scheduler a peer has completed task.
func (s *schedulerServerV1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequest) (*emptypb.Empty, error) {
metrics.AnnounceTaskCount.Inc()
// Collect AnnouncePeerCount metrics.
metrics.AnnouncePeerCount.Inc()
if err := s.service.AnnounceTask(ctx, req); err != nil {
metrics.AnnounceTaskFailureCount.Inc()
// Collect AnnouncePeerFailureCount metrics.
metrics.AnnouncePeerFailureCount.Inc()
return nil, err
}

Expand All @@ -94,9 +104,11 @@ func (s *schedulerServerV1) AnnounceTask(ctx context.Context, req *schedulerv1.A

// StatTask checks if the given task exists.
func (s *schedulerServerV1) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest) (*schedulerv1.Task, error) {
// Collect StatTaskCount metrics.
metrics.StatTaskCount.Inc()
resp, err := s.service.StatTask(ctx, req)
if err != nil {
// Collect StatTaskFailureCount metrics.
metrics.StatTaskFailureCount.Inc()
return nil, err
}
Expand All @@ -106,14 +118,24 @@ func (s *schedulerServerV1) StatTask(ctx context.Context, req *schedulerv1.StatT

// LeaveTask makes the peer unschedulable.
func (s *schedulerServerV1) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) (*emptypb.Empty, error) {
return new(emptypb.Empty), s.service.LeaveTask(ctx, req)
// Collect LeavePeerCount metrics.
metrics.LeavePeerCount.Inc()
if err := s.service.LeaveTask(ctx, req); err != nil {
// Collect LeavePeerFailureCount metrics.
metrics.LeavePeerFailureCount.Inc()
return nil, err
}

return new(emptypb.Empty), nil
}

// AnnounceHost announces host to scheduler.
func (s *schedulerServerV1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequest) (*emptypb.Empty, error) {
// Collect AnnounceHostCount metrics.
metrics.AnnounceHostCount.WithLabelValues(req.Os, req.Platform, req.PlatformFamily, req.PlatformVersion,
req.KernelVersion, req.Build.GitVersion, req.Build.GitCommit, req.Build.GoVersion, req.Build.Platform).Inc()
if err := s.service.AnnounceHost(ctx, req); err != nil {
// Collect AnnounceHostFailureCount metrics.
metrics.AnnounceHostFailureCount.WithLabelValues(req.Os, req.Platform, req.PlatformFamily, req.PlatformVersion,
req.KernelVersion, req.Build.GitVersion, req.Build.GitCommit, req.Build.GoVersion, req.Build.Platform).Inc()
return nil, err
Expand All @@ -124,8 +146,10 @@ func (s *schedulerServerV1) AnnounceHost(ctx context.Context, req *schedulerv1.A

// LeaveHost releases host in scheduler.
func (s *schedulerServerV1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) (*emptypb.Empty, error) {
// Collect LeaveHostCount metrics.
metrics.LeaveHostCount.Inc()
if err := s.service.LeaveHost(ctx, req); err != nil {
// Collect LeaveHostFailureCount metrics.
metrics.LeaveHostFailureCount.Inc()
return nil, err
}
Expand Down
Loading

0 comments on commit 21e03e2

Please sign in to comment.