From 38ba453e0852fba74f56db99452cbc5a6b35af61 Mon Sep 17 00:00:00 2001 From: Gaius <gaius.qi@gmail.com> Date: Fri, 31 Mar 2023 20:13:59 +0800 Subject: [PATCH] feat: if the scheduler feature is not in feature flags, then it will stop providing the featrue (#2234) Signed-off-by: Gaius <gaius.qi@gmail.com> --- go.mod | 9 +-- go.sum | 24 +++--- manager/database/database.go | 26 ++++++ manager/models/models.go | 69 ++++++++-------- manager/models/scheduler.go | 1 + manager/rpcserver/manager_server_v1.go | 66 +++++++++++++-- manager/rpcserver/manager_server_v2.go | 64 +++++++++++++-- manager/service/job.go | 108 ++++++++++++++++--------- manager/service/scheduler.go | 7 ++ manager/types/scheduler.go | 39 ++++++--- 10 files changed, 299 insertions(+), 114 deletions(-) diff --git a/go.mod b/go.mod index 4277365e145..1824acaa8be 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.20 require ( - d7y.io/api v1.8.3 + d7y.io/api v1.8.5 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 github.com/VividCortex/mysqlerr v1.0.0 @@ -78,7 +78,7 @@ require ( golang.org/x/sys v0.6.0 golang.org/x/time v0.3.0 google.golang.org/api v0.114.0 - google.golang.org/grpc v1.53.0 + google.golang.org/grpc v1.55.0-dev google.golang.org/protobuf v1.30.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v3 v3.0.1 @@ -108,7 +108,7 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/envoyproxy/protoc-gen-validate v0.9.1 // indirect + github.com/envoyproxy/protoc-gen-validate v0.10.1 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-echarts/go-echarts/v2 v2.2.4 // indirect @@ -204,12 +204,11 @@ require ( golang.org/x/net v0.8.0 // indirect golang.org/x/term v0.6.0 // indirect golang.org/x/text v0.8.0 // indirect - golang.org/x/tools v0.6.0 // indirect + golang.org/x/tools v0.7.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gorm.io/driver/sqlite v1.4.3 // indirect gorm.io/driver/sqlserver v1.4.1 // indirect gorm.io/plugin/dbresolver v1.3.0 // indirect k8s.io/apimachinery v0.26.0 // indirect diff --git a/go.sum b/go.sum index 097d207c51c..2d50771ad87 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api v1.8.3 h1:D1WyaqjWv+Vvu04SvfXIXNbfi+W/Yx0dE4mC3o8XAWk= -d7y.io/api v1.8.3/go.mod h1:xMezpFrEljSfy/LINGkqg07BC2hzXqfTg7pzq5PIIZ8= +d7y.io/api v1.8.5 h1:8LnSRrXiEY6XODK+GTgkrgBZkmONhLcyfDlarWpDrfw= +d7y.io/api v1.8.5/go.mod h1:HIJMfhqiBHJ0yNVuOASQe6X0IVzOkxdLiWzcMM0xo2c= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= @@ -230,8 +230,8 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/envoyproxy/protoc-gen-validate v0.9.1 h1:PS7VIOgmSVhWUEeZwTe7z7zouA22Cr590PzXKbZHOVY= -github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= +github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8= +github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= @@ -744,9 +744,8 @@ github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWV github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/mattn/go-sqlite3 v1.14.3 h1:j7a/xn1U6TKA/PHHxqZuzh64CdtRc7rU9M+AvkOl5bA= github.com/mattn/go-sqlite3 v1.14.3/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI= -github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= -github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2 h1:hAHbPm5IJGijwng3PWk09JkG9WeqChjprR5s9bBZ+OM= github.com/matttproud/golang_protobuf_extensions v1.0.2/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= @@ -1253,7 +1252,7 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1-0.20210830214625-1b1db11ec8f4/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= +golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1539,8 +1538,8 @@ golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= +golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1649,8 +1648,8 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5 google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= -google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc= -google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= +google.golang.org/grpc v1.55.0-dev h1:b3WG8LoyS+X/C5ZbIWsJGjt8Hhqq0wUVX8+rPF/BHZo= +google.golang.org/grpc v1.55.0-dev/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -1715,9 +1714,8 @@ gorm.io/driver/mysql v1.4.7/go.mod h1:SxzItlnT1cb6e1e4ZRpgJN2VYtcqJgqnHxWr4wsP8o gorm.io/driver/postgres v1.2.2/go.mod h1:Ik3tK+a3FMp8ORZl29v4b3M0RsgXsaeMXh9s9eVMXco= gorm.io/driver/postgres v1.5.0 h1:u2FXTy14l45qc3UeCJ7QaAXZmZfDDv0YrthvmRq1l0U= gorm.io/driver/postgres v1.5.0/go.mod h1:FUZXzO+5Uqg5zzwzv4KK49R8lvGIyscBOqYrtI1Ce9A= +gorm.io/driver/sqlite v1.1.3 h1:BYfdVuZB5He/u9dt4qDpZqiqDJ6KhPqs5QUqsr/Eeuc= gorm.io/driver/sqlite v1.1.3/go.mod h1:AKDgRWk8lcSQSw+9kxCJnX/yySj8G3rdwYlU57cB45c= -gorm.io/driver/sqlite v1.4.3 h1:HBBcZSDnWi5BW3B3rwvVTc510KGkBkexlOg0QrmLUuU= -gorm.io/driver/sqlite v1.4.3/go.mod h1:0Aq3iPO+v9ZKbcdiz8gLWRw5VOPcBOPUQJFLq5e2ecI= gorm.io/driver/sqlserver v1.2.1/go.mod h1:nixq0OB3iLXZDiPv6JSOjWuPgpyaRpOIIevYtA4Ulb4= gorm.io/driver/sqlserver v1.4.1 h1:t4r4r6Jam5E6ejqP7N82qAJIJAht27EGT41HyPfXRw0= gorm.io/driver/sqlserver v1.4.1/go.mod h1:DJ4P+MeZbc5rvY58PnmN1Lnyvb5gw5NPzGshHDnJLig= diff --git a/manager/database/database.go b/manager/database/database.go index 3ea659b064b..bf0e6c5bdae 100644 --- a/manager/database/database.go +++ b/manager/database/database.go @@ -17,6 +17,7 @@ package database import ( + "errors" "fmt" "github.com/go-redis/redis/v8" @@ -25,6 +26,7 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/models" + "d7y.io/dragonfly/v2/manager/types" schedulerconfig "d7y.io/dragonfly/v2/scheduler/config" ) @@ -93,6 +95,7 @@ func migrate(db *gorm.DB) error { } func seed(cfg *config.Config, db *gorm.DB) error { + // Create default scheduler cluster. var schedulerClusterCount int64 if err := db.Model(models.SchedulerCluster{}).Count(&schedulerClusterCount).Error; err != nil { return err @@ -119,6 +122,7 @@ func seed(cfg *config.Config, db *gorm.DB) error { } } + // Create default seed peer cluster. var seedPeerClusterCount int64 if err := db.Model(models.SeedPeerCluster{}).Count(&seedPeerClusterCount).Error; err != nil { return err @@ -153,5 +157,27 @@ func seed(cfg *config.Config, db *gorm.DB) error { } } + // TODO Compatible with old version. + // Update scheduler features when features is NULL. + var schedulers []models.Scheduler + if err := db.Model(models.Scheduler{}).Find(&schedulers).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil + } + + return err + } + + for _, scheduler := range schedulers { + if scheduler.Features == nil { + if err := db.Model(&scheduler).Update("features", models.Array(types.DefaultSchedulerFeatures)).Error; err != nil { + logger.Errorf("update scheduler %d features: %s", scheduler.ID, err.Error()) + continue + } + + logger.Infof("update scheduler %d default features", scheduler.ID) + } + } + return nil } diff --git a/manager/models/models.go b/manager/models/models.go index 231639b333e..45bf793f957 100644 --- a/manager/models/models.go +++ b/manager/models/models.go @@ -42,10 +42,7 @@ func Paginate(page, perPage int) func(db *gorm.DB) *gorm.DB { } } -type ( - JSONMap map[string]any - Array []string -) +type JSONMap map[string]any func (m JSONMap) Value() (driver.Value, error) { if m == nil { @@ -55,14 +52,6 @@ func (m JSONMap) Value() (driver.Value, error) { return string(ba), err } -func (a Array) Value() (driver.Value, error) { - if a == nil { - return nil, nil - } - ba, err := a.MarshalJSON() - return string(ba), err -} - func (m *JSONMap) Scan(val any) error { var ba []byte switch v := val.(type) { @@ -79,6 +68,39 @@ func (m *JSONMap) Scan(val any) error { return err } +func (m JSONMap) MarshalJSON() ([]byte, error) { + if m == nil { + return []byte("null"), nil + } + t := (map[string]any)(m) + return json.Marshal(t) +} + +func (m *JSONMap) UnmarshalJSON(b []byte) error { + t := map[string]any{} + err := json.Unmarshal(b, &t) + *m = JSONMap(t) + return err +} + +func (m JSONMap) GormDataType() string { + return "jsonmap" +} + +func (JSONMap) GormDBDataType(db *gorm.DB, field *schema.Field) string { + return "text" +} + +type Array []string + +func (a Array) Value() (driver.Value, error) { + if a == nil { + return nil, nil + } + ba, err := a.MarshalJSON() + return string(ba), err +} + func (a *Array) Scan(val any) error { var ba []byte switch v := val.(type) { @@ -95,14 +117,6 @@ func (a *Array) Scan(val any) error { return err } -func (m JSONMap) MarshalJSON() ([]byte, error) { - if m == nil { - return []byte("null"), nil - } - t := (map[string]any)(m) - return json.Marshal(t) -} - func (a Array) MarshalJSON() ([]byte, error) { if a == nil { return []byte("null"), nil @@ -111,13 +125,6 @@ func (a Array) MarshalJSON() ([]byte, error) { return json.Marshal(t) } -func (m *JSONMap) UnmarshalJSON(b []byte) error { - t := map[string]any{} - err := json.Unmarshal(b, &t) - *m = JSONMap(t) - return err -} - func (a *Array) UnmarshalJSON(b []byte) error { t := []string{} err := json.Unmarshal(b, &t) @@ -125,14 +132,6 @@ func (a *Array) UnmarshalJSON(b []byte) error { return err } -func (m JSONMap) GormDataType() string { - return "jsonmap" -} - -func (JSONMap) GormDBDataType(db *gorm.DB, field *schema.Field) string { - return "text" -} - func (Array) GormDataType() string { return "array" } diff --git a/manager/models/scheduler.go b/manager/models/scheduler.go index 15cf89bbd78..2d78c2ce62b 100644 --- a/manager/models/scheduler.go +++ b/manager/models/scheduler.go @@ -32,6 +32,7 @@ type Scheduler struct { IP string `gorm:"column:ip;type:varchar(256);not null;comment:ip address" json:"ip"` Port int32 `gorm:"column:port;not null;comment:grpc service listening port" json:"port"` State string `gorm:"column:state;type:varchar(256);default:'inactive';comment:service state" json:"state"` + Features Array `gorm:"column:features;comment:feature flags" json:"features"` SchedulerClusterID uint `gorm:"index:uk_scheduler,unique;not null;comment:scheduler cluster id"` SchedulerCluster SchedulerCluster `json:"-"` Models []Model `json:"-"` diff --git a/manager/rpcserver/manager_server_v1.go b/manager/rpcserver/manager_server_v1.go index 28f747c85a1..51c7a50f1b0 100644 --- a/manager/rpcserver/manager_server_v1.go +++ b/manager/rpcserver/manager_server_v1.go @@ -43,6 +43,7 @@ import ( "d7y.io/dragonfly/v2/manager/types" pkgcache "d7y.io/dragonfly/v2/pkg/cache" "d7y.io/dragonfly/v2/pkg/objectstorage" + "d7y.io/dragonfly/v2/pkg/slices" ) // managerServerV1 is v1 version of the manager grpc server. @@ -124,6 +125,12 @@ func (s *managerServerV1) GetSeedPeer(ctx context.Context, req *managerv1.GetSee var pbSchedulers []*managerv1.Scheduler for _, schedulerCluster := range seedPeer.SeedPeerCluster.SchedulerClusters { for _, scheduler := range schedulerCluster.Schedulers { + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + pbSchedulers = append(pbSchedulers, &managerv1.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -132,6 +139,7 @@ func (s *managerServerV1) GetSeedPeer(ctx context.Context, req *managerv1.GetSee Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, }) } } @@ -328,6 +336,12 @@ func (s *managerServerV1) GetScheduler(ctx context.Context, req *managerv1.GetSc } } + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + // Construct scheduler. pbScheduler = managerv1.Scheduler{ Id: uint64(scheduler.ID), @@ -337,6 +351,7 @@ func (s *managerServerV1) GetScheduler(ctx context.Context, req *managerv1.GetSc Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, SchedulerClusterId: uint64(scheduler.SchedulerClusterID), SchedulerCluster: &managerv1.SchedulerCluster{ Id: uint64(scheduler.SchedulerCluster.ID), @@ -394,6 +409,12 @@ func (s *managerServerV1) UpdateScheduler(ctx context.Context, req *managerv1.Up log.Warn(err) } + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + return &managerv1.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -401,8 +422,9 @@ func (s *managerServerV1) UpdateScheduler(ctx context.Context, req *managerv1.Up Location: scheduler.Location, Ip: scheduler.IP, Port: scheduler.Port, - SchedulerClusterId: uint64(scheduler.SchedulerClusterID), + Features: features, State: scheduler.State, + SchedulerClusterId: uint64(scheduler.SchedulerClusterID), }, nil } @@ -414,6 +436,7 @@ func (s *managerServerV1) createScheduler(ctx context.Context, req *managerv1.Up Location: req.Location, IP: req.Ip, Port: req.Port, + Features: types.DefaultSchedulerFeatures, SchedulerClusterID: uint(req.SchedulerClusterId), } @@ -421,6 +444,12 @@ func (s *managerServerV1) createScheduler(ctx context.Context, req *managerv1.Up return nil, status.Error(codes.Internal, err.Error()) } + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + return &managerv1.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -429,6 +458,7 @@ func (s *managerServerV1) createScheduler(ctx context.Context, req *managerv1.Up Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, SchedulerClusterId: uint64(scheduler.SchedulerClusterID), }, nil } @@ -463,13 +493,30 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis return &pbListSchedulersResponse, nil } - // Cache miss. + // Cache miss and search scheduler cluster. var schedulerClusters []models.SchedulerCluster - if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("SeedPeerClusters.SeedPeers", "state = ?", "active").Preload("Schedulers", "state = ?", "active").Find(&schedulerClusters).Error; err != nil { + if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("SeedPeerClusters.SeedPeers", "state = ?", "active"). + Preload("Schedulers", "state = ?", "active").Find(&schedulerClusters).Error; err != nil { return nil, status.Error(codes.Internal, err.Error()) } - log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(schedulerClusters), req.HostInfo) + // Remove schedulers which not have scehdule featrue. As OceanBase does not support JSON type, + // it is not possible to use datatypes.JSONQuery for filtering. + var tmpSchedulerClusters []models.SchedulerCluster + for _, schedulerCluster := range schedulerClusters { + var tmpSchedulers []models.Scheduler + for _, scheduler := range schedulerCluster.Schedulers { + if slices.Contains(scheduler.Features, types.SchedulerFeatureSchedule) { + tmpSchedulers = append(tmpSchedulers, scheduler) + } + } + + if len(tmpSchedulers) != 0 { + schedulerCluster.Schedulers = tmpSchedulers + tmpSchedulerClusters = append(tmpSchedulerClusters, schedulerCluster) + } + } + log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(tmpSchedulerClusters), req.HostInfo) // Search optimal scheduler clusters. // If searcher can not found candidate scheduler cluster, @@ -478,13 +525,13 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis candidateSchedulerClusters []models.SchedulerCluster err error ) - candidateSchedulerClusters, err = s.searcher.FindSchedulerClusters(ctx, schedulerClusters, req.Hostname, req.Ip, req.HostInfo, logger.CoreLogger) + candidateSchedulerClusters, err = s.searcher.FindSchedulerClusters(ctx, tmpSchedulerClusters, req.Hostname, req.Ip, req.HostInfo, logger.CoreLogger) if err != nil { log.Error(err) metrics.SearchSchedulerClusterFailureCount.WithLabelValues(req.Version, req.Commit).Inc() candidateSchedulerClusters = schedulerClusters } - log.Debugf("find matching scheduler cluster %v", getSchedulerClusterNames(schedulerClusters)) + log.Debugf("find matching scheduler cluster %v", getSchedulerClusterNames(candidateSchedulerClusters)) schedulers := []models.Scheduler{} for _, candidateSchedulerCluster := range candidateSchedulerClusters { @@ -515,6 +562,12 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis } } + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + pbListSchedulersResponse.Schedulers = append(pbListSchedulersResponse.Schedulers, &managerv1.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -523,6 +576,7 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, SchedulerClusterId: uint64(scheduler.SchedulerClusterID), SeedPeers: seedPeers, }) diff --git a/manager/rpcserver/manager_server_v2.go b/manager/rpcserver/manager_server_v2.go index e8af1f2f845..be0209b891a 100644 --- a/manager/rpcserver/manager_server_v2.go +++ b/manager/rpcserver/manager_server_v2.go @@ -43,6 +43,7 @@ import ( "d7y.io/dragonfly/v2/manager/types" pkgcache "d7y.io/dragonfly/v2/pkg/cache" "d7y.io/dragonfly/v2/pkg/objectstorage" + "d7y.io/dragonfly/v2/pkg/slices" ) // managerServerV2 is v2 version of the manager grpc server. @@ -124,6 +125,12 @@ func (s *managerServerV2) GetSeedPeer(ctx context.Context, req *managerv2.GetSee var pbSchedulers []*managerv2.Scheduler for _, schedulerCluster := range seedPeer.SeedPeerCluster.SchedulerClusters { for _, scheduler := range schedulerCluster.Schedulers { + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + pbSchedulers = append(pbSchedulers, &managerv2.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -132,6 +139,7 @@ func (s *managerServerV2) GetSeedPeer(ctx context.Context, req *managerv2.GetSee Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, }) } } @@ -328,6 +336,12 @@ func (s *managerServerV2) GetScheduler(ctx context.Context, req *managerv2.GetSc } } + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + // Construct scheduler. pbScheduler = managerv2.Scheduler{ Id: uint64(scheduler.ID), @@ -337,6 +351,7 @@ func (s *managerServerV2) GetScheduler(ctx context.Context, req *managerv2.GetSc Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, SchedulerClusterId: uint64(scheduler.SchedulerClusterID), SchedulerCluster: &managerv2.SchedulerCluster{ Id: uint64(scheduler.SchedulerCluster.ID), @@ -394,6 +409,12 @@ func (s *managerServerV2) UpdateScheduler(ctx context.Context, req *managerv2.Up log.Warn(err) } + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + return &managerv2.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -401,8 +422,9 @@ func (s *managerServerV2) UpdateScheduler(ctx context.Context, req *managerv2.Up Location: scheduler.Location, Ip: scheduler.IP, Port: scheduler.Port, - SchedulerClusterId: uint64(scheduler.SchedulerClusterID), + Features: features, State: scheduler.State, + SchedulerClusterId: uint64(scheduler.SchedulerClusterID), }, nil } @@ -414,6 +436,7 @@ func (s *managerServerV2) createScheduler(ctx context.Context, req *managerv2.Up Location: req.Location, IP: req.Ip, Port: req.Port, + Features: types.DefaultSchedulerFeatures, SchedulerClusterID: uint(req.SchedulerClusterId), } @@ -421,6 +444,12 @@ func (s *managerServerV2) createScheduler(ctx context.Context, req *managerv2.Up return nil, status.Error(codes.Internal, err.Error()) } + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + return &managerv2.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -429,6 +458,7 @@ func (s *managerServerV2) createScheduler(ctx context.Context, req *managerv2.Up Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, SchedulerClusterId: uint64(scheduler.SchedulerClusterID), }, nil } @@ -463,12 +493,29 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis return &pbListSchedulersResponse, nil } - // Cache miss. + // Cache miss and search scheduler cluster. var schedulerClusters []models.SchedulerCluster if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("SeedPeerClusters.SeedPeers", "state = ?", "active").Preload("Schedulers", "state = ?", "active").Find(&schedulerClusters).Error; err != nil { return nil, status.Error(codes.Internal, err.Error()) } - log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(schedulerClusters), req.HostInfo) + + // Remove schedulers which not have scehdule featrue. As OceanBase does not support JSON type, + // it is not possible to use datatypes.JSONQuery for filtering. + var tmpSchedulerClusters []models.SchedulerCluster + for _, schedulerCluster := range schedulerClusters { + var tmpSchedulers []models.Scheduler + for _, scheduler := range schedulerCluster.Schedulers { + if slices.Contains(scheduler.Features, types.SchedulerFeatureSchedule) { + tmpSchedulers = append(tmpSchedulers, scheduler) + } + } + + if len(tmpSchedulers) != 0 { + schedulerCluster.Schedulers = tmpSchedulers + tmpSchedulerClusters = append(tmpSchedulerClusters, schedulerCluster) + } + } + log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(tmpSchedulerClusters), req.HostInfo) // Search optimal scheduler clusters. // If searcher can not found candidate scheduler cluster, @@ -477,13 +524,13 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis candidateSchedulerClusters []models.SchedulerCluster err error ) - candidateSchedulerClusters, err = s.searcher.FindSchedulerClusters(ctx, schedulerClusters, req.Hostname, req.Ip, req.HostInfo, logger.CoreLogger) + candidateSchedulerClusters, err = s.searcher.FindSchedulerClusters(ctx, tmpSchedulerClusters, req.Hostname, req.Ip, req.HostInfo, logger.CoreLogger) if err != nil { log.Error(err) metrics.SearchSchedulerClusterFailureCount.WithLabelValues(req.Version, req.Commit).Inc() candidateSchedulerClusters = schedulerClusters } - log.Debugf("find matching scheduler cluster %v", getSchedulerClusterNames(schedulerClusters)) + log.Debugf("find matching scheduler cluster %v", getSchedulerClusterNames(candidateSchedulerClusters)) schedulers := []models.Scheduler{} for _, candidateSchedulerCluster := range candidateSchedulerClusters { @@ -514,6 +561,12 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis } } + // Marshal features of scheduler. + features, err := scheduler.Features.MarshalJSON() + if err != nil { + return nil, status.Error(codes.DataLoss, err.Error()) + } + pbListSchedulersResponse.Schedulers = append(pbListSchedulersResponse.Schedulers, &managerv2.Scheduler{ Id: uint64(scheduler.ID), Hostname: scheduler.Hostname, @@ -522,6 +575,7 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis Ip: scheduler.IP, Port: scheduler.Port, State: scheduler.State, + Features: features, SchedulerClusterId: uint64(scheduler.SchedulerClusterID), SeedPeers: seedPeers, }) diff --git a/manager/service/job.go b/manager/service/job.go index d2d8b7e598c..5b1d3125414 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -27,53 +27,26 @@ import ( "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/manager/types" "d7y.io/dragonfly/v2/pkg/retry" + "d7y.io/dragonfly/v2/pkg/slices" "d7y.io/dragonfly/v2/pkg/structure" ) func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) { - var schedulers []models.Scheduler - var schedulerClusters []models.SchedulerCluster - - if len(json.SchedulerClusterIDs) != 0 { - for _, schedulerClusterID := range json.SchedulerClusterIDs { - schedulerCluster := models.SchedulerCluster{} - if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil { - return nil, err - } - schedulerClusters = append(schedulerClusters, schedulerCluster) - - scheduler := models.Scheduler{} - if err := s.db.WithContext(ctx).First(&scheduler, models.Scheduler{ - SchedulerClusterID: schedulerCluster.ID, - State: models.SchedulerStateActive, - }).Error; err != nil { - return nil, err - } - schedulers = append(schedulers, scheduler) - } - } else { - if err := s.db.WithContext(ctx).Find(&schedulerClusters).Error; err != nil { - return nil, err - } - - for _, schedulerCluster := range schedulerClusters { - scheduler := models.Scheduler{} - if err := s.db.WithContext(ctx).First(&scheduler, models.Scheduler{ - SchedulerClusterID: schedulerCluster.ID, - State: models.SchedulerStateActive, - }).Error; err != nil { - continue - } - - schedulers = append(schedulers, scheduler) - } + candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs) + if err != nil { + return nil, err } - groupJobState, err := s.job.CreatePreheat(ctx, schedulers, json.Args) + groupJobState, err := s.job.CreatePreheat(ctx, candidateSchedulers, json.Args) if err != nil { return nil, err } + var candidateSchedulerClusters []models.SchedulerCluster + for _, candidateScheduler := range candidateSchedulers { + candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster) + } + args, err := structure.StructToMap(json.Args) if err != nil { return nil, err @@ -86,7 +59,7 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat State: groupJobState.State, Args: args, UserID: json.UserID, - SchedulerClusters: schedulerClusters, + SchedulerClusters: candidateSchedulerClusters, } if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { @@ -98,6 +71,65 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat return &job, nil } +func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) { + var candidateSchedulers []models.Scheduler + if len(schedulerClusterIDs) != 0 { + // Find the scheduler clusters by request. + for _, schedulerClusterID := range schedulerClusterIDs { + schedulerCluster := models.SchedulerCluster{} + if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil { + return nil, err + } + + var schedulers []models.Scheduler + if err := s.db.WithContext(ctx).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{ + SchedulerClusterID: schedulerCluster.ID, + State: models.SchedulerStateActive, + }).Error; err != nil { + return nil, err + } + + // Scan the schedulers to find the first scheduler that supports preheat. + for _, scheduler := range schedulers { + if slices.Contains(scheduler.Features, types.SchedulerFeaturePreheat) { + candidateSchedulers = append(candidateSchedulers, scheduler) + break + } + } + } + } else { + // Find all of the scheduler clusters that has active schedulers. + var candidateSchedulerClusters []models.SchedulerCluster + if err := s.db.WithContext(ctx).Find(&candidateSchedulerClusters).Error; err != nil { + return nil, err + } + + for _, candidateSchedulerCluster := range candidateSchedulerClusters { + var schedulers []models.Scheduler + if err := s.db.WithContext(ctx).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{ + SchedulerClusterID: candidateSchedulerCluster.ID, + State: models.SchedulerStateActive, + }).Error; err != nil { + continue + } + + // Scan the schedulers to find the first scheduler that supports preheat. + for _, scheduler := range schedulers { + if slices.Contains(scheduler.Features, types.SchedulerFeaturePreheat) { + candidateSchedulers = append(candidateSchedulers, scheduler) + break + } + } + } + } + + if len(candidateSchedulers) == 0 { + return nil, errors.New("candidate schedulers not found") + } + + return candidateSchedulers, nil +} + func (s *service) pollingJob(ctx context.Context, id uint, groupID string) { var ( job models.Job diff --git a/manager/service/scheduler.go b/manager/service/scheduler.go index c2493851a9d..e872753f7b3 100644 --- a/manager/service/scheduler.go +++ b/manager/service/scheduler.go @@ -24,12 +24,18 @@ import ( ) func (s *service) CreateScheduler(ctx context.Context, json types.CreateSchedulerRequest) (*models.Scheduler, error) { + features := types.DefaultSchedulerFeatures + if json.Features != nil { + features = json.Features + } + scheduler := models.Scheduler{ Hostname: json.Hostname, IDC: json.IDC, Location: json.Location, IP: json.IP, Port: json.Port, + Features: features, SchedulerClusterID: json.SchedulerClusterID, } @@ -60,6 +66,7 @@ func (s *service) UpdateScheduler(ctx context.Context, id uint, json types.Updat Location: json.Location, IP: json.IP, Port: json.Port, + Features: json.Features, SchedulerClusterID: json.SchedulerClusterID, }).Error; err != nil { return nil, err diff --git a/manager/types/scheduler.go b/manager/types/scheduler.go index 10c07d99e1d..31f2b9e805d 100644 --- a/manager/types/scheduler.go +++ b/manager/types/scheduler.go @@ -16,26 +16,41 @@ package types +const ( + // SchedulerFeatureSchedule is the schedule feature of scheduler. + SchedulerFeatureSchedule = "schedule" + + // SchedulerFeaturePreheat is the preheat feature of scheduler. + SchedulerFeaturePreheat = "preheat" +) + +var ( + // DefaultSchedulerFeatures is the default features of scheduler. + DefaultSchedulerFeatures = []string{SchedulerFeatureSchedule, SchedulerFeaturePreheat} +) + type SchedulerParams struct { ID uint `uri:"id" binding:"required"` } type CreateSchedulerRequest struct { - Hostname string `json:"host_name" binding:"required"` - IDC string `json:"idc" binding:"omitempty"` - Location string `json:"location" binding:"omitempty"` - IP string `json:"ip" binding:"required"` - Port int32 `json:"port" binding:"required"` - SchedulerClusterID uint `json:"scheduler_cluster_id" binding:"required"` + Hostname string `json:"host_name" binding:"required"` + IDC string `json:"idc" binding:"omitempty"` + Location string `json:"location" binding:"omitempty"` + IP string `json:"ip" binding:"required"` + Port int32 `json:"port" binding:"required"` + Features []string `json:"features" binding:"omitempty"` + SchedulerClusterID uint `json:"scheduler_cluster_id" binding:"required"` } type UpdateSchedulerRequest struct { - IDC string `json:"idc" binding:"omitempty"` - Location string `json:"location" binding:"omitempty"` - IP string `json:"ip" binding:"omitempty"` - Port int32 `json:"port" binding:"omitempty"` - SchedulerID uint `json:"scheduler_id" binding:"omitempty"` - SchedulerClusterID uint `json:"scheduler_cluster_id" binding:"omitempty"` + IDC string `json:"idc" binding:"omitempty"` + Location string `json:"location" binding:"omitempty"` + IP string `json:"ip" binding:"omitempty"` + Port int32 `json:"port" binding:"omitempty"` + SchedulerID uint `json:"scheduler_id" binding:"omitempty"` + Features []string `json:"features" binding:"omitempty"` + SchedulerClusterID uint `json:"scheduler_cluster_id" binding:"omitempty"` } type GetSchedulersQuery struct {