Skip to content

Commit

Permalink
feat: implement SyncProbes api in scheduler grpc service (#2449)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
Signed-off-by: XZ <[email protected]>
Co-authored-by: dlut_xz <[email protected]>
  • Loading branch information
gaius-qi and fcgxz2003 authored Jun 14, 2023
1 parent f53d966 commit 47e5427
Show file tree
Hide file tree
Showing 15 changed files with 1,543 additions and 103 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.20

require (
d7y.io/api v1.9.0
d7y.io/api v1.9.2
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
github.com/VividCortex/mysqlerr v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api v1.9.0 h1:AzAtFVHXCUM+L5r82IuGp9FfiexfWAYBP7WQ7CYmMaw=
d7y.io/api v1.9.0/go.mod h1:6bn5Z+OyjyvlB1UMxUZsFbyx47qjkpNEvC25hq5Qxy0=
d7y.io/api v1.9.2 h1:JB7sSKY4P9y2J0xsMRVxVpWkgnLPUEAYW2aIB2mEA/4=
d7y.io/api v1.9.2/go.mod h1:6bn5Z+OyjyvlB1UMxUZsFbyx47qjkpNEvC25hq5Qxy0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
14 changes: 14 additions & 0 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,20 @@ var (
Help: "Counter of the number of failed of the leaving host.",
})

SyncProbesCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "sync_probes_total",
Help: "Counter of the number of the synchronizing probes.",
})

SyncProbesFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "sync_probes_failure_total",
Help: "Counter of the number of failed of the synchronizing probes.",
})

Traffic = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand Down
27 changes: 19 additions & 8 deletions scheduler/networktopology/mocks/network_topology_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 15 additions & 6 deletions scheduler/networktopology/network_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,21 @@ const (
// NetworkTopology is an interface for network topology.
type NetworkTopology interface {
// Started network topology server.
Serve() error
Serve()

// Stop network topology server.
Stop() error
Stop()

// Has to check if there is a connection between source host and destination host.
Has(string, string) bool

// Store stores source host and destination host.
Store(string, string) error

// TODO Implement function.
// FindProbedHostIDs finds the most candidate destination host to be probed.
FindProbedHostIDs(string) ([]string, error)

// DeleteHost deletes source host and all destination host connected to source host.
DeleteHost(string) error

Expand Down Expand Up @@ -100,7 +104,7 @@ func NewNetworkTopology(cfg config.NetworkTopologyConfig, rdb redis.UniversalCli
}

// Started network topology server.
func (nt *networkTopology) Serve() error {
func (nt *networkTopology) Serve() {
logger.Info("collect network topology records")
tick := time.NewTicker(nt.config.CollectInterval)
for {
Expand All @@ -111,15 +115,14 @@ func (nt *networkTopology) Serve() error {
break
}
case <-nt.done:
return nil
return
}
}
}

// Stop network topology server.
func (nt *networkTopology) Stop() error {
func (nt *networkTopology) Stop() {
close(nt.done)
return nil
}

// Has to check if there is a connection between source host and destination host.
Expand Down Expand Up @@ -157,6 +160,12 @@ func (nt *networkTopology) Store(srcHostID string, destHostID string) error {
return nil
}

// TODO Implement function.
// FindProbedHostIDs finds the most candidate destination host to be probed.
func (nt *networkTopology) FindProbedHostIDs(hostID string) ([]string, error) {
return nil, nil
}

// DeleteHost deletes source host and all destination host connected to source host.
func (nt *networkTopology) DeleteHost(hostID string) error {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
Expand Down
12 changes: 3 additions & 9 deletions scheduler/networktopology/network_topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ func TestNetworkTopology_Serve(t *testing.T) {
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)
go func() {
assert.NoError(networkTopology.Serve())
}()
go networkTopology.Serve()
},
},
{
Expand Down Expand Up @@ -135,9 +133,7 @@ func TestNetworkTopology_Serve(t *testing.T) {
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)
go func() {
assert.NoError(networkTopology.Serve())
}()
go networkTopology.Serve()
},
},
}
Expand All @@ -157,9 +153,7 @@ func TestNetworkTopology_Serve(t *testing.T) {
networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage)
tc.expect(t, networkTopology, err)
tc.sleep()
if err := networkTopology.Stop(); err != nil {
t.Fatal(err)
}
networkTopology.Stop()
})
}
}
Expand Down
6 changes: 4 additions & 2 deletions scheduler/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"d7y.io/dragonfly/v2/pkg/rpc/scheduler/server"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/networktopology"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
Expand All @@ -33,10 +34,11 @@ func New(
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
networkTopology networktopology.NetworkTopology,
opts ...grpc.ServerOption,
) *grpc.Server {
return server.New(
newSchedulerServerV1(cfg, resource, scheduling, dynconfig, storage),
newSchedulerServerV2(cfg, resource, scheduling, dynconfig, storage),
newSchedulerServerV1(cfg, resource, scheduling, dynconfig, storage, networkTopology),
newSchedulerServerV2(cfg, resource, scheduling, dynconfig, storage, networkTopology),
opts...)
}
4 changes: 3 additions & 1 deletion scheduler/rpcserver/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"d7y.io/dragonfly/v2/scheduler/config"
configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
networktopologymocks "d7y.io/dragonfly/v2/scheduler/networktopology/mocks"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks"
Expand Down Expand Up @@ -62,8 +63,9 @@ func TestRPCServer_New(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)

