From dfde8bdce65a2077f35ff96cfbedd8e561767cd7 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 22 Aug 2023 19:58:27 +0800 Subject: [PATCH] feat: add sync peer job for scheduler (#2663) Signed-off-by: Gaius --- internal/job/constants.go | 4 + manager/config/config.go | 16 ++++ manager/config/config_test.go | 18 ++++ manager/config/constant_otel.go | 1 + manager/config/constants.go | 6 ++ manager/config/testdata/manager.yaml | 3 + manager/job/job.go | 36 +++++++- manager/job/mocks/sync_peers_mock.go | 58 +++++++++++++ manager/job/preheat.go | 32 ++++---- manager/job/sync_peers.go | 104 ++++++++++++++++++++++++ manager/service/mocks/service_mock.go | 42 ++++++++-- scheduler/job/job.go | 25 +++++- scheduler/resource/host_manager.go | 10 +++ scheduler/resource/host_manager_mock.go | 12 +++ scheduler/resource/peer_manager.go | 10 +++ scheduler/resource/peer_manager_mock.go | 12 +++ scheduler/resource/task_manager.go | 10 +++ scheduler/resource/task_manager_mock.go | 12 +++ 18 files changed, 384 insertions(+), 27 deletions(-) create mode 100644 manager/job/mocks/sync_peers_mock.go create mode 100644 manager/job/sync_peers.go diff --git a/internal/job/constants.go b/internal/job/constants.go index 21973539374..d977f443638 100644 --- a/internal/job/constants.go +++ b/internal/job/constants.go @@ -24,7 +24,11 @@ const ( // Job Name. const ( + // PreheatJob is the name of preheat job. PreheatJob = "preheat" + + // SyncPeersJob is the name of syncing peers job. + SyncPeersJob = "sync_peers" ) // Machinery server configuration. diff --git a/manager/config/config.go b/manager/config/config.go index feca20aa5b1..d85520fc7c6 100644 --- a/manager/config/config.go +++ b/manager/config/config.go @@ -284,6 +284,9 @@ type TCPListenPortRange struct { type JobConfig struct { // Preheat configuration. Preheat PreheatConfig `yaml:"preheat" mapstructure:"preheat"` + + // Sync peers configuration. + SyncPeers SyncPeersConfig `yaml:"syncPeers" mapstructure:"syncPeers"` } type PreheatConfig struct { @@ -294,6 +297,12 @@ type PreheatConfig struct { TLS *PreheatTLSClientConfig `yaml:"tls" mapstructure:"tls"` } +type SyncPeersConfig struct { + // Interval is the interval for syncing all peers information from the scheduler and + // display peers information in the manager console. + Interval time.Duration `yaml:"interval" mapstructure:"interval"` +} + type PreheatTLSClientConfig struct { // CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string. CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"` @@ -427,6 +436,9 @@ func New() *Config { Preheat: PreheatConfig{ RegistryTimeout: DefaultJobPreheatRegistryTimeout, }, + SyncPeers: SyncPeersConfig{ + Interval: DefaultJobSyncPeersInterval, + }, }, ObjectStorage: ObjectStorageConfig{ Enable: false, @@ -607,6 +619,10 @@ func (cfg *Config) Validate() error { return errors.New("preheat requires parameter registryTimeout") } + if cfg.Job.SyncPeers.Interval <= MinJobSyncPeersInterval { + return errors.New("syncPeers requires parameter interval and it must be greater than 12 hours") + } + if cfg.ObjectStorage.Enable { if cfg.ObjectStorage.Name == "" { return errors.New("objectStorage requires parameter name") diff --git a/manager/config/config_test.go b/manager/config/config_test.go index 9170a49af1e..5d7c948c869 100644 --- a/manager/config/config_test.go +++ b/manager/config/config_test.go @@ -194,6 +194,9 @@ func TestConfig_Load(t *testing.T) { CACert: "foo", }, }, + SyncPeers: SyncPeersConfig{ + Interval: 13 * time.Hour, + }, }, ObjectStorage: ObjectStorageConfig{ Enable: true, @@ -741,6 +744,21 @@ func TestConfig_Validate(t *testing.T) { assert.EqualError(err, "preheat requires parameter registryTimeout") }, }, + { + name: "syncPeers requires parameter interval", + config: New(), + mock: func(cfg *Config) { + cfg.Auth.JWT = mockJWTConfig + cfg.Database.Type = DatabaseTypeMysql + cfg.Database.Mysql = mockMysqlConfig + cfg.Database.Redis = mockRedisConfig + cfg.Job.SyncPeers.Interval = 11 * time.Hour + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "syncPeers requires parameter interval and it must be greater than 12 hours") + }, + }, { name: "objectStorage requires parameter name", config: New(), diff --git a/manager/config/constant_otel.go b/manager/config/constant_otel.go index 086c38fdcde..071c4f9bf92 100644 --- a/manager/config/constant_otel.go +++ b/manager/config/constant_otel.go @@ -26,6 +26,7 @@ const ( const ( SpanPreheat = "preheat" + SpanSyncPeers = "sync-peers" SpanGetLayers = "get-layers" SpanAuthWithRegistry = "auth-with-registry" ) diff --git a/manager/config/constants.go b/manager/config/constants.go index 81fa53604a8..3954069b180 100644 --- a/manager/config/constants.go +++ b/manager/config/constants.go @@ -92,6 +92,12 @@ const ( const ( // DefaultJobPreheatRegistryTimeout is the default timeout for requesting registry to get token and manifest. DefaultJobPreheatRegistryTimeout = 1 * time.Minute + + // DefaultJobSyncPeersInterval is the default interval for syncing all peers information from the scheduler. + DefaultJobSyncPeersInterval = 24 * time.Hour + + // MinJobSyncPeersInterval is the min interval for syncing all peers information from the scheduler. + MinJobSyncPeersInterval = 12 * time.Hour ) const ( diff --git a/manager/config/testdata/manager.yaml b/manager/config/testdata/manager.yaml index 79f01ad161a..7983a6611c7 100644 --- a/manager/config/testdata/manager.yaml +++ b/manager/config/testdata/manager.yaml @@ -68,6 +68,9 @@ job: registryTimeout: 1m tls: caCert: testdata/ca.crt + syncPeers: + interval: 13h + objectStorage: enable: true name: s3 diff --git a/manager/job/job.go b/manager/job/job.go index 252832b3759..8b19d8cb0fa 100644 --- a/manager/job/job.go +++ b/manager/job/job.go @@ -20,15 +20,24 @@ import ( "crypto/x509" "errors" + "go.opentelemetry.io/otel" + internaljob "d7y.io/dragonfly/v2/internal/job" "d7y.io/dragonfly/v2/manager/config" + "d7y.io/dragonfly/v2/manager/models" ) +// tracer is a global tracer for job. +var tracer = otel.Tracer("manager") + +// Job is an implementation of job. type Job struct { *internaljob.Job Preheat + SyncPeers } +// New returns a new Job. func New(cfg *config.Config) (*Job, error) { j, err := internaljob.New(&internaljob.Config{ Addrs: cfg.Database.Redis.Addrs, @@ -50,13 +59,34 @@ func New(cfg *config.Config) (*Job, error) { } } - p, err := newPreheat(j, cfg.Job.Preheat.RegistryTimeout, certPool) + preheat, err := newPreheat(j, cfg.Job.Preheat.RegistryTimeout, certPool) + if err != nil { + return nil, err + } + + syncPeers, err := newSyncPeers(j) if err != nil { return nil, err } return &Job{ - Job: j, - Preheat: p, + Job: j, + Preheat: preheat, + SyncPeers: syncPeers, }, nil } + +// getSchedulerQueues gets scheduler queues. +func getSchedulerQueues(schedulers []models.Scheduler) []internaljob.Queue { + var queues []internaljob.Queue + for _, scheduler := range schedulers { + queue, err := internaljob.GetSchedulerQueue(scheduler.SchedulerClusterID, scheduler.Hostname) + if err != nil { + continue + } + + queues = append(queues, queue) + } + + return queues +} diff --git a/manager/job/mocks/sync_peers_mock.go b/manager/job/mocks/sync_peers_mock.go new file mode 100644 index 00000000000..e25d2c2a567 --- /dev/null +++ b/manager/job/mocks/sync_peers_mock.go @@ -0,0 +1,58 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: sync_peers.go + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockSyncPeers is a mock of SyncPeers interface. +type MockSyncPeers struct { + ctrl *gomock.Controller + recorder *MockSyncPeersMockRecorder +} + +// MockSyncPeersMockRecorder is the mock recorder for MockSyncPeers. +type MockSyncPeersMockRecorder struct { + mock *MockSyncPeers +} + +// NewMockSyncPeers creates a new mock instance. +func NewMockSyncPeers(ctrl *gomock.Controller) *MockSyncPeers { + mock := &MockSyncPeers{ctrl: ctrl} + mock.recorder = &MockSyncPeersMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSyncPeers) EXPECT() *MockSyncPeersMockRecorder { + return m.recorder +} + +// Serve mocks base method. +func (m *MockSyncPeers) Serve() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Serve") +} + +// Serve indicates an expected call of Serve. +func (mr *MockSyncPeersMockRecorder) Serve() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Serve", reflect.TypeOf((*MockSyncPeers)(nil).Serve)) +} + +// Stop mocks base method. +func (m *MockSyncPeers) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockSyncPeersMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockSyncPeers)(nil).Stop)) +} diff --git a/manager/job/preheat.go b/manager/job/preheat.go index 0084f03fd89..55e744e1ec1 100644 --- a/manager/job/preheat.go +++ b/manager/job/preheat.go @@ -36,7 +36,6 @@ import ( "github.com/distribution/distribution/v3/manifest/schema2" "github.com/go-http-utils/headers" "github.com/google/uuid" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" logger "d7y.io/dragonfly/v2/internal/dflog" @@ -47,8 +46,6 @@ import ( nethttp "d7y.io/dragonfly/v2/pkg/net/http" ) -var tracer = otel.Tracer("manager") - type PreheatType string const ( @@ -59,18 +56,23 @@ const ( PreheatFileType PreheatType = "file" ) +// accessURLPattern is the pattern of access url. var accessURLPattern, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)") +// Preheat is an interface for preheat job. type Preheat interface { + // CreatePreheat creates a preheat job. CreatePreheat(context.Context, []models.Scheduler, types.PreheatArgs) (*internaljob.GroupJobState, error) } +// preheat is an implementation of Preheat. type preheat struct { job *internaljob.Job httpRequestTimeout time.Duration rootCAs *x509.CertPool } +// preheatImage is image information for preheat. type preheatImage struct { protocol string domain string @@ -78,10 +80,12 @@ type preheatImage struct { tag string } +// newPreheat creates a new Preheat. func newPreheat(job *internaljob.Job, httpRequestTimeout time.Duration, rootCAs *x509.CertPool) (Preheat, error) { return &preheat{job, httpRequestTimeout, rootCAs}, nil } +// CreatePreheat creates a preheat job. func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Scheduler, json types.PreheatArgs) (*internaljob.GroupJobState, error) { var span trace.Span ctx, span = tracer.Start(ctx, config.SpanPreheat, trace.WithSpanKind(trace.SpanKindProducer)) @@ -127,6 +131,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul return p.createGroupJob(ctx, files, queues) } +// createGroupJob creates a group job. func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.PreheatRequest, queues []internaljob.Queue) (*internaljob.GroupJobState, error) { var signatures []*machineryv1tasks.Signature for _, queue := range queues { @@ -169,6 +174,7 @@ func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.Prehea }, nil } +// getLayers gets layers of image. func (p *preheat) getLayers(ctx context.Context, url, tag, filter string, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) { ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer)) defer span.End() @@ -204,6 +210,7 @@ func (p *preheat) getLayers(ctx context.Context, url, tag, filter string, header return layers, nil } +// getManifests gets manifests of image. func (p *preheat) getManifests(ctx context.Context, url string, header http.Header, timeout time.Duration) (*http.Response, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { @@ -229,6 +236,7 @@ func (p *preheat) getManifests(ctx context.Context, url string, header http.Head return resp, nil } +// parseLayers parses layers of image. func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) { body, err := io.ReadAll(resp.Body) if err != nil { @@ -255,6 +263,7 @@ func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, head return layers, nil } +// getAuthToken gets auth token from registry. func getAuthToken(ctx context.Context, header http.Header, timeout time.Duration, rootCAs *x509.CertPool) (string, error) { ctx, span := tracer.Start(ctx, config.SpanAuthWithRegistry, trace.WithSpanKind(trace.SpanKindProducer)) defer span.End() @@ -298,6 +307,7 @@ func getAuthToken(ctx context.Context, header http.Header, timeout time.Duration return token, nil } +// authURL gets auth url from www-authenticate header. func authURL(wwwAuth []string) string { // Bearer realm="",service="",scope="repository::pull" if len(wwwAuth) == 0 { @@ -315,10 +325,12 @@ func authURL(wwwAuth []string) string { return fmt.Sprintf("%s?%s", host, query) } +// layerURL gets layer url. func layerURL(protocol string, domain string, name string, digest string) string { return fmt.Sprintf("%s://%s/v2/%s/blobs/%s", protocol, domain, name, digest) } +// parseAccessURL parses access url. func parseAccessURL(url string) (*preheatImage, error) { r := accessURLPattern.FindStringSubmatch(url) if len(r) != 5 { @@ -332,17 +344,3 @@ func parseAccessURL(url string) (*preheatImage, error) { tag: r[4], }, nil } - -func getSchedulerQueues(schedulers []models.Scheduler) []internaljob.Queue { - var queues []internaljob.Queue - for _, scheduler := range schedulers { - queue, err := internaljob.GetSchedulerQueue(scheduler.SchedulerClusterID, scheduler.Hostname) - if err != nil { - continue - } - - queues = append(queues, queue) - } - - return queues -} diff --git a/manager/job/sync_peers.go b/manager/job/sync_peers.go new file mode 100644 index 00000000000..5086a8d5ef5 --- /dev/null +++ b/manager/job/sync_peers.go @@ -0,0 +1,104 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//go:generate mockgen -destination mocks/sync_peers_mock.go -source sync_peers.go -package mocks + +package job + +import ( + "context" + "fmt" + "time" + + machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" + "github.com/google/uuid" + "go.opentelemetry.io/otel/trace" + + logger "d7y.io/dragonfly/v2/internal/dflog" + internaljob "d7y.io/dragonfly/v2/internal/job" + "d7y.io/dragonfly/v2/manager/config" + "d7y.io/dragonfly/v2/manager/models" +) + +// SyncPeers is an interface for sync peers. +type SyncPeers interface { + // Started sync peers server. + Serve() + + // Stop sync peers server. + Stop() +} + +// syncPeers is an implementation of SyncPeers. +type syncPeers struct { + job *internaljob.Job +} + +// newSyncPeers returns a new SyncPeers. +func newSyncPeers(job *internaljob.Job) (SyncPeers, error) { + return &syncPeers{job}, nil +} + +// TODO Implement function. +// Started sync peers server. +func (s *syncPeers) Serve() { +} + +// TODO Implement function. +// Stop sync peers server. +func (s *syncPeers) Stop() { +} + +// createSyncPeers creates sync peers. +func (s *syncPeers) createSyncPeers(ctx context.Context, schedulers []models.Scheduler) (*internaljob.GroupJobState, error) { + var span trace.Span + ctx, span = tracer.Start(ctx, config.SpanSyncPeers, trace.WithSpanKind(trace.SpanKindProducer)) + defer span.End() + + // Initialize queues + queues := getSchedulerQueues(schedulers) + + var signatures []*machineryv1tasks.Signature + for _, queue := range queues { + signatures = append(signatures, &machineryv1tasks.Signature{ + UUID: fmt.Sprintf("task_%s", uuid.New().String()), + Name: internaljob.SyncPeersJob, + RoutingKey: queue.String(), + }) + } + + group, err := machineryv1tasks.NewGroup(signatures...) + if err != nil { + return nil, err + } + + var tasks []machineryv1tasks.Signature + for _, signature := range signatures { + tasks = append(tasks, *signature) + } + + logger.Infof("create sync peers group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks) + if _, err := s.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { + logger.Errorf("create sync peers group %s failed", group.GroupUUID, err) + return nil, err + } + + return &internaljob.GroupJobState{ + GroupUUID: group.GroupUUID, + State: machineryv1tasks.StatePending, + CreatedAt: time.Now(), + }, nil +} diff --git a/manager/service/mocks/service_mock.go b/manager/service/mocks/service_mock.go index 6e44f02edb4..244a9953344 100644 --- a/manager/service/mocks/service_mock.go +++ b/manager/service/mocks/service_mock.go @@ -432,6 +432,20 @@ func (mr *MockServiceMockRecorder) DestroyOauth(arg0, arg1 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DestroyOauth", reflect.TypeOf((*MockService)(nil).DestroyOauth), arg0, arg1) } +// DestroyPeer mocks base method. +func (m *MockService) DestroyPeer(arg0 context.Context, arg1 uint) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DestroyPeer", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DestroyPeer indicates an expected call of DestroyPeer. +func (mr *MockServiceMockRecorder) DestroyPeer(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DestroyPeer", reflect.TypeOf((*MockService)(nil).DestroyPeer), arg0, arg1) +} + // DestroyPersonalAccessToken mocks base method. func (m *MockService) DestroyPersonalAccessToken(arg0 context.Context, arg1 uint) error { m.ctrl.T.Helper() @@ -733,19 +747,35 @@ func (mr *MockServiceMockRecorder) GetOauths(arg0, arg1 interface{}) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOauths", reflect.TypeOf((*MockService)(nil).GetOauths), arg0, arg1) } -// GetPeers mocks base method. -func (m *MockService) GetPeers(arg0 context.Context) ([]string, error) { +// GetPeer mocks base method. +func (m *MockService) GetPeer(arg0 context.Context, arg1 uint) (*models.Peer, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPeers", arg0) - ret0, _ := ret[0].([]string) + ret := m.ctrl.Call(m, "GetPeer", arg0, arg1) + ret0, _ := ret[0].(*models.Peer) ret1, _ := ret[1].(error) return ret0, ret1 } +// GetPeer indicates an expected call of GetPeer. +func (mr *MockServiceMockRecorder) GetPeer(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeer", reflect.TypeOf((*MockService)(nil).GetPeer), arg0, arg1) +} + +// GetPeers mocks base method. +func (m *MockService) GetPeers(arg0 context.Context, arg1 types.GetPeersQuery) ([]models.Peer, int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPeers", arg0, arg1) + ret0, _ := ret[0].([]models.Peer) + ret1, _ := ret[1].(int64) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + // GetPeers indicates an expected call of GetPeers. -func (mr *MockServiceMockRecorder) GetPeers(arg0 interface{}) *gomock.Call { +func (mr *MockServiceMockRecorder) GetPeers(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeers", reflect.TypeOf((*MockService)(nil).GetPeers), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeers", reflect.TypeOf((*MockService)(nil).GetPeers), arg0, arg1) } // GetPermissions mocks base method. diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 04bc14df782..947783fffed 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -44,10 +44,12 @@ const ( preheatTimeout = 20 * time.Minute ) +// Job is an interface for job. type Job interface { Serve() } +// job is an implementation of Job. type job struct { globalJob *internaljob.Job schedulerJob *internaljob.Job @@ -56,6 +58,7 @@ type job struct { config *config.Config } +// New creates a new Job. func New(cfg *config.Config, resource resource.Resource) (Job, error) { redisConfig := &internaljob.Config{ Addrs: cfg.Database.Redis.Addrs, @@ -102,7 +105,8 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) { } namedJobFuncs := map[string]any{ - internaljob.PreheatJob: t.preheat, + internaljob.PreheatJob: t.preheat, + internaljob.SyncPeersJob: t.syncPeers, } if err := localJob.RegisterJob(namedJobFuncs); err != nil { @@ -113,6 +117,7 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) { return t, nil } +// Serve starts the job. func (j *job) Serve() { go func() { logger.Infof("ready to launch %d worker(s) on global queue", j.config.Job.GlobalWorkerNum) @@ -142,6 +147,7 @@ func (j *job) Serve() { }() } +// preheat is a job to preheat. func (j *job) preheat(ctx context.Context, req string) error { ctx, cancel := context.WithTimeout(ctx, preheatTimeout) defer cancel() @@ -204,3 +210,20 @@ func (j *job) preheat(ctx context.Context, req string) error { } } } + +// syncPeers is a job to sync peers. +func (j *job) syncPeers() ([]*resource.Host, error) { + var hosts []*resource.Host + j.resource.HostManager().Range(func(key, value any) bool { + host, ok := value.(*resource.Host) + if !ok { + logger.Errorf("invalid host %v %v", key, value) + return true + } + + hosts = append(hosts, host) + return true + }) + + return hosts, nil +} diff --git a/scheduler/resource/host_manager.go b/scheduler/resource/host_manager.go index ecd4286897f..344240c909d 100644 --- a/scheduler/resource/host_manager.go +++ b/scheduler/resource/host_manager.go @@ -48,6 +48,10 @@ type HostManager interface { // Delete deletes host for a key. Delete(string) + // Range calls f sequentially for each key and value present in the map. + // If f returns false, range stops the iteration. + Range(f func(any, any) bool) + // LoadRandomHosts loads host randomly through the Range of sync.Map. LoadRandomHosts(int, set.SafeSet[string]) []*Host @@ -107,6 +111,12 @@ func (h *hostManager) Delete(key string) { h.Map.Delete(key) } +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +func (h *hostManager) Range(f func(key, value any) bool) { + h.Map.Range(f) +} + // LoadRandomHosts loads host randomly through the Range of sync.Map. func (h *hostManager) LoadRandomHosts(n int, blocklist set.SafeSet[string]) []*Host { hosts := make([]*Host, 0, n) diff --git a/scheduler/resource/host_manager_mock.go b/scheduler/resource/host_manager_mock.go index 89948df8ecc..a22a8b05c3a 100644 --- a/scheduler/resource/host_manager_mock.go +++ b/scheduler/resource/host_manager_mock.go @@ -90,6 +90,18 @@ func (mr *MockHostManagerMockRecorder) LoadRandomHosts(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadRandomHosts", reflect.TypeOf((*MockHostManager)(nil).LoadRandomHosts), arg0, arg1) } +// Range mocks base method. +func (m *MockHostManager) Range(f func(any, any) bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Range", f) +} + +// Range indicates an expected call of Range. +func (mr *MockHostManagerMockRecorder) Range(f interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Range", reflect.TypeOf((*MockHostManager)(nil).Range), f) +} + // RunGC mocks base method. func (m *MockHostManager) RunGC() error { m.ctrl.T.Helper() diff --git a/scheduler/resource/peer_manager.go b/scheduler/resource/peer_manager.go index e84405dedec..2976b165b04 100644 --- a/scheduler/resource/peer_manager.go +++ b/scheduler/resource/peer_manager.go @@ -48,6 +48,10 @@ type PeerManager interface { // Delete deletes peer for a key. Delete(string) + // Range calls f sequentially for each key and value present in the map. + // If f returns false, range stops the iteration. + Range(f func(any, any) bool) + // Try to reclaim peer. RunGC() error } @@ -140,6 +144,12 @@ func (p *peerManager) Delete(key string) { } } +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +func (p *peerManager) Range(f func(key, value any) bool) { + p.Map.Range(f) +} + // Try to reclaim peer. func (p *peerManager) RunGC() error { p.Map.Range(func(_, value any) bool { diff --git a/scheduler/resource/peer_manager_mock.go b/scheduler/resource/peer_manager_mock.go index b9a1e74f9ed..568670b7c1e 100644 --- a/scheduler/resource/peer_manager_mock.go +++ b/scheduler/resource/peer_manager_mock.go @@ -75,6 +75,18 @@ func (mr *MockPeerManagerMockRecorder) LoadOrStore(arg0 interface{}) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadOrStore", reflect.TypeOf((*MockPeerManager)(nil).LoadOrStore), arg0) } +// Range mocks base method. +func (m *MockPeerManager) Range(f func(any, any) bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Range", f) +} + +// Range indicates an expected call of Range. +func (mr *MockPeerManagerMockRecorder) Range(f interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Range", reflect.TypeOf((*MockPeerManager)(nil).Range), f) +} + // RunGC mocks base method. func (m *MockPeerManager) RunGC() error { m.ctrl.T.Helper() diff --git a/scheduler/resource/task_manager.go b/scheduler/resource/task_manager.go index 0bf1386dd17..5b78e46151b 100644 --- a/scheduler/resource/task_manager.go +++ b/scheduler/resource/task_manager.go @@ -46,6 +46,10 @@ type TaskManager interface { // Delete deletes task for a key. Delete(string) + // Range calls f sequentially for each key and value present in the map. + // If f returns false, range stops the iteration. + Range(f func(any, any) bool) + // Try to reclaim task. RunGC() error } @@ -102,6 +106,12 @@ func (t *taskManager) Delete(key string) { t.Map.Delete(key) } +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +func (t *taskManager) Range(f func(key, value any) bool) { + t.Map.Range(f) +} + // Try to reclaim task. func (t *taskManager) RunGC() error { t.Map.Range(func(_, value any) bool { diff --git a/scheduler/resource/task_manager_mock.go b/scheduler/resource/task_manager_mock.go index 39cc45f23b7..f2021f89059 100644 --- a/scheduler/resource/task_manager_mock.go +++ b/scheduler/resource/task_manager_mock.go @@ -75,6 +75,18 @@ func (mr *MockTaskManagerMockRecorder) LoadOrStore(arg0 interface{}) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadOrStore", reflect.TypeOf((*MockTaskManager)(nil).LoadOrStore), arg0) } +// Range mocks base method. +func (m *MockTaskManager) Range(f func(any, any) bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Range", f) +} + +// Range indicates an expected call of Range. +func (mr *MockTaskManagerMockRecorder) Range(f interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Range", reflect.TypeOf((*MockTaskManager)(nil).Range), f) +} + // RunGC mocks base method. func (m *MockTaskManager) RunGC() error { m.ctrl.T.Helper()