Skip to content

Commit

Permalink
feat: implement FindProbedHosts and add LoadRandomHosts to host manager
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jul 5, 2023
1 parent d74ef98 commit e7ca2a6
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 94 deletions.
5 changes: 5 additions & 0 deletions pkg/slices/slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func RemoveDuplicates[T comparable](s []T) []T {
return result
}

// Remove removes an element from a collection.
func Remove[T comparable](s []T, i int) []T {
return append(s[:i], s[i+1:]...)
}

// Reverse reverses elements in a collection.
func Reverse[S ~[]T, T any](s S) {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
Expand Down
15 changes: 8 additions & 7 deletions scheduler/networktopology/mocks/network_topology_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 58 additions & 7 deletions scheduler/networktopology/network_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ package networktopology

import (
"context"
"errors"
"math"
"sort"
"time"

"github.com/go-redis/redis/v8"
"github.com/google/uuid"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/container/set"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
"d7y.io/dragonfly/v2/pkg/slices"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/storage"
Expand All @@ -39,6 +43,9 @@ const (

// snapshotContextTimeout is the timeout of snapshot network topology.
snapshotContextTimeout = 20 * time.Minute

// findProbedCandidateHostsLimit is the limit of find probed candidate hosts.
findProbedCandidateHostsLimit = 50
)

// NetworkTopology is an interface for network topology.
Expand All @@ -55,9 +62,9 @@ type NetworkTopology interface {
// Store stores source host and destination host.
Store(string, string) error

// TODO Implement function.
// FindProbedHostIDs finds the most candidate destination host to be probed.
FindProbedHostIDs(string) ([]string, error)
// FindProbedHosts finds the most candidate destination host to be probed, randomly find a range of hosts,
// and then return the host with a smaller probed count.
FindProbedHosts(string) ([]*resource.Host, error)

// DeleteHost deletes source host and all destination host connected to source host.
DeleteHost(string) error
Expand Down Expand Up @@ -161,10 +168,54 @@ func (nt *networkTopology) Store(srcHostID string, destHostID string) error {
return nil
}

// TODO Implement function.
// FindProbedHostIDs finds the most candidate destination host to be probed.
func (nt *networkTopology) FindProbedHostIDs(hostID string) ([]string, error) {
return nil, nil
// FindProbedHosts finds the most candidate destination host to be probed, randomly find a range of hosts,
// and then return the host with a smaller probed count.
func (nt *networkTopology) FindProbedHosts(hostID string) ([]*resource.Host, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

blocklist := set.NewSafeSet[string]()
blocklist.Add(hostID)
candidateHosts := nt.resource.HostManager().LoadRandomHosts(findProbedCandidateHostsLimit, blocklist)
if len(candidateHosts) == 0 {
return nil, errors.New("probed hosts not found")
}

count := nt.config.Probe.Count
if len(candidateHosts) <= count {
return candidateHosts, nil
}

var probedCountKeys []string
for _, candidateHost := range candidateHosts {
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(candidateHost.ID))
}

rawProbedCounts, err := nt.rdb.MGet(ctx, probedCountKeys...).Result()
if err != nil {
return nil, err
}

// Filter invalid probed count. If probed key not exist, the probed count is nil.
probedCounts := make([]interface{}, len(rawProbedCounts))
for i, rawProbedCount := range rawProbedCounts {
probeCount, ok := rawProbedCount.(uint64)
if !ok {
slices.Remove(candidateHosts, i)
count--
logger.Error("invalid probed count")
continue
}

probedCounts = append(probedCounts, probeCount)
}

// Sort candidate hosts by probed count.
sort.Slice(candidateHosts, func(i, j int) bool {
return probedCounts[i].(uint64) < probedCounts[j].(uint64)
})

return candidateHosts[:count], nil
}

// DeleteHost deletes source host and all destination host connected to source host.
Expand Down
29 changes: 29 additions & 0 deletions scheduler/resource/host_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package resource
import (
"sync"

"d7y.io/dragonfly/v2/pkg/container/set"
pkggc "d7y.io/dragonfly/v2/pkg/gc"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
Expand All @@ -47,6 +48,9 @@ type HostManager interface {
// Delete deletes host for a key.
Delete(string)

// LoadRandomHosts loads host randomly through the Range of sync.Map.
LoadRandomHosts(int, set.SafeSet[string]) []*Host

// Try to reclaim host.
RunGC() error
}
Expand Down Expand Up @@ -103,6 +107,31 @@ func (h *hostManager) Delete(key string) {
h.Map.Delete(key)
}

// LoadRandomHosts loads host randomly through the Range of sync.Map.
func (h *hostManager) LoadRandomHosts(n int, blocklist set.SafeSet[string]) []*Host {
hosts := make([]*Host, 0, n)
h.Map.Range(func(key, value any) bool {
if len(hosts) >= n {
return false
}

host, ok := value.(*Host)
if !ok {
host.Log.Error("invalid host")
return true
}

if blocklist.Contains(host.ID) {
return true
}

hosts = append(hosts, host)
return true
})

return hosts
}

// Try to reclaim host.
func (h *hostManager) RunGC() error {
h.Map.Range(func(_, value any) bool {
Expand Down
15 changes: 15 additions & 0 deletions scheduler/resource/host_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 9 additions & 20 deletions scheduler/service/service_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,36 +677,25 @@ func (v *V1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error {
// Find probed hosts in network topology. Based on the source host information,
// the most candidate hosts will be evaluated.
logger.Info("receive SyncProbesRequest_ProbeStartedRequest")
probedHostIDs, err := v.networkTopology.FindProbedHostIDs(req.Host.Id)
hosts, err := v.networkTopology.FindProbedHosts(req.Host.Id)
if err != nil {
logger.Error(err)
return status.Error(codes.FailedPrecondition, err.Error())
}

var probedHosts []*commonv1.Host
for _, probedHostID := range probedHostIDs {
probedHost, loaded := v.resource.HostManager().Load(probedHostID)
if !loaded {
logger.Warnf("probed host %s not found", probedHostID)
continue
}

for _, host := range hosts {
probedHosts = append(probedHosts, &commonv1.Host{
Id: probedHost.ID,
Ip: probedHost.IP,
Hostname: probedHost.Hostname,
Port: probedHost.Port,
DownloadPort: probedHost.DownloadPort,
Location: probedHost.Network.Location,
Idc: probedHost.Network.IDC,
Id: host.ID,
Ip: host.IP,
Hostname: host.Hostname,
Port: host.Port,
DownloadPort: host.DownloadPort,
Location: host.Network.Location,
Idc: host.Network.IDC,
})
}

if len(probedHosts) == 0 {
logger.Error("probed host not found")
return status.Error(codes.NotFound, "probed host not found")
}

logger.Infof("probe started: %#v", probedHosts)
if err := stream.Send(&schedulerv1.SyncProbesResponse{
Hosts: probedHosts,
Expand Down
Loading

0 comments on commit e7ca2a6

Please sign in to comment.