From fea0ad6453f2c6da37bbf0058b1526aef35cb310 Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 31 Mar 2023 16:27:28 +0800 Subject: [PATCH] feat: database adds seed operation to compatible old version Signed-off-by: Gaius --- manager/database/database.go | 26 ++++++ manager/models/models.go | 69 ++++++++-------- manager/models/scheduler.go | 2 +- manager/rpcserver/manager_server_v1.go | 50 +++++++++--- manager/rpcserver/manager_server_v2.go | 64 +++++++++++++-- manager/service/job.go | 108 ++++++++++++++++--------- manager/service/scheduler.go | 23 +----- manager/types/scheduler.go | 46 +++++------ 8 files changed, 254 insertions(+), 134 deletions(-) diff --git a/manager/database/database.go b/manager/database/database.go index 3ea659b064b..bf0e6c5bdae 100644 --- a/manager/database/database.go +++ b/manager/database/database.go @@ -17,6 +17,7 @@ package database import ( + "errors" "fmt" "github.com/go-redis/redis/v8" @@ -25,6 +26,7 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/models" + "d7y.io/dragonfly/v2/manager/types" schedulerconfig "d7y.io/dragonfly/v2/scheduler/config" ) @@ -93,6 +95,7 @@ func migrate(db *gorm.DB) error { } func seed(cfg *config.Config, db *gorm.DB) error { + // Create default scheduler cluster. var schedulerClusterCount int64 if err := db.Model(models.SchedulerCluster{}).Count(&schedulerClusterCount).Error; err != nil { return err @@ -119,6 +122,7 @@ func seed(cfg *config.Config, db *gorm.DB) error { } } + // Create default seed peer cluster. var seedPeerClusterCount int64 if err := db.Model(models.SeedPeerCluster{}).Count(&seedPeerClusterCount).Error; err != nil { return err @@ -153,5 +157,27 @@ func seed(cfg *config.Config, db *gorm.DB) error { } } + // TODO Compatible with old version. + // Update scheduler features when features is NULL. + var schedulers []models.Scheduler + if err := db.Model(models.Scheduler{}).Find(&schedulers).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil + } + + return err + } + + for _, scheduler := range schedulers { + if scheduler.Features == nil { + if err := db.Model(&scheduler).Update("features", models.Array(types.DefaultSchedulerFeatures)).Error; err != nil { + logger.Errorf("update scheduler %d features: %s", scheduler.ID, err.Error()) + continue + } + + logger.Infof("update scheduler %d default features", scheduler.ID) + } + } + return nil } diff --git a/manager/models/models.go b/manager/models/models.go index 231639b333e..45bf793f957 100644 --- a/manager/models/models.go +++ b/manager/models/models.go @@ -42,10 +42,7 @@ func Paginate(page, perPage int) func(db *gorm.DB) *gorm.DB { } } -type ( - JSONMap map[string]any - Array []string -) +type JSONMap map[string]any func (m JSONMap) Value() (driver.Value, error) { if m == nil { @@ -55,14 +52,6 @@ func (m JSONMap) Value() (driver.Value, error) { return string(ba), err } -func (a Array) Value() (driver.Value, error) { - if a == nil { - return nil, nil - } - ba, err := a.MarshalJSON() - return string(ba), err -} - func (m *JSONMap) Scan(val any) error { var ba []byte switch v := val.(type) { @@ -79,6 +68,39 @@ func (m *JSONMap) Scan(val any) error { return err } +func (m JSONMap) MarshalJSON() ([]byte, error) { + if m == nil { + return []byte("null"), nil + } + t := (map[string]any)(m) + return json.Marshal(t) +} + +func (m *JSONMap) UnmarshalJSON(b []byte) error { + t := map[string]any{} + err := json.Unmarshal(b, &t) + *m = JSONMap(t) + return err +} + +func (m JSONMap) GormDataType() string { + return "jsonmap" +} + +func (JSONMap) GormDBDataType(db *gorm.DB, field *schema.Field) string { + return "text" +} + +type Array []string + +func (a Array) Value() (driver.Value, error) { + if a == nil { + return nil, nil + } + ba, err := a.MarshalJSON() + return string(ba), err +} + func (a *Array) Scan(val any) error { var ba []byte switch v := val.(type) { @@ -95,14 +117,6 @@ func (a *Array) Scan(val any) error { return err } -func (m JSONMap) MarshalJSON() ([]byte, error) { - if m == nil { - return []byte("null"), nil - } - t := (map[string]any)(m) - return json.Marshal(t) -} - func (a Array) MarshalJSON() ([]byte, error) { if a == nil { return []byte("null"), nil @@ -111,13 +125,6 @@ func (a Array) MarshalJSON() ([]byte, error) { return json.Marshal(t) } -func (m *JSONMap) UnmarshalJSON(b []byte) error { - t := map[string]any{} - err := json.Unmarshal(b, &t) - *m = JSONMap(t) - return err -} - func (a *Array) UnmarshalJSON(b []byte) error { t := []string{} err := json.Unmarshal(b, &t) @@ -125,14 +132,6 @@ func (a *Array) UnmarshalJSON(b []byte) error { return err } -func (m JSONMap) GormDataType() string { - return "jsonmap" -} - -func (JSONMap) GormDBDataType(db *gorm.DB, field *schema.Field) string { - return "text" -} - func (Array) GormDataType() string { return "array" } diff --git a/manager/models/scheduler.go b/manager/models/scheduler.go index ee7bd0e3284..2d78c2ce62b 100644 --- a/manager/models/scheduler.go +++ b/manager/models/scheduler.go @@ -32,7 +32,7 @@ type Scheduler struct { IP string `gorm:"column:ip;type:varchar(256);not null;comment:ip address" json:"ip"` Port int32 `gorm:"column:port;not null;comment:grpc service listening port" json:"port"` State string `gorm:"column:state;type:varchar(256);default:'inactive';comment:service state" json:"state"` - Features JSONMap `gorm:"column:features;comment:feature flags" json:"features"` + Features Array `gorm:"column:features;comment:feature flags" json:"features"` SchedulerClusterID uint `gorm:"index:uk_scheduler,unique;not null;comment:scheduler cluster id"` SchedulerCluster SchedulerCluster `json:"-"` Models []Model `json:"-"` diff --git a/manager/rpcserver/manager_server_v1.go b/manager/rpcserver/manager_server_v1.go index 191afeb96f1..51c7a50f1b0 100644 --- a/manager/rpcserver/manager_server_v1.go +++ b/manager/rpcserver/manager_server_v1.go @@ -43,7 +43,7 @@ import ( "d7y.io/dragonfly/v2/manager/types" pkgcache "d7y.io/dragonfly/v2/pkg/cache" "d7y.io/dragonfly/v2/pkg/objectstorage" - "d7y.io/dragonfly/v2/pkg/structure" + "d7y.io/dragonfly/v2/pkg/slices" ) // managerServerV1 is v1 version of the manager grpc server. @@ -125,6 +125,12 @@ func (s *managerServerV1) GetSeedPeer(ctx context.Context, req *managerv1.GetSee var pbSchedulers []*managerv1.Scheduler for _, schedulerCluster := range seedPeer.SeedPeerCluster.SchedulerClusters { for _, scheduler := range schedulerCluster.Schedulers { + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + pbSchedulers = append(pbSchedulers, &managerv1.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -133,6 +139,7 @@ func (s *managerServerV1) GetSeedPeer(ctx context.Context, req *managerv1.GetSee Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, }) } } @@ -416,25 +423,20 @@ func (s *managerServerV1) UpdateScheduler(ctx context.Context, req *managerv1.Up Ip: scheduler.IP, Port: scheduler.Port, Features: features, - SchedulerClusterId: uint64(scheduler.SchedulerClusterID), State: scheduler.State, + SchedulerClusterId: uint64(scheduler.SchedulerClusterID), }, nil } // Create scheduler and associate cluster. func (s *managerServerV1) createScheduler(ctx context.Context, req *managerv1.UpdateSchedulerRequest) (*managerv1.Scheduler, error) { - features, err := structure.StructToMap(types.DefaultSchedulerFeatures) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - scheduler := models.Scheduler{ Hostname: req.Hostname, IDC: req.Idc, Location: req.Location, IP: req.Ip, Port: req.Port, - Features: features, + Features: types.DefaultSchedulerFeatures, SchedulerClusterID: uint(req.SchedulerClusterId), } @@ -442,6 +444,12 @@ func (s *managerServerV1) createScheduler(ctx context.Context, req *managerv1.Up return nil, status.Error(codes.Internal, err.Error()) } + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + return &managerv1.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -450,6 +458,7 @@ func (s *managerServerV1) createScheduler(ctx context.Context, req *managerv1.Up Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, SchedulerClusterId: uint64(scheduler.SchedulerClusterID), }, nil } @@ -484,13 +493,30 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis return &pbListSchedulersResponse, nil } - // Cache miss. + // Cache miss and search scheduler cluster. var schedulerClusters []models.SchedulerCluster if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("SeedPeerClusters.SeedPeers", "state = ?", "active"). Preload("Schedulers", "state = ?", "active").Find(&schedulerClusters).Error; err != nil { return nil, status.Error(codes.Internal, err.Error()) } - log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(schedulerClusters), req.HostInfo) + + // Remove schedulers which not have scehdule featrue. As OceanBase does not support JSON type, + // it is not possible to use datatypes.JSONQuery for filtering. + var tmpSchedulerClusters []models.SchedulerCluster + for _, schedulerCluster := range schedulerClusters { + var tmpSchedulers []models.Scheduler + for _, scheduler := range schedulerCluster.Schedulers { + if slices.Contains(scheduler.Features, types.SchedulerFeatureSchedule) { + tmpSchedulers = append(tmpSchedulers, scheduler) + } + } + + if len(tmpSchedulers) != 0 { + schedulerCluster.Schedulers = tmpSchedulers + tmpSchedulerClusters = append(tmpSchedulerClusters, schedulerCluster) + } + } + log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(tmpSchedulerClusters), req.HostInfo) // Search optimal scheduler clusters. // If searcher can not found candidate scheduler cluster, @@ -499,13 +525,13 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis candidateSchedulerClusters []models.SchedulerCluster err error ) - candidateSchedulerClusters, err = s.searcher.FindSchedulerClusters(ctx, schedulerClusters, req.Hostname, req.Ip, req.HostInfo, logger.CoreLogger) + candidateSchedulerClusters, err = s.searcher.FindSchedulerClusters(ctx, tmpSchedulerClusters, req.Hostname, req.Ip, req.HostInfo, logger.CoreLogger) if err != nil { log.Error(err) metrics.SearchSchedulerClusterFailureCount.WithLabelValues(req.Version, req.Commit).Inc() candidateSchedulerClusters = schedulerClusters } - log.Debugf("find matching scheduler cluster %v", getSchedulerClusterNames(schedulerClusters)) + log.Debugf("find matching scheduler cluster %v", getSchedulerClusterNames(candidateSchedulerClusters)) schedulers := []models.Scheduler{} for _, candidateSchedulerCluster := range candidateSchedulerClusters { diff --git a/manager/rpcserver/manager_server_v2.go b/manager/rpcserver/manager_server_v2.go index e8af1f2f845..be0209b891a 100644 --- a/manager/rpcserver/manager_server_v2.go +++ b/manager/rpcserver/manager_server_v2.go @@ -43,6 +43,7 @@ import ( "d7y.io/dragonfly/v2/manager/types" pkgcache "d7y.io/dragonfly/v2/pkg/cache" "d7y.io/dragonfly/v2/pkg/objectstorage" + "d7y.io/dragonfly/v2/pkg/slices" ) // managerServerV2 is v2 version of the manager grpc server. @@ -124,6 +125,12 @@ func (s *managerServerV2) GetSeedPeer(ctx context.Context, req *managerv2.GetSee var pbSchedulers []*managerv2.Scheduler for _, schedulerCluster := range seedPeer.SeedPeerCluster.SchedulerClusters { for _, scheduler := range schedulerCluster.Schedulers { + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + pbSchedulers = append(pbSchedulers, &managerv2.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -132,6 +139,7 @@ func (s *managerServerV2) GetSeedPeer(ctx context.Context, req *managerv2.GetSee Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, }) } } @@ -328,6 +336,12 @@ func (s *managerServerV2) GetScheduler(ctx context.Context, req *managerv2.GetSc } } + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + // Construct scheduler. pbScheduler = managerv2.Scheduler{ Id: uint64(scheduler.ID), @@ -337,6 +351,7 @@ func (s *managerServerV2) GetScheduler(ctx context.Context, req *managerv2.GetSc Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, SchedulerClusterId: uint64(scheduler.SchedulerClusterID), SchedulerCluster: &managerv2.SchedulerCluster{ Id: uint64(scheduler.SchedulerCluster.ID), @@ -394,6 +409,12 @@ func (s *managerServerV2) UpdateScheduler(ctx context.Context, req *managerv2.Up log.Warn(err) } + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + return &managerv2.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -401,8 +422,9 @@ func (s *managerServerV2) UpdateScheduler(ctx context.Context, req *managerv2.Up Location: scheduler.Location, Ip: scheduler.IP, Port: scheduler.Port, - SchedulerClusterId: uint64(scheduler.SchedulerClusterID), + Features: features, State: scheduler.State, + SchedulerClusterId: uint64(scheduler.SchedulerClusterID), }, nil } @@ -414,6 +436,7 @@ func (s *managerServerV2) createScheduler(ctx context.Context, req *managerv2.Up Location: req.Location, IP: req.Ip, Port: req.Port, + Features: types.DefaultSchedulerFeatures, SchedulerClusterID: uint(req.SchedulerClusterId), } @@ -421,6 +444,12 @@ func (s *managerServerV2) createScheduler(ctx context.Context, req *managerv2.Up return nil, status.Error(codes.Internal, err.Error()) } + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + return &managerv2.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -429,6 +458,7 @@ func (s *managerServerV2) createScheduler(ctx context.Context, req *managerv2.Up Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, SchedulerClusterId: uint64(scheduler.SchedulerClusterID), }, nil } @@ -463,12 +493,29 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis return &pbListSchedulersResponse, nil } - // Cache miss. + // Cache miss and search scheduler cluster. var schedulerClusters []models.SchedulerCluster if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("SeedPeerClusters.SeedPeers", "state = ?", "active").Preload("Schedulers", "state = ?", "active").Find(&schedulerClusters).Error; err != nil { return nil, status.Error(codes.Internal, err.Error()) } - log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(schedulerClusters), req.HostInfo) + + // Remove schedulers which not have scehdule featrue. As OceanBase does not support JSON type, + // it is not possible to use datatypes.JSONQuery for filtering. + var tmpSchedulerClusters []models.SchedulerCluster + for _, schedulerCluster := range schedulerClusters { + var tmpSchedulers []models.Scheduler + for _, scheduler := range schedulerCluster.Schedulers { + if slices.Contains(scheduler.Features, types.SchedulerFeatureSchedule) { + tmpSchedulers = append(tmpSchedulers, scheduler) + } + } + + if len(tmpSchedulers) != 0 { + schedulerCluster.Schedulers = tmpSchedulers + tmpSchedulerClusters = append(tmpSchedulerClusters, schedulerCluster) + } + } + log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(tmpSchedulerClusters), req.HostInfo) // Search optimal scheduler clusters. // If searcher can not found candidate scheduler cluster, @@ -477,13 +524,13 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis candidateSchedulerClusters []models.SchedulerCluster err error ) - candidateSchedulerClusters, err = s.searcher.FindSchedulerClusters(ctx, schedulerClusters, req.Hostname, req.Ip, req.HostInfo, logger.CoreLogger) + candidateSchedulerClusters, err = s.searcher.FindSchedulerClusters(ctx, tmpSchedulerClusters, req.Hostname, req.Ip, req.HostInfo, logger.CoreLogger) if err != nil { log.Error(err) metrics.SearchSchedulerClusterFailureCount.WithLabelValues(req.Version, req.Commit).Inc() candidateSchedulerClusters = schedulerClusters } - log.Debugf("find matching scheduler cluster %v", getSchedulerClusterNames(schedulerClusters)) + log.Debugf("find matching scheduler cluster %v", getSchedulerClusterNames(candidateSchedulerClusters)) schedulers := []models.Scheduler{} for _, candidateSchedulerCluster := range candidateSchedulerClusters { @@ -514,6 +561,12 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis } } + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + pbListSchedulersResponse.Schedulers = append(pbListSchedulersResponse.Schedulers, &managerv2.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -522,6 +575,7 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, SchedulerClusterId: uint64(scheduler.SchedulerClusterID), SeedPeers: seedPeers, }) diff --git a/manager/service/job.go b/manager/service/job.go index d2d8b7e598c..5b1d3125414 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -27,53 +27,26 @@ import ( "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/manager/types" "d7y.io/dragonfly/v2/pkg/retry" + "d7y.io/dragonfly/v2/pkg/slices" "d7y.io/dragonfly/v2/pkg/structure" ) func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) { - var schedulers []models.Scheduler - var schedulerClusters []models.SchedulerCluster - - if len(json.SchedulerClusterIDs) != 0 { - for _, schedulerClusterID := range json.SchedulerClusterIDs { - schedulerCluster := models.SchedulerCluster{} - if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil { - return nil, err - } - schedulerClusters = append(schedulerClusters, schedulerCluster) - - scheduler := models.Scheduler{} - if err := s.db.WithContext(ctx).First(&scheduler, models.Scheduler{ - SchedulerClusterID: schedulerCluster.ID, - State: models.SchedulerStateActive, - }).Error; err != nil { - return nil, err - } - schedulers = append(schedulers, scheduler) - } - } else { - if err := s.db.WithContext(ctx).Find(&schedulerClusters).Error; err != nil { - return nil, err - } - - for _, schedulerCluster := range schedulerClusters { - scheduler := models.Scheduler{} - if err := s.db.WithContext(ctx).First(&scheduler, models.Scheduler{ - SchedulerClusterID: schedulerCluster.ID, - State: models.SchedulerStateActive, - }).Error; err != nil { - continue - } - - schedulers = append(schedulers, scheduler) - } + candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs) + if err != nil { + return nil, err } - groupJobState, err := s.job.CreatePreheat(ctx, schedulers, json.Args) + groupJobState, err := s.job.CreatePreheat(ctx, candidateSchedulers, json.Args) if err != nil { return nil, err } + var candidateSchedulerClusters []models.SchedulerCluster + for _, candidateScheduler := range candidateSchedulers { + candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster) + } + args, err := structure.StructToMap(json.Args) if err != nil { return nil, err @@ -86,7 +59,7 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat State: groupJobState.State, Args: args, UserID: json.UserID, - SchedulerClusters: schedulerClusters, + SchedulerClusters: candidateSchedulerClusters, } if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { @@ -98,6 +71,65 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat return &job, nil } +func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) { + var candidateSchedulers []models.Scheduler + if len(schedulerClusterIDs) != 0 { + // Find the scheduler clusters by request. + for _, schedulerClusterID := range schedulerClusterIDs { + schedulerCluster := models.SchedulerCluster{} + if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil { + return nil, err + } + + var schedulers []models.Scheduler + if err := s.db.WithContext(ctx).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{ + SchedulerClusterID: schedulerCluster.ID, + State: models.SchedulerStateActive, + }).Error; err != nil { + return nil, err + } + + // Scan the schedulers to find the first scheduler that supports preheat. + for _, scheduler := range schedulers { + if slices.Contains(scheduler.Features, types.SchedulerFeaturePreheat) { + candidateSchedulers = append(candidateSchedulers, scheduler) + break + } + } + } + } else { + // Find all of the scheduler clusters that has active schedulers. + var candidateSchedulerClusters []models.SchedulerCluster + if err := s.db.WithContext(ctx).Find(&candidateSchedulerClusters).Error; err != nil { + return nil, err + } + + for _, candidateSchedulerCluster := range candidateSchedulerClusters { + var schedulers []models.Scheduler + if err := s.db.WithContext(ctx).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{ + SchedulerClusterID: candidateSchedulerCluster.ID, + State: models.SchedulerStateActive, + }).Error; err != nil { + continue + } + + // Scan the schedulers to find the first scheduler that supports preheat. + for _, scheduler := range schedulers { + if slices.Contains(scheduler.Features, types.SchedulerFeaturePreheat) { + candidateSchedulers = append(candidateSchedulers, scheduler) + break + } + } + } + } + + if len(candidateSchedulers) == 0 { + return nil, errors.New("candidate schedulers not found") + } + + return candidateSchedulers, nil +} + func (s *service) pollingJob(ctx context.Context, id uint, groupID string) { var ( job models.Job diff --git a/manager/service/scheduler.go b/manager/service/scheduler.go index 873825c6f84..e872753f7b3 100644 --- a/manager/service/scheduler.go +++ b/manager/service/scheduler.go @@ -21,18 +21,12 @@ import ( "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/manager/types" - "d7y.io/dragonfly/v2/pkg/structure" ) func (s *service) CreateScheduler(ctx context.Context, json types.CreateSchedulerRequest) (*models.Scheduler, error) { - rawFeatures := types.DefaultSchedulerFeatures + features := types.DefaultSchedulerFeatures if json.Features != nil { - rawFeatures = json.Features - } - - features, err := structure.StructToMap(rawFeatures) - if err != nil { - return nil, err + features = json.Features } scheduler := models.Scheduler{ @@ -66,24 +60,13 @@ func (s *service) DestroyScheduler(ctx context.Context, id uint) error { } func (s *service) UpdateScheduler(ctx context.Context, id uint, json types.UpdateSchedulerRequest) (*models.Scheduler, error) { - var ( - features map[string]any - err error - ) - if json.Features != nil { - features, err = structure.StructToMap(json.Features) - if err != nil { - return nil, err - } - } - scheduler := models.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, id).Updates(models.Scheduler{ IDC: json.IDC, Location: json.Location, IP: json.IP, Port: json.Port, - Features: features, + Features: json.Features, SchedulerClusterID: json.SchedulerClusterID, }).Error; err != nil { return nil, err diff --git a/manager/types/scheduler.go b/manager/types/scheduler.go index cda3f9080b2..31f2b9e805d 100644 --- a/manager/types/scheduler.go +++ b/manager/types/scheduler.go @@ -16,12 +16,17 @@ package types +const ( + // SchedulerFeatureSchedule is the schedule feature of scheduler. + SchedulerFeatureSchedule = "schedule" + + // SchedulerFeaturePreheat is the preheat feature of scheduler. + SchedulerFeaturePreheat = "preheat" +) + var ( // DefaultSchedulerFeatures is the default features of scheduler. - DefaultSchedulerFeatures = &SchedulerFeatures{ - Schedule: true, - Preheat: true, - } + DefaultSchedulerFeatures = []string{SchedulerFeatureSchedule, SchedulerFeaturePreheat} ) type SchedulerParams struct { @@ -29,23 +34,23 @@ type SchedulerParams struct { } type CreateSchedulerRequest struct { - Hostname string `json:"host_name" binding:"required"` - IDC string `json:"idc" binding:"omitempty"` - Location string `json:"location" binding:"omitempty"` - IP string `json:"ip" binding:"required"` - Port int32 `json:"port" binding:"required"` - Features *SchedulerFeatures `json:"features" binding:"omitempty"` - SchedulerClusterID uint `json:"scheduler_cluster_id" binding:"required"` + Hostname string `json:"host_name" binding:"required"` + IDC string `json:"idc" binding:"omitempty"` + Location string `json:"location" binding:"omitempty"` + IP string `json:"ip" binding:"required"` + Port int32 `json:"port" binding:"required"` + Features []string `json:"features" binding:"omitempty"` + SchedulerClusterID uint `json:"scheduler_cluster_id" binding:"required"` } type UpdateSchedulerRequest struct { - IDC string `json:"idc" binding:"omitempty"` - Location string `json:"location" binding:"omitempty"` - IP string `json:"ip" binding:"omitempty"` - Port int32 `json:"port" binding:"omitempty"` - SchedulerID uint `json:"scheduler_id" binding:"omitempty"` - Features *SchedulerFeatures `json:"features" binding:"omitempty"` - SchedulerClusterID uint `json:"scheduler_cluster_id" binding:"omitempty"` + IDC string `json:"idc" binding:"omitempty"` + Location string `json:"location" binding:"omitempty"` + IP string `json:"ip" binding:"omitempty"` + Port int32 `json:"port" binding:"omitempty"` + SchedulerID uint `json:"scheduler_id" binding:"omitempty"` + Features []string `json:"features" binding:"omitempty"` + SchedulerClusterID uint `json:"scheduler_cluster_id" binding:"omitempty"` } type GetSchedulersQuery struct { @@ -58,8 +63,3 @@ type GetSchedulersQuery struct { State string `form:"state" binding:"omitempty,oneof=active inactive"` SchedulerClusterID uint `form:"scheduler_cluster_id" binding:"omitempty"` } - -type SchedulerFeatures struct { - Schedule bool `yaml:"schedule" mapstructure:"schedule" json:"schedule" binding:"required"` - Preheat bool `yaml:"preheat" mapstructure:"preheat" json:"preheat" binding:"required"` -}