From e7ca2a6fcdf226ecd94672f2b003f046b6b240c7 Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 5 Jul 2023 13:59:58 +0800 Subject: [PATCH] feat: implement FindProbedHosts and add LoadRandomHosts to host manager Signed-off-by: Gaius --- pkg/slices/slices.go | 5 + .../mocks/network_topology_mock.go | 15 +-- scheduler/networktopology/network_topology.go | 65 +++++++++-- scheduler/resource/host_manager.go | 29 +++++ scheduler/resource/host_manager_mock.go | 15 +++ scheduler/service/service_v1.go | 29 ++--- scheduler/service/service_v2.go | 109 ++++++++---------- 7 files changed, 173 insertions(+), 94 deletions(-) diff --git a/pkg/slices/slices.go b/pkg/slices/slices.go index c10516f549d..bb886a69da3 100644 --- a/pkg/slices/slices.go +++ b/pkg/slices/slices.go @@ -56,6 +56,11 @@ func RemoveDuplicates[T comparable](s []T) []T { return result } +// Remove removes an element from a collection. +func Remove[T comparable](s []T, i int) []T { + return append(s[:i], s[i+1:]...) +} + // Reverse reverses elements in a collection. func Reverse[S ~[]T, T any](s S) { for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { diff --git a/scheduler/networktopology/mocks/network_topology_mock.go b/scheduler/networktopology/mocks/network_topology_mock.go index 0b9c28d0fbd..841b637dc1e 100644 --- a/scheduler/networktopology/mocks/network_topology_mock.go +++ b/scheduler/networktopology/mocks/network_topology_mock.go @@ -9,6 +9,7 @@ import ( time "time" networktopology "d7y.io/dragonfly/v2/scheduler/networktopology" + resource "d7y.io/dragonfly/v2/scheduler/resource" gomock "github.com/golang/mock/gomock" ) @@ -49,19 +50,19 @@ func (mr *MockNetworkTopologyMockRecorder) DeleteHost(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteHost", reflect.TypeOf((*MockNetworkTopology)(nil).DeleteHost), arg0) } -// FindProbedHostIDs mocks base method. -func (m *MockNetworkTopology) FindProbedHostIDs(arg0 string) ([]string, error) { +// FindProbedHosts mocks base method. +func (m *MockNetworkTopology) FindProbedHosts(arg0 string) ([]*resource.Host, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FindProbedHostIDs", arg0) - ret0, _ := ret[0].([]string) + ret := m.ctrl.Call(m, "FindProbedHosts", arg0) + ret0, _ := ret[0].([]*resource.Host) ret1, _ := ret[1].(error) return ret0, ret1 } -// FindProbedHostIDs indicates an expected call of FindProbedHostIDs. -func (mr *MockNetworkTopologyMockRecorder) FindProbedHostIDs(arg0 interface{}) *gomock.Call { +// FindProbedHosts indicates an expected call of FindProbedHosts. +func (mr *MockNetworkTopologyMockRecorder) FindProbedHosts(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindProbedHostIDs", reflect.TypeOf((*MockNetworkTopology)(nil).FindProbedHostIDs), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindProbedHosts", reflect.TypeOf((*MockNetworkTopology)(nil).FindProbedHosts), arg0) } // Has mocks base method. diff --git a/scheduler/networktopology/network_topology.go b/scheduler/networktopology/network_topology.go index c25d3df6df2..f56ffdb91d5 100644 --- a/scheduler/networktopology/network_topology.go +++ b/scheduler/networktopology/network_topology.go @@ -20,14 +20,18 @@ package networktopology import ( "context" + "errors" "math" + "sort" "time" "github.com/go-redis/redis/v8" "github.com/google/uuid" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/container/set" pkgredis "d7y.io/dragonfly/v2/pkg/redis" + "d7y.io/dragonfly/v2/pkg/slices" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/storage" @@ -39,6 +43,9 @@ const ( // snapshotContextTimeout is the timeout of snapshot network topology. snapshotContextTimeout = 20 * time.Minute + + // findProbedCandidateHostsLimit is the limit of find probed candidate hosts. + findProbedCandidateHostsLimit = 50 ) // NetworkTopology is an interface for network topology. @@ -55,9 +62,9 @@ type NetworkTopology interface { // Store stores source host and destination host. Store(string, string) error - // TODO Implement function. - // FindProbedHostIDs finds the most candidate destination host to be probed. - FindProbedHostIDs(string) ([]string, error) + // FindProbedHosts finds the most candidate destination host to be probed, randomly find a range of hosts, + // and then return the host with a smaller probed count. + FindProbedHosts(string) ([]*resource.Host, error) // DeleteHost deletes source host and all destination host connected to source host. DeleteHost(string) error @@ -161,10 +168,54 @@ func (nt *networkTopology) Store(srcHostID string, destHostID string) error { return nil } -// TODO Implement function. -// FindProbedHostIDs finds the most candidate destination host to be probed. -func (nt *networkTopology) FindProbedHostIDs(hostID string) ([]string, error) { - return nil, nil +// FindProbedHosts finds the most candidate destination host to be probed, randomly find a range of hosts, +// and then return the host with a smaller probed count. +func (nt *networkTopology) FindProbedHosts(hostID string) ([]*resource.Host, error) { + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) + defer cancel() + + blocklist := set.NewSafeSet[string]() + blocklist.Add(hostID) + candidateHosts := nt.resource.HostManager().LoadRandomHosts(findProbedCandidateHostsLimit, blocklist) + if len(candidateHosts) == 0 { + return nil, errors.New("probed hosts not found") + } + + count := nt.config.Probe.Count + if len(candidateHosts) <= count { + return candidateHosts, nil + } + + var probedCountKeys []string + for _, candidateHost := range candidateHosts { + probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(candidateHost.ID)) + } + + rawProbedCounts, err := nt.rdb.MGet(ctx, probedCountKeys...).Result() + if err != nil { + return nil, err + } + + // Filter invalid probed count. If probed key not exist, the probed count is nil. + probedCounts := make([]interface{}, len(rawProbedCounts)) + for i, rawProbedCount := range rawProbedCounts { + probeCount, ok := rawProbedCount.(uint64) + if !ok { + slices.Remove(candidateHosts, i) + count-- + logger.Error("invalid probed count") + continue + } + + probedCounts = append(probedCounts, probeCount) + } + + // Sort candidate hosts by probed count. + sort.Slice(candidateHosts, func(i, j int) bool { + return probedCounts[i].(uint64) < probedCounts[j].(uint64) + }) + + return candidateHosts[:count], nil } // DeleteHost deletes source host and all destination host connected to source host. diff --git a/scheduler/resource/host_manager.go b/scheduler/resource/host_manager.go index ee066b42ecc..ecd4286897f 100644 --- a/scheduler/resource/host_manager.go +++ b/scheduler/resource/host_manager.go @@ -21,6 +21,7 @@ package resource import ( "sync" + "d7y.io/dragonfly/v2/pkg/container/set" pkggc "d7y.io/dragonfly/v2/pkg/gc" "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" @@ -47,6 +48,9 @@ type HostManager interface { // Delete deletes host for a key. Delete(string) + // LoadRandomHosts loads host randomly through the Range of sync.Map. + LoadRandomHosts(int, set.SafeSet[string]) []*Host + // Try to reclaim host. RunGC() error } @@ -103,6 +107,31 @@ func (h *hostManager) Delete(key string) { h.Map.Delete(key) } +// LoadRandomHosts loads host randomly through the Range of sync.Map. +func (h *hostManager) LoadRandomHosts(n int, blocklist set.SafeSet[string]) []*Host { + hosts := make([]*Host, 0, n) + h.Map.Range(func(key, value any) bool { + if len(hosts) >= n { + return false + } + + host, ok := value.(*Host) + if !ok { + host.Log.Error("invalid host") + return true + } + + if blocklist.Contains(host.ID) { + return true + } + + hosts = append(hosts, host) + return true + }) + + return hosts +} + // Try to reclaim host. func (h *hostManager) RunGC() error { h.Map.Range(func(_, value any) bool { diff --git a/scheduler/resource/host_manager_mock.go b/scheduler/resource/host_manager_mock.go index 16ce040559f..89948df8ecc 100644 --- a/scheduler/resource/host_manager_mock.go +++ b/scheduler/resource/host_manager_mock.go @@ -7,6 +7,7 @@ package resource import ( reflect "reflect" + set "d7y.io/dragonfly/v2/pkg/container/set" gomock "github.com/golang/mock/gomock" ) @@ -75,6 +76,20 @@ func (mr *MockHostManagerMockRecorder) LoadOrStore(arg0 interface{}) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadOrStore", reflect.TypeOf((*MockHostManager)(nil).LoadOrStore), arg0) } +// LoadRandomHosts mocks base method. +func (m *MockHostManager) LoadRandomHosts(arg0 int, arg1 set.SafeSet[string]) []*Host { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadRandomHosts", arg0, arg1) + ret0, _ := ret[0].([]*Host) + return ret0 +} + +// LoadRandomHosts indicates an expected call of LoadRandomHosts. +func (mr *MockHostManagerMockRecorder) LoadRandomHosts(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadRandomHosts", reflect.TypeOf((*MockHostManager)(nil).LoadRandomHosts), arg0, arg1) +} + // RunGC mocks base method. func (m *MockHostManager) RunGC() error { m.ctrl.T.Helper() diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 29b6b14f553..8e31e6d571a 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -677,36 +677,25 @@ func (v *V1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error { // Find probed hosts in network topology. Based on the source host information, // the most candidate hosts will be evaluated. logger.Info("receive SyncProbesRequest_ProbeStartedRequest") - probedHostIDs, err := v.networkTopology.FindProbedHostIDs(req.Host.Id) + hosts, err := v.networkTopology.FindProbedHosts(req.Host.Id) if err != nil { logger.Error(err) return status.Error(codes.FailedPrecondition, err.Error()) } var probedHosts []*commonv1.Host - for _, probedHostID := range probedHostIDs { - probedHost, loaded := v.resource.HostManager().Load(probedHostID) - if !loaded { - logger.Warnf("probed host %s not found", probedHostID) - continue - } - + for _, host := range hosts { probedHosts = append(probedHosts, &commonv1.Host{ - Id: probedHost.ID, - Ip: probedHost.IP, - Hostname: probedHost.Hostname, - Port: probedHost.Port, - DownloadPort: probedHost.DownloadPort, - Location: probedHost.Network.Location, - Idc: probedHost.Network.IDC, + Id: host.ID, + Ip: host.IP, + Hostname: host.Hostname, + Port: host.Port, + DownloadPort: host.DownloadPort, + Location: host.Network.Location, + Idc: host.Network.IDC, }) } - if len(probedHosts) == 0 { - logger.Error("probed host not found") - return status.Error(codes.NotFound, "probed host not found") - } - logger.Infof("probe started: %#v", probedHosts) if err := stream.Send(&schedulerv1.SyncProbesResponse{ Hosts: probedHosts, diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 282c6bcd24d..27f000e3289 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -666,88 +666,77 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error { // Find probed hosts in network topology. Based on the source host information, // the most candidate hosts will be evaluated. logger.Info("receive SyncProbesRequest_ProbeStartedRequest") - probedHostIDs, err := v.networkTopology.FindProbedHostIDs(req.Host.Id) + hosts, err := v.networkTopology.FindProbedHosts(req.Host.Id) if err != nil { logger.Error(err) return status.Error(codes.FailedPrecondition, err.Error()) } var probedHosts []*commonv2.Host - for _, probedHostID := range probedHostIDs { - probedHost, loaded := v.resource.HostManager().Load(probedHostID) - if !loaded { - logger.Warnf("probed host %s not found", probedHostID) - continue - } - + for _, host := range hosts { probedHosts = append(probedHosts, &commonv2.Host{ - Id: probedHost.ID, - Type: uint32(probedHost.Type), - Hostname: probedHost.Hostname, - Ip: probedHost.IP, - Port: probedHost.Port, - DownloadPort: probedHost.DownloadPort, - Os: probedHost.OS, - Platform: probedHost.Platform, - PlatformFamily: probedHost.PlatformFamily, - PlatformVersion: probedHost.PlatformVersion, - KernelVersion: probedHost.KernelVersion, + Id: host.ID, + Type: uint32(host.Type), + Hostname: host.Hostname, + Ip: host.IP, + Port: host.Port, + DownloadPort: host.DownloadPort, + Os: host.OS, + Platform: host.Platform, + PlatformFamily: host.PlatformFamily, + PlatformVersion: host.PlatformVersion, + KernelVersion: host.KernelVersion, Cpu: &commonv2.CPU{ - LogicalCount: probedHost.CPU.LogicalCount, - PhysicalCount: probedHost.CPU.PhysicalCount, - Percent: probedHost.CPU.Percent, - ProcessPercent: probedHost.CPU.ProcessPercent, + LogicalCount: host.CPU.LogicalCount, + PhysicalCount: host.CPU.PhysicalCount, + Percent: host.CPU.Percent, + ProcessPercent: host.CPU.ProcessPercent, Times: &commonv2.CPUTimes{ - User: probedHost.CPU.Times.User, - System: probedHost.CPU.Times.System, - Idle: probedHost.CPU.Times.Idle, - Nice: probedHost.CPU.Times.Nice, - Iowait: probedHost.CPU.Times.Iowait, - Irq: probedHost.CPU.Times.Irq, - Softirq: probedHost.CPU.Times.Softirq, - Steal: probedHost.CPU.Times.Steal, - Guest: probedHost.CPU.Times.Guest, - GuestNice: probedHost.CPU.Times.GuestNice, + User: host.CPU.Times.User, + System: host.CPU.Times.System, + Idle: host.CPU.Times.Idle, + Nice: host.CPU.Times.Nice, + Iowait: host.CPU.Times.Iowait, + Irq: host.CPU.Times.Irq, + Softirq: host.CPU.Times.Softirq, + Steal: host.CPU.Times.Steal, + Guest: host.CPU.Times.Guest, + GuestNice: host.CPU.Times.GuestNice, }, }, Memory: &commonv2.Memory{ - Total: probedHost.Memory.Total, - Available: probedHost.Memory.Available, - Used: probedHost.Memory.Used, - UsedPercent: probedHost.Memory.UsedPercent, - ProcessUsedPercent: probedHost.Memory.ProcessUsedPercent, - Free: probedHost.Memory.Free, + Total: host.Memory.Total, + Available: host.Memory.Available, + Used: host.Memory.Used, + UsedPercent: host.Memory.UsedPercent, + ProcessUsedPercent: host.Memory.ProcessUsedPercent, + Free: host.Memory.Free, }, Network: &commonv2.Network{ - TcpConnectionCount: probedHost.Network.TCPConnectionCount, - UploadTcpConnectionCount: probedHost.Network.UploadTCPConnectionCount, - Location: probedHost.Network.Location, - Idc: probedHost.Network.IDC, + TcpConnectionCount: host.Network.TCPConnectionCount, + UploadTcpConnectionCount: host.Network.UploadTCPConnectionCount, + Location: host.Network.Location, + Idc: host.Network.IDC, }, Disk: &commonv2.Disk{ - Total: probedHost.Disk.Total, - Free: probedHost.Disk.Free, - Used: probedHost.Disk.Used, - UsedPercent: probedHost.Disk.UsedPercent, - InodesTotal: probedHost.Disk.InodesTotal, - InodesUsed: probedHost.Disk.InodesUsed, - InodesFree: probedHost.Disk.InodesFree, - InodesUsedPercent: probedHost.Disk.InodesUsedPercent, + Total: host.Disk.Total, + Free: host.Disk.Free, + Used: host.Disk.Used, + UsedPercent: host.Disk.UsedPercent, + InodesTotal: host.Disk.InodesTotal, + InodesUsed: host.Disk.InodesUsed, + InodesFree: host.Disk.InodesFree, + InodesUsedPercent: host.Disk.InodesUsedPercent, }, Build: &commonv2.Build{ - GitVersion: probedHost.Build.GitVersion, - GitCommit: probedHost.Build.GitCommit, - GoVersion: probedHost.Build.GoVersion, - Platform: probedHost.Build.Platform, + GitVersion: host.Build.GitVersion, + GitCommit: host.Build.GitCommit, + GoVersion: host.Build.GoVersion, + Platform: host.Build.Platform, }, }) } - if len(probedHosts) == 0 { - logger.Error("probed host not found") - return status.Error(codes.NotFound, "probed host not found") - } - logger.Infof("probe started: %#v", probedHosts) if err := stream.Send(&schedulerv2.SyncProbesResponse{ Hosts: probedHosts,