diff --git a/client/daemon/announcer/announcer.go b/client/daemon/announcer/announcer.go index 51f8e532fa7..568c899bddc 100644 --- a/client/daemon/announcer/announcer.go +++ b/client/daemon/announcer/announcer.go @@ -52,14 +52,15 @@ type Announcer interface { // announcer provides announce function. type announcer struct { - config *config.DaemonOption - dynconfig config.Dynconfig - hostID string - daemonPort int32 - daemonDownloadPort int32 - schedulerClient schedulerclient.V1 - managerClient managerclient.V1 - done chan struct{} + config *config.DaemonOption + dynconfig config.Dynconfig + hostID string + daemonPort int32 + daemonDownloadPort int32 + daemonObjectStoragePort int32 + schedulerClient schedulerclient.V1 + managerClient managerclient.V1 + done chan struct{} } // Option is a functional option for configuring the announcer. @@ -72,6 +73,13 @@ func WithManagerClient(client managerclient.V1) Option { } } +// WithObjectStoragePort sets the daemonObjectStoragePort. +func WithObjectStoragePort(port int32) Option { + return func(a *announcer) { + a.daemonObjectStoragePort = port + } +} + // New returns a new Announcer interface. func New(cfg *config.DaemonOption, dynconfig config.Dynconfig, hostID string, daemonPort int32, daemonDownloadPort int32, schedulerClient schedulerclient.V1, options ...Option) Announcer { a := &announcer{ @@ -153,6 +161,11 @@ func (a *announcer) newAnnounceHostRequest() (*schedulerv1.AnnounceHostRequest, hostType = types.HostTypeSuperSeedName } + var objectStoragePort int32 + if a.config.ObjectStorage.Enable { + objectStoragePort = a.daemonObjectStoragePort + } + pid := os.Getpid() h, err := host.Info() @@ -223,17 +236,18 @@ func (a *announcer) newAnnounceHostRequest() (*schedulerv1.AnnounceHostRequest, } return &schedulerv1.AnnounceHostRequest{ - Id: a.hostID, - Type: hostType, - Hostname: a.config.Host.Hostname, - Ip: a.config.Host.AdvertiseIP.String(), - Port: a.daemonPort, - DownloadPort: a.daemonDownloadPort, - Os: h.OS, - Platform: h.Platform, - PlatformFamily: h.PlatformFamily, - PlatformVersion: h.PlatformVersion, - KernelVersion: h.KernelVersion, + Id: a.hostID, + Type: hostType, + Hostname: a.config.Host.Hostname, + Ip: a.config.Host.AdvertiseIP.String(), + Port: a.daemonPort, + DownloadPort: a.daemonDownloadPort, + ObjectStoragePort: objectStoragePort, + Os: h.OS, + Platform: h.Platform, + PlatformFamily: h.PlatformFamily, + PlatformVersion: h.PlatformVersion, + KernelVersion: h.KernelVersion, Cpu: &schedulerv1.CPU{ LogicalCount: uint32(cpuLogicalCount), PhysicalCount: uint32(cpuPhysicalCount), diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 9b5b8acb9c0..8b659c6b0bb 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -562,12 +562,15 @@ func (cd *clientDaemon) Serve() error { cd.schedPeerHost.DownPort = int32(uploadPort) // prepare object storage service listen - var objectStorageListener net.Listener + var ( + objectStorageListener net.Listener + objectStoragePort int + ) if cd.Option.ObjectStorage.Enable { if cd.Option.ObjectStorage.TCPListen == nil { return errors.New("object storage tcp listen option is empty") } - objectStorageListener, _, err = cd.prepareTCPListener(cd.Option.ObjectStorage.ListenOption, true) + objectStorageListener, objectStoragePort, err = cd.prepareTCPListener(cd.Option.ObjectStorage.ListenOption, true) if err != nil { logger.Errorf("failed to listen for object storage service: %v", err) return err @@ -678,6 +681,11 @@ func (cd *clientDaemon) Serve() error { if cd.managerClient != nil { announcerOptions = append(announcerOptions, announcer.WithManagerClient(cd.managerClient)) } + + if cd.Option.ObjectStorage.Enable { + announcerOptions = append(announcerOptions, announcer.WithObjectStoragePort(int32(objectStoragePort))) + } + cd.announcer = announcer.New(&cd.Option, cd.dynconfig, cd.schedPeerHost.Id, cd.schedPeerHost.RpcPort, cd.schedPeerHost.DownPort, cd.schedulerClient, announcerOptions...) go func() { diff --git a/go.mod b/go.mod index d49ecb45ae2..2ae0e7e7ff6 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.20 require ( - d7y.io/api/v2 v2.0.21 + d7y.io/api/v2 v2.0.23 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index 52726fdc0f4..9f8589ab3fe 100644 --- a/go.sum +++ b/go.sum @@ -50,8 +50,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api/v2 v2.0.21 h1:g/hiw1KhkroQjqsnetGdjutEJv4zsNY8LoM2jDM2UYg= -d7y.io/api/v2 v2.0.21/go.mod h1:lwCvFjtRVsyTKsiXfh2W0Jdv+5tQGR/vFj+TknwnusY= +d7y.io/api/v2 v2.0.23 h1:s0vDhh5P1jfKO/dee2DhQiUG7eFORVF2/M9O9SLRNQI= +d7y.io/api/v2 v2.0.23/go.mod h1:lwCvFjtRVsyTKsiXfh2W0Jdv+5tQGR/vFj+TknwnusY= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/internal/dflog/logger.go b/internal/dflog/logger.go index bc49bb1f77b..cd8c797bca5 100644 --- a/internal/dflog/logger.go +++ b/internal/dflog/logger.go @@ -123,6 +123,12 @@ func With(args ...any) *SugaredLoggerOnWith { } } +func WithScheduler(hostname, ip string, clusterID uint64) *SugaredLoggerOnWith { + return &SugaredLoggerOnWith{ + withArgs: []any{"hostname", hostname, "ip", ip, "clusterID", clusterID}, + } +} + func WithPeer(hostID, taskID, peerID string) *SugaredLoggerOnWith { return &SugaredLoggerOnWith{ withArgs: []any{"hostID", hostID, "taskID", taskID, "peerID", peerID}, diff --git a/internal/job/job.go b/internal/job/job.go index e260d0e344e..58756f4ece3 100644 --- a/internal/job/job.go +++ b/internal/job/job.go @@ -149,6 +149,15 @@ func (t *Job) GetGroupJobState(groupID string) (*GroupJobState, error) { }, nil } +func MarshalResponse(v any) (string, error) { + b, err := json.Marshal(v) + if err != nil { + return "", err + } + + return string(b), nil +} + func MarshalRequest(v any) ([]machineryv1tasks.Arg, error) { b, err := json.Marshal(v) if err != nil { @@ -169,6 +178,7 @@ func UnmarshalResponse(data []reflect.Value, v any) error { if err := json.Unmarshal([]byte(data[0].String()), v); err != nil { return err } + return nil } diff --git a/internal/job/job_test.go b/internal/job/job_test.go index 51203844e6e..6a0d997dbb4 100644 --- a/internal/job/job_test.go +++ b/internal/job/job_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestJobMarshal(t *testing.T) { +func TestJob_MarshalRequest(t *testing.T) { tests := []struct { name string value any @@ -110,7 +110,7 @@ func TestJobMarshal(t *testing.T) { } } -func TestJobUnmarshal(t *testing.T) { +func TestJob_UnmarshalResponse(t *testing.T) { tests := []struct { name string data []reflect.Value @@ -206,7 +206,7 @@ func TestJobUnmarshal(t *testing.T) { } } -func TestUnmarshalRequest(t *testing.T) { +func TestJob_UnmarshalRequest(t *testing.T) { tests := []struct { name string data string diff --git a/internal/job/queue_test.go b/internal/job/queue_test.go index 1a72d41eb4e..be3ec67a59d 100644 --- a/internal/job/queue_test.go +++ b/internal/job/queue_test.go @@ -22,7 +22,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestJobGetSchedulerQueue(t *testing.T) { +func TestJob_GetSchedulerQueue(t *testing.T) { tests := []struct { name string clusterID uint diff --git a/manager/config/config.go b/manager/config/config.go index c8a96e086ec..a27b0b683b5 100644 --- a/manager/config/config.go +++ b/manager/config/config.go @@ -441,6 +441,7 @@ func New() *Config { }, SyncPeers: SyncPeersConfig{ Interval: DefaultJobSyncPeersInterval, + Timeout: DefaultJobSyncPeersTimeout, }, }, ObjectStorage: ObjectStorageConfig{ @@ -626,6 +627,10 @@ func (cfg *Config) Validate() error { return errors.New("syncPeers requires parameter interval and it must be greater than 12 hours") } + if cfg.Job.SyncPeers.Timeout == 0 { + return errors.New("syncPeers requires parameter timeout") + } + if cfg.ObjectStorage.Enable { if cfg.ObjectStorage.Name == "" { return errors.New("objectStorage requires parameter name") diff --git a/manager/config/config_test.go b/manager/config/config_test.go index 5d7c948c869..04fbd2aef5f 100644 --- a/manager/config/config_test.go +++ b/manager/config/config_test.go @@ -196,6 +196,7 @@ func TestConfig_Load(t *testing.T) { }, SyncPeers: SyncPeersConfig{ Interval: 13 * time.Hour, + Timeout: 2 * time.Minute, }, }, ObjectStorage: ObjectStorageConfig{ @@ -759,6 +760,21 @@ func TestConfig_Validate(t *testing.T) { assert.EqualError(err, "syncPeers requires parameter interval and it must be greater than 12 hours") }, }, + { + name: "syncPeers requires parameter timeout", + config: New(), + mock: func(cfg *Config) { + cfg.Auth.JWT = mockJWTConfig + cfg.Database.Type = DatabaseTypeMysql + cfg.Database.Mysql = mockMysqlConfig + cfg.Database.Redis = mockRedisConfig + cfg.Job.SyncPeers.Timeout = 0 + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "syncPeers requires parameter timeout") + }, + }, { name: "objectStorage requires parameter name", config: New(), diff --git a/manager/config/constants.go b/manager/config/constants.go index 3954069b180..352ced73739 100644 --- a/manager/config/constants.go +++ b/manager/config/constants.go @@ -98,6 +98,9 @@ const ( // MinJobSyncPeersInterval is the min interval for syncing all peers information from the scheduler. MinJobSyncPeersInterval = 12 * time.Hour + + // DefaultJobSyncPeersTimeout is the default timeout for syncing all peers information from the scheduler. + DefaultJobSyncPeersTimeout = 10 * time.Minute ) const ( diff --git a/manager/config/testdata/manager.yaml b/manager/config/testdata/manager.yaml index 7983a6611c7..2bfd1bc13cf 100644 --- a/manager/config/testdata/manager.yaml +++ b/manager/config/testdata/manager.yaml @@ -70,6 +70,7 @@ job: caCert: testdata/ca.crt syncPeers: interval: 13h + timeout: 2m objectStorage: enable: true diff --git a/manager/job/mocks/sync_peers_mock.go b/manager/job/mocks/sync_peers_mock.go index e25d2c2a567..6b50e8c4615 100644 --- a/manager/job/mocks/sync_peers_mock.go +++ b/manager/job/mocks/sync_peers_mock.go @@ -5,6 +5,7 @@ package mocks import ( + context "context" reflect "reflect" gomock "github.com/golang/mock/gomock" @@ -33,6 +34,20 @@ func (m *MockSyncPeers) EXPECT() *MockSyncPeersMockRecorder { return m.recorder } +// Run mocks base method. +func (m *MockSyncPeers) Run(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Run", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Run indicates an expected call of Run. +func (mr *MockSyncPeersMockRecorder) Run(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSyncPeers)(nil).Run), arg0) +} + // Serve mocks base method. func (m *MockSyncPeers) Serve() { m.ctrl.T.Helper() diff --git a/manager/job/sync_peers.go b/manager/job/sync_peers.go index 349a045229f..4f987f076b3 100644 --- a/manager/job/sync_peers.go +++ b/manager/job/sync_peers.go @@ -32,10 +32,15 @@ import ( internaljob "d7y.io/dragonfly/v2/internal/job" "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/models" + "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/scheduler/resource" ) // SyncPeers is an interface for sync peers. type SyncPeers interface { + // Run sync peers. + Run(context.Context) error + // Started sync peers server. Serve() @@ -61,37 +66,56 @@ func newSyncPeers(cfg *config.Config, job *internaljob.Job, gdb *gorm.DB) (SyncP }, nil } -// TODO Implement function. +// Run sync peers. +func (s *syncPeers) Run(ctx context.Context) error { + // Find all of the scheduler clusters that has active schedulers. + var candidateSchedulerClusters []models.SchedulerCluster + if err := s.db.WithContext(ctx).Find(&candidateSchedulerClusters).Error; err != nil { + return err + } + + // Find all of the schedulers that has active scheduler cluster. + var candidateSchedulers []models.Scheduler + for _, candidateSchedulerCluster := range candidateSchedulerClusters { + var scheduler models.Scheduler + if err := s.db.WithContext(ctx).Preload("SchedulerCluster").First(&scheduler, models.Scheduler{ + SchedulerClusterID: candidateSchedulerCluster.ID, + State: models.SchedulerStateActive, + }).Error; err != nil { + continue + } + + candidateSchedulers = append(candidateSchedulers, scheduler) + } + + // Send sync peer requests to all available schedulers, + // and merge the sync peer results with the data in + // the peer table in the database. + for _, scheduler := range candidateSchedulers { + log := logger.WithScheduler(scheduler.Hostname, scheduler.IP, uint64(scheduler.SchedulerClusterID)) + + // Send sync peer request to scheduler. + results, err := s.createSyncPeers(ctx, scheduler) + if err != nil { + log.Error(err) + continue + } + log.Infof("sync peers count is %d", len(results)) + + // Merge sync peer results with the data in the peer table. + s.mergePeers(ctx, scheduler, results, log) + } + return nil +} + // Started sync peers server. func (s *syncPeers) Serve() { tick := time.NewTicker(s.config.Job.SyncPeers.Interval) for { select { case <-tick.C: - // Find all of the scheduler clusters that has active schedulers. - var candidateSchedulerClusters []models.SchedulerCluster - if err := s.db.WithContext(context.Background()).Find(&candidateSchedulerClusters).Error; err != nil { - logger.Errorf("find candidate scheduler clusters failed: %v", err) - break - } - - var candidateSchedulers []models.Scheduler - for _, candidateSchedulerCluster := range candidateSchedulerClusters { - var schedulers []models.Scheduler - if err := s.db.WithContext(context.Background()).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{ - SchedulerClusterID: candidateSchedulerCluster.ID, - State: models.SchedulerStateActive, - }).Error; err != nil { - continue - } - - candidateSchedulers = append(candidateSchedulers, schedulers...) - } - - for _, scheduler := range candidateSchedulers { - if _, err := s.createSyncPeers(context.Background(), scheduler); err != nil { - logger.Error(err) - } + if err := s.Run(context.Background()); err != nil { + logger.Errorf("sync peers failed: %v", err) } case <-s.done: return @@ -105,7 +129,7 @@ func (s *syncPeers) Stop() { } // createSyncPeers creates sync peers. -func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Scheduler) (any, error) { +func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Scheduler) ([]*resource.Host, error) { var span trace.Span ctx, span = tracer.Start(ctx, config.SpanSyncPeers, trace.WithSpanKind(trace.SpanKindProducer)) defer span.End() @@ -123,6 +147,7 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu RoutingKey: queue.String(), } + // Send sync peer task to worker. logger.Infof("create sync peers in queue %v, task: %#v", queue, task) asyncResult, err := s.job.Server.SendTaskWithContext(ctx, task) if err != nil { @@ -130,5 +155,104 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu return nil, err } - return asyncResult.GetWithTimeout(s.config.Job.SyncPeers.Timeout, DefaultTaskPollingInterval) + // Get sync peer task result. + results, err := asyncResult.GetWithTimeout(s.config.Job.SyncPeers.Timeout, DefaultTaskPollingInterval) + if err != nil { + return nil, err + } + + // Unmarshal sync peer task result. + var hosts []*resource.Host + if err := internaljob.UnmarshalResponse(results, &hosts); err != nil { + return nil, err + } + + if len(hosts) == 0 { + return nil, fmt.Errorf("can not found peers") + } + + return hosts, nil +} + +// Merge sync peer results with the data in the peer table. +func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler, results []*resource.Host, log *logger.SugaredLoggerOnWith) { + // Convert sync peer results from slice to map. + syncPeers := make(map[string]*resource.Host) + for _, result := range results { + syncPeers[result.ID] = result + } + + rows, err := s.db.Model(&models.Peer{}).Find(&models.Peer{SchedulerClusterID: scheduler.ID}).Rows() + if err != nil { + log.Error(err) + return + } + defer rows.Close() + + for rows.Next() { + peer := models.Peer{} + if err := s.db.ScanRows(rows, &peer); err != nil { + log.Error(err) + continue + } + + // If the peer exists in the sync peer results, update the peer data in the database with + // the sync peer results and delete the sync peer from the sync peers map. + id := idgen.HostIDV2(peer.IP, peer.Hostname) + if syncPeer, ok := syncPeers[id]; ok { + if err := s.db.WithContext(ctx).Preload("User").First(&models.Peer{}, peer.ID).Updates(models.Peer{ + Type: syncPeer.Type.Name(), + IDC: syncPeer.Network.IDC, + Location: syncPeer.Network.Location, + Port: syncPeer.Port, + DownloadPort: syncPeer.DownloadPort, + ObjectStoragePort: syncPeer.ObjectStoragePort, + State: models.PeerStateActive, + OS: syncPeer.OS, + Platform: syncPeer.Platform, + PlatformFamily: syncPeer.PlatformFamily, + PlatformVersion: syncPeer.PlatformVersion, + KernelVersion: syncPeer.KernelVersion, + GitVersion: syncPeer.Build.GitVersion, + GitCommit: syncPeer.Build.GitCommit, + BuildPlatform: syncPeer.Build.Platform, + }).Error; err != nil { + log.Error(err) + } + + // Delete the sync peer from the sync peers map. + delete(syncPeers, id) + } else { + // If the peer does not exist in the sync peer results, delete the peer in the database. + if err := s.db.WithContext(ctx).Unscoped().Delete(&models.Peer{}, peer.ID).Error; err != nil { + log.Error(err) + } + } + } + + // Insert the sync peers that do not exist in the database into the peer table. + for _, syncPeer := range syncPeers { + if err := s.db.WithContext(ctx).Create(&models.Peer{ + Hostname: syncPeer.Hostname, + Type: syncPeer.Type.Name(), + IDC: syncPeer.Network.IDC, + Location: syncPeer.Network.Location, + IP: syncPeer.IP, + Port: syncPeer.Port, + DownloadPort: syncPeer.DownloadPort, + ObjectStoragePort: syncPeer.ObjectStoragePort, + State: models.PeerStateActive, + OS: syncPeer.OS, + Platform: syncPeer.Platform, + PlatformFamily: syncPeer.PlatformFamily, + PlatformVersion: syncPeer.PlatformVersion, + KernelVersion: syncPeer.KernelVersion, + GitVersion: syncPeer.Build.GitVersion, + GitCommit: syncPeer.Build.GitCommit, + BuildPlatform: syncPeer.Build.Platform, + SchedulerClusterID: uint(syncPeer.SchedulerClusterID), + }).Error; err != nil { + log.Error(err) + } + } } diff --git a/manager/manager.go b/manager/manager.go index b4480a2b121..22f0a767cfb 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -86,16 +86,19 @@ func EmbedFolder(fsEmbed embed.FS, targetPath string) static.ServeFileSystem { // Server is the manager server. type Server struct { - // Server configuration + // Server configuration. config *config.Config - // GRPC server + // Job server. + job *job.Job + + // GRPC server. grpcServer *grpc.Server - // REST server + // REST server. restServer *http.Server - // Metrics server + // Metrics server. metricsServer *http.Server } @@ -103,34 +106,35 @@ type Server struct { func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) { s := &Server{config: cfg} - // Initialize database + // Initialize database. db, err := database.New(cfg) if err != nil { return nil, err } - // Initialize enforcer + // Initialize enforcer. enforcer, err := rbac.NewEnforcer(db.DB) if err != nil { return nil, err } - // Initialize cache + // Initialize cache. cache, err := cache.New(cfg) if err != nil { return nil, err } - // Initialize searcher + // Initialize searcher. searcher := searcher.New(d.PluginDir()) - // Initialize job + // Initialize job. job, err := job.New(cfg, db.DB) if err != nil { return nil, err } + s.job = job - // Initialize object storage + // Initialize object storage. var objectStorage objectstorage.ObjectStorage if cfg.ObjectStorage.Enable { objectStorage, err = objectstorage.New( @@ -146,7 +150,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) { } } - // Initialize REST server + // Initialize REST server. restService := service.New(cfg, db, cache, job, enforcer, objectStorage) router, err := router.Init(cfg, d.LogDir(), restService, db, enforcer, EmbedFolder(assets, assetsTargetPath)) if err != nil { @@ -157,7 +161,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) { Handler: router, } - // Initialize roles and check roles + // Initialize roles and check roles. err = rbac.InitRBAC(enforcer, router, db.DB) if err != nil { return nil, err @@ -213,7 +217,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) { ) } - // Initialize GRPC server + // Initialize GRPC server. _, grpcServer, err := rpcserver.New(cfg, db, cache, searcher, objectStorage, options...) if err != nil { return nil, err @@ -221,7 +225,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) { s.grpcServer = grpcServer - // Initialize prometheus + // Initialize prometheus. if cfg.Metrics.Enable { s.metricsServer = metrics.New(&cfg.Metrics, grpcServer) } @@ -231,7 +235,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) { // Serve starts the manager server. func (s *Server) Serve() error { - // Started REST server + // Started REST server. go func() { logger.Infof("started rest server at %s", s.restServer.Addr) if s.config.Server.REST.TLS != nil { @@ -251,7 +255,7 @@ func (s *Server) Serve() error { } }() - // Started metrics server + // Started metrics server. if s.metricsServer != nil { go func() { logger.Infof("started metrics server at %s", s.metricsServer.Addr) @@ -264,14 +268,20 @@ func (s *Server) Serve() error { }() } - // Generate GRPC listener + // Started job server. + go func() { + logger.Info("started job server") + s.job.Serve() + }() + + // Generate GRPC listener. lis, _, err := rpc.ListenWithPortRange(s.config.Server.GRPC.ListenIP.String(), s.config.Server.GRPC.PortRange.Start, s.config.Server.GRPC.PortRange.End) if err != nil { logger.Fatalf("net listener failed to start: %v", err) } defer lis.Close() - // Started GRPC server + // Started GRPC server. logger.Infof("started grpc server at %s://%s", lis.Addr().Network(), lis.Addr().String()) if err := s.grpcServer.Serve(lis); err != nil { logger.Errorf("stoped grpc server: %+v", err) @@ -283,14 +293,14 @@ func (s *Server) Serve() error { // Stop stops the manager server. func (s *Server) Stop() { - // Stop REST server + // Stop REST server. if err := s.restServer.Shutdown(context.Background()); err != nil { logger.Errorf("rest server failed to stop: %+v", err) } else { logger.Info("rest server closed under request") } - // Stop metrics server + // Stop metrics server. if s.metricsServer != nil { if err := s.metricsServer.Shutdown(context.Background()); err != nil { logger.Errorf("metrics server failed to stop: %+v", err) @@ -299,7 +309,10 @@ func (s *Server) Stop() { } } - // Stop GRPC server + // Stop job server. + s.job.Stop() + + // Stop GRPC server. stopped := make(chan struct{}) go func() { s.grpcServer.GracefulStop() diff --git a/pkg/digest/digest_reader_test.go b/pkg/digest/digest_reader_test.go index 6214a556a07..2bf5746697d 100644 --- a/pkg/digest/digest_reader_test.go +++ b/pkg/digest/digest_reader_test.go @@ -27,7 +27,7 @@ import ( ) func TestDigest_Reader(t *testing.T) { - logger := logger.With("test", "digest") + log := logger.With("test", "digest") tests := []struct { name string @@ -40,7 +40,7 @@ func TestDigest_Reader(t *testing.T) { name: "sha1 reader", algorithm: AlgorithmSHA1, data: []byte("foo"), - options: []Option{WithLogger(logger)}, + options: []Option{WithLogger(log)}, run: func(t *testing.T, data []byte, reader Reader, err error) { assert := assert.New(t) assert.NoError(err) @@ -55,7 +55,7 @@ func TestDigest_Reader(t *testing.T) { name: "sha256 reader", algorithm: AlgorithmSHA256, data: []byte("foo"), - options: []Option{WithLogger(logger)}, + options: []Option{WithLogger(log)}, run: func(t *testing.T, data []byte, reader Reader, err error) { assert := assert.New(t) assert.NoError(err) @@ -70,7 +70,7 @@ func TestDigest_Reader(t *testing.T) { name: "sha512 reader", algorithm: AlgorithmSHA512, data: []byte("foo"), - options: []Option{WithLogger(logger)}, + options: []Option{WithLogger(log)}, run: func(t *testing.T, data []byte, reader Reader, err error) { assert := assert.New(t) assert.NoError(err) @@ -85,7 +85,7 @@ func TestDigest_Reader(t *testing.T) { name: "md5 reader", algorithm: AlgorithmMD5, data: []byte("foo"), - options: []Option{WithLogger(logger)}, + options: []Option{WithLogger(log)}, run: func(t *testing.T, data []byte, reader Reader, err error) { assert := assert.New(t) assert.NoError(err) @@ -100,7 +100,7 @@ func TestDigest_Reader(t *testing.T) { name: "sha1 reader with encoded", algorithm: AlgorithmSHA1, data: []byte("foo"), - options: []Option{WithLogger(logger), WithEncoded("0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33")}, + options: []Option{WithLogger(log), WithEncoded("0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33")}, run: func(t *testing.T, data []byte, reader Reader, err error) { assert := assert.New(t) assert.NoError(err) @@ -115,7 +115,7 @@ func TestDigest_Reader(t *testing.T) { name: "sha256 reader with encoded", algorithm: AlgorithmSHA256, data: []byte("foo"), - options: []Option{WithLogger(logger), WithEncoded("2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae")}, + options: []Option{WithLogger(log), WithEncoded("2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae")}, run: func(t *testing.T, data []byte, reader Reader, err error) { assert := assert.New(t) assert.NoError(err) @@ -130,7 +130,7 @@ func TestDigest_Reader(t *testing.T) { name: "sha512 reader with encoded", algorithm: AlgorithmSHA512, data: []byte("foo"), - options: []Option{WithLogger(logger), WithEncoded("f7fbba6e0636f890e56fbbf3283e524c6fa3204ae298382d624741d0dc6638326e282c41be5e4254d8820772c5518a2c5a8c0c7f7eda19594a7eb539453e1ed7")}, + options: []Option{WithLogger(log), WithEncoded("f7fbba6e0636f890e56fbbf3283e524c6fa3204ae298382d624741d0dc6638326e282c41be5e4254d8820772c5518a2c5a8c0c7f7eda19594a7eb539453e1ed7")}, run: func(t *testing.T, data []byte, reader Reader, err error) { assert := assert.New(t) assert.NoError(err) @@ -145,7 +145,7 @@ func TestDigest_Reader(t *testing.T) { name: "md5 reader with encoded", algorithm: AlgorithmMD5, data: []byte("foo"), - options: []Option{WithLogger(logger), WithEncoded("acbd18db4cc2f85cedef654fccc4a4d8")}, + options: []Option{WithLogger(log), WithEncoded("acbd18db4cc2f85cedef654fccc4a4d8")}, run: func(t *testing.T, data []byte, reader Reader, err error) { assert := assert.New(t) assert.NoError(err) @@ -160,7 +160,7 @@ func TestDigest_Reader(t *testing.T) { name: "sha1 reader with invalid encoded", algorithm: AlgorithmSHA1, data: []byte("foo"), - options: []Option{WithLogger(logger), WithEncoded("bar")}, + options: []Option{WithLogger(log), WithEncoded("bar")}, run: func(t *testing.T, data []byte, reader Reader, err error) { assert := assert.New(t) assert.NoError(err) @@ -173,7 +173,7 @@ func TestDigest_Reader(t *testing.T) { name: "sha256 reader with invalid encoded", algorithm: AlgorithmSHA256, data: []byte("foo"), - options: []Option{WithLogger(logger), WithEncoded("bar")}, + options: []Option{WithLogger(log), WithEncoded("bar")}, run: func(t *testing.T, data []byte, reader Reader, err error) { assert := assert.New(t) assert.NoError(err) @@ -186,7 +186,7 @@ func TestDigest_Reader(t *testing.T) { name: "sha512 reader with invalid encoded", algorithm: AlgorithmSHA512, data: []byte("foo"), - options: []Option{WithLogger(logger), WithEncoded("bar")}, + options: []Option{WithLogger(log), WithEncoded("bar")}, run: func(t *testing.T, data []byte, reader Reader, err error) { assert := assert.New(t) assert.NoError(err) @@ -199,7 +199,7 @@ func TestDigest_Reader(t *testing.T) { name: "md5 reader with invalid encoded", algorithm: AlgorithmMD5, data: []byte("foo"), - options: []Option{WithLogger(logger), WithEncoded("bar")}, + options: []Option{WithLogger(log), WithEncoded("bar")}, run: func(t *testing.T, data []byte, reader Reader, err error) { assert := assert.New(t) assert.NoError(err) @@ -212,7 +212,7 @@ func TestDigest_Reader(t *testing.T) { name: "new reader with invalid algorithm", algorithm: "", data: []byte("foo"), - options: []Option{WithLogger(logger)}, + options: []Option{WithLogger(log)}, run: func(t *testing.T, data []byte, reader Reader, err error) { assert := assert.New(t) assert.Error(err) diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 947783fffed..ee0d72265d6 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -212,7 +212,7 @@ func (j *job) preheat(ctx context.Context, req string) error { } // syncPeers is a job to sync peers. -func (j *job) syncPeers() ([]*resource.Host, error) { +func (j *job) syncPeers() (string, error) { var hosts []*resource.Host j.resource.HostManager().Range(func(key, value any) bool { host, ok := value.(*resource.Host) @@ -225,5 +225,5 @@ func (j *job) syncPeers() ([]*resource.Host, error) { return true }) - return hosts, nil + return internaljob.MarshalResponse(hosts) } diff --git a/scheduler/resource/host.go b/scheduler/resource/host.go index 7ca493eda6c..b0e35bba397 100644 --- a/scheduler/resource/host.go +++ b/scheduler/resource/host.go @@ -38,6 +38,13 @@ func WithSchedulerClusterID(id uint64) HostOption { } } +// WithObjectStoragePort sets host's ObjectStoragePort. +func WithObjectStoragePort(port int32) HostOption { + return func(h *Host) { + h.ObjectStoragePort = port + } +} + // WithConcurrentUploadLimit sets host's ConcurrentUploadLimit. func WithConcurrentUploadLimit(limit int32) HostOption { return func(h *Host) { @@ -135,6 +142,9 @@ type Host struct { // DownloadPort is piece downloading port. DownloadPort int32 + // ObjectStoragePort is object storage port. + ObjectStoragePort int32 + // Host OS. OS string diff --git a/scheduler/resource/host_test.go b/scheduler/resource/host_test.go index 042619665ce..d95c1c8d67d 100644 --- a/scheduler/resource/host_test.go +++ b/scheduler/resource/host_test.go @@ -150,6 +150,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) @@ -173,6 +174,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawSeedHost.IP) assert.Equal(host.Port, mockRawSeedHost.Port) assert.Equal(host.DownloadPort, mockRawSeedHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultSeedPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) @@ -209,6 +211,31 @@ func TestHost_NewHost(t *testing.T) { assert.NotNil(host.Log) }, }, + { + name: "new host and set object storage port", + rawHost: mockRawHost, + options: []HostOption{WithObjectStoragePort(1)}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(1)) + assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, { name: "new host and set upload loadlimit", rawHost: mockRawHost, @@ -221,6 +248,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) @@ -245,6 +273,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.OS, "linux") assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) @@ -270,6 +299,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.Platform, "ubuntu") assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) @@ -295,6 +325,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.PlatformFamily, "debian") assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) @@ -320,6 +351,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.PlatformVersion, "22.04") assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) @@ -344,6 +376,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.KernelVersion, "5.15.0-27-generic") assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) @@ -369,6 +402,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.CPU, mockCPU) assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) @@ -394,6 +428,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.Memory, mockMemory) assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) @@ -419,6 +454,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.Network, mockNetwork) assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) @@ -444,6 +480,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.Disk, mockDisk) assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) @@ -469,6 +506,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.Build, mockBuild) assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index fa9863103aa..9db8336963b 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -552,6 +552,10 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ options = append(options, resource.WithSchedulerClusterID(req.GetSchedulerClusterId())) } + if req.GetObjectStoragePort() != 0 { + options = append(options, resource.WithObjectStoragePort(req.GetObjectStoragePort())) + } + host = resource.NewHost( req.GetId(), req.GetIp(), req.GetHostname(), req.GetPort(), req.GetDownloadPort(), types.ParseHostType(req.GetType()), options..., diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 45745e05843..9ed230e9ca8 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -540,6 +540,10 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ options = append(options, resource.WithSchedulerClusterID(req.Host.GetSchedulerClusterId())) } + if req.Host.GetObjectStoragePort() != 0 { + options = append(options, resource.WithObjectStoragePort(req.Host.GetObjectStoragePort())) + } + host = resource.NewHost( req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(), req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()),