svr := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svr := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.expect(t, svr)
})
}
Expand Down
13 changes: 11 additions & 2 deletions scheduler/rpcserver/scheduler_server_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
"d7y.io/dragonfly/v2/scheduler/networktopology"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/service"
Expand All @@ -47,8 +48,9 @@ func newSchedulerServerV1(
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
networkTopology networktopology.NetworkTopology,
) schedulerv1.SchedulerServer {
return &schedulerServerV1{service.NewV1(cfg, resource, scheduling, dynconfig, storage)}
return &schedulerServerV1{service.NewV1(cfg, resource, scheduling, dynconfig, storage, networkTopology)}
}

// RegisterPeerTask registers peer and triggers seed peer download task.
Expand Down Expand Up @@ -157,8 +159,15 @@ func (s *schedulerServerV1) LeaveHost(ctx context.Context, req *schedulerv1.Leav
return new(emptypb.Empty), nil
}

// TODO Implement SyncProbes
// SyncProbes sync probes of the host.
func (s *schedulerServerV1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error {
// Collect SyncProbesCount metrics.
metrics.SyncProbesCount.Inc()
if err := s.service.SyncProbes(stream); err != nil {
// Collect SyncProbesFailureCount metrics.
metrics.SyncProbesFailureCount.Inc()
return err
}

return nil
}
12 changes: 11 additions & 1 deletion scheduler/rpcserver/scheduler_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
"d7y.io/dragonfly/v2/scheduler/networktopology"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/service"
Expand All @@ -46,8 +47,9 @@ func newSchedulerServerV2(
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
networkTopology networktopology.NetworkTopology,
) schedulerv2.SchedulerServer {
return &schedulerServerV2{service.NewV2(cfg, resource, scheduling, dynconfig, storage)}
return &schedulerServerV2{service.NewV2(cfg, resource, scheduling, dynconfig, storage, networkTopology)}
}

// AnnouncePeer announces peer to scheduler.
Expand Down Expand Up @@ -152,5 +154,13 @@ func (s *schedulerServerV2) LeaveHost(ctx context.Context, req *schedulerv2.Leav

// SyncProbes sync probes of the host.
func (s *schedulerServerV2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
// Collect SyncProbesCount metrics.
metrics.SyncProbesCount.Inc()
if err := s.service.SyncProbes(stream); err != nil {
// Collect SyncProbesFailureCount metrics.
metrics.SyncProbesFailureCount.Inc()
return err
}

return nil
}
Loading

0 comments on commit 47e5427

Please sign in to comment.